Webinar: Turn Your Agents Into Kafka Experts with Skills Register here
  • Pricing
  • Install Now
installNow icon
installNow icon
Install Now
homeMobile icon
homeMobile icon
Home
picingMobile icon
picingMobile icon
Pricing
blogMobile icon
blogMobile icon
Blog
Banner Website Skills

Exploding arrays in Kafka with lateral joins

Nicolò Martini
By Nicolò MartiniDecember 14, 2020
Lateral-joins-arrays-for-kafka
In this article:
  • 01.Motivation
  • 02.Exploring the solution space
  • 03.Filtering the results of a lateral join
  • 04.Nested lateral joins
  • 05.Working with multiple arrays
  • 06.Lateral joins for streaming data
  • 07.Conclusion

In this article we are going to explore lateral joins.

"What is a lateral join?" you may ask. It's a new kind of join that allows to extract and work with the single elements found inside an array, as if the array was a normal table.

Lenses 4.1 comes with a lot of new features that make your life easier when working with arrays: we introduced 6 new functions to work with arrays, better support for array literals, and lateral joins.

All these features are available both for our Snapshot and Streaming Engine.

You can read more about this functionality in our docs, as well as trying it yourself following this self-contained tutorial.

What we would like to focus on in this post, however, is how this feature came to be; designing this functionality has been an exciting journey and we thought it would be useful to share the thought process and iterations behind the implementation.

  • We will initially explore the high-level requirements that motivated this feature;
  • Then we will explore what a possible solution could look like and the implications of each approach
  • Finally we will go through our design iterations, concluding with the final result and how it delivers all that we set out to achieve.

Let's get cracking.

Motivation

Assume that you have a topic called

batched_readings
that collects readings from some kind of sensor meter.

The upstream system stores "batches" of reading per meter. That means that each single record will contain multiple readings for its meter:

```
{ "meter_id": 1, "readings": [100, 101, 102] },
{ "meter_id": 2, "readings": [81, 82, 81] },
{ "meter_id": 1, "readings": [95, 94, 93, 96] },
{ "meter_id": 2, "readings": [80, 82] }
```



Our goal is to extract the single readings for each record, and build a new topic where each record contains only one reading:

```
{ "meter_id": 1, "reading": 100 },
{ "meter_id": 1, "reading": 101 },
{ "meter_id": 1, "reading": 102 },
{ "meter_id": 2, "reading": 81 },
{ "meter_id": 2, "reading": 82 },
...
```



How can we do that?

Exploring the solution space

Before starting to work on the feature, we did some research to see if and how this functionality was implemented in other SQL streaming systems or more traditional RDBMS systems.

We found two main different approaches to the problem:

The "Explode" approach


Some systems implement the functionality introducing special functions that can be used as a normal projections, right after the

SELECT 
clause. The idea is that a special function will "explode" the array, making the select emit a record for each item in the array.

```
SELECT
  meter_id,
  EXPLODE(readings) AS reading
FROM 
  batched_reading
```


The syntax is quite straightforward and easy to use in this simple case, but we found it a bit more complex and less predictable in more complicated scenarios, like when multiple

EXPLODE
s are used or when multi-level arrays need to be worked with.

Another limitation of this approach is that it is hard to work with the exploded array elements outside the projection. For example, something like this is not possible in KSqlDB:

```
SELECT
  meter_id,
  EXPLODE(readings) AS reading
FROM 
  batched_reading
WHERE EXPLODE(readings) > 100
```


In general, we soon realized that treating

EXPLODE
as a normal function (albeit one that generates more than one output per each input) was not the right choice for us. We could do better.

The "Table Function" approach

Not satisfied with the solution above, we considered whether moving the

EXPLODE
to the source would solve the some of the highlighted problems.

After some research, we realized that this was an approach already embraced by other classical RDBMS systems like PostgreSQL, with its

unnest
table function.

This approach is based on two concepts:

  1. Table functions: functions that transforms normal values to tables
  2. Lateral joins: with lateral joins it is possible to express a relation between the value of a field on the left-hand side of a join, and its right-hand side. This contrasts with traditional joins, where you can only join tables that are completely independent of one another.

With these concepts to hand, our first design iteration defined a new

EXPLODE
table function and a new
LATERAL
keyword, to be able to join a source to the result of a table function:

```
SELECT
  meter_id,
  reading
FROM
  batched_reading 
  LATERAL EXPLODE(readings as reading)
```


The table function

EXPLODE(readings as reading)
should be read as follows:

Take the current value of the

readings
array and transform it to a table. Such table will have only one column, called
reading
, and each element of the original array will be a row in that table.

EXPLODE(readings as reading)
is a table that depends on the value of the
readings
field for each single record. Every record will then produce a table.

Going back to our meter reading example, consider the first record, where

readings
is
[100, 101, 102]
. For that record, the expression
EXPLODE(readings as reading) 
will produce the following table:

| reading |
-----------
| 100     |
| 101     |
| 102     |

Since we gave the name
reading
to the elements inside
readings
, we can use it in the
SELECT
as if it were a normal field.

But not only that, we can use it wherever we want, like in a

WHERE
or in a
GROUP BY
!

The table expression

batched_reading LATERAL EXPLODE(readings as reading)
can at this point be read as follows:

For each record in

batched_reading
, compute the table
EXPLODE(readings as reading)
, and join that table with the original record.

That means that if we take again the record

{ "meter_id": 1, "readings": [100, 101, 102] }
as an example, the above table expression will result in the following table:

| meter_id | readings       | reading |
|----------|----------------|---------|
|1         |[100, 101, 102] |100      |
|1         |[100, 101, 102] |101      |
|1         |[100, 101, 102] |102      |

Our final approach

After exploring the table function approach, we realized that the only real concrete example of a table function would be

EXPLODE
; we then understood that we could use directly the array expression as the argument of the
LATERAL
join, without the
EXPLODE
function.

With that simplification the query becomes:

```
SELECT
  meter_id,
  reading
FROM
  batched_reading 
  LATERAL readings as reading
```


This brings a small but effective improvement to the syntax, as well as making an important aspect of our approach to lateral joins more prominent: we can treat any array as a valid right-hand side of a lateral join, including any expression returning an array.

Filtering the results of a lateral join

With the approach to lateral joins we just described, we can use the exploded item not only in the projections of a

SELECT
, but also in all the other places of the query where you can use fields coming from a source. This includes
WHERE
and
GROUP BY
.

Using the same example used above, this query allows you to keep only the readings greater than

90
:

```
SELECT
  meter_id,
  reading
FROM
  batched_reading 
  LATERAL readings as reading
WHERE
  reading > 90
```


Nested lateral joins

The result of a

LATERAL
is a table expression, so it can be used inside the left-hand-side of a
LATERAL
. This can be useful when you want to extract elements inside a nested array.

Let's take a slight modification of the

batched_readings
example:

```
{ "meter_id": 1, "nested_readings": [[100, 101], [102]] },
{ "meter_id": 2, "nested_readings": [[81], [82, 81]] },
{ "meter_id": 1, "nested_readings": [[95, 94], [93, 96]] },
{ "meter_id": 2, "nested_readings": [[80, 82]] }
```



Here

nested_readings
is an array of arrays. We can get the same results as before with the following query:

```
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings_nested
  LATERAL nested_readings as readings
  LATERAL readings as reading
```


Working with multiple arrays

The

EXPLODE
function approach allowed to specify multiple arrays to be exploded at the same time. When multiple
EXPLODE
s are used, the arrays are traversed in parallel, and elements with the same index are returned together.

To achieve similar behavior with

LATERAL
join it was enough to introduce a new
zip
array function:
zip
takes two (or more) arrays, it traverses them in parallel, and it builds a single new array where the elements are objects containing values from the original arrays.

For example, the following expression:

```
zip([1, 2, 3], 'a', ['x', 'y', 'z'], 'b')
```


will be evaluated to the following array:

```
[{"a": 1, "b": "x"}, {"a": 2, "b": "y"}, {"a": 3, "b": "z"}]
```



Let's go back to our original example, and assume now we have a new array where the time of the reading was reported (the time is here a simple integer just to keep the code concise):

```
{ "meter_id": 1, "readings": [100, 101, 102], "times": [1, 2, 3] },
{ "meter_id": 2, "readings": [81, 82, 81], "times": [1, 2] },
{ "meter_id": 1, "readings": [95, 94, 93, 96], "times": [4, 5, 6, 7] },
{ "meter_id": 2, "readings": [80, 82], "times": [4, 5] }
```


We can use

zip
to build an intermediate array that will then be passed to the
LATERAL
join:

```
SELECT
  meter_id,
  reading.value,
  reading.time
FROM
  batched_readings
  LATERAL zip(readings, 'value', times, 'time') as reading
```


Lateral joins for streaming data

Something that we strive to achieve with Lenses SQL is a consistent and seamless experience over all type of user data, be it static or streaming.

It will come as little surprise then that lateral joins are fully supported in our Streaming SQL as well as in our Snapshot one.

The syntax and concepts are the same the we have been explaining throughout this post, but now are used as part of an SQL Processor rather than an SQL Studio query.

To illustrate how this would work, let's go back to our sensor meter example, and let's assume that we have the same

batched_readings
topic as before, which will receive events as below:

```
{ "meter_id": 1, "readings": [100, 101, 102] },
{ "meter_id": 2, "readings": [81, 82, 81] },
{ "meter_id": 1, "readings": [95, 94, 93, 96] },
{ "meter_id": 2, "readings": [80, 82] },
....
....
....
```


What if we wanted to split our data to ensure that readings that are inside a normal range are sent downstream to be processed, but readings outside such range are sent to a separate topic (which may generate an alert or be counted in a dashboard)?

We can easily use an SQL Processor for this, and we will be guaranteed that, for every new batched reading, we will react as expected.

Here is the processor code (assuming the normal range is between 95 and 100):

```
WITH singleReadings AS(
  SELECT STREAM
    reading
  FROM 
    batched_readings LATERAL readings as reading
);

INSERT INTO normalReadings
SELECT STREAM * 
FROM
  singleReadings
WHERE 
  reading <= 100 AND 
  reading >= 95;

INSERT INTO outliers
SELECT STREAM * 
FROM
  singleReadings
WHERE 
  reading < 95 OR 
  reading > 100;
```


Conclusion

And this concludes our voyage from requirement to delivery, through design and implementation.

In this post we explored the general idea behind lateral joins, why they are useful and how they are implemented in the industry; we then analyzed the advantages and disadvantages of each solution and we finally came to present what our final iteration of the feature looks like.

We concluded showing how the chosen approach delivers on the requirements, maintaining flexibility and expressivity.

Lateral joins, together with table functions, are powerful features already available in many RDBMS systems, but they are not very well known. With this post, we wanted to shed some light on them and to present them in the context of Lenses SQL.

Finally, we want to invite you to experiment with this new feature yourself. Try Lenses 4.1 out with our *battery-included*, free, Box and let us know what you think.

We look forward to any feedback you all might have for us!

Back to all blogs

Related Blogs

Lenses MCP Server with OAuth 2.1
Lenses MCP Server with OAuth 2.1
Blog

Lenses MCP Server with OAuth 2.1

Jeremy Frenay Picture
Jeremy Frenay Picture
By
Jeremy Frenay
Kafka Skills for AI
Kafka Skills for AI
Blog

Introducing Kafka Skills for AI Engineering Agents

Jonas Best Profile Picture
Jonas Best Profile Picture
By
Jonas Best
Lenses 6.2 Oauth
Lenses 6.2 Oauth
Blog

Lenses 6.2 - Trusting Agents to build & operate event-driven applications

andrew
andrew
By
Andrew Stevenson

Lenses, autonomy in data streaming

Install now
Products
Developer Experience
Kafka replicator
Kafka AI
Kafka Connectors
Pricing
Company
About
Careers
Contact
Solutions by industry
Financial services
For engineers
Docs
Ask Marios Discourse
Github
Slack
For executives
Case studies
Resources
Blog
Press room
Events
LinkedIn
Youtube
Legal
Terms
Privacy
Cookies
SLAs
EULA
© 2026Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation