Lenses is developer experience for building and operating streaming applications on open-source technologies. Deployed as a container or as a JVM application, Lenses.io works with any Kafka & Kubernetes environment including managed and cloud services.
Streaming SQL allows anyone to build stream processing applications with SQL. Queries react to data as soon as it is available in Apache Kafka. They can reshape your data, aggregate it based on any field or time window, or enrich it with other streaming data. The results are pushed back to Kafka topics so that downstream applications and processes can consume them. Pipelines are deployed and scaled on your own Kubernetes environment or managed cloud solutions.
Streams (Stateless) and tables (Stateful) have different semantics and use-cases, but are strongly related nonetheless.
This relationship is known as stream-table duality. Every stream can be interpreted as a table, and similarly a table can be interpreted as a stream.
Stream (Stateless) | Represents an unbounded sequence of independent events over a continuously changing dataset. The dataset is the totality of all data described by every event received so far |
|
Table (Stateful) | For each key, a table holds the latest version (state) received of its value. Upon receiving events for keys that already have an associated value, such values will be overridden and state changed |
|
Use case | Description |
Filtering | Read payment events in topic cc_payments and filter those with value greater than 5000$ into large_cc_payments topic for possible fraud. |
Aggregating | In a topic containing information about network traffic, average the bandwidth per network device in a rolling 5 minute window. |
Enriching | For a topic containing customers orders, join the events with customer information in a customer's topic to a new topic. |
Reshaping | Unwrap certain field values for a topic containing energy usage information into another topic so that it can be sent to a time-series database. |
Re-key | Change the event key to suit a downstream consumer or align the topic for Kafka Streams joins/aggregation |
Reformatting | Translate incoming JSON data to AVRO for better control over schema evolution, rogue messages and so on. |
Lenses.io supports reading and writing data from/to Kafka topics in different serialisation formats.
Format | Read | Write |
Int | yes | yes |
Bytes | yes | yes |
Long | yes | yes |
String | yes | yes |
Json | yes | yes |
Avro | Via schema registry | Via schema registry |
XML | yes | yes |
Csv | yes | not yet |
Google Protobuf | Via custom configuration | not yet |
Custom | Via custom configuration | no |
SET defaults.topic.autocreate=true;
INSERT INTO speeding_cars
SELECT STREAM
car_speed as speed
, car_name
WHERE car_speed > 100
FROM car_data;
The above example will auto-create any necessary topics (in this case, “speeding_cars” topic) and populate it with the value of car_speed renamed as speed and car_name from the car_data topic only for events where the car_speed is greater than 100.
Facet | Lenses.io reference | Example |
Key | _key OR _key.<fieldname> |
|
Value | _value OR _value.<fieldname> OR <fieldname> OR * |
|
A projection represents the ability to project a given input value onto a target location (key or value) in an output record. Projections are the main building block of SELECT
statements.
INSERT INTO target-topic
SELECT STREAM
CONCAT('a', 'b') AS result1
, (1 + field1) AS _key.a
, _key.field2 AS result3
, CASE
WHEN field3 = 'Robert' THEN 'It`s bobby'
WHEN field3 ='William' THEN 'It`s willy'
ELSE 'Unknown'
END AS who_is_it
FROM input-topic;
In the above example, result1, _key.a, result3
and who_is_it
are all fields outputted into target-topic.
Type | Supported Operators | Example |
Logical | AND, OR |
|
Arithmetic | +,-,*,/,%(mod) |
|
Ordering | >,>=,<,<= |
|
Equality | =, != |
|
String | LIKE, NOT LIKE |
|
CASE | IN |
|
Select move_number
and date_released
from transformer_movies_xml
topic and put in as AVRO format in transformer_movies_avro
topic. Make movie_number
as the _key and only select movies where movie_number
is less than 300.
INSERT INTO transformer_movies_avro
STORE KEY AS AVRO
VALUE AS AVRO
SELECT STREAM movie_number AS _key
, date_released
FROM transformer_movies_xml
WHERE movie_number > 0 AND
movie_number < 300
Calculate the rolling average KW for each customer_id
for every 10 minute period every 5 minutes. Populate intoelectricity_event_avg
stream
INSERT INTO electricity_events_avg
SELECT STREAM customer_id
, AVG (KW) AS KW
FROM electricity_events
WINDOW BY HOP 10m, 5m
GROUP BY customer_id
Populate the ship_speeds_with_names
topic with the speed field from fast_vessel_processor
topic joined with the ship_names
topic using the MMSI value. The ship_names
MMSI value is stored as a STRING whereas the fast_vessel_processor
is stored as a LONG. The ship_names
also doesn't have a key so needs to be rekeyed (ship_names_rekeyed
) in order to build a state table (ship_names_state
) since states cannot have NULL keys.
WITH ship_names_rekeyed AS
(SELECT STREAM ship_names.mmsi AS _key
FROM ship_names);
WITH ship_names_state AS
(SELECT TABLE *
FROM ship_names_rekeyed);
INSERT INTO ship_speeds_with_names
SELECT STREAM fast_vessel_processor._key AS mmsikey
, fast_vessel_processor.Speed
, ship_names.owner
, ship_names.name
FROM fast_vessel_processor
INNER JOIN ship_names_state ON fast_vessel_processor._key.MMSI
= CAST(ship_names_state._key AS LONG);
Reshape the car_speed_events
stream by setting sensor_id
andevent_time
as keys and nesting mph and a calculated kmph values for speed object in the value field.
SELECT STREAM sensor.id AS _key.sensor_id
, event_time AS _key.event_time
, car_id
, speedMph AS speed.mph
, speedMph * 1.60934 AS speed.kmph
FROM car_speed_events;
At times, it is useful to control the resulting Key and/or Value storage of the output topic. If the input is Json, the output for the streaming computation can be set to Avro.
The syntax is the following:
INSERT INTO <target topic>
STORE
KEY AS <format>
VALUE AS <format>
...
From \ To | INT | LONG | STRING | JSON | AVRO |
INT | = | yes | yes | no | yes |
LONG | no | = | yes | no | yes |
String | no | no | = | no | yes |
Json | If the Json storage contains integer only | If the Json storage contains integer or long only | yes | = | yes |
Avro | If Avro storage contains integer only | If the Avro storage contains integer or long only | yes | yes | = |
Join Type | Description | Lexicon | Example |
Inner | This join type will only emit records where a match has occurred. | JOIN |
|
Left | Selects all the records from the left side of the join regardless of a match. | LEFT JOIN |
|
Right | A mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match. | RIGHT JOIN |
|
Outer | Union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening. | OUTER |
|
if no ON expression is provided, the join will be evaluated based on the equality of the _key facet
Equality | customers.id = order.user_id customers.id - 1 = order.user_id - 1 substr(customers.name, 5) = order.item |
|
Boolean | len(customers.name) > 10 substr(customer.name,1) = “J” len(customer.name) > 10 OR customer_key > 1 |
|
Logical | customers._key = order.user_id AND len(customers.name) > 10 len(customers.name) > 10 AND substr(customer.name,1) = “J” substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1 |
|
Left | Right | Allowed types | Window | Result |
Stream | Stream | All | Required | Stream |
Table | Table | All | No | Table |
Table | Stream | RIGHT JOIN | No | Stream |
Stream | Table | INNER, LEFT JOIN | No | Stream |
When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.
SELECT STREAM
customers.name
, orders.item
FROM
customers LEFT JOIN orders WITHIN 5s
ON customers.id = orders.customer_id
WITHIN 5s;
The above example means that an event with an orders.item
may be generated with a null value for customers.name
if a matching customers.id
event has not been generated within a 5 second +/- window.
Duration | Description | Example |
ms | time in milliseconds | 100ms |
s | time in seconds | 10s |
m | time in minutes | 10m |
h | time in hours | 10h |
Hopping Window | WINDOW BY HOP <duration_time>,<hop_i nterval> Fixed size and overlapping windows. The same event can overlap into multiple windows. | |
Tumbling window | WINDOW BY TUMBLE <duration_time> Duration and hop interval are equal. An event can only appear in one window. | |
Session window | WINDOW BY SESSION <inactivity_interval> Defined by a period of activity separated by a specified gap of inactivity at which point current session closes. | |
Grace period | WINDOW BY <windowing type> GRACE BY <grace time> For a window to accept late-arriving records a grace period can be defined. If a record falls within a window and it arrived before the end of the grace time then the record will be processed and the aggregations or joins will update. If not, the record is discarded. |
Use `WITH` to break the code in reusable blocks:
WITH countriesStream AS (
SELECT STREAM *
FROM countries
);
WITH merchantsStream AS (
SELECT STREAM *
FROM merchants
);
WITH merchantsWithCountryInfoStream AS (
SELECT STREAM
m._key AS l_key
, CONCAT(surname, ', ', name) AS fullname
, address.country
, language
, platform
FROM merchantsStream AS m JOIN countriesStream AS c
ON m.address.country = c._key
WITHIN 1h
);
WITH merchantsCorrectKey AS (
SELECT STREAM
l_key AS _key
, fullname
, country
, language
, platform
FROM merchantsWithCountryInfoStream
);
INSERT INTO currentMerchants
SELECT STREAM *
FROM merchantsCorrectKey;
INSERT INTO merchantsPerPlatform
SELECT TABLE
COUNT(*) AS merchants
FROM merchantsCorrectKey
GROUP BY platform;
Function | Description |
AS_NON_NULLABLE (expr) | Returns the provided value with its type changed from the original type to its non nullable version |
AS_NULLABLE (expr) | Returns the provided value with its type changed from the original type to its nullable version |
CAST (dt AS int) | Enables conversion of values from one data type to another |
COALESCE (value, prevValue) | Returns the first non-null expression in the expression list |
DUMP (expr) | Show the internal representation of a value |
EXISTS (field) | Returns true if the given field is present false otherwise |
ISNOTNULL (expr) | Returns true if the input is not null; false otherwise |
ISNULL (expr) | Returns true if the input is null; false otherwise |
SIZEOF (expr) | Returns the number of elements in an array |
TYPEOF () | Returns the object type of a complex expression. This can only be used when the format on the wire includes the details of the objects full name |
Function | Description |
ABBREVIATE (expr, lengthExpr) | Abbreviates the expression to the given length and appends ellipses |
BASE64 (expr) | Returns the input string using base64 algorithm |
CAPITALIZE (expr) | Capitalizes the first letter of the expression |
CENTER (target, size, padExpr) | Centers a String in a larger String of size N |
CHOP (expr) | Returns the last character from an expression of type string |
CONCATENATE (expr1, expr2, expr3) | Returns the string representation of concatenating each expression in the list. Null fields are left out |
CONCAT (expr1, expr2, expr3) | Alias for CONCATENATE |
CONTAINS (sourceExpr, targetExpr) | Returns true if an expression contains the given substring |
DECODE64 (expr) | Decodes a Base64 encrypted string |
DELETEWHITESPACE (expr) | Removes all whitespace from an expression of type string |
DIGITS (expr) | Retains only the digits from a string expression |
DROPLEFT (expr, lengthExpr) | Removes the left most ‘length’ characters from a string expression |
DROPRIGHT (expr, lengthExpr) | Removes the right most ‘length’ characters from a string expression |
ENDSWITH (sourceExpr, targetExpr) | Returns true if an expression ends with the given substring |
INDEXOF (expr, substringExpr) | Returns the index of a substring in an expression |
LENGTH (expr) | Returns the length of a string. Calculates length using characters as defined by UTF-16 |
LEN (expr) | Alias for LENGTH |
LOWERCASE (strExpr) | Returns the expression in lowercase |
LOWER (strExpr) | Alias for LOWERCASE |
LEFTPAD (strExpr, lengthExpr, padExpr) | Prepends the value of padExpr to the value of strExpr until the total length is lengthExpr |
LPAD (strExpr, lengthExpr, padExpr) | Alias for LEFTPAD |
REGEXP (strExpr, regexExpr) | Returns the matched groups otherwise null |
REGEX (strExpr, regexExpr) | Alias for REGEXP |
REPLACE (sourceExpr, targetExpr, replaceExpr) | Returns a new string in which all occurrences of a specified String in the current string are replaced with another specified String |
REVERSE (expr) | Reverses the order of the elements in the input |
RIGHTPAD (strExpr, lengthExpr, padExpr) | Appends the value of padExpr to the value of strExpr until the total length is lengthExpr |
RPAD (strExpr, lengthExpr, padExpr) | Alias for RIGHTPAD |
STARTSWITH (exprSource, exprTarget) | Returns true if an expression starts with the given substring |
STRIPACCENTS (expr) | Removes diacritics (approximately the same as accents) from an expression. The case will not be altered |
SUBSTRING (expr, startIndexExpr, endIndexExpr) | Returns a new string that is a substring of this string |
SUBSTR (expr, startIndexExpr) | Alias for SUBSTRING |
SWAPCASE (expr) | Swaps the case of a string expression |
TAKELEFT (expr, lengthExpr) | Returns the left most ‘length’ characters from a string expression |
TAKERIGHT (expr, lengthExpr) | Returns the right most ‘length’ characters from a string expression |
TRIM (expr) | Removes leading and trailing spaces from the input expression |
TRUNCATE (strExpr, nExpr) | Truncates a string so that it has at most N characters |
UNCAPITALIZE (expr) | Changes the first letter of each word in the expression to lowercase |
UPPER (strExpr) UPPERCASE (strExpr) | Returns the expression in uppercase |
UUID () | Returns an universally unique identifier |
Name | Description |
ANONYMIZE (strExpr) | Obfuscates the entire string input |
MASK (strExpr) | Alias for ANONYMIZE |
EMAIL (emailExpr) | Anonymize the value and obfuscates an email address |
FIRST1 (strExpr) | Anonymize the value and only keeps the first character |
FIRST2 (strExpr) | Anonymize the value and only keeps the first two characters |
FIRST3 (strExpr) | Anonymize the value and only keeps the first three characters |
FIRST4 (strExpr) | Anonymize the value and only keeps the first four characters |
LAST1 (strExpr) | Anonymize the value and only keeps the last character |
LAST2 (strExpr) | Anonymize the value and only keeps the last two characters |
LAST3 (strExpr) | Anonymize the value and only keeps the last three characters |
LAST4 (strExpr) | Anonymize the value and only keeps the last four characters |
INITIALS (strExpr) | Anonymize the value and only keeps the initials of all the words in the input |
Name | Description |
% | The remainder operator (%) computes the remainder after dividing its first operand by its second i.e. numExpr % numExpr |
/ | Divides one number by another (an arithmetic operator) i.e. numExpr / numExpr |
- | Subtracts one number from another (an arithmetic operator) i.e. numExpr - numExpr |
* | Multiplies one number with another (an arithmetic operator) i.e. numExpr * numExpr |
+ | Adds one number to another (an arithmetic operator) i.e. numExpr + numExpr |
- (negative) | Returns the negative of the value of a numeric expression (a unary operator) i.e. -numExpr |
ABS (numExpr) | Returns the absolute value of an expression that evaluates to a number type |
ACOS (numExpr) | Returns the trigonometric arc cosine of an expression |
ASIN (numExpr) | Returns the trigonometric arc sine of an expression |
ATAN (numExpr) | Returns the trigonometric arc tangent of an expression |
CBRT (numExpr) | Returns the cube root of numExpr |
CEIL (numExpr) | Returns the absolute value of an expression |
COSH (numExpr) | Returns the hyperbolic cosine of an expression |
COS (numExpr) | Returns the trigonometric cosine of an expression |
DEGREES (numExpr) | Converts the input expression to degrees |
DISTANCE (x1, y1, x2, y2) | Calculates the distance between two points using the haversine method |
FLOOR (numExpr) | Returns the largest value not greater than the argument |
MAX (numExpr1, numExpr2, numExpr3) | Returns the maximum element from an arbitrary number of given elements |
MIN (numExpr1, numExpr2, numExpr3) | Returns the minimum element from an arbitrary number of given elements |
MOD (numExpr, numExpr) | Alias for % |
NEG (numExpr) | Returns the negative value of an expression it has to evaluate to a number type |
POW (numExpr1, numExpr2) | Returns numExp1 raised to the numExp2 power |
RADIANS (numExpr) | Converts the input expression to radians |
RANDINT () | Returns a random integer |
ROUND (numExpr) | Returns the closest integer of an expression, with ties rounding towards positive infinity |
SIGN (numExpr) | Returns +1 if a value is positive or -1 if a value is negative |
SINH (numExpr) | Returns the hyperbolic sine of an expression |
SIN (numExpr) | Returns the trigonometric sine of an expression |
SQRT (numExpr) | Returns the square root of numExpr |
TANH (numExpr) | Returns the hyperbolic tangent of an expression |
TAN (numExpr) | Returns the trigonometric tangent of an expression |
Name | Description |
DATE () | Provides the current ISO date value |
CONVERT_DATETIME (strExpr, fromPattern, toPattern) | Converts the string format of a date [and time] to another using the pattern provided |
CONVERTDATETIME (strExpr, fromPattern, toPattern) | Alias for CONVERT_DATETIME |
DATETIME () | Provides the current ISO date and time |
DATE_TO_STR (strExpr, pattern) | Converts a date time value to a string using the pattern provided |
DAY (expr) | Extracts the day component of an expression that is of type timestamp |
HOUR <small)(expr) | Extracts the hour component of an expression that is of type timestamp |
MINUTE <small)(dataExpr) | Extracts the minute component of an expression that is of type timestamp |
MONTH_TEXT <small)(dataExpr) | Returns the month name |
MONTH<small)(dataExpr) | Extracts the month component of an expression that is of type timestamp |
SECOND<small)(dataExpr) | Extracts the second component of an expression that is of type timestamp |
TOMORROW () | Returns the current date time plus 1 day |
TO_DATETIME (strExpr, pattern) | Converts a string representation of a datetime into epoch value using the pattern provided |
TO_DATE (strExpr, pattern) | Converts a string representation of a date into epoch value using the pattern provided |
TO_TIMESTAMP (longExpr) | Converts a long (epoch) to a date and time type |
TO_TIMESTAMP (strExpr, pattern) | Converts a string using a pattern to a date and time type |
YEAR (expr) | Extracts the year component of an expression that is of type timestamp |
YESTERDAY () | Returns the current date time minus 1 day |
Name | Description | Streaming (stateless) | Streaming (stateful) |
AVG (numExpr) | Returns the average of the values in a group. It ignores null value. It can be used with numeric input only | ✔ | ✔ |
COLLECT (expr, maxN) | Returns an array in which each value in the input set is assigned to an element of the array | ✔ | ✔ |
COLLECT_UNIQUE (expr, maxN) | Returns an array of unique values in which each value in the input set is assigned to an element of the array | ✔ | no |
COUNT (*) AS total | Returns the number of records returned by a query or the records in a group as a result of a GROUP BY statement | ✔ | ✔ |
MAXK (numExpr, N) | Returns the N largest values of an numExpr | ✔ | no |
MAXK_UNIQUE (numExpr, N) | Returns the N largest unique values of an numExpr | ✔ | no |
MINK (numExpr, N) | Returns the N smallest values of an numExpr | ✔ | no |
MINK_UNIQUE (numExpr, N) | Returns the N smallest unique values of an numExpr | ✔ | no |
SUM (numExpr) | Returns the sum of all the values, in the expression. It can be used with numeric input only. Null values are ignored | ✔ | ✔ |
Allows to customize the behaviour of the application (including error handling). Must be declared at the beginning of the SQL statement.
Key | Type | Description |
defaults.topic.autocreate | BOOLEAN | Creates the target topics if they do not exist |
defaults.topic.partition | INT | Controls the target topics partitions count. Ignored if the topic exists |
defaults.topic.replication | INT | Controls the target topic replication factor. Ignored if the topics are present already. |
defaults.topic.key.avro.record | STRING | Controls the output record Key schema name. |
defaults.topic.key.avro.namespace | STRING | Controls the output record Key schema namespace. |
defaults.topic.value.avro.record | STRING | Controls the output record Key schema name. |
defaults.topic.value.avro.namespace | STRING | Controls the output record Key schema namespace. |
topic.`market.risk`.<topic_con figuration> | STRING | Applies the given Kafka topic setting for `market.risk`. SET topic.`market.risk`.cleanup.policy =compact,delete’’; This is applied only if the topic does not exist a priori and it needs to be created |
error.policy | STRING | Controls the behaviour of handling corrupted messages. Can be any of the following:
|
dead.letter.queue | STRING | The topic to send those messages which cannot be processed. Requires `error.policy` to be set to `dlq` |
processing.guarantee | STRING | The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default, which is the recommended setting for production. |
commit.interval.ms | LONG | The frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behaviour of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms, so every intermediate event that is received by the table will not be visible downstream directly. |
poll.ms | LONG | The amount of time in milliseconds to block waiting for input. |
cache.max.bytes.buffering | LONG | Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024. |
client.id | STRING | An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer |
num.standby.replicas | INT | The number of standby replicas for each task. Default value is 0. |
num.stream.threads | INT | The number of threads to execute stream processing. Default value is 1. |
max.task.idle.ms | LONG | Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
connections.max.idle.ms | LONG | Close idle connections after the number of milliseconds specified by this config. |
receive.buffer.bytes | LONG | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. |
reconnect.backoff.ms | LONG | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. |
reconnect.backoff.max.ms | LONG | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000. |
retries | INT | Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0 |
retry.backoff.ms | LONG | The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100. |
send.buffer.bytes | LONG | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024. |
state.cleanup.delay.ms | LONG | The amount of time in milliseconds to wait before deleting state when a partition has migrated. |
consumer.<config> | STRING | Sets the underlying Kafka consumer configuration |
producer.<config> | STRING | Sets the underlying Kafka producer configuration |
rocksdb.<config> | STRING | Sets the configuration for the RocksDB database when stateful processing is used |