In this article we are going to explore lateral joins.
"What is a lateral join?" you may ask. It's a new kind of join that allows to extract and work with the single elements found inside an array, as if the array was a normal table.
Lenses 4.1 comes with a lot of new features that make your life easier when working with arrays: we introduced 6 new functions to work with arrays, better support for array literals, and lateral joins.
All these features are available both for our Snapshot and Streaming Engine.
You can read more about this functionality in our docs, as well as trying it yourself following this self-contained tutorial.
What we would like to focus on in this post, however, is how this feature came to be; designing this functionality has been an exciting journey and we thought it would be useful to share the thought process and iterations behind the implementation.
We will initially explore the high-level requirements that motivated this feature;
Then we will explore what a possible solution could look like and the implications of each approach
Finally we will go through our design iterations, concluding with the final result and how it delivers all that we set out to achieve.
Let's get cracking.
Motivation
Assume that you have a topic called that collects readings from some kind of sensor meter.batched_readings
The upstream system stores "batches" of reading per meter. That means that each single record will contain multiple readings for its meter:
```
{ "meter_id": 1, "readings": [100, 101, 102] },
{ "meter_id": 2, "readings": [81, 82, 81] },
{ "meter_id": 1, "readings": [95, 94, 93, 96] },
{ "meter_id": 2, "readings": [80, 82] }
```Our goal is to extract the single readings for each record, and build a new topic where each record contains only one reading:
```
{ "meter_id": 1, "reading": 100 },
{ "meter_id": 1, "reading": 101 },
{ "meter_id": 1, "reading": 102 },
{ "meter_id": 2, "reading": 81 },
{ "meter_id": 2, "reading": 82 },
...
```How can we do that?
Exploring the solution space
Before starting to work on the feature, we did some research to see if and how this functionality was implemented in other SQL streaming systems or more traditional RDBMS systems.
We found two main different approaches to the problem:
The "Explode" approach
Some systems implement the functionality introducing special functions that can be used as a normal projections, right after the clause. The idea is that a special function will "explode" the array, making the select emit a record for each item in the array.SELECT
```
SELECT
meter_id,
EXPLODE(readings) AS reading
FROM
batched_reading
```
The syntax is quite straightforward and easy to use in this simple case, but we found it a bit more complex and less predictable in more complicated scenarios, like when multiple s are used or when multi-level arrays need to be worked with.EXPLODE
Another limitation of this approach is that it is hard to work with the exploded array elements outside the projection. For example, something like this is not possible in KSqlDB:
```
SELECT
meter_id,
EXPLODE(readings) AS reading
FROM
batched_reading
WHERE EXPLODE(readings) > 100
```
In general, we soon realized that treating as a normal function (albeit one that generates more than one output per each input) was not the right choice for us. We could do better.EXPLODE
The "Table Function" approach
Not satisfied with the solution above, we considered whether moving the to the source would solve the some of the highlighted problems.EXPLODE
After some research, we realized that this was an approach already embraced by other classical RDBMS systems like PostgreSQL, with its table function.unnest
This approach is based on two concepts:
Table functions: functions that transforms normal values to tables
Lateral joins: with lateral joins it is possible to express a relation between the value of a field on the left-hand side of a join, and its right-hand side. This contrasts with traditional joins, where you can only join tables that are completely independent of one another.
With these concepts to hand, our first design iteration defined a new table function and a new EXPLODE
keyword, to be able to join a source to the result of a table function:LATERAL
```
SELECT
meter_id,
reading
FROM
batched_reading
LATERAL EXPLODE(readings as reading)
```
The table function should be read as follows:EXPLODE(readings as reading)
Take the current value of the
array and transform it to a table. Such table will have only one column, calledreadings, and each element of the original array will be a row in that table.reading
is a table that depends on the value of the EXPLODE(readings as reading)
field for each single record. Every record will then produce a table.readings
Going back to our meter reading example, consider the first record, where is readings
. For that record, the expression [100, 101, 102]
will produce the following table:EXPLODE(readings as reading)
Since we gave the name | reading |
-----------
| 100 |
| 101 |
| 102 |
to the elements inside reading
, we can use it in the readings
as if it were a normal field.SELECT
But not only that, we can use it wherever we want, like in a or in a WHERE
!GROUP BY
The table expression can at this point be read as follows:batched_reading LATERAL EXPLODE(readings as reading)
For each record in
, compute the tablebatched_reading, and join that table with the original record.EXPLODE(readings as reading)
That means that if we take again the record as an example, the above table expression will result in the following table:{ "meter_id": 1, "readings": [100, 101, 102] }
| meter_id | readings | reading |
|----------|----------------|---------|
|1 |[100, 101, 102] |100 |
|1 |[100, 101, 102] |101 |
|1 |[100, 101, 102] |102 |
Our final approach
After exploring the table function approach, we realized that the only real concrete example of a table function would be ; we then understood that we could use directly the array expression as the argument of the EXPLODE
join, without the LATERAL
function.EXPLODE
With that simplification the query becomes:
```
SELECT
meter_id,
reading
FROM
batched_reading
LATERAL readings as reading
```
This brings a small but effective improvement to the syntax, as well as making an important aspect of our approach to lateral joins more prominent: we can treat any array as a valid right-hand side of a lateral join, including any expression returning an array.
Filtering the results of a lateral join
With the approach to lateral joins we just described, we can use the exploded item not only in the projections of a , but also in all the other places of the query where you can use fields coming from a source. This includes SELECT
and WHERE
.GROUP BY
Using the same example used above, this query allows you to keep only the readings greater than :90
```
SELECT
meter_id,
reading
FROM
batched_reading
LATERAL readings as reading
WHERE
reading > 90
```
Nested lateral joins
The result of a is a table expression, so it can be used inside the left-hand-side of a LATERAL
. This can be useful when you want to extract elements inside a nested array.LATERAL
Let's take a slight modification of the example:batched_readings
```
{ "meter_id": 1, "nested_readings": [[100, 101], [102]] },
{ "meter_id": 2, "nested_readings": [[81], [82, 81]] },
{ "meter_id": 1, "nested_readings": [[95, 94], [93, 96]] },
{ "meter_id": 2, "nested_readings": [[80, 82]] }
```
Here is an array of arrays. We can get the same results as before with the following query:nested_readings
```
SELECT STREAM
meter_id,
reading
FROM
batched_readings_nested
LATERAL nested_readings as readings
LATERAL readings as reading
```
Working with multiple arrays
The function approach allowed to specify multiple arrays to be exploded at the same time. When multiple EXPLODE
s are used, the arrays are traversed in parallel, and elements with the same index are returned together.EXPLODE
To achieve similar behavior with join it was enough to introduce a new LATERAL
array function: zip
takes two (or more) arrays, it traverses them in parallel, and it builds a single new array where the elements are objects containing values from the original arrays.zip
For example, the following expression:
```
zip([1, 2, 3], 'a', ['x', 'y', 'z'], 'b')
```
will be evaluated to the following array:
```
[{"a": 1, "b": "x"}, {"a": 2, "b": "y"}, {"a": 3, "b": "z"}]
```Let's go back to our original example, and assume now we have a new array where the time of the reading was reported (the time is here a simple integer just to keep the code concise):
```
{ "meter_id": 1, "readings": [100, 101, 102], "times": [1, 2, 3] },
{ "meter_id": 2, "readings": [81, 82, 81], "times": [1, 2] },
{ "meter_id": 1, "readings": [95, 94, 93, 96], "times": [4, 5, 6, 7] },
{ "meter_id": 2, "readings": [80, 82], "times": [4, 5] }
```
We can use to build an intermediate array that will then be passed to the zip
join:LATERAL
```
SELECT
meter_id,
reading.value,
reading.time
FROM
batched_readings
LATERAL zip(readings, 'value', times, 'time') as reading
```
Lateral joins for streaming data
Something that we strive to achieve with Lenses SQL is a consistent and seamless experience over all type of user data, be it static or streaming.
It will come as little surprise then that lateral joins are fully supported in our Streaming SQL as well as in our Snapshot one.
The syntax and concepts are the same the we have been explaining throughout this post, but now are used as part of an SQL Processor rather than an SQL Studio query.
To illustrate how this would work, let's go back to our sensor meter example, and let's assume that we have the same topic as before, which will receive events as below:batched_readings
```
{ "meter_id": 1, "readings": [100, 101, 102] },
{ "meter_id": 2, "readings": [81, 82, 81] },
{ "meter_id": 1, "readings": [95, 94, 93, 96] },
{ "meter_id": 2, "readings": [80, 82] },
....
....
....
```What if we wanted to split our data to ensure that readings that are inside a normal range are sent downstream to be processed, but readings outside such range are sent to a separate topic (which may generate an alert or be counted in a dashboard)?
We can easily use an SQL Processor for this, and we will be guaranteed that, for every new batched reading, we will react as expected.
Here is the processor code (assuming the normal range is between 95 and 100):
```
WITH singleReadings AS(
SELECT STREAM
reading
FROM
batched_readings LATERAL readings as reading
);
INSERT INTO normalReadings
SELECT STREAM *
FROM
singleReadings
WHERE
reading <= 100 AND
reading >= 95;
INSERT INTO outliers
SELECT STREAM *
FROM
singleReadings
WHERE
reading < 95 OR
reading > 100;
```Conclusion
And this concludes our voyage from requirement to delivery, through design and implementation.
In this post we explored the general idea behind lateral joins, why they are useful and how they are implemented in the industry; we then analyzed the advantages and disadvantages of each solution and we finally came to present what our final iteration of the feature looks like.
We concluded showing how the chosen approach delivers on the requirements, maintaining flexibility and expressivity.
Lateral joins, together with table functions, are powerful features already available in many RDBMS systems, but they are not very well known. With this post, we wanted to shed some light on them and to present them in the context of Lenses SQL.
Finally, we want to invite you to experiment with this new feature yourself. Try Lenses 4.1 out with our *battery-included*, free, Box and let us know what you think. We look forward to any feedback you all might have for us!







