Until recently, teams were building a small handful of Kafka streaming applications. They were usually associated with Big Data workloads (analytics, data science etc.), and data serialization would typically be in AVRO or JSON.
Now a wider set of engineering teams are building entire software products with microservices decoupled through Kafka. Many teams have adopted Google Protobuf as their serialization, partly due to its use in gRPC.
In the latest version of Lenses, we now fully support Kafka data observability and Kafka stream processing with Protobuf schemas managed in a Schema Registry. In this blog, we’ll show you how to create your first application with Protobuf and explore this data in Lenses.
Some context on Protocol buffers
Protocol buffers is an open source data format introduced at Google in 2001, often combined with gRPC and found in microservice-based software architectures.
Several features of Protobuf make it a good fit for this type of direct service-to-service communication:
Protobuf allows several systems to agree on the format of messages and correctly typecast native to the programming language.
It allows for both backward and forward-compatible schema evolutions (i..e old consumers can consume messages from new producers/servers and vice-versa).
It has broad language support. Its protoc compiler can generate builder classes for most programming languages.
Its compact, binary format allows for faster performance when serializing/deserializing ( more than 5x faster than JSON).
Why Protobuf for Kafka?
So Protobuf makes sense with microservices and is gaining a wider appeal than AVRO. If Protobuf is so commonly used for request/response, what makes it suitable for Kafka, a system that facilitates loose coupling between various services?
Loose coupling in Kafka increases the likelihood that developers are using different frameworks & languages. A schema like Protobuf becomes more compelling in principle, transferring data securely and efficiently.
Confluent Platform 5.5 has made a Schema Registry aware Kafka serializer for Protobuf and Protobuf support has been introduced within Schema Registry itself.
And now, Lenses 5.0 is here. It provides first-class Protobuf support:
Let’s go over an example of interacting with Protobuf data. To see an end-to-end (local) flow, we will:
- Build a basic application that produces Protobuf data (in Java)
- Send that Protobuf data to Kafka
- Explore the data in Lenses Box
- Consume the data through a Node.js application
What we need:
Docker installed
Java/Maven installed
An IDE (we will be using vsCode on our example)
NPM/Node installed
Before we begin, let’s get to know our tools and how they interact with each other:
Lenses Kafka Docker Box
Lenses Box allows us to connect applications to a localhost Apache Kafka docker inside a container.
Apache Kafka, Schema Registry, and an ecosystem of open source Kafka tools are already pre-configured for us in a single Docker. Perfect for us, since we don’t need to spin up each of these services independently:

Schema Registry
Schema Registry allows us to maintain a repository of schemas that our applications will depend on, interacting via Producer and Consumer applications to provide a reliable single source of truth.
Kafka Producers
The producer checks for the schema in its cache prior to sending the data.
If the schema is not found, the producer first sends (registers) the schema in the Schema Registry and gets back the schema ID.
It then produces the serialized data to Kafka along with the schema ID.
Kafka Consumers
Similarly, when consuming data, the consumer first checks its local cache to find the schema using the schema ID.
If not found, it sends the schema ID to the Schema Registry and then gets the schema back.
It deserializes the data.

Similarly, we will serialize our data based on the Protobuf schema, store the schema in the Schema Registry, retrieve the Schema ID, and then produce the message to our Kafka Topic. Afterward, we will be consuming those messages from our Node.js app following the flow of the diagram.
Let's dive into this!
Spin up Lenses Box
Start by setting up a running Lenses Box.
First, sign up for a Lenses Kafka Docker box. Make sure you have Docker running, open a new terminal window, and start Lenses with the following command:
```
docker run -e ADV_HOST=127.0.0.1 \
-e EULA="https://licenses.lenses.io/d/?id="<YOUR LENSES BOX ID>" \
--rm -p 3030:3030 -p 9092:9092 -p 8181:8181 -p 8081:8081 lensesio/box:latest
```
Note: If you copied the command from here make sure to also export ports 8081, and 8181 as shown on the command above.
In case you receive a warning of type: “Operating system RAM available is less than the lowest recommended”, make sure to increase your docker RAM for Lenses to work properly.
Access: http://127.0.0.1:3030 and log in with credentials admin/admin:

After logging in you should successfully land on http://127.0.0.1:3030/data and be able to view the main dashboard screen. Let's park Lenses for the time being and we will return to it later.
Create java application
Let's start by creating our folder directory on our IDE, its structure. It looks like this:
```
-src
|
|__main
|
|__java
| |
| |__kafka
| |
| |__SendKafkaProto.java
|
|__protobuf
|
|__CreditCard.proto
-pom.xml
```On the pom.xml we declare all of the dependencies that we will be using - the full file can be found here.
CreditCard.proto holds our schema which looks like this:
```
syntax = "proto3";
package io.lenses.examples.serde.protobuf;
option java_package = "com.example";
option java_outer_classname = "CardData";
message CreditCard {
string name = 1;
string country = 2;
string currency = 3;
string cardNumber = 4;
bool blocked = 5;
enum CardType {
VISA = 0;
MASTERCARD = 1;
AMEX = 2;
}
CardType type = 6;
}
```Generate Protobuf classes
Back to our IDE, let's produce the CardData.java file. We will need this later on to serialize our object to Protobuf, by typing (in a terminal window at the project location folder):
```
mvn generate-sources
```
This will generate our Protobuf Java class and our updated folder structure should now look like this:
```
-src
|
|__main
|
|__java
| |
| |__kafka
| | |
| | |__SendKafkaProto.java
| |
| |__com
| |
| |__example
| |
| |__CardData.java
|
|__protobuf
|
|__CreditCard.proto
-target
-pom.xml
```The code itself
Opening our SendKafkaProto.java file we will see:
```
package kafka;
import com.github.javafaker.Faker;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.CardData;
import java.util.Properties;
public class SendKafkaProto {
public static void main(String[] args) {
// Setup Producer Properties
String bootstrapServers = "127.0.0.1:9092";
var properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("schema.registry.url", "http://localhost:8081");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());
KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
// Specify Topic Name
var topic = "protos_topic_cards";
// Loop to Produce Fake Data
for (int i = 0; i < 15; i++) {
// creating Random object
Random rd = new Random();
Faker faker = new Faker();
String name = faker.name().fullName();
String countryCode = faker.address().countryCode();
String cardNumber = faker.business().creditCardNumber();
Integer typeValue = rd.nextInt(3);
String currencyCode = faker.country().currencyCode();
// Serializing to Protobuf based on CreditCard.proto Schema
var cardData = CardData.CreditCard.newBuilder()
.setName(name)
.setCountry(countryCode)
.setCurrency(currencyCode)
.setTypeValue(typeValue)
.setBlocked(false)
.setCardNumber(cardNumber)
.build();
var record = new ProducerRecord<String, CardData.CreditCard>(topic, "Credit Card", cardData);
// Send to Producer
producer.send(record);
}
producer.flush();
producer.close();
// Log success message
System.out.println("Sent Data Successfully");
}
}
```In the above snippet, we start by naming our package and importing the libraries that we will need later on. We then specify the properties of the Kafka broker, schema registry URL, and our topic name: protos_topic_cards.
Then, we construct some card objects using random data and we serialize the objects to Protobuf. Finally, we add the data to a record and send our records to the Kafka topic. Cool!
To test things out, let's package our code by typing the following in a terminal window at the project location folder:
```
mvn package
```
After receiving the "BUILD SUCCESS" info on our terminal, let's execute it with:
```
java -cp target/kafka-send-proto-0.1.0.jar kafka.SendKafkaProto
```
Or in case you are using VScode you can right-click the file and select "Run Java" (make sure you have installed the Extension Pack for Java in case the option is not appearing).
At this point, you should see the "Sent Data Successfully" message to your console, which indicates that we have sent data to Kafka as expected.
Congrats! 🚀
Note: If you receive an error of type:
```
Error connecting to node XXXXXXXXX:9092 (id: 101 rack: null)
java.net.UnknownHostException: XXXXXXXXX: nodename nor servname provided, or not known.
```
You need to map your localhost (127.0.0.1) to XXXXXXXX. Some instructions can be found here on stackoverflow and crunchify.
Sneak peek Schema Registry
Back to Lenses - http://127.0.0.1:3030/data - let’s make sure we have successfully registered our schema to the schema registry by taking a look. From the left menu bar select Schema Registry:

Our proto_topic_cards-value schema is there! Clicking on it to get its details we see that it’s almost identical to our initial schema:

Notice that the schema we initially seeded is a slightly different version that gets overridden by the Protobuf Kafka serializer with a semantically- equivalent.
The Protobuf Kafka Serializer is still validating the record we publish against the existing schema (if one exists). It just automatically overrides it.
Preview Data
Next, time to return to Lenses and have a look at the data itself.
Let's navigate back to http://127.0.0.1:3030/data and from the top left menu select the explore option.
Landing on the explore screen we can see on our topics list that our protos_topic_cards topic is there, and Lenses has already picked up that its value type is Protobuf.

Since the schema exists in the schema registry and the data in the topic matches it, Lenses can figure out the link between the two. Clicking on our topic we can see the key and value for each record in a human-readable format - a proper data-catalog:

Without this, it would read as an unreadable byte array as that is how data is stored in a Kafka topic.
Kafka Consumer
Lastly, let’s consume the data from our topic. We will consume the data in a local nodeJS application, spin up an Express Server, and preview the data in our Browser.
We are presuming that we already have npm and nodejs installed on our local machine but if that's not the case , download them here.
Let’s start by opening up a terminal window and initializing our project in a new directory by typing:
```
npm init -y
```
This will create a package.json file for us, looking like this:
```
{
"name": "foo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
```Let’s then install the dependencies that we will use, by typing on our terminal:
```
npm install @kafkajs/confluent-schema-registry express kafkajs
```
This will update our package.json file with the dependencies included:
```
"dependencies": {
"@kafkajs/confluent-schema-registry": "^3.2.1",
"express": "^4.17.3",
"kafkajs": "^1.16.0"
}
```We will use Express to preview our data in the browser, Kafkajs to connect to Kafka, and @kafkajs/confluent-schema-registry to connect to our schema registry and be able to deserialize the message.
The last thing we need to do with our package.json file is to create our run script (the command that we will be using to run our application) is edit our scripts:
```
"scripts": {
"dev": "node index.js"
},
```You can find the package.json file here.
So now by typing “npm run dev”, we actually run “node index.js” and execute the code that we have in our index.js file, which we need to create. So let’s create our index.js file (in the same directory), which will look like this:
```
const { Kafka } = require("kafkajs");
const { SchemaRegistry } = require("@kafkajs/confluent-schema-registry");
const express = require("express");
const registry = new SchemaRegistry({ host: "http://localhost:8081" });
// Create the client with the broker list
const kafka = new Kafka({
clientId: "my-proto-app",
brokers: ["127.0.0.1:9092"],
});
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: "proto-topic-consumer" });
let messagesArray = [];
const consume = async () => {
await consumer.connect();
// Subscribe to our Topic
// fromBeginning flag to consume messages from start
// for more on this see: https://kafka.js.org/docs/consuming#frombeginning
await consumer.subscribe({
topic: "protos_topic_cards",
fromBeginning: true,
});
// Run our consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Push messages to messagesArray after decoded
messagesArray.push(await registry.decode(message.value));
// Log details on our console (optional)
// Notice there is no need to convert to JSON here
console.log({
topic,
key: message.key.toString(),
value: await registry.decode(message.value),
headers: message.headers,
});
},
});
};
// Run our consumer
consume();
var app = express();
// Create the "/" route response
app.get("/", function (req, res) {
res.setHeader("Content-type", "text/html");
// In a JSON format to be able to display in the browser
res.send(
messagesArray
.map((message) => {
return `<h4>${JSON.stringify(message)}</h4>`;
})
.join(" ")
);
});
// Listen on port 5000
var server = app.listen(5000, function () {
var host = server.address().address;
var port = server.address().port;
// Log to make sure server started successfully
console.log("Example app listening at http://%s:%s", host, port);
});
```For starters, we need to import all of our required packages for this to work. Then we provide the required Kafka information (brokers url, topic, schema registry.)
Then we subscribe to the protos_topic_cards topic, get the messages, push them to an array and log consumer details (topic, message value, headers) - just to make sure we are getting the data from Kafka.
Finally, we spin up the express server, set up its ‘/’ route, and map over the messages to send our response and be able to preview to our browser. To do so we need to stringify the messages in JSON. Our server listens on port 5000.
OK, let’s have a look at what we produced. Looking at our http://localhost:5000 we can see the data in our browser:

And with this, we have successfully sent data, in a Protobuf format, from a java application to Kafka, had a look at what's inside our topic utilizing Lenses, and consumed that data in our Node.js application. Good stuff!
Useful links:
Github source code for Java App: https://github.com/eleftheriosd/Protobuf-To-Kafka
Github source code for Node.js App: https://github.com/eleftheriosd/Node.js-Kafka-consumer
Lenses box download: https://lenses.io/apache-kafka-docker/







