MQTT to Kafka to InfluxDB to view data in Kafka.
Time-series data stores are of particular interest these days and influxDB is a popular open source distributed time-series database. In this tutorial we will integrate Kafka with InfluxDB using Kafka Connect and implement a Scala Avro message producer to test the setup.
The steps we are going to follow are:
Setup a docker development environment
Run an InfluxDB Sink Kafka Connector
Create a Kafka Avro producer in Scala (use the schema registry)
Generate some messages in Kafka
Finally, we will verify the data in influxDB and visualize them in Chronograph.
Data in influxDb is organized in time series
where each time series has points
, one for each discrete sample of the metric. Points consists of:
time
: the timestamp
measurement
: which conceptually matches the idea of a SQL table
tags
: key-value pairs in order to store index values, usually metadata.
fields
: key-value pairs, containing the value itself, non indexed.
null
values aren’t stored.
The structure is of the data is:
measurement,tagKey1=tagVal1,tagKey2=tagVal2 fieldK1=fieldV1,fieldK1=fieldV1
Example:
For both the InfluxDB and Kafka ecosystem setup we are going to use Docker. If you don’t have docker installed on your machine, follow the instructions on their website.
Run the docker image:
$ docker run -p 8086:8086 -v influxdb:/var/lib/influxdb influxdb
Enter the InfluxDB Shell and create a database and a user:
You may find more info on this docker image here.
In order to set up Kafka we will need a set of systems to be installed. We are going to use fast-data-dev docker image, which includes everything we need for this tutorial:
Kafka Broker
ZooKeeper
Kafka Connect
Kafka REST Proxy
Schema Registry
Landoop’s UI tools
DataMountaineer Connectors (in our case InfluxDB will be used)
Embedded integration tests with examples.
Run the docker image:
You may find more on fast-data-dev documentation here.
While this is running for the first time it will run some integration tests and create some topics and connectors for you. You are able to access the landing page for the container at http://localhost:3030
Once it’s up and running you may access:
the Kafka Broker logs: docker exec <ID> tail -f /var/log/broker.log
the Kafka Connect logs: docker exec <ID> tail -f /var/log/connect-distributed.log
the docker bash: docker run --rm -it --net=host landoop/fast-data-dev bash
Data usually originate from somewhere. For example, an embedded sensor can produce data at frequent intervals. For the shake of the example we are going to create a simple Kafka Producer in Scala to transmit some metrics
kafka-avro-console-producer
can also be used instead of the Scala app
The data structure looks like this:
We are going to send these messages in a Kafka topic named as device-measurements-topic
Our fast-data-dev
docker image provides Kafka Connect and has already added the influxDB connector in the classpath
of the available plugins
.
Let’s find the IP address of the docker used by the influxDB container (usually 172.17.0.x)
docker inspect <ID or NAME> | grep IPA
We will now use the kafka-connect-ui to configure the connector:
or from command line:
InfluxDB Sink connector supports KCQL (Kafka Connect Query Language) and this allows us to filter fields from a Kafka topic without needing to do any extra processing. Let’s say in our case we are only interested in storing the temperature for a particular device in a time series within InfluxDB.
The query looks like this:
deviceMeasurement
is the time series namespace (measurement
) for influxDB
WITHTIMESTAMP
will add thetime
in the point. If we omit it from our query then influx will add a system timestamp.
Now lets create a simple producer that sends these types of event messages into Kafka. We are going to use Avro format so that we can maintain the schema of the data in the Schema Registry. In this example both the key
and the value
of the Kafka message will be in Avro format. Of course you may use any other Serializer.
We create the case classes that describe the schemas for the key
and the value
and then use call the producer with some random values.
In your schema registry you will find 2 automatically registered schemas: one for the key called device-measurement-topic-key
and one for the value device-measurement-topic-value
.
We can see the schemas created in http://localhost:3030/schema-registry-ui
We can see the Kafka messages generated in http://localhost:3030/kafka-topics-ui
and also InfluxDB should also be getting data:
InfluxDB provides a visualization tool called Chronograf.
Here is how our time series chart may look like:
Find the code for this example in GitHub