Angelos Petheriotis, Senior Data Engineer at Centrica (Hive Home/British Gas) shares parts of their data journey, building IoT realtime data pipelines with Stream-Reactor, Kafka and Kubernetes.
Driving billions of messages per day through multiple processing pipelines requires a significant amount of processing and persisting jobs. We designed our pipelines having in mind a real time, durable and stable continuous data pipeline. In order to achieve this goal we made our services and our infrastructure as decoupled as possible.
Centrica launched Hive in 2013 with customers telling us they wanted comfort and convenience, and a thermostat they could control that was easy and simple to use. Today, with £500m investment into Centrica Hive Limited, from parent company Centrica plc, the Hive ecosystem of connected products has grown its range and geographical market, to give people new ways to control their homes, their comfort, and their energy usage. Hive is possibly one of the largest and most mature IoT deployments in the UK utilizing Kafka.
In this post, we will present the way we evolved our systems architecture over the past few years, and how we are implementing the “single responsibility” principle, resulting into a design that has been proven to work and scale quite easily.
Let’s get through a simple example of our way of implementing pipelines. Take for example the following requirement. We need a real-time pipeline that receives 10 billion temperature reads a day (in a streaming way, evenly distributed) group them by specific time-window and store the results to a data-store. In our case we are using Elasticsearch and Cassandra.
The way we approached the above requirement was to decouple the data-pipeline by implementing two jobs, as shown in the following figure.
We split the pipeline into 2 main units: the aggregator job and the persisting
job. The aggregator has one and only one responsibility: to read from the input Kafka topic, process the messages and
finally emit them to a new Kafka topic. The persisting job then takes over and whenever a message is received from
topic temperatures.aggregated
it persists to Elasticsearch.
The above approach might seem to be an overkill at first but it provides a lot of benefits (but also some drawbacks). Having two units means that each unit’s health won’t directly affect each other. If the processing job fails due OOM, the persisting job will still be healthy.
One major benefit we’ve seen using this approach is the replay capabilities this approach offers. For example, if
at some point we need to persist the messages from temperatures.aggregated
to Cassandra, it’s just a matter
of wiring a new pipeline and start consuming the Kafka topic. If we had one job for processing and persisting, we
would have to reprocess every record from the thermostat.data
, which comes with a great computational and time cost.
For data processing, we initially set up a Spark Cluster with 300+ GB RAM, and then we implemented numerous projects, one for every step of the data pipeline. Of course infrastructure is one part of the equation, but over time this setup also resulted to multiple GitHub repositories. Each of them requiring both maintenance, as well as the full SDLC: multiple CI pipelines, compiling, unit-testing, building JAR as well as multiple CI jobs for deploying executing and managing these jobs.
Another issue was utilization and job isolation. Our Spark cluster was either remaining partially idle, or being too busy. Another one was job isolation, a mis-behaving job could affect the entire cluster.
So the key issues identified with our initial approach were:
Too many GitHub projects, CI pipelines and high maintenance
Poor resource utilization
No Job isolation
At that point, we took a step back and started evaluating Kafka Connect. The end result was to replace 90% of our spark persisting jobs with Kafka Connect jobs! Today, persisting data from a topic to datastore is a matter of a few lines of configuration and then kubernetes takes over and deploys a kafka-connect cluster.
In order to achieve the config based plumping approach, we used stream-reactor Apache 2.0 licensed Kafka connectors with Kafka Connect Query Language (part of Lenses SQL). It offers a SQL like syntax that automatically instructs the Kafka cluster with what input topic(s) to use, which fields to extract from each message and what transformations to apply to it.
You can find an example configuration of a job that reads from a topic and persists data to Elasticsearch below:
By using dockers, kubernetes and Helm charts, we progressed from a monolithic approach of Spark streaming, that required managing lots of code into simple config based tasks to persisting data to data stores.
The following image illustrates the benefits of the second iteration:
Let’s look a bit more on some of those points.
Job Isolation
With Kubernetes, we executed 10-15 Kafka Connect clusters (each one with 3 Connect workers, and a default parallelization of 3 tasks) allocating 3 GB RAM to each connect cluster.
Resource Utilization
In Kubernetes each pod is pre-allocated a specific amount of Resources: “Use what you need”. Kubernetes was a key decision to our architecture, that helped among other to bring the cost of infrastructure down.
Engineering Effort
With Stream-Reactor Kafka Connectors we now need to manage only 2 to 3 parameters.
Stream-Reactor is a large collection of open-source Kafka Connectors. One of the main benefits is the simplicity of configuring them and advanced capabilities. To configure a connector you effectively need to configure:
the level of parallelization
the routing
any advanced configs
For example with Elasticsearch, we had to deal with numerous topics and different business requests. The Kafka Connectors made it easy to select which data and how they will be stored, via a simple query:
Multiple routing settings can also be configured using the ;
separator. The additional benefit being that a message
can be routed to multiple target Elasticsearch indexes, by a single connector that polls every message just once
Dealing with errors
A few error conditions that could occasionally occur, were easy to be handled. The stream-reactors connectors provide
configurable error policies. The RETRY
policy for example can retry a failed INSERT, something that would be useful
if an Elasticsearch instance was restarted for maintenance. The NOOP
policy can be uses on a topic, where we could
expect a malformed message, so that the connector will skip and continue. The THROW
error policy, sets the
connector to throw a stack-trace and exit, indicating an issue, ensuring that we can have confidence that every single
message has been sinked correctly (think of exactly-once-semantics).
Managing Billions of events in Elasticsearch
Writing billions of events into Elasticsearch every day, creates an issue regarding managing persistence. To overcome this we utilized a feature of the elastic KCQL connector
.. FROM topicA WITHINDEXSUFFIX={YYYY-MM-dd}
That results into every day persisting data into different indexed in Elasticsearch. And this allows us to control the total number of open Indices, and for example close and retire any index that is older than 7 days.
Analytics and in particular aggregation jobs is a significant part of what we do. We started working with KStreams, that is a library that provides low latency and has an easy-to-use event time support. Initially we had some challenges around managing state. Once overcome (and based on the fact that we had a Kafka centric approach) it made sense for our use cases. KStreams was also a very good fit for our Kubernetes cluster, as state-less applications were easily ported into the rest of the infrastructure.
So how does our Kubernetes cluster now looking? We are deploying multiple KStream applications and multiple Kafka
Connect clusters utilizing stream-reactor to build data pipelines. Here we are following again the single responsibility
principle discussed earlier.
We are using multiple Kafka Connect clusters and multiple Kstreams clusters. For every step of a data pipeline, a set of state-less microservices form into a cluster.
The lesson learned here, is that it’s always better to employ i.e. 10 or 15 Kafka connect clusters, each with 3 workers (for both fault tolerance as well as parallelization) rather than 1 single large Kafka connect clusters, running 15 connectors.
Having isolation in your design, means that for example a Cassandra connector is not going to effect an Elasticsearch connector. If part of the system misbehaves it will not affect all the pipelines. Running multiple connectors on the same Connect cluster, also occasionally results into rebalances of tasks and workers, affecting the entire connect cluster.
This design also makes monitoring straight forward for Kafka connect. We enable the JMX metrics that kafka-connect provides and we just forward them to our monitoring infrastructure.
Bringing Kubernetes into our architecture, helped us automate and streamline not only our infrastructure, but also our processes around data-pipeline automations (with minimal coding effort), allowing us to tackle the more challenging aspect of machine learning and complex event processing requirements.
Stream-Reactor is a collection of Apache 2.0 Kafka Connectors that is both feature-rich and battle tested. Elasticsearch is an excellent fit for IoT projects, and via the Kibana integration and end-points can enable the ability to query over IoT data in scale. Approaching data-pipelines via infrastructure-as-code
and building isolated components with the single-responsibility
has been an approach that delivers significant benefits.
As everything is better, faster and more simple when you visualize it, we use kafka-connect-ui
to visualize our kafka-connect clusters. From the Connect UI, we can easily see the status of each cluster and of each task in that cluster. Furthermore being able to see the exceptions that a task might have encountered in the browser is always faster than trying to grep on the logs :)
Whats next
SQL over streaming data these days seems to be simplifying both aggregations and analytics especially when it comes with cool tooling, and based on state-less application logic it makes it a good fit for Kubernetes that has been proven to work and scale.