• Pricing
  • Install Now
installNow icon
installNow icon
Install Now
homeMobile icon
homeMobile icon
Home
picingMobile icon
picingMobile icon
Pricing
blogMobile icon
blogMobile icon
Blog

Producing Protobuf data to Kafka

Eleftherios Davros
By Eleftherios DavrosMarch 1, 2022
OG-protobuf
In this article:
  • 01.Some context on Protocol buffers
  • 02.Why Protobuf for Kafka?
  • 03.What we need:
  • 04.Lenses Kafka Docker Box
  • 05.Schema Registry
  • 06.Spin up Lenses Box
  • 07.Create java application
  • 08.Generate Protobuf classes
  • 09.The code itself
  • 10.Sneak peek Schema Registry
  • 11.Preview Data
  • 12.Kafka Consumer

Until recently, teams were building a small handful of Kafka streaming applications. They were usually associated with Big Data workloads (analytics, data science etc.), and data serialization would typically be in AVRO  or JSON.

Now a wider set of engineering teams are building entire software products with microservices decoupled through Kafka. Many teams have adopted Google Protobuf as their serialization, partly due to its use in gRPC.

In the latest version of Lenses, we now fully support Kafka data observability and Kafka stream processing with Protobuf schemas managed in a Schema Registry. In this blog, we’ll show you how to create your first application with Protobuf and explore this data in Lenses.

Some context on Protocol buffers

Protocol buffers is an open source data format introduced at Google in 2001, often combined with gRPC and found in microservice-based software architectures.

Several features of Protobuf make it a good fit for this type of direct service-to-service communication:

  • Protobuf allows several systems to agree on the format of messages and correctly typecast native to the programming language.
  • It allows for both backward and forward-compatible schema evolutions (i..e old consumers can consume messages from new producers/servers and vice-versa).
  • It has broad language support. Its protoc compiler can generate builder classes for most programming languages. 
  • Its compact, binary format allows for faster performance when serializing/deserializing ( more than 5x faster than JSON).

Why Protobuf for Kafka?

So Protobuf makes sense with microservices and is gaining a wider appeal than AVRO. If Protobuf is so commonly used for request/response, what makes it suitable for Kafka, a system that facilitates loose coupling between various services?

  1. Loose coupling in Kafka increases the likelihood that developers are using different frameworks & languages. A schema like Protobuf becomes more compelling in principle, transferring data securely and efficiently.
  2. Confluent Platform 5.5 has made a Schema Registry aware Kafka serializer for Protobuf and Protobuf support has been introduced within Schema Registry itself.
  3. And now, Lenses 5.0 is here. It provides first-class Protobuf support:

    • Browse messages using SQL
    • Explore & evolve schemas across any Kafka infrastructure
    • Apply data redaction/masking, and
    • Build stream processing applications with SQL syntax.

Let’s go over an example of interacting with Protobuf data. To see an end-to-end (local) flow, we will:

- Build a basic application that produces Protobuf data (in Java) 

- Send that Protobuf data to Kafka

- Explore the data in Lenses Box

- Consume the data through a Node.js application

What we need:

  • Docker installed
  • Lenses Kafka Docker Box 
  • Java/Maven installed
  • An IDE (we will be using vsCode on our example)
  • NPM/Node installed

Before we begin, let’s get to know our tools and how they interact with each other:

Lenses Kafka Docker Box

Lenses Box allows us to connect applications to a localhost Apache Kafka docker inside a container.

Apache Kafka, Schema Registry, and an ecosystem of open source Kafka tools are already pre-configured for us in a single Docker. Perfect for us, since we don’t need to spin up each of these services independently:

Lenses Apache Kafka Docker


Schema Registry

Schema Registry allows us to maintain a repository of schemas that our applications will depend on, interacting via Producer and Consumer applications to provide a reliable single source of truth.

Kafka Producers

  1. The producer checks for the schema in its cache prior to sending the data. 
  2. If the schema is not found, the producer first sends (registers) the schema in the Schema Registry and gets back the schema ID.
  3. It then produces the serialized data to Kafka along with the schema ID.

Kafka Consumers

  1. Similarly, when consuming data, the consumer first checks its local cache to find the schema using the schema ID.
  2. If not found, it sends the schema ID to the Schema Registry and then gets the schema back.
  3. It deserializes the data.


Architecture diagram


Similarly, we will serialize our data based on the Protobuf schema, store the schema in the Schema Registry, retrieve the Schema ID, and then produce the message to our Kafka Topic. Afterward, we will be consuming those messages from our Node.js app following the flow of the diagram.

Let's dive into this!

Spin up Lenses Box

Start by setting up a running Lenses Box.

First, sign up for a Lenses Kafka Docker box. Make sure you have Docker running, open a new terminal window, and start Lenses with the following command:


```
docker run -e ADV_HOST=127.0.0.1 \
-e EULA="https://licenses.lenses.io/d/?id="<YOUR LENSES BOX ID>" \
--rm -p 3030:3030 -p 9092:9092 -p 8181:8181 -p 8081:8081 lensesio/box:latest
```

Note: If you copied the command from here make sure to also export ports 8081, and 8181 as shown on the command above.

In case you receive a warning of type: “Operating system RAM available is less than the lowest recommended”, make sure to increase your docker RAM for Lenses to work properly.

Access: http://127.0.0.1:3030 and log in with credentials admin/admin:


Lenses Login Screen

After logging in you should successfully land on http://127.0.0.1:3030/data and be able to view the main dashboard screen.
Let's park Lenses for the time being and we will return to it later.

Create java application

Let's start by creating our folder directory on our IDE, its structure. It looks like this:

```
-src
   | 
   |__main
     |
     |__java
     |  |
     |  |__kafka
     |     |
     |     |__SendKafkaProto.java
     |
     |__protobuf
        |
        |__CreditCard.proto
-pom.xml
```


  • On the pom.xml we declare all of the dependencies that we will be using - the full file can be found here.
  • CreditCard.proto holds our schema which looks like this:


```
syntax = "proto3";

package io.lenses.examples.serde.protobuf;

option java_package = "com.example";
option java_outer_classname = "CardData";

message CreditCard {
  string name = 1;
  string country = 2;
  string currency = 3;
  string cardNumber = 4;
  bool blocked = 5;
  enum CardType {
    VISA = 0;
    MASTERCARD = 1;
    AMEX = 2;
  }
  CardType type = 6;
}
```


Generate Protobuf classes

Back to our IDE, let's produce the CardData.java file. We will need this later on to serialize our object to Protobuf, by typing (in a terminal window at the project location folder):


```
mvn generate-sources
```


This will generate our Protobuf Java class and our updated folder structure should now look like this:


```
-src
   | 
   |__main
     |
     |__java
     |  |
     |  |__kafka
     |  |  |
     |  |  |__SendKafkaProto.java
     |  |
     |  |__com
     |     |
     |     |__example
     |              |
     |              |__CardData.java
     |
     |__protobuf
        |
        |__CreditCard.proto
-target
-pom.xml
```


The code itself

Opening our SendKafkaProto.java file we will see:


```
package kafka;

import com.github.javafaker.Faker;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.CardData;
import java.util.Properties;

public class SendKafkaProto {
  public static void main(String[] args) {
    // Setup Producer Properties
    String bootstrapServers = "127.0.0.1:9092";
    var properties = new Properties();
    properties.setProperty("bootstrap.servers", bootstrapServers);
    properties.setProperty("schema.registry.url", "http://localhost:8081");
    properties.setProperty("key.serializer", StringSerializer.class.getName());
    properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());

    KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
    // Specify Topic Name
    var topic = "protos_topic_cards";

    // Loop to Produce Fake Data
    for (int i = 0; i < 15; i++) {
      // creating Random object
      Random rd = new Random();
      Faker faker = new Faker();
      String name = faker.name().fullName();
      String countryCode = faker.address().countryCode();
      String cardNumber = faker.business().creditCardNumber();
      Integer typeValue = rd.nextInt(3);
      String currencyCode = faker.country().currencyCode();

      // Serializing to Protobuf based on CreditCard.proto Schema
      var cardData = CardData.CreditCard.newBuilder()
          .setName(name)
          .setCountry(countryCode)
          .setCurrency(currencyCode)
          .setTypeValue(typeValue)
          .setBlocked(false)
          .setCardNumber(cardNumber)
          .build();

      var record = new ProducerRecord<String, CardData.CreditCard>(topic, "Credit Card", cardData);
      // Send to Producer
      producer.send(record);
    }
    producer.flush();
    producer.close();
    // Log success message
    System.out.println("Sent Data Successfully");
  }
}
```


In the above snippet, we start by naming our package and importing the libraries that we will need later on. We then specify the properties of the Kafka broker, schema registry URL, and our topic name: protos_topic_cards.

Then, we construct some card objects using random data and we serialize the objects to Protobuf. Finally, we add the data to a record and send our records to the Kafka topic. Cool!

To test things out, let's package our code by typing the following in a terminal window at the project location folder:


```
mvn package
```


After receiving the "BUILD SUCCESS" info on our terminal, let's execute it with:


```
java -cp target/kafka-send-proto-0.1.0.jar kafka.SendKafkaProto
```


Or in case you are using VScode you can right-click the file and select "Run Java" (make sure you have installed the Extension Pack for Java in case the option is not appearing).

At this point, you should see the "Sent Data Successfully" message to your console, which indicates that we have sent data to Kafka as expected.

Congrats! 🚀

Note: If you receive an error of type:

```
Error connecting to node XXXXXXXXX:9092 (id: 101 rack: null)
java.net.UnknownHostException: XXXXXXXXX: nodename nor servname provided, or not known.
```


You need to map your localhost (127.0.0.1) to XXXXXXXX. Some instructions can be found here on stackoverflow and crunchify.

Sneak peek Schema Registry

Back to Lenses - http://127.0.0.1:3030/data - let’s make sure we have successfully registered our schema to the schema registry by taking a look. From the left menu bar select Schema Registry:

Schema Registry


Our proto_topic_cards-value schema is there! Clicking on it to get its details we see that it’s almost identical to our initial schema:

Screenshot 2022-02-15 at 12.54.18 PM


Notice that the schema we initially seeded is a slightly different version that gets overridden by the Protobuf Kafka serializer with a semantically- equivalent.

The Protobuf Kafka Serializer is still validating the record we publish against the existing schema (if one exists). It just automatically overrides it.

Preview Data

Next, time to return to Lenses and have a look at the data itself.

Let's navigate back to http://127.0.0.1:3030/data and from the top left menu select the explore option.

Landing on the explore screen we can see on our topics list that our protos_topic_cards topic is there, and Lenses has already picked up that its value type is Protobuf.

proto topic explore screen


Since the schema exists in the schema registry and the data in the topic matches it, Lenses can figure out the link between the two. Clicking on our topic we can see the key and value for each record in a human-readable format - a proper data-catalog:


Topic cards data


Without this, it would read as an unreadable byte array as that is how data is stored in a Kafka topic.

Kafka Consumer

Lastly, let’s consume the data from our topic. We will consume the data in a local nodeJS application, spin up an Express Server, and preview the data in our Browser.

We are presuming that we already have npm and nodejs installed on our local machine but if that's not the case , download them here.

Let’s start by opening up a terminal window and initializing our project in a new directory by typing:

```
npm init -y
```


This will create a package.json file for us, looking like this:

```
{
  "name": "foo",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}
```


Let’s then install the dependencies that we will use, by typing on our terminal:


```
npm install @kafkajs/confluent-schema-registry express kafkajs
```


This will update our package.json file with the dependencies included:

```
"dependencies": {
    "@kafkajs/confluent-schema-registry": "^3.2.1",
    "express": "^4.17.3",
    "kafkajs": "^1.16.0"
  }
```


We will use Express to preview our data in the browser, Kafkajs to connect to Kafka, and @kafkajs/confluent-schema-registry to connect to our schema registry and be able to deserialize the message.

The last thing we need to do with our package.json file is to create our run script (the command that we will be using to run our application) is edit our scripts:

```
"scripts": {
    "dev": "node index.js"
  },
```


You can find the package.json file here.

So now by typing “npm run dev”, we actually run “node index.js” and execute the code that we have in our index.js file, which we need to create. So let’s create our index.js file (in the same directory), which will look like this:


```
const { Kafka } = require("kafkajs");
const { SchemaRegistry } = require("@kafkajs/confluent-schema-registry");
const express = require("express");
const registry = new SchemaRegistry({ host: "http://localhost:8081" });

// Create the client with the broker list
const kafka = new Kafka({
  clientId: "my-proto-app",
  brokers: ["127.0.0.1:9092"],
});

// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: "proto-topic-consumer" });

let messagesArray = [];

const consume = async () => {
  await consumer.connect();
  // Subscribe to our Topic
  // fromBeginning flag to consume messages from start
  // for more on this see: https://kafka.js.org/docs/consuming#frombeginning
  await consumer.subscribe({
    topic: "protos_topic_cards",
    fromBeginning: true,
  });
  // Run our consumer
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      // Push messages to messagesArray after decoded
      messagesArray.push(await registry.decode(message.value));

      // Log details on our console (optional)
      // Notice there is no need to convert to JSON here
      console.log({
        topic,
        key: message.key.toString(),
        value: await registry.decode(message.value),
        headers: message.headers,
      });
    },
  });
};

// Run our consumer
consume();

var app = express();

// Create the "/" route response
app.get("/", function (req, res) {
  res.setHeader("Content-type", "text/html");
  // In a JSON format to be able to display in the browser
  res.send(
    messagesArray
      .map((message) => {
        return `<h4>${JSON.stringify(message)}</h4>`;
      })
      .join(" ")
  );
});

// Listen on port 5000
var server = app.listen(5000, function () {
  var host = server.address().address;
  var port = server.address().port;

  // Log to make sure server started successfully
  console.log("Example app listening at http://%s:%s", host, port);
});

```


For starters, we need to import all of our required packages for this to work. Then we provide the required Kafka information (brokers url, topic, schema registry.)

Then we subscribe to the protos_topic_cards topic, get the messages, push them to an array and log consumer details (topic, message value, headers) - just to make sure we are getting the data from Kafka.

Finally, we spin up the express server, set up its ‘/’ route, and map over the messages to send our response and be able to preview to our browser. To do so we need to stringify the messages in JSON. Our server listens on port 5000.

OK, let’s have a look at what we produced. Looking at our http://localhost:5000  we can see the data in our browser:


JSON deserialized


And with this, we have successfully sent data, in a Protobuf format, from a java application to Kafka, had a look at what's inside our topic utilizing Lenses, and consumed that data in our Node.js application. Good stuff!

Useful links:

  • Github source code for Java App: https://github.com/eleftheriosd/Protobuf-To-Kafka 
  • Github source code for Node.js App: https://github.com/eleftheriosd/Node.js-Kafka-consumer 
  • https://help.lenses.io/sql/protobuf/
  • Lenses box download: https://lenses.io/apache-kafka-docker/
Back to all blogs

Related Blogs

Lenses 6.2 Oauth
Lenses 6.2 Oauth
Blog

Lenses 6.2 - Trusting Agents to build & operate event-driven applications

andrew
andrew
By
Andrew Stevenson
image
image
Blog

Kafka Migrations Need More Than a Replicator

Jonas Best Profile Picture
Jonas Best Profile Picture
By
Jonas Best
kafkaconnections hero banner
kafkaconnections hero banner
Blog

Self-Service Data Replication with K2K - part 1

Drew Oetzel
Drew Oetzel
By
Drew Oetzel

Lenses, autonomy in data streaming

Install now
Products
Developer Experience
Kafka replicator
Lenses AI
Kafka Connectors
Pricing
Company
About
Careers
Contact
Solutions by industry
Financial services
For engineers
Docs
Ask Marios Discourse
Github
Slack
For executives
Case studies
Resources
Blog
Press room
Events
LinkedIn
Youtube
Legal
Terms
Privacy
Cookies
SLAs
EULA
© 2026Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation