Lenses.io has joined forces with Celonis | Learn more >

Exploding arrays in Kafka with lateral joins

Make your life with Kafka easier by working with lateral joins instead of window functions.

Nicolò Martini
Dec 14, 2020
Exploding arrays in Kafka with lateral joins

In this article we are going to explore lateral joins.

"What is a lateral join?" you may ask. It's a new kind of join that allows to extract and work with the single elements found inside an array, as if the array was a normal table.

Lenses 4.1 comes with a lot of new features that make your life easier when working with arrays: we introduced 6 new functions to work with arrays, better support for array literals, and lateral joins.

All these features are available both for our Snapshot and Streaming Engine.

You can read more about this functionality in our docs, as well as trying it yourself following this self-contained tutorial.

What we would like to focus on in this post, however, is how this feature came to be; designing this functionality has been an exciting journey and we thought it would be useful to share the thought process and iterations behind the implementation.

  • We will initially explore the high-level requirements that motivated this feature;

  • Then we will explore what a possible solution could look like and the implications of each approach

  • Finally we will go through our design iterations, concluding with the final result and how it delivers all that we set out to achieve.

Let's get cracking.

Motivation

Assume that you have a topic called batched_readings that collects readings from some kind of sensor meter.

The upstream system stores "batches" of reading per meter. That means that each single record will contain multiple readings for its meter:

Our goal is to extract the single readings for each record, and build a new topic where each record contains only one reading:

How can we do that?

Exploring the solution space

Before starting to work on the feature, we did some research to see if and how this functionality was implemented in other SQL streaming systems or more traditional RDBMS systems.

We found two main different approaches to the problem:

The "Explode" approach

Some systems implement the functionality introducing special functions that can be used as a normal projections, right after the SELECT clause. The idea is that a special function will "explode" the array, making the select emit a record for each item in the array.

The syntax is quite straightforward and easy to use in this simple case, but we found it a bit more complex and less predictable in more complicated scenarios, like when multiple EXPLODEs are used or when multi-level arrays need to be worked with.

Another limitation of this approach is that it is hard to work with the exploded array elements outside the projection. For example, something like this is not possible in KSqlDB:

In general, we soon realized that treating EXPLODE as a normal function (albeit one that generates more than one output per each input) was not the right choice for us. We could do better.

The "Table Function" approach

Not satisfied with the solution above, we considered whether moving the EXPLODE to the source would solve the some of the highlighted problems.

After some research, we realized that this was an approach already embraced by other classical RDBMS systems like PostgreSQL, with its unnest table function.

This approach is based on two concepts:

  1. Table functions: functions that transforms normal values to tables

  2. Lateral joins: with lateral joins it is possible to express a relation between the value of a field on the left-hand side of a join, and its right-hand side. This contrasts with traditional joins, where you can only join tables that are completely independent of one another.

With these concepts to hand, our first design iteration defined a new EXPLODE table function and a new LATERAL keyword, to be able to join a source to the result of a table function:

The table function EXPLODE(readings as reading) should be read as follows:

Take the current value of the readings array and transform it to a table. Such table will have only one column, called reading, and each element of the original array will be a row in that table.

EXPLODE(readings as reading) is a table that depends on the value of the readings field for each single record. Every record will then produce a table.

Going back to our meter reading example, consider the first record, where readings is [100, 101, 102]. For that record, the expression EXPLODE(readings as reading) will produce the following table:

| reading |
-----------
| 100     |
| 101     |
| 102     |

Since we gave the name reading to the elements inside readings, we can use it in the SELECT as if it were a normal field.

But not only that, we can use it wherever we want, like in a WHERE or in a GROUP BY !

The table expression batched_reading LATERAL EXPLODE(readings as reading) can at this point be read as follows:

For each record in batched_reading, compute the table EXPLODE(readings as reading), and join that table with the original record.

That means that if we take again the record { "meter_id": 1, "readings": [100, 101, 102] } as an example, the above table expression will result in the following table:

| meter_id | readings       | reading |
|----------|----------------|---------|
|1         |[100, 101, 102] |100      |
|1         |[100, 101, 102] |101      |
|1         |[100, 101, 102] |102      |

Our final approach

After exploring the table function approach, we realized that the only real concrete example of a table function would be EXPLODE; we then understood that we could use directly the array expression as the argument of the LATERAL join, without the EXPLODE function.

With that simplification the query becomes:

This brings a small but effective improvement to the syntax, as well as making an important aspect of our approach to lateral joins more prominent: we can treat any array as a valid right-hand side of a lateral join, including any expression returning an array.

Filtering the results of a lateral join

With the approach to lateral joins we just described, we can use the exploded item not only in the projections of a SELECT, but also in all the other places of the query where you can use fields coming from a source. This includes WHERE and GROUP BY.

Using the same example used above, this query allows you to keep only the readings greater than 90:

Nested lateral joins

The result of a LATERAL is a table expression, so it can be used inside the left-hand-side of a LATERAL. This can be useful when you want to extract elements inside a nested array.

Let's take a slight modification of the batched_readings example:

Here nested_readings is an array of arrays. We can get the same results as before with the following query:

Working with multiple arrays

The EXPLODE function approach allowed to specify multiple arrays to be exploded at the same time. When multiple EXPLODEs are used, the arrays are traversed in parallel, and elements with the same index are returned together.

To achieve similar behavior with LATERAL join it was enough to introduce a new zip array function: zip takes two (or more) arrays, it traverses them in parallel, and it builds a single new array where the elements are objects containing values from the original arrays.

For example, the following expression:

will be evaluated to the following array:

Let's go back to our original example, and assume now we have a new array where the time of the reading was reported (the time is here a simple integer just to keep the code concise):

We can use zip to build an intermediate array that will then be passed to the LATERAL join:

Lateral joins for streaming data

Something that we strive to achieve with Lenses SQL is a consistent and seamless experience over all type of user data, be it static or streaming.

It will come as little surprise then that lateral joins are fully supported in our Streaming SQL as well as in our Snapshot one.

The syntax and concepts are the same the we have been explaining throughout this post, but now are used as part of an SQL Processor rather than an SQL Studio query.

To illustrate how this would work, let's go back to our sensor meter example, and let's assume that we have the same batched_readings topic as before, which will receive events as below:

What if we wanted to split our data to ensure that readings that are inside a normal range are sent downstream to be processed, but readings outside such range are sent to a separate topic (which may generate an alert or be counted in a dashboard)?

We can easily use an SQL Processor for this, and we will be guaranteed that, for every new batched reading, we will react as expected.

Here is the processor code (assuming the normal range is between 95 and 100):

Conclusion

And this concludes our voyage from requirement to delivery, through design and implementation.

In this post we explored the general idea behind lateral joins, why they are useful and how they are implemented in the industry; we then analyzed the advantages and disadvantages of each solution and we finally came to present what our final iteration of the feature looks like.

We concluded showing how the chosen approach delivers on the requirements, maintaining flexibility and expressivity.

Lateral joins, together with table functions, are powerful features already available in many RDBMS systems, but they are not very well known. With this post, we wanted to shed some light on them and to present them in the context of Lenses SQL.

Finally, we want to invite you to experiment with this new feature yourself. Try Lenses 4.1 out with our *battery-included*, free, Box and let us know what you think. We look forward to any feedback you all might have for us!

Ready to get started with Lenses?

Try now for free