New Lenses Multi-Kafka Developer Experience

Download

Streaming SQL Cheat Sheet for Apache Kafka

A low code way for engineers to learn stream processing.

SQL Cheatsheet

What is Lenses?

Lenses is developer experience for building and operating streaming applications on open-source technologies. Deployed as a container or as a JVM application, Lenses.io works with any Kafka & Kubernetes environment including managed and cloud services.

Lenses Streaming SQL

Streaming SQL allows anyone to build stream processing applications with SQL. Queries react to data as soon as it is available in Apache Kafka. They can reshape your data, aggregate it based on any field or time window, or enrich it with other streaming data. The results are pushed back to Kafka topics so that downstream applications and processes can consume them. Pipelines are deployed and scaled on your own Kubernetes environment or managed cloud solutions.

Try out Streaming SQL

Lenses Streaming SQL

Stateful and Stateless processing

Streams (Stateless) and tables (Stateful) have different semantics and use-cases, but are strongly related nonetheless.

This relationship is known as stream-table duality. Every stream can be interpreted as a table, and similarly a table can be interpreted as a stream.

Stream (Stateless)Represents an unbounded sequence of independent events over a continuously changing dataset. The dataset is the totality of all data described by every event received so far
SELECT STREAM *
FROM website_traffic
Table (Stateful)For each key, a table holds the latest version (state) received of its value. Upon receiving events for keys that already have an associated value, such values will be overridden and state changed
SELECT TABLE *
FROM customers

Streaming SQL example use cases

Use caseDescription
FilteringRead payment events in topic cc_payments and filter those with value greater than 5000$ into large_cc_payments topic for possible fraud.
AggregatingIn a topic containing information about network traffic, average the bandwidth per network device in a rolling 5 minute window.
EnrichingFor a topic containing customers orders, join the events with customer information in a customer's topic to a new topic.
ReshapingUnwrap certain field values for a topic containing energy usage information into another topic so that it can be sent to a time-series database.
Re-keyChange the event key to suit a downstream consumer or align the topic for Kafka Streams joins/aggregation
ReformattingTranslate incoming JSON data to AVRO for better control over schema evolution, rogue messages and so on.

Supported formats

Lenses.io supports reading and writing data from/to Kafka topics in different serialisation formats.

FormatReadWrite
Intyesyes
Bytesyesyes
Longyesyes
Stringyesyes
Jsonyesyes
AvroVia schema registryVia schema registry
XMLyesyes
Csvyesnot yet
Google ProtobufVia custom configurationnot yet
CustomVia custom configurationno

First basic example


SET defaults.topic.autocreate=true;

INSERT INTO speeding_cars
SELECT STREAM
	car_speed as speed
	 , car_name
WHERE car_speed > 100
FROM car_data;

The above example will auto-create any necessary topics (in this case, “speeding_cars” topic) and populate it with the value of car_speed renamed as speed and car_name from the car_data topic only for events where the car_speed is greater than 100.

Kafka Record/Message

FacetLenses.io referenceExample
Key_key

OR

_key.<fieldname>
INSERT INTO outputTopic 
SELECT STREAM _key as customerId
FROM customers
Value_value

OR

_value.<fieldname>

OR

<fieldname>

OR *
INSERT INTO outputTopic
SELECT STREAM _value.firstName 
FROM customers

Projections

A projection represents the ability to project a given input value onto a target location (key or value) in an output record. Projections are the main building block of SELECT statements.


INSERT INTO target-topic
SELECT STREAM
	CONCAT('a', 'b') AS result1
	, (1 + field1) AS _key.a
	, _key.field2 AS result3
	, CASE
		WHEN field3 = 'Robert' THEN 'It`s bobby'
		WHEN field3 ='William' THEN 'It`s willy'
		ELSE 'Unknown'
		END AS who_is_it
FROM input-topic;

In the above example, result1, _key.a, result3 and who_is_itare all fields outputted into target-topic.

Operators & Expressions

TypeSupported OperatorsExample
LogicalAND, OR
WHERE 1 + _value.field1 > 5 
AND value.field2 < 10
Arithmetic+,-,*,/,%(mod)
WHERE 1 + _value.field1 
> 5
Ordering>,>=,<,<=
WHERE _value.field1 
> _value.field2
Equality=, !=
WHERE _key !=
LENGTH(_value.field1)
StringLIKE, NOT LIKE
WHERE _value.field1 
NOT LIKE%foo%
CASEIN
CASE WHEN field3 = 'Robert' 
THEN 'Its bobby' 
WHEN field3 = 'William'
THEN 'Its willy'
ELSE 'Unknown'
END AS who_is_it

Examples

Select move_number and date_released from transformer_movies_xml topic and put in as AVRO format in transformer_movies_avro topic. Make movie_number as the _key and only select movies where movie_number is less than 300.


INSERT INTO transformer_movies_avro
	STORE KEY AS AVRO
	VALUE AS AVRO
	SELECT STREAM movie_number AS _key
	, date_released
	FROM transformer_movies_xml
	WHERE movie_number > 0 AND
	movie_number < 300
example1

Calculate the rolling average KW for each customer_id for every 10 minute period every 5 minutes. Populate intoelectricity_event_avg stream


INSERT INTO electricity_events_avg
	SELECT STREAM customer_id
	, AVG (KW) AS KW
	FROM electricity_events
	WINDOW BY HOP 10m, 5m
	GROUP BY customer_id
example2

Populate the ship_speeds_with_names topic with the speed field from fast_vessel_processor topic joined with the ship_names topic using the MMSI value. The ship_names MMSI value is stored as a STRING whereas the fast_vessel_processor is stored as a LONG. The ship_names also doesn't have a key so needs to be rekeyed (ship_names_rekeyed) in order to build a state table (ship_names_state) since states cannot have NULL keys.


WITH ship_names_rekeyed AS
	(SELECT STREAM ship_names.mmsi AS _key
	FROM ship_names);
	WITH ship_names_state AS
	(SELECT TABLE *
	FROM ship_names_rekeyed);
	INSERT INTO ship_speeds_with_names
	SELECT STREAM fast_vessel_processor._key AS mmsikey
	, fast_vessel_processor.Speed
	, ship_names.owner
	, ship_names.name
	FROM fast_vessel_processor
	INNER JOIN ship_names_state ON fast_vessel_processor._key.MMSI
	= CAST(ship_names_state._key AS LONG);
example3

Reshape the car_speed_events stream by setting sensor_id andevent_time as keys and nesting mph and a calculated kmph values for speed object in the value field.


SELECT STREAM sensor.id AS _key.sensor_id
	, event_time AS _key.event_time
	, car_id
	, speedMph AS speed.mph
	, speedMph * 1.60934 AS speed.kmph
	FROM car_speed_events;
example4

Changing Storage Format

At times, it is useful to control the resulting Key and/or Value storage of the output topic. If the input is Json, the output for the streaming computation can be set to Avro.

The syntax is the following:


INSERT INTO <target topic>
	STORE 
	KEY AS <format>
	VALUE AS <format>
...
From \ ToINTLONGSTRINGJSONAVRO
INT=yesyesnoyes
LONGno=yesnoyes
Stringnono=noyes
JsonIf the Json storage contains integer onlyIf the Json storage contains integer or long onlyyes=yes
AvroIf Avro storage contains integer onlyIf the Avro storage contains integer or long onlyyesyes=

JOINS

Join Types

Join TypeDescriptionLexiconExample
InnerThis join type will only emit records where a match has occurred.JOIN
INSERT INTO result 
SELECT STREAM customersTable.name
, ordersStream.item 
FROM ordersStream 
JOIN customersTable
ON customersTable.id =
ordersStream.customer_id;
LeftSelects all the records from the left side of the join regardless of a match.LEFT JOIN
INSERT INTO result 
SELECT STREAM customersTable.name
, ordersStream.item 
FROM ordersStream 
LEFT JOIN customersTable
ON customersTable.id =
ordersStream.customer_id;
RightA mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match.RIGHT JOIN
INSERT INTO result 
SELECT TABLE customersTable.name
, ordersStream.item 
FROM customersTable 
RIGHT JOIN ordersStream
ON customersTable.id =
ordersStream.customer_id;
OuterUnion of left and right joins. It selects all records from the left and right side of the join regardless of a match happening.OUTER
INSERT INTO result 
SELECT TABLE customersStream.name
, ordersStream.item 
FROM ordersStream 
OUTER JOIN customersStream
ON customersTable.id =
ordersStream.customer_id;

Join Match Expressions

if no ON expression is provided, the join will be evaluated based on the equality of the _key facet

Equalitycustomers.id = order.user_id

customers.id - 1 = order.user_id - 1

substr(customers.name, 5) = order.item
INSERT INTO RESULT 
SELECT TABLE customersStream.name
, ordersStream.item 
FROM ordersStream 
OUTER JOIN customersStream
ON substr(customersTable.name, 5) =
ordersStream.customerName;
Booleanlen(customers.name) > 10

substr(customer.name,1) = “J”

len(customer.name) > 10 OR customer_key > 1
INSERT INTO RESULT 
SELECT TABLE cars_table.cars_name
, car_speeds.speed 
FROM cars_table 
OUTER JOIN car_speeds
ON car_speeds.speed > 100
Logicalcustomers._key = order.user_id AND len(customers.name) > 10

len(customers.name) > 10 AND substr(customer.name,1) = “J”

substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1
INSERT INTO RESULT 
SELECT TABLE customersStream.name
, customersTable.country
, ordersStream.customerId
, FROM ordersStream 
OUTER JOIN customersStream
ON customersTable.country = "USA"
AND customersTable.id =
ordersStream.customerId

Join Compatibility

LeftRightAllowed typesWindowResult
StreamStreamAllRequiredStream
TableTableAllNoTable
TableStreamRIGHT JOINNoStream
StreamTableINNER, LEFT JOINNoStream

Stream-to-Stream Windowing with WITHIN

When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.


SELECT STREAM
	customers.name
	, orders.item
FROM
	customers LEFT JOIN orders WITHIN 5s
		ON customers.id = orders.customer_id
WITHIN 5s;

The above example means that an event with an orders.item may be generated with a null value for customers.name if a matching customers.id event has not been generated within a 5 second +/- window.

Time Windows

Supported time descriptors

DurationDescriptionExample
mstime in milliseconds100ms
stime in seconds10s
mtime in minutes10m
htime in hours10h

Windowing types

Hopping WindowWINDOW BY HOP <duration_time>,<hop_i nterval>

Fixed size and overlapping windows. The same event can overlap into multiple windows.
WINDOW BY HOP
Tumbling windowWINDOW BY TUMBLE <duration_time>

Duration and hop interval are equal. An event can only appear in one window.
WINDOW BY TUMBLE
Session windowWINDOW BY SESSION <inactivity_interval>

Defined by a period of activity separated by a specified gap of inactivity at which point current session closes.
WINDOW BY Session
Grace periodWINDOW BY <windowing type> GRACE BY <grace time>

For a window to accept late-arriving records a grace period can be defined. If a record falls within a window and it arrived before the end of the grace time then the record will be processed and the aggregations or joins will update. If not, the record is discarded.
WINDOW BY TUMBLE

WITH for better SQL structure

Use `WITH` to break the code in reusable blocks:


WITH countriesStream AS (
	SELECT STREAM *
	FROM countries
);

WITH merchantsStream AS (
	SELECT STREAM *
	FROM merchants
);

WITH merchantsWithCountryInfoStream AS (
	SELECT STREAM
		m._key AS l_key
		, CONCAT(surname, ', ', name) AS fullname
		, address.country
		, language
		, platform
	FROM merchantsStream AS m JOIN countriesStream AS c
			ON m.address.country = c._key
	WITHIN 1h
);

WITH merchantsCorrectKey AS (
	SELECT STREAM
		l_key AS _key
		, fullname
		, country
		, language
		, platform
	FROM merchantsWithCountryInfoStream 
);

INSERT INTO currentMerchants
SELECT STREAM *
FROM merchantsCorrectKey;

INSERT INTO merchantsPerPlatform
SELECT TABLE
	COUNT(*) AS merchants
FROM merchantsCorrectKey
GROUP BY platform;

Functions

FunctionDescription
AS_NON_NULLABLE (expr)Returns the provided value with its type changed from the original type to its non nullable version
AS_NULLABLE (expr)Returns the provided value with its type changed from the original type to its nullable version
CAST (dt AS int)Enables conversion of values from one data type to another
COALESCE (value, prevValue)Returns the first non-null expression in the expression list
DUMP (expr)Show the internal representation of a value
EXISTS (field)Returns true if the given field is present false otherwise
ISNOTNULL (expr)Returns true if the input is not null; false otherwise
ISNULL (expr)Returns true if the input is null; false otherwise
SIZEOF (expr)Returns the number of elements in an array
TYPEOF ()Returns the object type of a complex expression. This can only be used when the format on the wire includes the details of the objects full name

String functions

FunctionDescription
ABBREVIATE (expr, lengthExpr)Abbreviates the expression to the given length and appends ellipses
BASE64 (expr)Returns the input string using base64 algorithm
CAPITALIZE (expr)Capitalizes the first letter of the expression
CENTER (target, size, padExpr)Centers a String in a larger String of size N
CHOP (expr)Returns the last character from an expression of type string
CONCATENATE (expr1, expr2, expr3)Returns the string representation of concatenating each expression in the list. Null fields are left out
CONCAT (expr1, expr2, expr3)Alias for CONCATENATE
CONTAINS (sourceExpr, targetExpr)Returns true if an expression contains the given substring
DECODE64 (expr)Decodes a Base64 encrypted string
DELETEWHITESPACE (expr)Removes all whitespace from an expression of type string
DIGITS (expr)Retains only the digits from a string expression
DROPLEFT (expr, lengthExpr)Removes the left most ‘length’ characters from a string expression
DROPRIGHT (expr, lengthExpr)Removes the right most ‘length’ characters from a string expression
ENDSWITH (sourceExpr, targetExpr)Returns true if an expression ends with the given substring
INDEXOF (expr, substringExpr)Returns the index of a substring in an expression
LENGTH (expr)Returns the length of a string. Calculates length using characters as defined by UTF-16
LEN (expr)Alias for LENGTH
LOWERCASE (strExpr)Returns the expression in lowercase
LOWER (strExpr)Alias for LOWERCASE
LEFTPAD (strExpr, lengthExpr, padExpr)Prepends the value of padExpr to the value of strExpr until the total length is lengthExpr
LPAD (strExpr, lengthExpr, padExpr)Alias for LEFTPAD
REGEXP (strExpr, regexExpr)Returns the matched groups otherwise null
REGEX (strExpr, regexExpr)Alias for REGEXP
REPLACE (sourceExpr, targetExpr, replaceExpr)Returns a new string in which all occurrences of a specified String in the current string are replaced with another specified String
REVERSE (expr)Reverses the order of the elements in the input
RIGHTPAD (strExpr, lengthExpr, padExpr)Appends the value of padExpr to the value of strExpr until the total length is lengthExpr
RPAD (strExpr, lengthExpr, padExpr)Alias for RIGHTPAD
STARTSWITH (exprSource, exprTarget)Returns true if an expression starts with the given substring
STRIPACCENTS (expr)Removes diacritics (approximately the same as accents) from an expression. The case will not be altered
SUBSTRING (expr, startIndexExpr, endIndexExpr)Returns a new string that is a substring of this string
SUBSTR (expr, startIndexExpr)Alias for SUBSTRING
SWAPCASE (expr)Swaps the case of a string expression
TAKELEFT (expr, lengthExpr)Returns the left most ‘length’ characters from a string expression
TAKERIGHT (expr, lengthExpr)Returns the right most ‘length’ characters from a string expression
TRIM (expr)Removes leading and trailing spaces from the input expression
TRUNCATE (strExpr, nExpr)Truncates a string so that it has at most N characters
UNCAPITALIZE (expr)Changes the first letter of each word in the expression to lowercase
UPPER (strExpr)
UPPERCASE (strExpr)
Returns the expression in uppercase
UUID ()Returns an universally unique identifier

Obfuscation functions

NameDescription
ANONYMIZE (strExpr)Obfuscates the entire string input
MASK (strExpr)Alias for ANONYMIZE
EMAIL (emailExpr)Anonymize the value and obfuscates an email address
FIRST1 (strExpr)Anonymize the value and only keeps the first character
FIRST2 (strExpr)Anonymize the value and only keeps the first two characters
FIRST3 (strExpr)Anonymize the value and only keeps the first three characters
FIRST4 (strExpr)Anonymize the value and only keeps the first four characters
LAST1 (strExpr)Anonymize the value and only keeps the last character
LAST2 (strExpr)Anonymize the value and only keeps the last two characters
LAST3 (strExpr)Anonymize the value and only keeps the last three characters
LAST4 (strExpr)Anonymize the value and only keeps the last four characters
INITIALS (strExpr)Anonymize the value and only keeps the initials of all the words in the input

Numeric Functions

NameDescription
%The remainder operator (%) computes the remainder after dividing its first operand by its second i.e. numExpr % numExpr
/Divides one number by another (an arithmetic operator) i.e. numExpr / numExpr
-Subtracts one number from another (an arithmetic operator) i.e. numExpr - numExpr
*Multiplies one number with another (an arithmetic operator) i.e. numExpr * numExpr
+Adds one number to another (an arithmetic operator) i.e. numExpr + numExpr
- (negative)Returns the negative of the value of a numeric expression (a unary operator) i.e. -numExpr
ABS (numExpr)Returns the absolute value of an expression that evaluates to a number type
ACOS (numExpr)Returns the trigonometric arc cosine of an expression
ASIN (numExpr)Returns the trigonometric arc sine of an expression
ATAN (numExpr)Returns the trigonometric arc tangent of an expression
CBRT (numExpr)Returns the cube root of numExpr
CEIL (numExpr)Returns the absolute value of an expression
COSH (numExpr)Returns the hyperbolic cosine of an expression
COS (numExpr)Returns the trigonometric cosine of an expression
DEGREES (numExpr)Converts the input expression to degrees
DISTANCE (x1, y1, x2, y2)Calculates the distance between two points using the haversine method
FLOOR (numExpr)Returns the largest value not greater than the argument
MAX (numExpr1, numExpr2, numExpr3)Returns the maximum element from an arbitrary number of given elements
MIN (numExpr1, numExpr2, numExpr3)Returns the minimum element from an arbitrary number of given elements
MOD (numExpr, numExpr)Alias for %
NEG (numExpr)Returns the negative value of an expression it has to evaluate to a number type
POW (numExpr1, numExpr2)Returns numExp1 raised to the numExp2 power
RADIANS (numExpr)Converts the input expression to radians
RANDINT ()Returns a random integer
ROUND (numExpr)Returns the closest integer of an expression, with ties rounding towards positive infinity
SIGN (numExpr)Returns +1 if a value is positive or -1 if a value is negative
SINH (numExpr)Returns the hyperbolic sine of an expression
SIN (numExpr)Returns the trigonometric sine of an expression
SQRT (numExpr)Returns the square root of numExpr
TANH (numExpr)Returns the hyperbolic tangent of an expression
TAN (numExpr)Returns the trigonometric tangent of an expression

Date Functions

NameDescription
DATE ()Provides the current ISO date value
CONVERT_DATETIME (strExpr, fromPattern, toPattern)Converts the string format of a date [and time] to another using the pattern provided
CONVERTDATETIME (strExpr, fromPattern, toPattern)Alias for CONVERT_DATETIME
DATETIME ()Provides the current ISO date and time
DATE_TO_STR (strExpr, pattern)Converts a date time value to a string using the pattern provided
DAY (expr)Extracts the day component of an expression that is of type timestamp
HOUR <small)(expr)Extracts the hour component of an expression that is of type timestamp
MINUTE <small)(dataExpr)Extracts the minute component of an expression that is of type timestamp
MONTH_TEXT <small)(dataExpr)Returns the month name
MONTH<small)(dataExpr)Extracts the month component of an expression that is of type timestamp
SECOND<small)(dataExpr)Extracts the second component of an expression that is of type timestamp
TOMORROW ()Returns the current date time plus 1 day
TO_DATETIME (strExpr, pattern)Converts a string representation of a datetime into epoch value using the pattern provided
TO_DATE (strExpr, pattern)Converts a string representation of a date into epoch value using the pattern provided
TO_TIMESTAMP (longExpr)Converts a long (epoch) to a date and time type
TO_TIMESTAMP (strExpr, pattern)Converts a string using a pattern to a date and time type
YEAR (expr)Extracts the year component of an expression that is of type timestamp
YESTERDAY ()Returns the current date time minus 1 day

Utility Functions

NameDescriptionStreaming (stateless)Streaming (stateful)
AVG (numExpr)Returns the average of the values in a group. It ignores null value. It can be used with numeric input only
COLLECT (expr, maxN)Returns an array in which each value in the input set is assigned to an element of the array
COLLECT_UNIQUE (expr, maxN)Returns an array of unique values in which each value in the input set is assigned to an element of the arrayno
COUNT (*) AS totalReturns the number of records returned by a query or the records in a group as a result of a GROUP BY statement
MAXK (numExpr, N)Returns the N largest values of an numExprno
MAXK_UNIQUE (numExpr, N)Returns the N largest unique values of an numExprno
MINK (numExpr, N)Returns the N smallest values of an numExprno
MINK_UNIQUE (numExpr, N)Returns the N smallest unique values of an numExprno
SUM (numExpr)Returns the sum of all the values, in the expression. It can be used with numeric input only. Null values are ignored

Flags

Allows to customize the behaviour of the application (including error handling). Must be declared at the beginning of the SQL statement.

KeyTypeDescription
defaults.topic.autocreateBOOLEANCreates the target topics if they do not exist
defaults.topic.partitionINTControls the target topics partitions count.
Ignored if the topic exists
defaults.topic.replicationINTControls the target topic replication factor.
Ignored if the topics are present already.
defaults.topic.key.avro.recordSTRINGControls the output record Key schema name.
defaults.topic.key.avro.namespaceSTRINGControls the output record Key schema namespace.
defaults.topic.value.avro.recordSTRINGControls the output record Key schema name.
defaults.topic.value.avro.namespaceSTRINGControls the output record Key schema namespace.
topic.`market.risk`.<topic_con figuration>STRINGApplies the given Kafka topic setting for `market.risk`. SET topic.`market.risk`.cleanup.policy
=compact,delete’’; This is applied only if the topic does not exist a priori and it needs to be created
error.policySTRINGControls the behaviour of handling corrupted messages. Can be any of the following:
  • continue
  • fail
  • dlq (dead letter queue)
dead.letter.queueSTRINGThe topic to send those messages which cannot be processed. Requires `error.policy` to be set to `dlq`
processing.guaranteeSTRINGThe processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default, which is the recommended setting for production.
commit.interval.msLONGThe frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behaviour of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms, so every intermediate event that is received by the table will not be visible downstream directly.
poll.msLONGThe amount of time in milliseconds to block waiting for input.
cache.max.bytes.bufferingLONGMaximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.
client.idSTRINGAn ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer
num.standby.replicasINTThe number of standby replicas for each task. Default value is 0.
num.stream.threadsINTThe number of threads to execute stream processing. Default value is 1.
max.task.idle.msLONGMaximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.
buffered.records.per.partitionINTMaximum number of records to buffer per partition. Default is 1000.
buffered.records.per.partitionINTMaximum number of records to buffer per partition. Default is 1000.
connections.max.idle.msLONGClose idle connections after the number of milliseconds specified by this config.
receive.buffer.bytesLONGThe size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
reconnect.backoff.msLONGThe base amount of time to wait before
attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
reconnect.backoff.max.msLONGThe maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum.
After calculating the backoff increase, 20%
random jitter is added to avoid connection
storms. Default is 1000.
retriesINTSetting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0
retry.backoff.msLONGThe amount of time to wait before attempting to
retry a failed request to a given topic partition.
This avoids repeatedly sending requests
in a tight loop under some failure scenarios.
Default is 100.
send.buffer.bytesLONGThe size of the TCP send buffer (SO_SNDBUF)
to use when sending data. If the value is -1,
the OS default will be used.
Default is 128 * 1024.
state.cleanup.delay.msLONGThe amount of time in milliseconds to wait
before deleting state when a partition has migrated.
consumer.<config>STRINGSets the underlying Kafka consumer
configuration
producer.<config>STRINGSets the underlying Kafka producer
configuration
rocksdb.<config>STRINGSets the configuration for the RocksDB
database when stateful processing is used