Antonios Chalkiopoulos
Antonios Chalkiopoulos
An FTP server, together with a pair of credentials is a common pattern, on how data providers expose data as a service.
In this article we are going to implement custom file transformers to efficiently load files over FTP and using Kafka Connect convert them to meaningful events in Avro format.
Depending on data subscriptions we might get access to FTP locations with files
updated daily
, weekly
or monthly
. File structures might be positional, CSV, JSON,
XML or even binary.
On IoT use cases we might need to flatten multiple events arriving in a single line or apply other transformations before allowing the data to enter the Kafka highway as a stream of meaningful messages.
Kafka Connect distributed workers can provide a reliable and straight forward way of ingesting data over FTP. Let’s now look at some real IoT cases with data delivered on FTP and how to load them into Kafka:
XML
CSV
Binary
The first data set is available through FTP and multiple XML files that contain data per day for numerous geo-locations in the world:
Note in the above block the last line. The siteID
, lat
and lng
are metadata about this time-series.
The XML file continues with the entries:
All we need to do is to define a case class and provide a class implementing
SourceRecordConverter
and encapsulating the logic of flattening XML packed data
into messages:
In no more than 30 lines of code, we have encapsulated the entire parsing logic into a RecordConverter. The data contract is inside a case class that provides access to the schema and a converter to an Avro structure.
Get the code from GitHub,
build the JAR file with sbt assembly
and add it into the classpath of Kafka Connect.
Next we can instruct the connector to use it via setting the property sourcerecordconverter
to com.landoop.IradianceXML
:
What we have achieved, is setting up and posting to Kafka Connect distributed, a connector that tails all files
newer than 14 days P14D
in a remote FTP at the location iradiance/*.xml
and refreshes the tailing every
(1 minute) PT1M
.
Timings are provided in the
iso8601 duration
format.
When the first XML file is consumed, a number of events are generated in Avro format. The first message is checked against the schema-registry, and as no such Avro subject is yet registered, it will register it automatically:
By using the kafka-topics-ui we can also see the data landing into the topic:
As instructed, a single XML file is parsed into multiple messages, that each one makes sense in isolation, thus are converted to streaming-events. The solar irradiance seems to be picking up in the morning hours.
Every time an XML file is consumed and multiple new Avro messages are generated into Kafka, a record is automatically added
in the connect-offsets
topic:
The above record acts as the high watermark
, so that on the next poll of the connector, only new files and files that
increased in size will be consumed. So, similarly to Camel
and other FTP pollers, the FTP connector is
a state-less micro service that preserves state and data in Kafka.
Let’s look at some CSV files delivered over FTP. Horizontal files, come in with some metadata columns, followed by a date column and then by 24 or 48 comma separated set of numbers that indicate a reading every 60 minute or 30 minute time interval in that day.
DeviceID_1234_foo,21/01/2017,1.5,1.6 … 10.2,10.4,10.2,12.6,11.2,9.5,8.8
A compacted time-series in plain sight, requires a simple transformation to break it down to simple events and then send them to Kafka in records, ready to be consumed by downstream apps.
The above code will cater for entries coming with missing values. We have defined the specifications of the connectors as Scala spec tests:
All we need to do is send a request for a new CSV (horizontal) files with:
We will now look at another use case where an embedded device captures multiple data points and interpolates them into discrete channels. For example when having Channel A and Channel B a CSV file could have the following columns:
The implementation, available on GitHub, ignores completely the Channel B data and for every line it emits 1 record with (Column 4) data to topic `` … and 1 record for each measurement.
We can inspect our schemas:
And using the kafka-topics-ui we can also see the data landing into the topic:
You will have noticed, that currently all topics have 1 partition and 1 replication factor.
Setting up a connector to fetch binary files, is supported by default, by using the
In the above configuration we have selected the ftp.monitor.update
capability of the connector.
No development is required, and all we need to do is post a connector with the appropriate configuration.
We can now have a unified view of our Connect topology using the kafka-connect-ui tool:
In this article we have presented how to use Kafka Connect to set up connectors to poll remote FTP locations, pick up new data (in a variety of file-formats) and transform it into Avro messages and transmit these Avro messages to Apache Kafka.
In the second part of this Blog we will present how to run a setup such as the above operationally with associated metrics, monitoring and alerting.
Happy coding - Landoop team.