Some time ago, the concept of event streaming was not that widespread. In addition to that, the platforms provided were much fewer than today, and required more technical depth to set up and operate.
However, as time passed, those platforms matured, community grew, the documentation was improved and the rest of the world started to wake up to the advantages these platforms can bring to address the real-time experiences businesses need.
And Apache Pulsar is a great example.
It has been some time now since I was first introduced to Pulsar, another event streaming platform, which in my opinion is one of the most promising event streaming platforms out there today. And I have to admit, back then it did not feel that easy.
Of course, when I was first introduced to it, I had a completely different opinion. I was mostly comfortable with Apache Kafka and the last thing I needed was to learn a new streaming platform.
As time passed however, I not only got more familiar with the technology, but I also noticed the huge leap it was making through being more user friendly and also easier to maintain and setup.
I really like both platforms and I believe each one has its perks, and hence there is no reason not to use them both, and that is what I want to share with you in this article. We will learn how to pipe data from Apache Kafka via Lenses API and push that data to Apache Pulsar. I will install Lenses.io and lensesPy (Lenses python module) that uses the Lenses SQL Engine and security model via the Lenses API to consume data off a Kafka topic. Then we will produce the consumed data into a Pulsar topic.
The value here is that not only are we using the same API to consume/produce off the two different technologies, but also the Lenses SQL Engine allows us to consume data serialized in any fashion (AVRO, Protobuf, JSON, etc.) all whilst using a great namespace-based security model (including data masking if we need it).
I am using Lenses.io as an intermediate station since I want to take advantage of the SQL syntax that it provides to process Apache Kafka data.
We will see how to use LensesPy to query data from an Apache Kafka topic to Apache Pulsar.
First of all we will need a Pulsar instance up and running, a Kafka Cluster and a Lenses Instance.
A Pulsar docker is going to serve a standalone Pulsar server while Lenses Box will serve 1 Kafka broker, 1 Zookeeper, 1 Schema Registry, 1 Kafka Connect Worker Node and Lenses.io.
For Lenses there are multiple ways, but for this I recommend one of the following two.
1. The first one is via Lenses Portal. A free workspace that Lenses.io provides for testing. All you need is to visit portal.lenses.io and signup for a free account, then, once you have your account, login and create a free workspace, which should take around 2-3 min to complete. The workspace provides 1 Kafka Cluster + 1 Running instance of Lenses.io, which is more than enough for what you need.
2. The second option is to use Lenses Box (initially mentioned) which will install a Kafka Cluster + Lenses.io locally via docker.
For Lenses Box, in a new terminal issue the following command
If you do not have a Developer License to run Lenses Box, please visit Developer Lenses Box, register and a new developer Licenses will be mailed to you.
To spin up Apache Pulsar, issue the following command in a new terminal
You should now have Lenses Box up and running and a standalone instance of Apache Pulsar.
Next LensesPy Git Repo and clone the repository locally.
Dependencies for building the wheel package:
Python3
Setuptools
Wheel
Building the LensesPy:
python3 setup.py sdist bdist_wheel
Installing LensesPy with Pulsar:
pip3 install dist/lensesio-3.0.0-py3-none-any.whl[pulsar]
After installing the LensesPy module, we are ready to start sending some data to Pulsar.
First, create a new file named send-to-pulsar.py in the root of the repository and paste the following code:
Let’s see what the above code does.
First we import lensesio & json modules and we login to Lenses by issuing (Service accounts are what is recommended for production, but since this is a demonstration, we are using credentials instead)
After we logged in to Lenses, we initiated the Pulsar Client and started the producer for a topic “my-topic”.
Next we create a new function with payload as input. The function encodes in utf-8 the payload and then calls lenses_lib.PulsarProduce(payload) which will send the payload to Pulsar.
Finally we execute a query via Lenses API which fetches 10 records from backblaze_smart topic and we send the fetched records to Apache Pulsar.
We are ready now to execute the script:
python3 send-to-pulsar.py
After we execute the above script we will get the following output:
We fetched our records and we send them to Apache Pulsar and it's now time to create a consumer and verify that our records were successfully sent to Pulsar.
First create a new file called consume-from-pulsar.py and paste the following code:
The above code will set up a Pulsar consumer client and then start consuming from the Pulsar topic “my-topic”.
Note: Since we are piping data from Kafka to Pulsar, we need only to authenticate with Lenses during fetching from Apache Kafka, that’s why in the above code we did not provide any auth code.
Without auth code, the above example is exactly the same as what you would run if you followed the Apache Pulsar python consumer example with the exception of the Lenses module which functions as a simple wrapper.
Executing the script consume-from-pulsar.py
python3 consume-from-pulsar.py
Will start consuming and printing the records that are being published to Pulsar from our first script:
(Optional Threads) Note: You can also use pulsar consumer threads. The logic is the same here but the code needs a few changes. First, to enable the thread, we need to pass the spawn_thread=True to the PulsarConsume method.
But, since we are creating a thread, we do not need to write the while loop since it is part of the thread. Also, PulsarConsume threaded method does not use a return statement. What it does instead is to register a queue in the active threads which it can be used to access the data.
(Optional Threads Continued) To spawn a Pulsar consumer thread, in a python terminal issue:
(Optional Threads Continued) Information about the thread can be found by issuing:
(Optional Threads Continued) To consume data from the queue, issue
(Optional Threads Continued) Or by using the consume_pc_queue method:
If you’re experimenting with Kafka or developing real-time applications, you can access the Lenses.io cloud workspace - a full Kafka+Lenses.io environment in the cloud here.
Or otherwise share your thoughts with me on Pulsar over our community Slack channel.