Kafka connect for FTP data

Learn how to implement custom file transformers to efficiently load files over FTP and use Kafka Connect to convert them to meaningful events in Avro format.

Antonios Chalkiopoulos
Kafka connect for FTP data

An FTP server, together with a pair of credentials is a common pattern, on how data providers expose data as a service.

In this article we are going to implement custom file transformers to efficiently load files over FTP and using Kafka Connect convert them to meaningful events in Avro format.

ftp to kafka

Depending on data subscriptions we might get access to FTP locations with files updated dailyweekly or monthly. File structures might be positionalCSVJSON, XML or even binary.

On IoT use cases we might need to flatten multiple events arriving in a single line or apply other transformations before allowing the data to enter the Kafka highway as a stream of meaningful messages.

Kafka Connect distributed workers can provide a reliable and straight forward way of ingesting data over FTP. Let’s now look at some real IoT cases with data delivered on FTP and how to load them into Kafka:

  • XML

  • CSV

  • Binary

XML : Irradiance Solar data

The first data set is available through FTP and multiple XML files that contain data per day for numerous geo-locations in the world:

Note in the above block the last line. The siteIDlat and lng are metadata about this time-series.

The XML file continues with the entries:

All we need to do is to define a case class and provide a class implementing SourceRecordConverter and encapsulating the logic of flattening XML packed data into messages:

In no more than 30 lines of code, we have encapsulated the entire parsing logic into a RecordConverter. The data contract is inside a case class that provides access to the schema and a converter to an Avro structure.

Get the code from GitHub, build the JAR file with sbt assembly and add it into the classpath of Kafka Connect.

Next we can instruct the connector to use it via setting the property sourcerecordconverter to com.landoop.IradianceXML :

What we have achieved, is setting up and posting to Kafka Connect distributed, a connector that tails all files newer than 14 days P14D in a remote FTP at the location iradiance/*.xmland refreshes the tailing every (1 minute) PT1M.

Timings are provided in the iso8601 duration format.

When the first XML file is consumed, a number of events are generated in Avro format. The first message is checked against the schema-registry, and as no such Avro subject is yet registered, it will register it automatically:

iradiance schema

By using the kafka-topics-ui we can also see the data landing into the topic:

iradiance topics

As instructed, a single XML file is parsed into multiple messages, that each one makes sense in isolation, thus are converted to streaming-events. The solar irradiance seems to be picking up in the morning hours.

Every time an XML file is consumed and multiple new Avro messages are generated into Kafka, a record is automatically added in the connect-offsets topic:

The above record acts as the high watermark, so that on the next poll of the connector, only new files and files that increased in size will be consumed. So, similarly to Camel and other FTP pollers, the FTP connector is a state-less micro service that preserves state and data in Kafka.

Horizontal CSV files (monthly)

Let’s look at some CSV files delivered over FTP. Horizontal files, come in with some metadata columns, followed by a date column and then by 24 or 48 comma separated set of numbers that indicate a reading every 60 minute or 30 minute time interval in that day.

DeviceID_1234_foo,21/01/2017,1.5,1.6 … 10.2,10.4,10.2,12.6,11.2,9.5,8.8

A compacted time-series in plain sight, requires a simple transformation to break it down to simple events and then send them to Kafka in records, ready to be consumed by downstream apps.

The above code will cater for entries coming with missing values. We have defined the specifications of the connectors as Scala spec tests:

All we need to do is send a request for a new CSV (horizontal) files with:

CSV : Multi channel files

We will now look at another use case where an embedded device captures multiple data points and interpolates them into discrete channels. For example when having Channel A and Channel B a CSV file could have the following columns:

The implementation, available on GitHub, ignores completely the Channel B data and for every line it emits 1 record with (Column 4) data to topic `` … and 1 record for each measurement.

We can inspect our schemas:

multichannel schema

And using the kafka-topics-ui we can also see the data landing into the topic:

multichannel topics

You will have noticed, that currently all topics have 1 partition and 1 replication factor.

Binary compressed files

Setting up a connector to fetch binary files, is supported by default, by using the In the above configuration we have selected the ftp.monitor.update capability of the connector.

No development is required, and all we need to do is post a connector with the appropriate configuration.

Connect Topology

We can now have a unified view of our Connect topology using the kafka-connect-ui tool:

kafka connect ui

Conclusions

In this article we have presented how to use Kafka Connect to set up connectors to poll remote FTP locations, pick up new data (in a variety of file-formats) and transform it into Avro messages and transmit these Avro messages to Apache Kafka.

In the second part of this Blog we will present how to run a setup such as the above operationally with associated metrics, monitoring and alerting.

Happy coding - Landoop team.

References

Ready to get started with Lenses?

Try now for free