Kafka Connect HTTP Plugin

Kafka Connect connectors enabling Kafka integration with external systems via HTTP

License

License

Categories

Categories

ORM Data
GroupId

GroupId

com.github.castorm
ArtifactId

ArtifactId

kafka-connect-http-plugin
Last Version

Last Version

0.5.0
Release Date

Release Date

Type

Type

jar
Description

Description

Kafka Connect HTTP Plugin
Kafka Connect connectors enabling Kafka integration with external systems via HTTP
Project Organization

Project Organization

CastorM
Source Code Management

Source Code Management

https://github.com/castorm/kafka-connect-http-plugin

Download kafka-connect-http-plugin

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.castorm/kafka-connect-http-plugin/ -->
<dependency>
    <groupId>com.github.castorm</groupId>
    <artifactId>kafka-connect-http-plugin</artifactId>
    <version>0.5.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.castorm/kafka-connect-http-plugin/
implementation 'com.github.castorm:kafka-connect-http-plugin:0.5.0'
// https://jarcasting.com/artifacts/com.github.castorm/kafka-connect-http-plugin/
implementation ("com.github.castorm:kafka-connect-http-plugin:0.5.0")
'com.github.castorm:kafka-connect-http-plugin:jar:0.5.0'
<dependency org="com.github.castorm" name="kafka-connect-http-plugin" rev="0.5.0">
  <artifact name="kafka-connect-http-plugin" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.castorm', module='kafka-connect-http-plugin', version='0.5.0')
)
libraryDependencies += "com.github.castorm" % "kafka-connect-http-plugin" % "0.5.0"
[com.github.castorm/kafka-connect-http-plugin "0.5.0"]

Dependencies

compile (6)

Group / Artifact Type Version
org.slf4j : slf4j-api jar 1.7.30
com.squareup.okhttp3 : okhttp jar 4.6.0
com.squareup.okhttp3 : logging-interceptor jar 4.6.0
com.fasterxml.jackson.core : jackson-databind jar 2.11.0
org.freemarker : freemarker jar 2.3.30
com.joestelmach : natty jar 0.13

provided (3)

Group / Artifact Type Version
org.projectlombok : lombok jar 1.18.12
org.apache.kafka : connect-api jar 2.5.0
ch.qos.logback : logback-classic jar 1.2.3

test (5)

Group / Artifact Type Version
org.junit.jupiter : junit-jupiter-engine jar 5.6.2
org.junit.jupiter : junit-jupiter-api jar 5.6.2
org.mockito : mockito-junit-jupiter jar 3.3.3
org.assertj : assertj-core jar 3.16.0
com.google.guava : guava jar 29.0-jre

Project Modules

There are no modules declared in this project.

Kafka Connect HTTP Connector

Build Codacy Badge FOSSA Status Release to GitHub Release to Maven Central Maven Central

Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.

This connector is for you if

  • You want to (live) replicate a dataset exposed through JSON/HTTP API
  • You want to do so efficiently
  • You want to capture only changes, not full snapshots
  • You want to do so via configuration, with no custom coding
  • You want to be able to extend the connector if it comes to that

Examples

See examples, e.g.

Getting Started

If your Kafka Connect deployment is automated and packaged with Maven, you can unpack the artifact on Kafka Connect plugins folder.

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <execution>
        <id>copy-kafka-connect-plugins</id>
        <phase>prepare-package</phase>
        <goals>
            <goal>unpack</goal>
        </goals>
        <configuration>
            <outputDirectory>${project.build.directory}/docker-build/plugins</outputDirectory>
            <artifactItems>
                <artifactItem>
                    <groupId>com.github.castorm</groupId>
                    <artifactId>kafka-connect-http</artifactId>
                    <version>0.7.6</version>
                    <type>tar.gz</type>
                    <classifier>plugin</classifier>
                </artifactItem>
            </artifactItems>
        </configuration>
    </execution>
</plugin>

Otherwise, you'll have to do it manually by downloading the package from the Releases Page.

More details on how to Install Connectors.

Source Connector

com.github.castorm.kafka.connect.http.HttpSourceConnector

Extension points

The connector can be easily extended by implementing your own version of any of the components below.

These are better understood by looking at the source task implementation:

public List<SourceRecord> poll() throws InterruptedException {

    throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));

    HttpRequest request = requestFactory.createRequest(offset);

    HttpResponse response = requestExecutor.execute(request);

    List<SourceRecord> records = responseParser.parse(response);

    return recordSorter.sort(records).stream()
            .filter(recordFilterFactory.create(offset))
            .collect(toList());
}

public void commitRecord(SourceRecord record) {
    offset = Offset.of(record.sourceOffset(), record.timestamp());
}

Timer: Throttling HttpRequests

Controls the rate at which HTTP requests are executed by informing the task, how long until the next execution is due.

http.timer

public interface Timer extends Configurable {

    Long getRemainingMillis();

    default void reset(Instant lastZero) {
        // Do nothing
    }
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer
  • Available implementations:
    • com.github.castorm.kafka.connect.timer.FixedIntervalTimer
    • com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer

Throttling HttpRequests with FixedIntervalThrottler

Throttles rate of requests based on a fixed interval.

http.timer.interval.millis

Interval in between requests

  • Type: Long
  • Default: 60000

Throttling HttpRequests with AdaptableIntervalThrottler

Throttles rate of requests based on a fixed interval. However, it has two modes of operation, with two different intervals:

  • Up to date: No new records, or they have been created since last poll
  • Catching up: There were new records in last poll but they were created long ago (longer than interval)
http.timer.interval.millis

Interval in between requests when up-to-date

  • Type: Long
  • Default: 60000
http.timer.catchup.interval.millis

Interval in between requests when catching up

  • Type: Long
  • Default: 30000

HttpRequestFactory: Creating a HttpRequest

The first thing our connector will need to do is creating a HttpRequest.

http.request.factory

public interface HttpRequestFactory extends Configurable {

    HttpRequest createRequest(Offset offset);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory
  • Available implementations:
    • com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory

http.offset.initial

Initial offset, comma separated list of pairs.

  • Example: property1=value1, property2=value2
  • Type: String
  • Default: ""

Creating a HttpRequest with TemplateHttpRequestFactory

This HttpRequestFactory is based on template resolution.

http.request.method

Http method to use in the request.

  • Type: String
  • Default: GET
http.request.url

Http url to use in the request.

  • Required
  • Type: String
http.request.headers

Http headers to use in the request, , separated list of : separated pairs.

  • Example: Name: Value, Name2: Value2
  • Type: String
  • Default: ""
http.request.params

Http query parameters to use in the request, & separated list of = separated pairs.

  • Example: name=value & name2=value2
  • Type: String
  • Default: ""
http.request.body

Http body to use in the request.

  • Type: String
  • Default: ""
http.request.template.factory
public interface TemplateFactory {

    Template create(String template);
}

public interface Template {

    String apply(Offset offset);
}

Class responsible for creating the templates that will be used on every request.

Creating a HttpRequest with FreeMarkerTemplateFactory

FreeMarker templates will have the following data model available:

  • offset
    • key
    • timestamp
    • ... (custom offset properties)

Accessing any of the above withing a template can be achieved like this:

http.request.params=after=${offset.timestamp}

For a complete understanding of the features provided by FreeMarker, please, refer to the User Manual


HttpClient: Executing a HttpRequest

Once our HttpRequest is ready, we have to execute it to get some results out of it. That's the purpose of the HttpClient

http.client

public interface HttpClient extends Configurable {

    HttpResponse execute(HttpRequest request) throws IOException;
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.client.okhttp.OkHttpClient
  • Available implementations:
    • com.github.castorm.kafka.connect.http.client.okhttp.OkHttpClient

Executing a HttpRequest with OkHttpClient

Uses a OkHttp client.

http.client.connection.timeout.millis

Timeout for opening a connection

  • Type: Long
  • Default: 2000
http.client.read.timeout.millis

Timeout for reading a response

  • Type: Long
  • Default: 2000
http.client.connection.ttl.millis

Time to live for the connection

  • Type: Long
  • Default: 300000
http.client.max-idle

Maximum number of idle connections in the connection pool

  • Type: Integer
  • Default: 1

HttpAuthenticator: Authenticating a HttpRequest

When executing the request, authentication might be required. The HttpAuthenticator is responsible for resolving the authentication header to be included in the request HttpAuthenticator

http.auth

public interface HttpAuthenticator extends Configurable {

    Optional<String> getAuthorizationHeader();
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator
  • Available implementations:
    • com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator
    • com.github.castorm.kafka.connect.http.auth.NoneHttpAuthenticator
    • com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator

Authenticating a HttpRequest with ConfigurableHttpAuthenticator

Allows selecting the athentication type via configuration property

http.auth.type

Type of authentication

  • Type: String
  • Default: None
  • Available options:
    • None
    • Basic

Authenticating a HttpRequest with BasicHttpAuthenticator

Allows selecting the athentication type via configuration property

http.auth.user
  • Type: String
  • Default: ``
http.auth.password
  • Type: String
  • Default: ``

HttpResponseParser: Parsing a HttpResponse

Once our HttpRequest has been executed, as a result we'll have to deal with a HttpResponse and translate it into the list of SourceRecords expected by Kafka Connect.

http.response.parser

public interface HttpResponseParser extends Configurable {

    List<SourceRecord> parse(HttpResponse response);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.PolicyHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.PolicyHttpResponseParser
    • com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

Parsing a HttpResponse with PolicyHttpResponseParser

Vets the HTTP response deciding whether the response should be processed, skipped or failed. This decision is delegated to a HttpResponsePolicy. When the decision is to process the response, this processing is delegated to a secondary HttpResponseParser.

HttpResponsePolicy: Vetting a HttpResponse
http.response.policy
public interface HttpResponsePolicy extends Configurable {

    HttpResponseOutcome resolve(HttpResponse response);

    enum HttpResponseOutcome {
        PROCESS, SKIP, FAIL
    }
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.StatusCodeHttpResponsePolicy
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.StatusCodeHttpResponsePolicy
http.response.policy.parser
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
Vetting a HttpResponse with StatusCodeHttpResponsePolicy

Does response vetting based on HTTP status codes in the response and the configuration below.

http.response.policy.codes.process

Comma separated list of code ranges that will result in the parser processing the response

  • Example: 200..205, 207..210
  • Type: String
  • Default: 200..299
http.response.policy.codes.skip

Comma separated list of code ranges that will result in the parser skipping the response

  • Example: 300..305, 307..310
  • Type: String
  • Default: 300..399

Parsing a HttpResponse with KvHttpResponseParser

Parses the HTTP response into a key-value SourceRecord. This process is decomposed in two steps:

  • Parsing the HttpResponse into a KvRecord
  • Mapping the KvRecord into a SourceRecord
http.response.record.parser
public interface KvRecordHttpResponseParser extends Configurable {

    List<KvRecord> parse(HttpResponse response);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser
http.response.record.mapper
public interface KvSourceRecordMapper extends Configurable {

    SourceRecord map(KvRecord record);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper Maps key to a Struct schema with a single property key, and value to a Struct schema with a single property value
    • com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper Maps both key and value to a String schema
Parsing a HttpResponse with JacksonKvRecordHttpResponseParser

Uses Jackson to look for the records in the response.

http.response.list.pointer

JsonPointer to the property in the response body containing an array of records

  • Example: /items
  • Type: String
  • Default: /
http.response.record.pointer

JsonPointer to the individual record to be used as kafka record body. Useful when the object we are interested in is under a nested structure

  • Type: String
  • Default: /
http.response.record.key.pointer

JsonPointer to the comma separated list of properties that compound, uniquely identify the individual record to be used as key in kafka record key This is especially important on partitioned topics

  • Example: /id
  • Type: String
  • Default: ""
http.response.record.timestamp.pointer

JsonPointer to the timestamp of the individual record to be used as kafka record timestamp This is especially important to track progress, enable latency calculations, improved throttling and feedback to TemplateHttpRequestFactory

  • Type: String
  • Default: ""
http.response.record.timestamp.parser

Class responsible for converting the timestamp property captured above into a java.time.Instant.

  • Type: String
  • Default: com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampParser Implementation that captures the timestamp as an epoch millis long
    • com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParser Implementation that tries to capture as epoch millis or delegates to another parser in case of failure
    • com.github.castorm.kafka.connect.http.response.timestamp.DateTimeFormatterTimestampParser Implementation based on based on a DateTimeFormatter
    • com.github.castorm.kafka.connect.http.response.timestamp.NattyTimestampParser Implementation based on Natty parser
http.response.record.timestamp.parser.pattern

When using DateTimeFormatterTimestampParser, a custom pattern can be specified

  • Type: String
  • Default: yyyy-MM-dd'T'HH:mm:ss[.SSS]X
http.response.record.timestamp.parser.zone

Timezone of the timestamp. Accepts ZoneId valid identifiers

  • Type: String
  • Default: UTC
http.response.record.offset.pointer

Comma separated list of key=/value pairs where the key is the name of the property in the offset, and the value is the JsonPointer to the value being used as offset for future requests This is the mechanism that enables sharing state in between HttpRequests. HttpRequestFactory implementations receive this Offset.

One of the roles of the offset, even if not required for preparing the next request, is helping in deduplication of already seen records, by providing a sense of progress, assuming consistent ordering. (e.g. even if the response returns some repeated results in between requests because they have the same timestamp, anything prior to the last seen offset will be ignored). see OffsetFilterFactory

  • Example: id=/itemId
  • Type: String
  • Default: ""

Mapping a KvRecord into SourceRecord with SimpleKvSourceRecordMapper

Once we have our KvRecord we have to translate it into what Kafka Connect is expecting: SourceRecords

Embeds the record properties into a common simple envelope to enable schema evolution. This envelope simply contains a key and a value properties with customizable field names.

Here is also where we'll tell Kafka Connect to what topic and on what partition do we want to send our record.

kafka.topic

Name of the topic where the record will be sent to

  • Required
  • Type: String
  • Default: ""
http.record.schema.key.property.name

Name of the key property in the key-value envelope

  • Type: String
  • Default: key
http.record.schema.value.property.name

Name of the value property in the key-value envelope

  • Type: String
  • Default: value

SourceRecordSorter: Sorting SourceRecords

Some Http resources not designed for CDC, return snapshots with most recent records first. In this cases de-duplication is especially important, as subsequent request are likely to produce similar results. The de-duplication mechanisms offered by this connector are order-dependent, as they are usually based on timestamps.

To enable de-duplication in cases like this, we can instruct the connector to assume a specific order direction, either ASC, DESC, or IMPLICIT, where implicit figures it out based on records' timestamps.

http.record.sorter

public interface SourceRecordSorter extends Configurable {

    List<SourceRecord> sort(List<SourceRecord> records);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter

http.response.list.order.direction

Order direction of the results in the response list.

  • Type: Enum { ASC, DESC, IMPLICIT }
  • Default: IMPLICIT

SourceRecordFilterFactory: Filtering out SourceRecord

There are cases when we'll be interested in filtering out certain records. One of these would be de-duplication.

http.record.filter.factory

public interface SourceRecordFilterFactory extends Configurable {

    Predicate<SourceRecord> create(Offset offset);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.OffsetRecordFilterFactory
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.OffsetRecordFilterFactory
    • com.github.castorm.kafka.connect.http.record.OffsetTimestampRecordFilterFactory
    • com.github.castorm.kafka.connect.http.record.PassthroughRecordFilterFactory

Filtering out SourceRecord with OffsetTimestampRecordFilterFactory

De-duplicates based on Offset's timestamp, filtering out records with earlier or the same timestamp. Useful when timestamp is used to filter the HTTP resource, but the filter does not have full timestamp precision. Assumptions:

  • Records are ordered by timestamp
  • No two records can contain the same timestamp (to whatever precision the HTTP resource uses)

If the latter assumption cannot be satisfied, check OffsetRecordFilterFactory to try and prevents data loss.

Filtering out SourceRecord with OffsetRecordFilterFactory

De-duplicates based on Offset's timestamp, key and any other custom property present in the Offset, filtering out records with earlier timestamps, or when in the same timestamp, only those up to the last seen Offset properties. Useful when timestamp alone is not unique but together with some other Offset property is. Assumptions:

  • Records are ordered by timestamp
  • There is an Offset property that uniquely identify records (e.g. key)
  • There won't be new items preceding already seen ones

Development

Building

mvn package

Running the tests

mvn test

Releasing

Contributing

Contributions are welcome via pull requests, pending definition of code of conduct.

Versioning

We use SemVer for versioning.

Authors

  • Cástor Rodríguez - Only contributor so far - castorm

License

This project is licensed under the Apache 2.0 License - see the LICENSE.txt file for details

Built With

Acknowledgments

Versions

Version
0.5.0
0.4.0
0.3.5
0.3.4
0.2.6