New Apache Kafka to AWS S3 Connector
Setting up your Kafka to AWS S3 to optimize data storage.
Many in the community have been asking us to develop a new Kafka to S3 connector for some time. So we’re pleased to announce it's now available.
It’s been designed to deliver a number of benefits over existing S3 connectors:
To be completely free and Open Source
To allow for optimized data storage (costs) and read-access
To make it simpler to manage via GitOps and introduce governance
To reduce need for pre-data-processing workloads
Efficiently archive data & reduce Kafka platform TCO
Like our other Stream Reactors, the connector extends the standard connect config adding a parameter for a SQL command (Lenses Kafka Connect Query Language or “KCQL”). This defines how to map data from the source (in this case Kafka) to the target (S3).
Importantly, it also includes how data should be partitioned into S3, the bucket names and the serialization format (support includes JSON, Avro, Parquet, Text, CSV and binary).
The syntax looks like this:
INSERT INTO bucketAddress:pathPrefix
[PARTITIONBY (partition[, partition] ...)]
Full docs for the syntax can be found here.
The connector supports a number of different authentication mechanisms to the S3 bucket including standard AWS Credentials (using an access and secret key) and via an IAM role for EC2.
Where credentials are needed, the Lenses secret manager plugins allow you to integrate with common key management solutions such as Hashicorp Vault and AWS Secret Manager.
Increased control & pipeline as code
Having control over how the data is structured whilst being sinked allows you to optimize for specific use cases, workloads and read patterns by downstream applications (such as Snowflake or AWS Athena, Glue etc.) and cost. And you avoid having to build and deploy complex stream processing workloads prior to sending to S3.
Like all Stream Reactor and Kafka Connect connectors, the connector can be managed via your traditional deployment tools and frameworks or can be managed through Lenses. This adds a layer of RBAC, governance, auditing, error handling and monitoring to how to manage Kafka connectors.
The whole workload defined as Connect configuration & SQL helps you manage data pipelines as configuration and over GitOps.
S3 sink walkthrough
Let’s assume a scenario where we have records in a Kafka topic
backblaze_smart related to manufacturing process failures for certain products and events need to be stored in an S3 bucket.
Downstream analytics solutions (such as AWS Athena) run by different Data teams for each model need access to this data.
For a read-performance and cost reason, the Data team asks that messages related to each model be partitioned into different objects within the S3 bucket and with a particular naming convention and placed in a
The KCQL we will use in our connector configuration is therefore:
connect.s3.kcql=INSERT INTO Manufacturing:manufacturing_failures SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart PARTITIONBY model STOREAS `Json` WITH_FLUSH_COUNT = 300
Setting the environment
Here's a walkthrough of testing out the connector. It will involve the following processes:
Creating an S3 Bucket
Creating an IAM role with access to bucket
Running the Lenses Box docker on EC2
Configuring the connector in Lenses Box with EC2 authentication.
The Lenses.io Box is a free all-in-one Kafka + Lenses development docker. It’s a perfect environment for developing real-time applications and testing connectors and it comes with sample data.
If you prefer to use your own Lenses or Kafka Connect environment, you can download the connector from here and request a free trial of Lenses from lenses.io/start.
We will create an IAM role in order to configure EC2 authentication to access the S3 bucket. If you’re not running Lenses Box on an EC2 instance, this won’t be necessary but you’ll need to provide an Access and Secret key to write to S3 instead.
For this walkthrough, you should have the following prerequisites:
Access to the AWS management console with permissions to manage applications.
A host to run a docker container with 4GB RAM
Step 1: Create an S3 bucket
1. Sign in to the AWS Management Console and open the Amazon S3 console
2. Choose Create bucket.
3. In Bucket name, enter a name for your bucket <BUCKETNAME>. Do not use hyphens or other special characters. This is a requirement of the connector later.
4. Leave checked Block All Public Access and other settings as default.
5. Choose Create bucket
Take note of your AWS region of your bucket as you’ll need this later.
Step 2: Create an IAM role
For the EC2 authentication to work, you must create an IAM role before you can launch the instance that will run Kafka Connect (the Lenses Box docker is packaged with Kafka Connect).
With the IAM role created, it will be attached to an instance running Kafka Connect/Box.
Alternatively, if you will authenticate to the S3 bucket with AWS Secret and Access keys instead, follow the processes described here.
To create an IAM role using the IAM console
1. Open the IAM console
2. In the navigation pane, choose Roles, Create role.
3. On the Select role type page, choose EC2 and the EC2 use case. Choose Next: Permissions.
4. Choose Create policy to open the policy creation browser tab.
5. Open the JSON editor and paste the following policy, replacing the bucket name with the name of your bucket.
This policy allows access to the S3 bucket and also to the Kafka functions we will need later.
6. On the Review page, enter a name
S3DataAccessPolicy for the policy and choose Create policy.
7. Return to the previous browser tab for Create Role and refresh the policy list and find and check the newly created policy.
8. Choose Next: Add Tags.
9. Choose Next: Review.
10. Name the role
S3DataAccessRole and choose Create Role
Step 3: Deploy Lenses
If you’re deploying on an EC2 instance running docker we recommend you run on a
For the EC2 authentication to work, you’ll need to ensure the IAM Role associated with the instance running is assigned to the
S3DataAccessRole just created.
Port 3030 will need to be configured in the Security Group in order to access Lenses. Depending on which other services you want to access, you may want to open up certain ports (but not essential for this walkthrough).
1. Request a Box license key for free from: https://lenses.io/box/. You’ll receive an email with a docker run command
2. Run the command on your EC2 instance. It should look like this:
docker run -e ADV_HOST=127.0.0.1 \
-e EULA="https://dl.lenses.io/d/?id=REGISTER_FOR_KEY" \
--rm -p 3030:3030 -p 9092:9092 lensesio/box:latest
Note: if you want to configure your own external producers/consumers to this instance, set the
ADV_HOST address accordingly. But you’ll need to ensure you open up port 9092 in the security group too.
Step 4: Access Lenses and verify data
1. Access Lenses through your browser on port 3030.
2. The “Explore” page acts as a real-time data catalog. Search for the
backblaze_smart topic. This topic includes real-time flow from a sample producer within the Box environment.
This will be the data we sink to S3.
3. To be more precise about which fields to sink to S3, from within the SQL Studio run the statement
SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart
Step 5: Configure the S3 Connector through Lenses.io
1. It’s often a good idea to ensure you have access to the S3 bucket from within your environment using the AWS CLI.
Configure the CLI client with the command:
And then at least ensure you can ls the contents of your bucket:
aws s3 ls MyBucketName
Full instructions can be found here.
2. If successfully accessing the bucket, from within Lenses, select “Connectors” from the top level menu. A number of sample connectors should already be configured in Box.
3. Click New Connector. Lenses will list all the available Connectors that are packaged by default in Box.
4. Select the S3 Connector
5. Enter the details of the Connector. Take care to update the region of your S3 bucket in the parameters
MyBucketName with the bucket created earlier in the
connect.s3.kcql=INSERT INTO MyBucketName:manufacturing_failures SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart PARTITIONBY model STOREAS `Json` WITH_FLUSH_COUNT = 3
If you're using credentials to authenticate instead of EC2, enter a secret and access key.
6. Click Create Connector & ensure there are no errors in the runners.
7. Verify the data is sinked correctly in MyBucketName/manufacturing_failures folder in S3.
If you were successful, get ready to deploy the connector against your own Kafka Connect & Kafka environment with a free trial of Lenses available from lenses.io/start.
See our other Open Source connectors from https://lenses.io/connect/