Distributed tracing with Kafka message headers

Implement distributed tracing with OpenTracing and view trace IDs in Apache Kafka message headers.

Guillaume Aymé
By Guillaume Aymé
Dec 11, 2020
Distributed tracing with Kafka message headers

Apache Kafka 0.11 introduced headers to messages in Kafka.

Pretty handy. Since Kafka is at the heart of business services, and services become more distributed and complex, headers make managing these environments much easier. 

Kafka Headers act in much the same way as headers for HTTP. They add metadata to your message on top of the payload and key that you get with a Kafka message. They’re useful for annotating, auditing, monitoring and routing of events as they flow through Kafka.

This helps support a number of different use cases including:

  • Tracing data lineage

  • Adding business metadata to governance

  • Monitoring & observability

In Lenses 4.1, we’ve introduced querying headers alongside your Kafka message and key with SQL. Querying header and payload looks like this:

HEADERASSTRING()

select currency, amount, merchantId, HEADERASSTRING("traceId") as traceId 

FROM cc_payments 

LIMIT 100

Having access to metadata in your Kafka headers can drastically accelerate time investigate issues such as for an IT incident or a compliance audit. An operator will have more business and technical context as they explore their Kafka events.

Query Kafka Headers with Lenses.io

Kafka Headers for Observability

One of the biggest use cases is observability. 

It’s important to be able to view transactions and data flows as they traverse different applications and APIs connected to Kafka.

APM solutions such as NewRelic and Dynatrace have taken advantage of Kafka headers to include IDs in Kafka messages, thus enabling distributed tracing. 

Jaeger too. The OpenTracing Kafka Client allows you to trace spans across Kafka clients.

Then using Lenses to query headers can be particularly handy during an investigation. 

For example, identifying a corrupted message (in the business sense as much as technical) in Kafka and needing to view the trace in Jaeger. Or vice versa, identifying an error trace in Jaeger and needing to find the business event in Kafka.

The same might apply for governance. You need governance to identify the lineage and provenance of the event. 

Here’s a quick example for how to use the OpenTracing Kafka client to instrument your Kafka messages by injecting a trace ID into Kafka message headers. The walkthrough will involve:

  • Launching a Lenses Box “All-in-one” Kafka docker environment

  • Running the Jaeger “all-in-one” docker

  • Building a basic Kafka producer

  • Identify a trace in Lenses

Deploy Lenses

A Lenses Box contains a single broker Kafka with Lenses.io, Kafka Connect, Schema Registry and other services. 

1. Get your docker command and license key from lenses.io/box. Or alternatively request a Trial to use Lenses against your own Kafka. 

2. Run the docker

docker run -e ADV_HOST=127.0.0.1 -e EULA="<<LICENSE KEY>>" --rm -p 3030:3030 -p 9092:9092 lensesio/box

Ensure you have at least 4GB RAM.

We’ll be connecting to an external application so adjust the ADV_HOST accordingly if you’re not running it on a localhost. Also ensure ports 3030 (HTTP for the Lenses UI) and 9092 (for the Kafka client) are available.

3. Access the Lenses UI from http://<host>:3030.  The environment can take a few minutes to be fully available.

Deploy Jaeger 

The Jaeger client will allow us to collect and visualize traces.  

1. Execute the following docker run command:

docker run -d --name jaeger   -e COLLECTOR_ZIPKIN_HTTP_PORT=9411   -p 5775:5775/udp   -p 6831:6831/udp   -p 6832:6832/udp   -p 5778:5778   -p 16686:16686   -p 14268:14268   -p 14250:14250   -p 9411:9411   jaegertracing/all-in-one:latest

2. Access the Jaeger UI from http:<host>:16686

Kafka Headers Jaeger UI

Build a basic Kafka producer

We’re going to develop a very basic Kafka client producing a single string message event to a “traces” topic.

package com.lemastergui.lensesio.kafkaheaders;
import com.google.common.collect.ImmutableMap;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.kafka.TracingKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class LensesioJaeger {
    private final Tracer tracer;
    private LensesioJaeger(Tracer tracer) {
        this.tracer = tracer;
    }
    public static void main(String[] args) {
        if (args.length != 2) {
            throw new IllegalArgumentException("Expecting two arguments: Kafka Broker (eg: mybroker.com:9092) & message string");
        }
        String broker = args[0];
        String message = args[1];
        try (JaegerTracer tracer = initTracer("lenses.io_test")) {
            new LensesioJaeger(tracer).sendMessage(broker, message);
        }
    }
    public static JaegerTracer initTracer(String service) {
        Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
        Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
        Configuration config = new Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig);
        return config.getTracer();
    }
    private static KafkaProducer<String, String> createProducer(String broker) {
        final Properties kafkaProperties = new Properties();
        kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        kafkaProperties.put("enable.auto.commit", "false");
        kafkaProperties.put("group.id", "payments_processing");
        kafkaProperties.put("key.serializer", StringSerializer.class.getName());
        kafkaProperties.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer<>(kafkaProperties);
    }
    public void sendMessage(String broker, String message) {
        try {
            KafkaProducer<String, String> kafkaProducer = createProducer(broker);
            TracingKafkaProducer producer = new TracingKafkaProducer(kafkaProducer, tracer);
            Span span = tracer.buildSpan("sendMessage").start();
            try (Scope scope = tracer.scopeManager().activate(span)) {
                ProducerRecord pr = new ProducerRecord<>("traces", null, "111", message);
                final Future<RecordMetadata> future = producer.send(pr);
                producer.flush();
                span.log(ImmutableMap.of("event", "sendMessage", "value", message));
            } finally {
                span.finish();
            }
        } catch (Exception e) {
            System.out.println("An error occurred while sending message to Kafka. " + e.getMessage());
            e.printStackTrace();
        }
    }
}

If you’re using Maven, you'll find the project on Github.

Run the application with two arguments: the broker hostname:port and the second being the message to be published to the traces topic. 

Identify the trace

1. From Lenses, access the Explore page and find the traces topic from the data catalog. The topic was automatically created by the producer. 

2. If you’ve generated very few events, Lenses won’t have enough data to understand the serialization so you may need to hard-set the serialisation type for the Key and Value to String:String. 

Kafka Headers serialisation

3. From the Lenses SQL Studio, run the statement

SELECT _value as message, TAKELEFT(HEADERASSTRING("uber-trace-id"),INDEXOF(HEADERASSTRING("uber-trace-id"),":")) as trace_id FROM traces LIMIT 100;

Apache Kafka headers query header with SQL

4. From the Jaeger client ensure you can see traces for the service lenses.io_test

Kafka Headers Distributed Trace

5. Take one of the trace IDs returned in the Lenses SQL response and use it to lookup the trace. 

Lenses.io Find Jaeger Trace Kafka Headers

Next Steps

Explore the full range of SQL use cases for Kafka. Or start a free Trial against your own Kafka cluster

Ready to get started with Lenses?

Try now for free