io.opentracing.contrib:opentracing-kafka-spring

OpenTracing Instrumentation for Spring Kafka

License

License

GroupId

GroupId

io.opentracing.contrib
ArtifactId

ArtifactId

opentracing-kafka-spring
Last Version

Last Version

0.1.15
Release Date

Release Date

Type

Type

jar
Description

Description

io.opentracing.contrib:opentracing-kafka-spring
OpenTracing Instrumentation for Spring Kafka

Download opentracing-kafka-spring

How to add to project

<!-- https://jarcasting.com/artifacts/io.opentracing.contrib/opentracing-kafka-spring/ -->
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-spring</artifactId>
    <version>0.1.15</version>
</dependency>
// https://jarcasting.com/artifacts/io.opentracing.contrib/opentracing-kafka-spring/
implementation 'io.opentracing.contrib:opentracing-kafka-spring:0.1.15'
// https://jarcasting.com/artifacts/io.opentracing.contrib/opentracing-kafka-spring/
implementation ("io.opentracing.contrib:opentracing-kafka-spring:0.1.15")
'io.opentracing.contrib:opentracing-kafka-spring:jar:0.1.15'
<dependency org="io.opentracing.contrib" name="opentracing-kafka-spring" rev="0.1.15">
  <artifact name="opentracing-kafka-spring" type="jar" />
</dependency>
@Grapes(
@Grab(group='io.opentracing.contrib', module='opentracing-kafka-spring', version='0.1.15')
)
libraryDependencies += "io.opentracing.contrib" % "opentracing-kafka-spring" % "0.1.15"
[io.opentracing.contrib/opentracing-kafka-spring "0.1.15"]

Dependencies

compile (3)

Group / Artifact Type Version
io.opentracing.contrib : opentracing-kafka-client jar 0.1.15
io.opentracing : opentracing-api jar 0.33.0
io.opentracing : opentracing-util jar 0.33.0

provided (2)

Group / Artifact Type Version
org.springframework.kafka : spring-kafka jar 2.6.1
org.springframework : spring-aspects jar 5.2.7.RELEASE

test (8)

Group / Artifact Type Version
org.awaitility : awaitility jar 4.0.2
com.fasterxml.jackson.core : jackson-databind jar 2.10.0
io.opentracing : opentracing-mock jar 0.33.0
org.springframework.kafka : spring-kafka-test jar 2.6.1
org.apache.kafka : kafka_2.13 jar 2.6.0
org.apache.kafka : kafka_2.13 jar 2.6.0
org.apache.kafka : kafka-clients jar 2.6.0
junit : junit jar 4.13.1

Project Modules

There are no modules declared in this project.

Build Status Coverage Status Released Version Apache-2.0 license

OpenTracing Apache Kafka Client Instrumentation

OpenTracing instrumentation for Apache Kafka Client.
Two solutions are provided:

  1. Based on decorated Producer and Consumer
  2. Based on Interceptors

Requirements

  • Java 8
  • Kafka 2.2.0

Installation

Kafka Client

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>VERSION</version>
</dependency>

Kafka Streams

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-streams</artifactId>
    <version>VERSION</version>
</dependency>

Spring Kafka

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-spring</artifactId>
    <version>VERSION</version>
</dependency>

Usage

// Instantiate tracer
Tracer tracer = ...

// Optionally register tracer with GlobalTracer
GlobalTracer.register(tracer);

Kafka Client

Decorators based solution

// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, 
        tracer);

// Send
tracingProducer.send(...);

// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Decorate KafkaConsumer with TracingKafkaConsumer
TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, 
        tracer);

//Subscribe
tracingConsumer.subscribe(Collections.singletonList("messages"));

// Get records
ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);

// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Custom Span Names for Decorators based solution

The decorator-based solution includes support for custom span names by passing in a BiFunction object as an additional argument to the TracingKafkaConsumer or TracingKafkaProducer constructors, either one of the provided BiFunctions or your own custom one.

// Create BiFunction for the KafkaProducer that operates on
// (String operationName, ProducerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, 
        tracer,
        producerSpanNameProvider);
// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.


// Create BiFunction for the KafkaConsumer that operates on
// (String operationName, ConsumerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();
// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
// Decorate KafkaConsumer with TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction
TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, 
        tracer,
        consumerSpanNameProvider);
// Spans created by the tracingConsumer will now have the capitalized operation name as the span name.
// "receive" -> "RECEIVE"

Interceptors based solution

// Register tracer with GlobalTracer:
GlobalTracer.register(tracer);

// Add TracingProducerInterceptor to sender properties:
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
          TracingProducerInterceptor.class.getName());

// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Send
producer.send(...);

// Add TracingConsumerInterceptor to consumer properties:
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
          TracingConsumerInterceptor.class.getName());

// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

//Subscribe
consumer.subscribe(Collections.singletonList("messages"));

// Get records
ConsumerRecords<Integer, String> records = consumer.poll(1000);

// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

Kafka Streams

// Instantiate TracingKafkaClientSupplier
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);

// Provide supplier to KafkaStreams
KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
streams.start();

Spring Kafka

// Declare Tracer bean
@Bean
public Tracer tracer() {
  return ...
}


// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
  return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}

// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
  return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}

// Use decorated ProducerFactory in KafkaTemplate 
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

// Use an aspect to decorate @KafkaListeners
@Bean
public TracingKafkaAspect tracingKafkaAspect() {
  return new TracingKafkaAspect(tracer());
}
Custom Span Names for Spring Kafka

The Spring Kafka factory implementations include support for custom span names by passing in a BiFunction object as an additional argument to the TracingConsumerFactory or TracingProducerFactory constructors, either one of the provided BiFunctions or your own custom one.

// Create BiFunction for the KafkaProducerFactory that operates on
// (String operationName, ProducerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
  return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}
// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.


// Create BiFunction for the KafkaConsumerFactory that operates on
// (String operationName, ConsumerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
  return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}
// Consumers produced by the traced consumerFactory

Pre-made Span Name Providers

The following BiFunctions are already included in the ClientSpanNameProvider class, with CONSUMER_OPERATION_NAME and PRODUCER_OPERATION_NAME being the default should no spanNameProvider be provided:

  • CONSUMER_OPERATION_NAME and PRODUCER_OPERATION_NAME : Returns the operationName as the span name ("receive" for Consumer, "send" for producer).
  • CONSUMER_PREFIXED_OPERATION_NAME(String prefix) and PRODUCER_PREFIXED_OPERATION_NAME(String prefix) : Returns a String concatenation of prefix and operatioName.
  • CONSUMER_TOPIC and PRODUCER_TOPIC : Returns the Kafka topic name that the record was pushed to/pulled from (record.topic()).
  • PREFIXED_CONSUMER_TOPIC(String prefix) and PREFIXED_PRODUCER_TOPIC(String prefix) : Returns a String concatenation of prefix and the Kafka topic name (record.topic()).
  • CONSUMER_OPERATION_NAME_TOPIC and PRODUCER_OPERATION_NAME_TOPIC : Returns "operationName - record.topic()".
  • CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix) and PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix) : Returns a String concatenation of prefix and "operationName - record.topic()".

License

Apache 2.0 License.

io.opentracing.contrib

3rd-Party OpenTracing API Contributions

3rd-party contributions that use OpenTracing. **The repositories in this org are *not* affiliated with the CNCF.**

Versions

Version
0.1.15
0.1.14
0.1.13
0.1.12
0.1.11
0.1.10
0.1.9
0.1.8
0.1.7
0.1.6
0.1.4
0.1.3
0.1.2
0.1.1
0.1.0
0.0.20
0.0.19
0.0.18
0.0.17
0.0.16
0.0.15
0.0.14
0.0.13
0.0.12
0.0.11
0.0.10
0.0.9
0.0.8
0.0.7
0.0.6