By Christos Kotsis | Jun 02, 2020


Apache Pulsar walks into a data bar

Some time ago, the concept of event streaming was not that widespread. In addition to that, the platforms provided were much fewer than today, and required more technical depth to set up and operate.

However, as time passed, those platforms matured, community grew, the documentation was improved and the rest of the world started to wake up to the advantages these platforms can bring to address the real-time experiences businesses need.

And Apache Pulsar is a great example.

It has been some time now since I was first introduced to Pulsar, another event streaming platform, which in my opinion is one of the most promising event streaming platforms out there today. And I have to admit, back then it did not feel that easy.

Of course, when I was first introduced to it, I had a completely different opinion. I was mostly comfortable with Apache Kafka and the last thing I needed was to learn a new streaming platform.

As time passed however, I not only got more familiar with the technology, but I also noticed the huge leap it was making through being more user friendly and also easier to maintain and setup.

I really like both platforms and I believe each one has its perks, and hence there is no reason not to use them both, and that is what I want to share with you in this article. We will learn how to pipe data from Apache Kafka via Lenses API and push that data to Apache Pulsar. I will install Lenses.io and lensesPy (Lenses python module) that uses the Lenses SQL Engine and security model via the Lenses API to consume data off a Kafka topic. Then we will produce the consumed data into a Pulsar topic.

The value here is that not only are we using the same API to consume/produce off the two different technologies, but also the Lenses SQL Engine allows us to consume data serialized in any fashion (AVRO, Protobuf, JSON, etc.) all whilst using a great namespace-based security model (including data masking if we need it).

Apache Pulsar Lenses.io

Data pipes with SQL

I am using Lenses.io as an intermediate station since I want to take advantage of the SQL syntax that it provides to process Apache Kafka data.

We will see how to use LensesPy to query data from an Apache Kafka topic to Apache Pulsar.

First of all we will need a Pulsar instance up and running, a Kafka Cluster and a Lenses Instance.

A Pulsar docker is going to serve a standalone Pulsar server while Lenses Box will serve 1 Kafka broker, 1 Zookeeper, 1 Schema Registry, 1 Kafka Connect Worker Node and Lenses.io.

For Lenses there are multiple ways, but for this I recommend one of the following two.

1. The first one is via Lenses Portal. A free workspace that Lenses.io provides for testing. All you need is to visit portal.lenses.io and signup for a free account, then, onces you have your account, login and create a free workspace, which should take around 2-3 min to complete. The workspace provides 1 Kafka Cluster + 1 Running instance of Lenses.io, which is more than enough for what you need.

2. The second option is to use Lenses Box (initially mentioned) which will install a Kafka Cluster + Lenses.io locally via docker.

For Lenses Box, in a new terminal issue the following command

docker run -it \
 --name=”LensesBox” \
 --net=host \
 - e EULA=”Licenses_URL” \  
 lensesio/box:3.1.2

If you do not have a Developer License to run Lenses Box, please visit Developer Lenses Box, register and a new developer Licenses will be mailed to you.

To spin up Apache Pulsar, issue the following command in a new terminal

docker run -it \  
  -p 6650:6650 \  
  -p 8080:8080 \  
  --mount source=pulsardata,target=/pulsar/data \  
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.5.1 \  
  bin/pulsar standalone

You should now have Lenses Box up and running and a standalone instance of Apache Pulsar.

Next LensesPy Git Repo and clone the repository locally.

mkdir -vp local && pushd localgit clone
https://github.com/lensesio/lenses-python.git
pushd lenses-python

Dependencies for building the wheel package:

  • Python3

  • Setuptools

  • Wheel

Building the LensesPy:

python3 setup.py sdist bdist_wheel

Installing LensesPy with Pulsar:

pip3 install dist/lensesio-3.0.0-py3-none-any.whl[pulsar]

After installing the LensesPy module, we are ready to start sending some data to Pulsar.

First, create a new file named send-to-pulsar.py in the root of the repository and paste the following code

from lensesio.lenses import main
from json import dumps
lenses_lib = main(
	auth_type="basic",
	url="https://10.15.3.1:9991",	
        username="admin",
	password="admin",
        verify_cert=False,)
lenses_lib.InitPulsarClient('pulsar://localhost:6650')
lenses_lib.StartPulsarProducer("my-topic")
def send_pulsar(payload):
	print("Sending payload to Pulsar")	
        payload = dumps(payload).encode('utf-8')	
        lenses_lib.PulsarProduce(payload)
bucket = lenses_lib.ExecSQL("select smart_194_raw from backblaze_smart limit 10")
for r in bucket["data"]:
	send_pulsar(r)

Let’s see what the above code does.

First we import lensesio & json modules and we login to Lenses by issuing (Service accounts are what is recommended for production, but since this is a demonstration, we are using credentials instead)

lenses_lib = main(
	auth_type="basic",	
        url="https://10.15.3.1:9991",
	username="admin",	
        password="admin",
        verify_cert=False,
)

After we logged in to Lenses, we initiated the Pulsar Client and started the producer for a topic “my-topic”.

Next we create a new function with payload as input. The function encodes in utf-8 the payload and then calls lenses_lib.PulsarProduce(payload) which will send the payload to Pulsar.

Finally we execute a query via Lenses API which fetches 10 records from backblaze_smart topic and we send the fetched records to Apache Pulsar.

We are ready now to execute the script

python3 send-to-pulsar.py

After we execute the above script we will get the following output

Sending payload to Pulsar topic my-topic from Kafka topic backblaze_smart
...
Sending payload to Pulsar topic my-topic from Kafka topic backblaze_smart

We fetched our records and we send them to Apache Pulsar and it's now time to create a consumer and verify that our records were successfully sent to Pulsar.

First create a new file called consume-from-pulsar.py and paste the following code

from lensesio.lenses import main
from json import loads
lenses_lib = main(auth_type=None)
lenses_lib.InitPulsarClient('pulsar://localhost:6650')
lenses_lib.StartPulsarConsumer("my-topic", "lensesPy")
while True:
  msg = lenses_lib.PulsarConsume()
  msg = loads(msg.decode())  
  print(msg)

The above code will set up a Pulsar consumer client and then start consuming from the Pulsar topic “my-topic”

Note: Since we are piping data from Kafka to Pulsar, we need only to authenticate with Lenses during fetching from Apache Kafka, that’s why in the above code we did not provide any auth code.

Without auth code, the above example is exactly the same as what you would run if you followed the Apache Pulsar python consumer example with the exception of the Lenses module which functions as a simple wrapper.

Executing the script consume-from-pulsar.py

python3 consume-from-pulsar.py

Will start consuming and printing the records that are being published to Pulsar from our first script

...
{'key': '{"serial_number":"Z3016N6R"}', 'offset': 6712, 
'partition': 1, 'timestamp': 1589723977183, 'topic': 
'backblaze_smart', 'value': '{"smart_194_raw":33}'}
...
{'key': 
'{"serial_number":"Z302G5JD"}', 'offset': 6717, 'partition': 1, 
'timestamp': 1589723979478, 'topic': 'backblaze_smart', 'value': 
'{"smart_194_raw":24}'}
...

(Optional Threads) Note: You can also use pulsar consumer threads. The logic is the same here but the code needs a few changes. First, to enable the thread, we need to pass the spawn_thread=True to the PulsarConsume method.

But, since we are creating a thread, we do not need to write the while loop since it is part of the thread. Also, PulsarConsume threaded method does not use a return statement. What it does instead is to register a queue in the active threads which it can be used to access the data.

(Optional Threads Continued) To spawn a Pulsar consumer thread, in a python terminal issue

from lensesio.lenses import main
from json import loads
lenses_lib = main(auth_type=None)
lenses_lib.InitPulsarClient('pulsar://localhost:6650')
lenses_lib.StartPulsarConsumer("my-topic", "lensesPy")

lenses_lib.PulsarConsume(spawn_thread=True)

(Optional Threads Continued) Information about the thread can be found by issuing

lenses_lib.active_threads

{'pulsar_consumer': {'t': 1,
  1: {'consumerQue': <queue.Queue at 0x7fca57d94d90>,   
  'state': True,
  'state_info': 'Consuming',   
  'thread': <Thread(Thread-2, started 140506999768832)>,   
  'recordsFetched': 52097}}, 
  'pulsar_reader': {'t': 0}, 
  'thread_lock': <unlocked _thread.RLock object owner=0 count=0 at
 0x7fca7e60a630>}

(Optional Threads Continued) To consume data from the queue, issue

In [3]: lenses_lib.active_threads["pulsar_consumer"][1]
["consumerQue"].get(block=False)
Out[3]: b'{"key": "{\\"serial_number\\":\\"Z302AL4H\\"}", "offset": 63954,
"partition": 0, "timestamp": 1590988524530, "topic": "backblaze_smart",
"value": "{\\"smart_194_raw\\":27}"}'

(Optional Threads Continued) Or by using the consume_pc_queue method

In [4]: lenses_lib.consume_pc_queue(1)
Out[4]:
(b'{"key": "{\\"serial_number\\":\\"PL1331LAGSAEHH\\"}", "offset": 63955,
"partition": 0, "timestamp": 1590988525482, "topic": "backblaze_smart",
"value": "{\\"smart_194_raw\\":27}"}',
 True)

If you’re experimenting with Kafka or developing real-time applications, you can access the Lenses.io cloud workspace - a full Kafka+Lenses.io environment in the cloud here.

Or otherwise share your thoughts with me on Pulsar over our community Slack channel.

Ready to get started with Lenses?

Download free version