New Lenses Multi-Kafka Developer Experience

Download

Mission-critical data flows with the open-source Lenses Kafka Connector for Amazon S3

By

Stefan Bocutiu

Sep 25, 2023

An effective data platform thrives on solid data integration, and for Kafka, S3 data flows are paramount.

Data engineers often grapple with diverse data requests related to S3. Enter Lenses. By partnering with major enterprises, we've levelled up our S3 connector, making it the market's leading choice.

We've also incorporated it into our Lenses 5.3 release, boosting Kafka topic backup/restore.

This post will delve into the connector's expanded capabilities and the latest features it brings to the table.

Kafka & S3 data integration: the challenges

Given the flexible nature of S3 and the sheer number of use cases it supports, it’s no wonder that data engineers struggle to come up with a catch-all solution that addresses all the different challenges between Kafka and S3:

  • Kafka topic backups need context preservation.

  • Athena prefers Parquet storage with specific partitioning.

  • Storing images in S3? One message per file is crucial.

And these are just the issues with data sinking.

Fetching data from S3 to Kafka often means decoding intricate formats, be it multiline XML or niche industry text protocols.

____

Watch: Kafka with Amazon S3 - Architecture and top use cases

____

A connector that meets multiple needs

We need versatile Source and Sink connectors packed with features to cater to a wide range of scenarios:

  • An Envelope data structure that efficiently preserves key/value/headers/metadata

  • Support for storage formats, Parquet, AVRO, JSON and text

  • Sources complex formats such as XML and industry protocols

  • Supports writing raw events into separate files (e.g. images)

  • Flexible structure and partitioning in S3

  • Control flush timing & sizing of the files

  • Dynamically detects new objects at source

  • Avoids data loss and exactly-once semantics

Moreover, these connectors:

  • Are open source

  • Come with optional enterprise support

  • Have been stress-tested in critical production environments alongside our enterprise partners.

With the Lenses S3 connectors, you can:

  • Backup and restore Kafka topics.

  • Archive topic histories for security and disaster recovery.

  • Transport Kafka data to data lakes, from Athena to Databricks or Snowflake.

  • Power AI models/LLM.

  • Seamlessly merge non-Kafka native apps by sourcing from S3.

Achieve this on a large scale, tweaking throughput and latency as per your needs.

To cap it off, the connector uses the Kafka Connect Query Language (KCQL) that operates like SQL to govern its functions. The complete syntax is as follows:

The main features of the connector are driven by this configuration property and we will walk through examples in the rest of this blog.

Message Envelope

A Kafka message includes components like the key, value, headers, and metadata (topic, partition, offset, timestamp).

The connector wraps these messages in an "envelope", streamlining backup and restore without relying on complex Kafka Connect transformations.

Here's how the envelope is structured:

In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.

The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.

Storage Formats

Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:

  • AVRO

  • Parquet

  • JSON

  • CSV (including headers)

  • Text

  • BYTES

This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter and value.converter connector properties.

The data storage format in S3 is simply set using the STOREAS keyword in the connector configuration. For example, to store single images, we can set it to bytes and the flush count to 1.

Object partitioning in S3

Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:

Would result in:

The connector allows for customised partitioning, which has its perks:

  • Better performance in subsequent data queries due to organized partitions.

  • Easy data management through time intervals, like year or month.

  • Keeping sensitive data in distinct partitions for tighter access controls.

  • To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.

For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.

The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:

To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:

This will result in object keys like:

Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation here.

Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:

In this configuration:

  • The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.

  • The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).

  • The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.

Sourcing multi-line events & XML

While not all data formats adhere to Avro or Protobuf standards, text-based data remains prevalent. In such cases, leveraging regular expressions (regex) becomes a powerful strategy for filtering and extracting specific records from this textual data.

To illustrate when it makes sense to apply regex to text data, consider the following example:

Imagine you are tasked with processing a log file from a web server. This log file contains a multitude of entries, including requests, errors, and other information. By applying regex to this text data, you can selectively extract only the HTTP request entries that match a specific pattern, such as successful GET requests to a particular endpoint.

This targeted filtering allows you to isolate and analyze relevant records, forwarding them to Kafka for further processing or monitoring, while disregarding irrelevant information. In summary, regex offers a valuable means to parse and extract pertinent data from text sources like log files, enabling you to focus on specific records of interest and streamline data processing workflows.

The following section applies a regex to each line and considers it a record if it starts with a number greater than zero. This is an example from an airline customer of Lenses which extracts specific entries about the flight data:

Occasionally, data records within a text file extend across multiple lines, posing a unique challenge for extraction. To address this scenario, the source connector offers a versatile multi-line mode.

To enable this feature, simply set the read.text.mode to StartEndLine and specify the starting and ending lines to identify and encompass the desired records. Let's delve into a practical example to demonstrate its effectiveness:

Suppose you're dealing with a log file that contains multi-line entries for system status messages (SSM), where each SSM entry begins with the line 'SSM' and concludes with an empty line ('').

To configure this specific use case, while also ensuring any trailing whitespace is trimmed, follow this configuration:

And finally, we introduce the Start-End Tag mode, a dynamic text-reading feature that selectively extracts text content sandwiched between specified start and end tags, inclusively.

This mode is powerful when dealing with scenarios where a single line of text in an S3 source equates to multiple Kafka messages in the output.

For example, given XML records nestled between '<SSM>' and '</SSM>' tags, with the objective of reading and transforming the records into Kafka messages. This can be achieved with effortless precision by configuring the connector as shown below:

What next?