By Mihalis Tsoukalos | May 05, 2020


Simple & powerful: User Defined Functions for SQL stream processing

Simple & powerful: User Defined Functions for SQL stream processing

SQL is the de facto query language for exploring data.

With Lenses, we also allow developers to build stream processing applications with SQL (not KSQL, it is our own engine based on Kafka Streams and developed prior to KSQL!). These applications can be simply deployed over Kubernetes or Kafka Connect workers.

This solves the problem of managing complex code and deployment practices for sometimes really simple workloads in your favourite stream processing frameworks (Kafka Streams, Flink, Akka, Storm, ...).

The trouble with SQL is that sometimes you can be a limited by the functions, especially if you want to do more sophisticated stuff. For example if you want to do some spacial mapping on real time data and republish data onto another Kafka topic.

One really powerful feature of Lenses is creating your own User Defined Functions (UDFs) or UDAFs (User Defined Aggregate Functions). You can then reference your custom functions like you would on a standard SQL operation (instead of SELECT max(myvalue)... you could have SELECT geoLocate(lat,long)...)

The other benefit of UDFs is that you can start modularising your code and refining to common functions across multiple applications. This solves a common complaint with SQL.

Lenses provides a really simple API that can be used for building custom functions for our SQL engine.

This blog post is about implementing a user defined function in Java and integrating it into Lenses and Lenses SQL.

About UDF and UDAF

UDF stands for User Defined Function whereas UDAF stands for User Defined Aggregate Function. The difference between the two is that the former processes just a single row (eg. lower(name)) whereas the latter aggregates results across multiple rows (eg max(age)) . The programming language for writing both UDF and UDAF is currently Java.

Pre-requisites

In order to be able to follow the steps of this tutorial, you will need the following:

  • Lenses up and running pointing to your Apache Kafka cluster. You can use our free Lenses Box instance if necessary.

  • An Internet connection that will help you clone a GitHub repository.

The Scenario

This blog post will illustrate how to compile and install a Java function into Lenses.

The Implementation

We are going to use a Lenses Box in this tutorial. The Lenses Box Docker image will be executed as follows:

    docker run --name=lenses-dev -e ADV_HOST=127.0.0.1 -e
      EULA="YOUR_PERSONAL_CODE" --rm -p 3030:3030 -p 9092:9092
      -p 2181:2181 -p 8081:8081 -p 9581:9581 -p 9582:9582 -p 9584:9584
      -p 9585:9585 lensesio/box

Note that the alias to the Lenses Box Docker image is defined by the argument of the --name option – in this case it is lenses-dev and that you should replace the YOUR_PERSONAL_CODE text with your personal code.

All the commands that follow will be executed from within the lenses-dev Lenses Box - we can connect to a bash(1) shell of the Lenses Box as follows:

docker exec -it lenses-dev bash

Defining the function

The contents of the celsius_to_fahrenheit.java file that defines the UDF will be as follows:

package io.lenses.sql.udf;
    
public class celsius_to_fahrenheit implements UserDefinedFunction1 {
    
    @Override
    public Object evaluate(Object arg1) {
        double celsius = Double.parseDouble(arg1.toString());
        double fahrenheit = celsius * 1.8 + 32;
        return Math.round(fahrenheit);
    }
}

The GitHub project with all the required Java files can be found here and it will be cloned in a while.

Installing the Function

As a Lenses Box does not include all the Linux packages that we are going to need, we will have to install them manually. As Lenses Box uses Alpine Linux, you will need to execute the following commands:

apk add git
apk add openssh
apk add maven
apk add openjdk11

After that we will need to clone the GitHub project as follows:

cd /tmp
git clone https://github.com/lensesio/lenses-udf-example
cd lenses-udf-example/

The files of the GitHub project can be viewed with the help of the tree(1) command:

$ tree
.
├── README.md
├── pom.xml
└── src
    └── main
        └── java
            └── io
                └── lenses
                    └── sql
                        └── udf
                            ├── GeoHash.java
                            ├── celsius_to_fahrenheit.java
                            ├── count_double.java
                            ├── datetimefromepoch.java
                            ├── isoformatdate.java
                            ├── isoformatdatetime.java
                            └── isoformattime.java
   
7 directories, 9 files

As the project is using the Maven build system, we will need to compile it as follows:

    mvn clean package

The previous command will generate lots of output – the end of the output should be similar to the following:

[INFO] Building jar: /tmp/lenses-udf-example/target/lenses-udf-example-1.0.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  30.107 s
[INFO] Finished at: 2020-04-27T13:56:55Z
[INFO] ------------------------------------------------------------------------

After that we will install the generated jar file to Lenses as follows:

    cp ./target/lenses-udf-example-1.0.0.jar /plugins

To make sure that /plugins is the right place to install your jar file, we will need to execute the following command:

$ grep -n lenses.plugins.classpath.opts /run/lenses/lenses.conf
11:lenses.plugins.classpath.opts="/plugins"

No additional steps are required for installing the celsius_to_fahrenheit() function.

Using the Function

After copying the generated jar file into the /plugins directory, the celsius_to_fahrenheit() function is ready for use.

The query that will be executed on the temperatures Kafka topic is the following:

SELECT value, celsius_to_fahrenheit(value) AS fahrenheit
FROM temperatures

The following image shows a part of the results of the query.

SELECT value, celsius_to_fahrenheit(value) AS fahrenheit FROM temperatures

Conclusions

User Defined Functions allow you to extend Lenses and add the desired functionality when there is a need for doing so.

Start building SQL stream processing applications with the free Lenses Box.

Useful Links

Ready to get started with Lenses?

Download free version