photo of Mihalis Tsoukalos
Mihalis Tsoukalos

8 Oct 2019 Read in about 9 min

In this blog post you are going to see how you can use the Debezium Connector to get the row-level changes from a MySQL database before using the Snowflake Kafka Connect Connector to process and send Kafka records to Snowflake.

For this whole process we will configure, deploy and monitor this flow using Lenses.io.


What is Lenses

Lenses is lightweight but powerful solution that delivers DataOps on your data infrastructure such as Apache Kafka and Kubernetes. It provides a unified UI and API to explore data in your streams and microservices, monitor, troubleshoot and secure your flows and data infrastructure. And finally to build, configure and deploy code-free applications and deploy them via your existing infrastructure such as Kubernetes and Kafka Connect and manage them via GitOps.


What is Debezium

Debezium is a distributed platform that can turn your existing databases into event streams. This means that your applications can see and respond to row-level changes in databases immediately.

Debezium is built on top of Apache Kafka and provides Kafka connectors that monitor existing database management systems, including MySQL, the database that will be used in this tutorial. Debezium uses Kafka logs to record the history of data changes in a database.

You can find more information about the Debezium Connector for MySQL here.


What is CDC

CDC stands for Changed Data Capture and is an old software design pattern for a system that monitors and captures the changes in data in order for other software to be able to respond to those changes.

In our case, CDC will capture all the changes on a MySQL database only and pass the relevant change events to Kafka with the help of Debezium.


What is Snowflake

Snowflake is a data warehouse that lives in the cloud. Snowflake can store both structured relational data and semi-structured data. You can connect to Snowflake using either the Snowflake CLI or using the web interface of Snowflake.


Pre-requisites

You are going to need to have Lenses for this tutorial.

Lenses is just a small 4GB JVM that you point to your Kafka and Kubernetes (if you have one) clusters. To make things easier for this guide, we are going to use the Lenses+Kafka all-in-one free “Lenses Box” container. Download it for free here.

Additionally, you will need to have a valid account to Snowflake.

The Debezium Connector for MySQL is already installed on Lenses Box so you will just need to configure it in order to communicate with your MySQL database.


The scenario that will be implemented

The scenario that is going to be implemented has the following steps:

  • Setup a MySQL database with sample data.
  • Import data from the MySQL to a Lenses Box with Kafka using Debezium.
  • Export the data into Snowflake.


Setting up MySQL

You do not need to do something special in the MySQL database that you are using. What is important is that MySQL is running and that you know the TCP port, username and password required to connect. For the purposes of this tutorial, MySQL will run on a Docker image. You can get the MySQL Docker image as follows:

docker pull debezium/example-mysql:0.10

Notice that this Docker image contains sample data on a database named inventory, which is the one that is going to be monitored.

After that you can execute that Docker image as follows:

docker run -it --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium \ 
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=dbz-pass debezium/example-mysql:0.10

The username that we will be using is root and the password is debezium – both will be used when setting up the Debezium Connector. Also notice that the name of the image as defined using --name mysql is mysql.

If you do not want to keep the Debezium Docker image on your computer, you can bypass the docker pull step and execute the docker run command with the --rm parameter, which tells Docker to remove the container when it stops.

If you want to connect to that Docker image using the sqlclient utility, you can execute the following command, which runs the mysql utility inside a Docker image named mysql:5.7:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 \ 
sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" \ 
-uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

From there you can inspect the contents of the inventory MySQL database and make changes to the contents of its tables.


Setting Up Lenses

Setting up Lenses Box container is as simple as executing the command:

docker run --name=lenses-dev --link mysql:mysql -e ADV_HOST=127.0.0.1 \ 
-e EULA="https://dl.lenses.stream/d/?id=xxxxxxxxxxxxxx" \ 
--rm -p 3030:3030 -p 9092:9092 -p 2181:2181 -p 8081:8081 -p 9581:9581 \ 
-p 9582:9582 -p 9584:9584 -p 9585:9585 lensesio/box

IMPORTANT: When you registered for the “Lenses Box”, you would have received the above docker command with a valid free license (instead of the “xxxxxxxxx” in the id parameter). Check your emails for your unique command.

You need the --link mysql:mysql parameter so that the Lenses Box image can find the Docker image with MySQL, which is going to be called mysql. Notice that a MySQL Docker image named mysql should already be running for the above docker run command to work.

You might need to wait a little for the Lenses Docker image to load all necessary components. After that you will be able to connect to Lenses Box from your favorite web browser on port 3030 – the username and the password are both admin.

After connecting, you will see the image that follows on your web browser.

Setting up Lenses


Setting Up Snowflake

You will need a Snowflake account in order to be able to use it. The easiest way is to visit https://trial.snowflake.com/ and create a trial account.

For the purposes of this tutorial the Snowflake username is mihalis. Notice that the password will not be needed for creating the connection between Lenses and Snowflake. However, you will still need your Snowflake password for connecting to the Snowflake UI.

The name of the Snowflake server is re91701.eu-west-1.snowflakecomputing.com. Yours will be different, That means if you try to follow this tutorial, you will have to replace re91701.eu-west-1.snowflakecomputing.com with your Snowflake server address.


Installing Snowflake Kafka Connector

We will need to install the Snowflake Kafka Connector to a Lenses Box. We will be using version 0.4.0 of the Snowflake Kafka Connector. Please visit this page for the full list of available Snowflake Kafka Connectors.

You will need to connect to the Docker image of the Lenses Box with the following command:

docker exec -it lenses-dev bash

Notice that the name of the Lenses Box being used is lenses-dev and was specified using the --name=lenses-dev parameter.

The previous step is required because you will need to install the Snowflake Connector inside the running Lenses Box.

You will need to download the Snowflake Kafka Connector with the command:

wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/0.4.0/snowflake-kafka-connector-0.4.0.jar

Put the downloaded file in the right directory with:

mv snowflake-kafka-connector-0.4.0.jar /run/connect/connectors/third-party/

Congratulations, you have just installed the Snowflake Kafka Connector jar file by putting it in the /run/connect/connectors/third-party/ directory of Lenses Box.

However, you will need to edit /run/lenses/lenses.conf and add the following text at the end of the Lenses configuration file in order for Lenses to understand the Snowflake Connector:

lenses {
  connectors.info = [
      {
           class.name = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
           name = "Snowflake Kafka Connector"
           sink = true
           description = "A description for the connector"
           author = "The connector author"
      }
  ]
}

Execute the following commands, which are needed for the Snowflake Connector to work:

mkdir -p /.cache/snowflake
chmod -R 777 /.cache/snowflake

The last commands that you need to execute is the following:

supervisorctl restart connect-distributed
supervisorctl restart lenses

The former command tells Lenses to search for new Connectors whereas the latter command restarts Lenses in order to reread the new configuration from /run/lenses/lenses.conf.


Using the Snowflake Connector for Kafka

In order to make sure that the Snowflake Kafka Connector is property installed you will need to visit the Connectors link inside the Lenses Box and press the + New Connector button.

Lenses Snowflake connector for Kafka

The following image proves that the Snowflake Connector is recognized by the Lenses Box and therefore is properly installed.

Snowflake connector in Lenses Box

You can find more information about the Snowflake Kafka Connector here.


Getting data from MySQL to Lenses and Kafka

As stated before, we are going to use the Debezium Connector to get row-level changes from MySQL to Kafka.

Using Debezium to put data into Lenses Box

We will now create a new Connector using the Debezium MySQL Connector available in Lenses Box. The Debezium configuration will be the following:

name=MySQL-CDC
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=mysql
database.port=3306
database.user=root
database.password=debezium
database.server.name=dbserver1
database.whitelist=inventory
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.inventory
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

Debezium MySQL connector in Lenses Box

The name of the MySQL database that we are going to watch is inventory and exists in the debezium/example-mysql:0.10 Docker image.

Now, connect to the MySQL database using the command we talked about earlier:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 \ 
sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" \ 
-uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Then, create a new table named profits to the inventory database as follows:

use inventory;
CREATE TABLE `profits` ( `product_id` int(11) NOT NULL, `amount` int(11) NOT NULL, PRIMARY KEY (`product_id`)) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Verify data published to Kafka

Within Lenses, go to the Topics >> schema-changes.inventory

Succesfull connection with Debezium and Lenses

There will be a new record to the schema-changes.inventory Kafka topic, which in JSON format will look similar to the following:

{
    "value": {
        "source": {
            "server": "dbserver1"
        },
        "position": {
            "ts_sec": 1569607738,
            "file": "mysql-bin.000003",
            "pos": 450,
            "server_id": 223344
        },
        "databaseName": "inventory",
        "ddl": "CREATE TABLE `profits` ( `product_id` int(11) NOT NULL, `amount` int(11) NOT NULL, PRIMARY KEY (`product_id`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"
    },
    "metadata": {
        "offset": 16,
        "partition": 0,
        "timestamp": 1569607738666,
        "__keysize": 0,
        "__valsize": 366
    }
}

This proves that everything works well with Debezium and Lenses.

Stream Process events before sending to Snowflake

So, imagine that you want to transfer data from dbserver1.inventory.products to Snowflake.

In this case, you will need to rename that topic into something simpler because it looks like Snowflake hates dot characters in topic names. The easiest way to do that is using a SQL processor (SQL Processor >> New Processor) that will continuously process events and transform them via an SQL statement.

For the purposes of this tutorial, the code for the SQL processor will be the following:

set autocreate=true;
INSERT INTO mytopic SELECT * FROM dbserver1.inventory.products

The new name for dbserver1.inventory.products will be mytopic – however the dbserver1.inventory.products still exists. The following figure shows that in a graphical way.

SQL processor to SQL statement


Connecting Lenses to Snowflake

After the installation of the Snowflake Kafka Connector, the process of connecting Lenses and Snowflake is straightforward.

Create a new Sink using the Snowflake Connector in Connectors >> New Connector >> Snowflake. The configuration for the Snowflake Sink in the Lenses Box will be the following:

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.topic2table.map=mytopic:external
tasks.max=1
topics=mytopic
snowflake.url.name=https://re91701.eu-west-1.snowflakecomputing.com
snowflake.database.name=mydb
snowflake.schema.name=public
value.converter.schema.registry.url=http://localhost:8081
buffer.count.records=10000
snowflake.user.name=mihalis
snowflake.private.key=REALLY_LOOOOOOOONG_STRING_READ_HOW_TO_FIND_IT
name=snowflake
snowflake.user.role=sysadmin
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
buffer.size.bytes=5242880

In order to get the correct value for snowflake.private.key you will need to visit this and follow the instructions.

Visiting Snowflake will verify that the data from Lenses Box is sent to Snowflake successfully. Data is read from mytopic in Lenses and written to external in Snowflake. The Snowflake database and schema used are mydb and public, respectively.

Data succesfully sent to Snowflake from Lenses


Conclusions

Lenses provides a DataOps layer to configure, manage, debug and deploy flows. You can manage these flows via GitOps and leverage Lenses to bake in monitoring, alerting and data compliance. Check out our other blogs for more info.


Get familar with everything you can do with Lenses Box in this 3-min tour


Did you like this article?

Industrialize your Kafka with Lenses


Download Free Version