By David Sloan | Nov 18, 2020


New Apache Kafka to AWS S3 Connector

New Apache Kafka to AWS S3 Connector

Many in the community have been asking us to develop a new Kafka to S3 connector for some time. So we’re pleased to announce it's now available. 

It’s been designed to deliver a number of benefits over existing S3 connectors:

  • To be completely free and Open Source

  • To allow for optimized data storage (costs) and read-access

  • To make it simpler to manage via GitOps and introduce governance

  • To reduce need for pre-data-processing workloads

  • Efficiently archive data & reduce Kafka platform TCO

Like our other Stream Reactors, the connector extends the standard connect config adding a parameter for a SQL command (Lenses Kafka Connect Query Language or “KCQL”). This defines how to map data from the source (in this case Kafka) to the target (S3). 

Importantly, it also includes how data should be partitioned into S3, the bucket names and the serialization format (support includes JSON, Avro, Parquet, Text, CSV and binary). 

The syntax looks like this:

INSERT INTO bucketAddress:pathPrefix 
SELECT * 
FROM kafka-topic
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITHPARTITIONER partitioner]
[WITH_FLUSH_SIZE flush_size]
[WITH_FLUSH_INTERVAL flush_interval]
[WITH_FLUSH_COUNT flush_count]

Full docs for the syntax can be found here.

The connector supports a number of different authentication mechanisms to the S3 bucket including standard AWS Credentials (using an access and secret key) and via an IAM role for EC2.

Where credentials are needed, the Lenses secret manager plugins allow you to integrate with common key management solutions such as Hashicorp Vault and AWS Secret Manager. 

Increased control & pipeline as code

Having control over how the data is structured whilst being sinked allows you to optimize for specific use cases, workloads and read patterns by downstream applications (such as Snowflake or AWS Athena, Glue etc.) and cost. And you avoid having to build and deploy complex stream processing workloads prior to sending to S3. 

Like all Stream Reactor and Kafka Connect connectors, the connector can be managed via your traditional deployment tools and frameworks or can be managed through Lenses. This adds a layer of RBAC, governance, auditing, error handling and monitoring to how to manage Kafka connectors. 

Lenses.io Apache Kafka Connector for S3 -

The whole workload defined as Connect configuration & SQL helps you manage data pipelines as configuration and over GitOps.

S3 sink walkthrough

Let’s assume a scenario where we have records in a Kafka topic backblaze_smart related to manufacturing process failures for certain products and events need to be stored in an S3 bucket.

manufacturing data failure events in Lenses.io

Downstream analytics solutions (such as AWS Athena) run by different Data teams for each model need access to this data. 

For a read-performance and cost reason, the Data team asks that messages related to each model be partitioned into different objects within the S3 bucket and with a particular naming convention and placed in a manufacturing_failures folder.

The KCQL we will use in our connector configuration is therefore:

connect.s3.kcql=INSERT INTO Manufacturing:manufacturing_failures SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart PARTITIONBY model STOREAS `Json` WITH_FLUSH_COUNT = 300 

Setting the environment

Here's a walkthrough of testing out the connector. It will involve the following processes:

  • Creating an S3 Bucket 

  • Creating an IAM role with access to bucket

  • Running the Lenses Box docker on EC2

  • Configuring the connector in Lenses Box with EC2 authentication.

The Lenses.io Box is a free all-in-one Kafka + Lenses development docker. It’s a perfect environment for developing real-time applications and testing connectors and it comes with sample data.

Lenses.io AWS S3 Connector for Kafka Connect with Lenses Box

If you prefer to use your own Lenses or Kafka Connect environment, you can download the connector from here and request a free trial of Lenses from lenses.io/start. 

We will create an IAM role in order to configure EC2 authentication to access the S3 bucket. If you’re not running Lenses Box on an EC2 instance, this won’t be necessary but you’ll need to provide an Access and Secret key to write to S3 instead. 

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • AWS account.

  • Access to the AWS management console with permissions to manage applications.

  • A host to run a docker container with 4GB RAM

Step 1: Create an S3 bucket

1. Sign in to the AWS Management Console and open the Amazon S3 console

2. Choose Create bucket.

3. In Bucket name, enter a name for your bucket <BUCKETNAME>. Do not use hyphens or other special characters. This is a requirement of the connector later.

4. Leave checked Block All Public Access and other settings as default.

5. Choose Create bucket

Take note of your AWS region of your bucket as you’ll need this later. 

Step 2: Create an IAM role

For the EC2 authentication to work, you must create an IAM role before you can launch the instance that will run Kafka Connect (the Lenses Box docker is packaged with Kafka Connect).

With the IAM role created,  it will be attached to an instance running Kafka Connect/Box. 

Alternatively, if you will authenticate to the S3 bucket with AWS Secret and Access keys instead, follow the processes described here.

To create an IAM role using the IAM console

1. Open the IAM console

2. In the navigation pane, choose Roles, Create role.

3. On the Select role type page, choose EC2 and the EC2 use case. Choose Next: Permissions.

Create IAM Role S3 Lenses.io

4. Choose Create policy to open the policy creation browser tab.

5. Open the JSON editor and paste the following policy, replacing the bucket name with the name of your bucket.

This policy allows access to the S3 bucket and also to the Kafka functions we will need later.  

{
	"Version": "2012-10-17",
	"Statement": [
    	{
        	"Sid": "1",
        	"Effect": "Allow",
        	"Action": [
            	"s3:PutObject",
            	"s3:GetObject",
            	"s3:ListBucket",
                "s3:DeleteObject"
        	],
        	"Resource": [
            	"arn:aws:s3:::<BUCKETNAME>",
            	"arn:aws:s3:::<BUCKETNAME>/*"
        	]
    	}
	]
}

6. On the Review page, enter a name S3DataAccessPolicy for the policy and choose Create policy.

IAM Role S3 Bucket Review Policy

7. Return to the previous browser tab for Create Role and refresh the policy list and find and check the newly created policy.

Create Policy Role S3 Bucket IAM AWS

8. Choose Next: Add Tags.

9. Choose Next: Review.

10.  Name the role S3DataAccessRole and choose Create Role

Create S3 IAM Role for Lenses.io Kafka Connect Connector

Step 3: Deploy Lenses

If you’re deploying on an EC2 instance running docker we recommend you run on a t2.large instance. 

For the EC2 authentication to work, you’ll need to ensure the IAM Role associated with the instance running is assigned to the S3DataAccessRole just created.

Lenses.io Box AWS console

Port 3030 will need to be configured in the Security Group in order to access Lenses. Depending on which other services you want to access, you may want to open up certain ports (but not essential for this walkthrough). 

1. Request a Box license key for free from: https://lenses.io/box/. You’ll receive an email with a docker run command

2. Run the command on your EC2 instance. It should look like this:

docker run -e ADV_HOST=127.0.0.1 \

       -e EULA="https://dl.lenses.io/d/?id=REGISTER_FOR_KEY" \

       --rm -p 3030:3030 -p 9092:9092 lensesio/box:latest

Note: if you want to configure your own external producers/consumers to this instance, set the ADV_HOST address accordingly.  But you’ll need to ensure you open up port 9092 in the security group too. 

Step 4: Access Lenses and verify data

1. Access Lenses through your browser on port 3030.

2. The “Explore” page acts as a real-time data catalog. Search for the backblaze_smart topic. This topic includes real-time flow from a sample producer within the Box environment. 

This will be the data we sink to S3. 

3. To be more precise about which fields to sink to S3, from within the SQL Studio run the statement

SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart

lenses.io Sql studio aws s3 connector

Step 5: Configure the S3 Connector through Lenses.io

1. It’s often a good idea to ensure you have access to the S3 bucket from within your environment using the AWS CLI. 

Configure the CLI client with the command:

aws configure

And then at least ensure you can ls the contents of your bucket:

aws s3 ls MyBucketName

Full instructions can be found here.

2. If successfully accessing the bucket, from within Lenses, select “Connectors” from the top level menu. A number of sample connectors should already be configured in Box. 

Lenses.io box connectors kafka connect

3. Click New Connector. Lenses will list all the available Connectors that are packaged by default in Box.

4. Select the S3 Connector

5. Enter the details of the Connector. Take care to update the region of your S3 bucket in the parameters aws.custom.endpoint & aws.region. Ensure to update the bucket name MyBucketName with the bucket created earlier in the connect.s3.kcql parameter. 

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
errors.retry.timeout=0
errors.log.include.messages=false
tasks.max=1
topics=backblaze_smart
errors.retry.delay.max.ms=60000
errors.deadletterqueue.context.headers.enable=false
key.converter.schemas.enable=false
connect.s3.kcql=INSERT INTO MyBucketName:manufacturing_failures SELECT serial_number, model, capacity_bytes, failure FROM backblaze_smart PARTITIONBY model STOREAS `Json` WITH_FLUSH_COUNT = 3 
aws.region=eu-west-3
errors.deadletterqueue.topic.name=s3dlq
aws.custom.endpoint=https://s3.eu-west-3.amazonaws.com/
value.converter.schemas.enable=false
name=S3Example
errors.tolerance=none
errors.deadletterqueue.topic.replication.factor=1
aws.vhost.bucket=false
value.converter=org.apache.kafka.connect.json.JsonConverter
config.action.reload=restart
errors.log.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
schemas.enable=false

lenses.io aws s3 connector kafka connect configuration

If you're using credentials to authenticate instead of EC2, enter a secret and access key.

aws.secret.key=SECRET_KEY
aws.access.key=ACCESS_KEY

6. Click Create Connector & ensure there are no errors in the runners.

7. Verify the data is sinked correctly in MyBucketName/manufacturing_failures folder in S3. 

s3 console lenses.io Kafka Connect Connector for S3

Next steps

If you were successful, get ready to deploy the connector against your own Kafka Connect & Kafka environment with a free trial of Lenses available from lenses.io/start.

See our other Open Source connectors from https://lenses.io/connect/

Ready to get started with Lenses?

Try now for free