• Pricing
  • Install Now
installNow icon
installNow icon
Install Now
homeMobile icon
homeMobile icon
Home
picingMobile icon
picingMobile icon
Pricing
blogMobile icon
blogMobile icon
Blog

Distributed tracing with Kafka message headers

Guillaume Aymé
By Guillaume AyméDecember 11, 2020
Kafka-message-headers
In this article:
  • 01.Kafka Headers for Observability
  • 02.Deploy Lenses
  • 03.Deploy Jaeger 
  • 04.Build a basic Kafka producer
  • 05.Identify the trace
  • 06.Next Steps

Apache Kafka 0.11 introduced headers to messages in Kafka.

Pretty handy. Since Kafka is at the heart of business services, and services become more distributed and complex, headers make managing these environments much easier. 

Kafka Headers act in much the same way as headers for HTTP. They add metadata to your message on top of the payload and key that you get with a Kafka message. They’re useful for annotating, auditing, monitoring and routing of events as they flow through Kafka.

This helps support a number of different use cases including:

  • Tracing data lineage
  • Adding business metadata to governance
  • Monitoring & observability

In Lenses 4.1, we’ve introduced querying headers alongside your Kafka message and key with SQL. Querying header and payload looks like this:


```
HEADERASSTRING()
select currency, amount, merchantId, HEADERASSTRING("traceId") as traceId 
FROM cc_payments 
LIMIT 100
```


Having access to metadata in your Kafka headers can drastically accelerate time investigate issues such as for an IT incident or a compliance audit. An operator will have more business and technical context as they explore their Kafka events.

Query Kafka Headers with Lenses.io

Kafka Headers for Observability

One of the biggest use cases is observability. 

It’s important to be able to view transactions and data flows as they traverse different applications and APIs connected to Kafka.

APM solutions such as NewRelic and Dynatrace have taken advantage of Kafka headers to include IDs in Kafka messages, thus enabling distributed tracing. 

Jaeger too. The OpenTracing Kafka Client allows you to trace spans across Kafka clients.

Then using Lenses to query headers can be particularly handy during an investigation. 

For example, identifying a corrupted message (in the business sense as much as technical) in Kafka and needing to view the trace in Jaeger. Or vice versa, identifying an error trace in Jaeger and needing to find the business event in Kafka.

The same might apply for governance. You need governance to identify the lineage and provenance of the event. 

Here’s a quick example for how to use the OpenTracing Kafka client to instrument your Kafka messages by injecting a trace ID into Kafka message headers. The walkthrough will involve:

  • Launching a Lenses Box “All-in-one” Kafka docker environment
  • Running the Jaeger “all-in-one” docker
  • Building a basic Kafka producer
  • Identify a trace in Lenses

Deploy Lenses

A Lenses Box contains a single broker Kafka with Lenses.io, Kafka Connect, Schema Registry and other services. 

1. Get your docker command and license key from lenses.io/box. Or alternatively request a Trial to use Lenses against your own Kafka. 

2. Run the docker

```
docker run -e ADV_HOST=127.0.0.1 -e EULA="<<LICENSE KEY>>" --rm -p 3030:3030
-p 9092:9092 lensesio/box
```


Ensure you have at least 4GB RAM.

We’ll be connecting to an external application so adjust the ADV_HOST accordingly if you’re not running it on a localhost. Also ensure ports 3030 (HTTP for the Lenses UI) and 9092 (for the Kafka client) are available.

3. Access the Lenses UI from http://<host>:3030.  The environment can take a few minutes to be fully available.

Deploy Jaeger 


The Jaeger client will allow us to collect and visualize traces.  

1. Execute the following docker run command:

```
docker run -d --name jaeger   -e COLLECTOR_ZIPKIN_HTTP_PORT=9411   -p
5775:5775/udp   -p 6831:6831/udp   -p 6832:6832/udp   -p 5778:5778   -p
16686:16686   -p 14268:14268   -p 14250:14250   -p 9411:9411
jaegertracing/all-in-one:latest
```


2. Access the Jaeger UI from http:<host>:16686

Kafka Headers Jaeger UI

Build a basic Kafka producer

We’re going to develop a very basic Kafka client producing a single string message event to a “traces” topic.

```
package com.lemastergui.lensesio.kafkaheaders;

import com.google.common.collect.ImmutableMap;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.kafka.TracingKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class LensesioJaeger {

    private final Tracer tracer;

    private LensesioJaeger(Tracer tracer) {
        this.tracer = tracer;
    }

    public static void main(String[] args) {
        if (args.length != 2) {
            throw new IllegalArgumentException("Expecting two arguments:
            Kafka Broker (eg: mybroker.com:9092) & message string");
        }

        String broker = args[0];
        String message = args[1];
        try (JaegerTracer tracer = initTracer("lenses.io_test")) {
            new LensesioJaeger(tracer).sendMessage(broker, message);
        }
    }

    public static JaegerTracer initTracer(String service) {
        Configuration.SamplerConfiguration samplerConfig =
Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
        Configuration.ReporterConfiguration reporterConfig =
Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
        Configuration config = new
Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig);
        return config.getTracer();
    }

    private static KafkaProducer<String, String> createProducer(String broker) {

        final Properties kafkaProperties = new Properties();
        kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        kafkaProperties.put("enable.auto.commit", "false");
        kafkaProperties.put("group.id", "payments_processing");
        kafkaProperties.put("key.serializer", StringSerializer.class.getName());
        kafkaProperties.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer<>(kafkaProperties);
    }

    public void sendMessage(String broker, String message) {
        try {
            KafkaProducer<String, String> kafkaProducer = createProducer(broker);
            TracingKafkaProducer producer = new
TracingKafkaProducer(kafkaProducer, tracer);
            Span span = tracer.buildSpan("sendMessage").start();
            try (Scope scope = tracer.scopeManager().activate(span)) {
                ProducerRecord pr = new ProducerRecord<>("traces", null, "111", message);
                final Future<RecordMetadata> future = producer.send(pr);
                producer.flush();
                span.log(ImmutableMap.of("event", "sendMessage", "value", message));
            } finally {
                span.finish();
            }
        } catch (Exception e) {
            System.out.println("An error occurred while sending message to
Kafka. " + e.getMessage());
            e.printStackTrace();
        }
    }
}

```


If you’re using Maven, you'll find the project on Github.

Run the application with two arguments: the broker hostname:port and the second being the message to be published to the traces topic. 

Identify the trace

1. From Lenses, access the Explore page and find the traces topic from the data catalog. The topic was automatically created by the producer. 

2. If you’ve generated very few events, Lenses won’t have enough data to understand the serialization so you may need to hard-set the serialisation type for the Key and Value to String:String. 

Kafka Headers serialisation

3. From the Lenses SQL Studio, run the statement

```
SELECT _value as message, TAKELEFT(HEADERASSTRING("uber-trace
id"),INDEXOF(HEADERASSTRING("uber-trace-id"),":")) as trace_id FROM
traces LIMIT 100;
```


Apache Kafka headers query header with SQL

4. From the Jaeger client ensure you can see traces for the service lenses.io_test

Kafka Headers Distributed Trace

5. Take one of the trace IDs returned in the Lenses SQL response and use it to lookup the trace. 

Lenses.io Find Jaeger Trace Kafka Headers

Next Steps

Explore the full range of SQL use cases for Kafka. Or start a free Trial against your own Kafka cluster

Back to all blogs

Related Blogs

Lenses 6.2 Oauth
Lenses 6.2 Oauth
Blog

Lenses 6.2 - Trusting Agents to build & operate event-driven applications

andrew
andrew
By
Andrew Stevenson
image
image
Blog

Kafka Migrations Need More Than a Replicator

Jonas Best Profile Picture
Jonas Best Profile Picture
By
Jonas Best
kafkaconnections hero banner
kafkaconnections hero banner
Blog

Self-Service Data Replication with K2K - part 1

Drew Oetzel
Drew Oetzel
By
Drew Oetzel

Lenses, autonomy in data streaming

Install now
Products
Developer Experience
Kafka replicator
Lenses AI
Kafka Connectors
Pricing
Company
About
Careers
Contact
Solutions by industry
Financial services
For engineers
Docs
Ask Marios Discourse
Github
Slack
For executives
Case studies
Resources
Blog
Press room
Events
LinkedIn
Youtube
Legal
Terms
Privacy
Cookies
SLAs
EULA
© 2026Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation