By Mihalis Tsoukalos | Apr 21, 2020


Visualize Kafka data in your web apps with D3.js and SQL

Visualize Kafka data in your web apps with D3.js and SQL

This tutorial will walk you through visualizing live data in a Kafka topic using the D3.js JavaScript library.

This technique might be used to democratise more data in Kafka across your organization, including to engineering and operations teams. It will also allow you to simplify how to integrate data from Kafka into your business applications.

But rather than querying the data directly against Kafka, we will query via Lenses.io. Lenses provides a governance portal for Apache Kafka.

Querying data into Kafka through Lenses has a few benefits:

  • Use of SQL: Lenses.io allows us to use SQL to query, filter and aggregate the data before visualizing it. This will reduce workload on the client side.

  • Deserialized payloads: Payload data in Kafka will also deserialize by Lenses whatever the format (AVRO, Protobuf, XML even proprietary formats via Serde).

  • Secure data access via namespaces and security tokens: Data access will be protected by role based access controls based on namespaces and access with service account tokens. This avoids you having to configure & manage ACLs in Kafka.

Visualize-Kafka-data-using-D3.js-and-SQ

Last, note that if you want to visualize JSON data stored in a file, you can have a look at the Using D3 to visualize data blog post.

Pre-requisites

In order to be able to follow the steps of this tutorial, you will need the following:

  • Lenses up and running. You can use our free Lenses Box instance if necessary.

  • An Internet connection that will help you get D3.js and create the sample project.

The Scenario

We are going to read live data from a Kafka topic using Lenses API and a service account and visualize it using D3.js. The Kafka topic that will be used is called nyc_yellow_taxi_trip_data.

The Implementation

This section will present the steps needed for implementing the described scenario beginning from Lenses.

About Lenses

The nyc_yellow_taxi_trip_data Kafka topic contains records with the following kind of format (represented as JSON records):

{
	"value": {
		"VendorID": 2,
		"tpep_pickup_datetime": "2016-01-01 00:00:00",
		"tpep_dropoff_datetime": "2016-01-01 00:00:00",
		"passenger_count": 1,
		"trip_distance": 10.54,
		"pickup_longitude": "-73.98455047607422",
		"pickup_latitude": "40.6795654296875",
		"RateCodeID": 1,
		"store_and_fwd_flag": "N",
		"dropoff_longitude": "-73.95027160644531",
		"dropoff_latitude": "40.78892517089844",
		"payment_type": 1,
		"fare_amount": 33,
		"extra": 0.5,
		"mta_tax": 0.5,
		"improvement_surcharge": 0.3,
		"tip_amount": 0,
		"tolls_amount": 0,
		"total_amount": 34.3
	},
	"metadata": {
		"offset": 0,
		"partition": 0,
		"timestamp": 1587447010296,
		"__keysize": 0,
		"__valsize": 147
	}
},

The query that is going to be executed in the JavaScript code is the following:

SELECT count(*), passenger_count, VendorID, sum(total_amount)
FROM nyc_yellow_taxi_trip_data
WHERE passenger_count != 0 AND passenger_count <= 6
GROUP BY passenger_count, VendorID

Each record returned by the previous query contains 4 fields as defined in the SELECT statement.

Creating a Lenses Service Account

You will need to perform two steps in order to create a service account in Lenses:

  • Create a new Group that matches the requirements of that service account.

  • Create the service account and keep its token.

Note that the name of the service account should be unique in a Lenses installation.

You can find more information about creating a Lenses Service Account in this blog post. The name of the service account used in this tutorial will be service.

How to create the JavaScript project

In order to create the JavaScript project, you will need to execute some commands.

First, you should execute git clone https://github.com/wbkd/webpack-starter.git inside the root directory of your project.

Then, you will then need to make changes to the following files:

  • package.json

  • ./src/index.html

  • ./src/scripts/index.js

After that you will need to execute one of the following two commands depending on the JavaScript package manager that you are using:

  • npm install if you are using the npm JavaScript package manager.

  • yarn if you are using the yarn JavaScript package manager.

If you decide to use our GitHub repository, you will need to execute the following command for creating the project:

git clone git@github.com:mactsouk/d3-service.git

You might need to make changes to ./src/scripts/index.js to match your Lenses installation or define your own SQL query.

The last thing you should do is execute npm run start to begin your application. If you are using yarn, execute yarn start instead.

How to get data from Lenses

The following JavaScript code is used for getting the data from Lenses.

// We open the Websocket
  webSocketRequest.onopen = () => {
    // Here, we send the message with the query and the credentials
    webSocketRequest.send(JSON.stringify(firstMessage));
    // Here, the onmessage() method is executed each time we
    // receive a message from the Websocket connection.
    webSocketRequest.onmessage = (streamEvent) => {
      // This streamEvent is an object that has a data attribute.
      // That data attribute has a type property, which can be
      // RECORD, END or ERROR.
      const isRecord = JSON.parse(streamEvent.data).type === 'RECORD';
      const isComplete = JSON.parse(streamEvent.data).type === 'END';
      const isError = JSON.parse(streamEvent.data).type === 'ERROR';
      websocketSubject.next(websocketData);
      isRecord && websocketData.push(JSON.parse(streamEvent.data).data.value);
      isError && websocketSubject.console.error((error) => console.error(error));
      isComplete && websocketSubject.complete();
    };
  };
};
In order to get data from Lenses, you will need to create a WebSocket connection. As we are using a service account, there is no need to login to Lenses using Lenses REST API and get the authentication token.

The Format of the Data

In this section you will learn more about the format of the JSON records read from Lenses using JavaScript. The records returned by the SQL query have the following format:

{
    count: 13,
    passenger_count: 5,
    VendorID: 1,
    sum: 14.405384615384614
}

Once you know the format of the data, you can easily choose the fields that interest you and are going to be included in the visualization process.

The returned data will be processed in the JavaScript script using the following code:

websocketData.forEach(function(d) {
      d.sum = d.sum / d.count;
    });

This means that the sum field will contain the amount of money paid per trip.

Visualizing Data

This section will show the core JavaScript code used for visualizing the data. As mentioned before, we are going to use the D3.js library.

The JavaScript code that draws the dots and generates the tooltips is the following:

    svg.selectAll(".dot")
      .data(websocketData)
    .enter().append("circle")
      .attr("class", "dot")
      .attr("r", function(d) { return 2 * d.passenger_count;})
      .attr("cx", xMap)
      .attr("cy", yMap)
      .style("fill", function(d) { return color(cValue(d));})
      .style("opacity", 20)
      .on("mouseover", function(d) {
          tooltip.transition()
               .duration(200)
               .style("opacity", .9);
          tooltip.html("VendorID: " + d["VendorID"] + "<br/> (" + xValue(d)
	        + ", " + Math.floor(yValue(d)) + ")")
               .style("left", (d3.event.pageX + 5) + "px")
               .style("top", (d3.event.pageY - 28) + "px");
      });

The radius of each circle depends on the number of passengers – the more passengers the taxi had, the bigger the circle.

The Final Output

In this section we are going to see the generated visualization. Note that each time you load the HTML file, you will get a slightly different output as we are working with dynamic data.

D3.js Scatter Plot

Presenting the Project Files

This section will present the main files of the project.

The package.json file

The contents of package.json are the following:

{
  "name": "d3-live",
  "version": "0.0.1",
  "repository": "https://github.com/mactsouk/d3-service.git",
  "description": "A D3 visualization of live data using the Lenses API and a Service Account",
  "scripts": {
    "build": "cross-env NODE_ENV=production webpack --config webpack/webpack.config.prod.js --colors",
    "start": "webpack-dev-server --open --config webpack/webpack.config.dev.js"
  },
  "author": "Lenses.io",
  "license": "MIT",
  "devDependencies": {
    "@babel/core": "^7.9.0",
    "@babel/plugin-proposal-class-properties": "^7.8.3",
    "@babel/plugin-syntax-dynamic-import": "^7.8.3",
    "@babel/preset-env": "^7.9.0",
    "babel-loader": "^8.1.0",
    "clean-webpack-plugin": "^3.0.0",
    "copy-webpack-plugin": "^5.1.1",
    "cross-env": "^7.0.2",
    "eslint": "^6.8.0",
    "eslint-loader": "^3.0.3",
    "file-loader": "^5.1.0",
    "html-webpack-plugin": "^4.0.0-beta.11",
    "mini-css-extract-plugin": "^0.9.0",
    "webpack": "^4.42.1",
    "webpack-cli": "^3.3.11",
    "webpack-dev-server": "^3.10.3",
    "webpack-merge": "^4.2.2"
  },
  "dependencies": {
    "@babel/polyfill": "^7.8.7",
    "axios": "^0.19.2",
    "core-js": "^3.6.4",
    "rxjs": "^6.5.5",
    "ws": "^7.2.3"
  }
}
The dependencies block is where you define the JavaScript packages that need to be downloaded for the project to execute.

The ./src/index.html file

The contents of ./src/index.html are the following:

<!DOCTYPE html>
<html lang="en">
<style>
body {
  font: 11px sans-serif;
}
.axis path,
.axis line {
  fill: none;
  stroke: #000;
  shape-rendering: crispEdges;
}
.dot {
  stroke: #000;
}
.tooltip {
  position: absolute;
  width: 200px;
  height: 28px;
  pointer-events: none;
}
h2 {
  font-size: 20px;
  font-weight: bold;
  line-height: 1;
  text-align: center;
  margin: 0px 0 0;
  padding-top: 0px; 
  padding-bottom: 0px;
 }
</style>
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Visualizing Live Data from Lenses D3.js</title>
</head>
<body>
  <h2>Visualizing live data from Lenses via Websocket Endpoint using D3.js!</h2>
  <h2>Using a Lenses Service Account.</h2>
  <div id="my_dataviz"></div>
<script src="https://d3js.org/d3.v3.js"></script>
</body>
</html>

The embedded CSS code is required for the correct rendering of the HTML output.

The ./src/scripts/index.js file

The ./src/scripts/index.js file is the most important file of the project:

import axios from 'axios';
import Websocket from 'ws';
import { Subject } from 'rxjs';
import { finalize } from 'rxjs/operators';
const username = 'service';
const token = '7a89e29e-453e-48cc-a881-94d687bfb1cb';
const topicDataUrl = 'ws://localhost:3030/api/ws/v2/sql/execute';
const webSocketRequest = new WebSocket(topicDataUrl);
var websocketData = new Array;
const websocketSubject = new Subject();
// We are waiting for the authentication token
async function requestToWSEndpoint() {
  // Construct the WebSocket token
  const reqToken = username.concat(':', token);
  websocketData = [];
  // This is an object with the authentication token + query
  // This is the first message for the Websocket connection
  const firstMessage = {
    token: reqToken,
    stats: 2,
    sql: "SELECT count(*), passenger_count, VendorID, sum(total_amount) FROM nyc_yellow_taxi_trip_data WHERE passenger_count != 0 AND passenger_count <= 6 GROUP BY passenger_count, VendorID",
    live: false
  };
  // We open the Websocket
  webSocketRequest.onopen = () => {
    // Here, we send the message with the query and the credentials
    webSocketRequest.send(JSON.stringify(firstMessage));
    // Here, the onmessage() method is executed each
    // time we receive a message
    // from the Websocket connection.
    webSocketRequest.onmessage = (streamEvent) => {
      // This streamEvent is an object that has a data attribute.
      // That data attribute has a type property, which can be
      // RECORD, END or ERROR.
      const isRecord = JSON.parse(streamEvent.data).type === 'RECORD';
      const isComplete = JSON.parse(streamEvent.data).type === 'END';
      const isError = JSON.parse(streamEvent.data).type === 'ERROR';
      websocketSubject.next(websocketData);
      isRecord && websocketData.push(JSON.parse(streamEvent.data).data.value);
      isError && websocketSubject.console.error((error) => console.error(error));
      isComplete && websocketSubject.complete();
    };
  };
};
(async function () {
  await requestToWSEndpoint();
  // finanize() will return data when the stream has finished.
  websocketSubject.pipe(finalize(() => {
    var width = 1000;
    var height = 1000;
    var margin = { top: 50, right: 40, bottom: 50, left: 60 };
    var width = width - margin.left - margin.right;
    var height = height - margin.top - margin.bottom;
    console.log(websocketData, 'DATA');
    websocketData.forEach(function(d) {
      d.sum = d.sum / d.count;
    });
    var xValue = function(d) { return d.count;}, // data -> value
    xScale = d3.scale.linear().range([0, width]), // value -> display
    xMap = function(d) { return xScale(xValue(d));}, // data -> display
    xAxis = d3.svg.axis().scale(xScale).orient("bottom");
    // setup y
    var yValue = function(d) { return d.sum;}, // data -> value
    yScale = d3.scale.linear().range([height, 0]), // value -> display
    yMap = function(d) { return yScale(yValue(d));}, // data -> display
    yAxis = d3.svg.axis().scale(yScale).orient("left");
    // setup fill color
    var cValue = function(d) { return d.VendorID;},
    color = d3.scale.category10();
  
    // add the graph canvas to the body of the webpage
    var svg = d3.select("body").append("svg")
      .attr("width", width + margin.left + margin.right)
      .attr("height", height + margin.top + margin.bottom)
      .append("g")
      .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
    // add the tooltip area to the webpage
    var tooltip = d3.select("body").append("div")
      .attr("class", "tooltip")
      .style("opacity", 0);
    // don't want dots overlapping axis, so add in buffer to data domain
    xScale.domain([d3.min(websocketData, xValue)-1, d3.max(websocketData, xValue)+1]);
    yScale.domain([d3.min(websocketData, yValue)-1, d3.max(websocketData, yValue)+1]);
    // x-axis
    svg.append("g")
      .attr("class", "x axis")
      .attr("transform", "translate(0," + height + ")")
      .call(xAxis)
    .append("text")
      .attr("class", "label")
      .attr("x", width)
      .attr("y", -6)
      .style("text-anchor", "end")
      .text("Count");
    // y-axis
    svg.append("g")
      .attr("class", "y axis")
      .call(yAxis)
    .append("text")
      .attr("class", "label")
      .attr("transform", "rotate(-90)")
      .attr("y", 6)
      .attr("dy", ".71em")
      .style("text-anchor", "end")
      .text("Amount per trip");
    // draw dots
    svg.selectAll(".dot")
      .data(websocketData)
    .enter().append("circle")
      .attr("class", "dot")
      .attr("r", function(d) { return 2 * d.passenger_count;})
      .attr("cx", xMap)
      .attr("cy", yMap)
      .style("fill", function(d) { return color(cValue(d));})
      .style("opacity", 20)
      .on("mouseover", function(d) {
          tooltip.transition()
               .duration(200)
               .style("opacity", .9);
          tooltip.html("VendorID: " + d["VendorID"] + "<br/> (" + xValue(d)
	        + ", " + Math.floor(yValue(d)) + ")")
               .style("left", (d3.event.pageX + 5) + "px")
               .style("top", (d3.event.pageY - 28) + "px");
      });
  })).subscribe();
}());
This file contains all the JavaScript code. Its flow goes like this:

  • Create the WebSocket connection as defined in the firstMessage object.

  • The message is sent using the webSocketRequest.send(JSON.stringify(firstMessage)) call.

  • The onmessage() method is executed each time we receive new data from the WebSocket connection - this happens because we are expecting to get multiple JSON records back.

  • This streamEvent is an object that has a data attribute. That data attribute has a type property, which can be RECORD, END or ERROR.

  • When we are dealing with a new RECORD, we add it to the existing records.

  • When there is an ERROR, we print it in the console.

  • When we are dealing with END, we terminate the connection.

  • After the connection is terminated and all data has been read, websocketSubject.pipe(finalize(()) will return the data that is stored in the websocketData variable.

  • The websocketData variable is used by D3.js to visualize the desired data.

The .eslintrc file

The contents of .eslintrc are the following:

{
  "env": {
    "browser": true,
    "node": true
  },
  "parserOptions": {
    "ecmaVersion": 8,
    "sourceType": "module"
  },
  "rules": {
    "semi": 2
  }
}

Next steps

Now that you know how to visualize live data from Kafka topics through Lenses and D3.js, you should begin visualizing data from your own Kafka topics.

Useful links

Ready to get started with Lenses?

Download free version