Infinispan-Kafka

Kafka Connector for Infinispan

License

License

Categories

Categories

Infinispan Data Caching
GroupId

GroupId

org.infinispan.kafka
ArtifactId

ArtifactId

infinispan-kafka
Last Version

Last Version

0.7
Release Date

Release Date

Type

Type

jar
Description

Description

Infinispan-Kafka
Kafka Connector for Infinispan
Project Organization

Project Organization

JBoss, a division of Red Hat

Download infinispan-kafka

How to add to project

<!-- https://jarcasting.com/artifacts/org.infinispan.kafka/infinispan-kafka/ -->
<dependency>
    <groupId>org.infinispan.kafka</groupId>
    <artifactId>infinispan-kafka</artifactId>
    <version>0.7</version>
</dependency>
// https://jarcasting.com/artifacts/org.infinispan.kafka/infinispan-kafka/
implementation 'org.infinispan.kafka:infinispan-kafka:0.7'
// https://jarcasting.com/artifacts/org.infinispan.kafka/infinispan-kafka/
implementation ("org.infinispan.kafka:infinispan-kafka:0.7")
'org.infinispan.kafka:infinispan-kafka:jar:0.7'
<dependency org="org.infinispan.kafka" name="infinispan-kafka" rev="0.7">
  <artifact name="infinispan-kafka" type="jar" />
</dependency>
@Grapes(
@Grab(group='org.infinispan.kafka', module='infinispan-kafka', version='0.7')
)
libraryDependencies += "org.infinispan.kafka" % "infinispan-kafka" % "0.7"
[org.infinispan.kafka/infinispan-kafka "0.7"]

Dependencies

compile (6)

Group / Artifact Type Version
org.infinispan : infinispan-client-hotrod jar 10.1.5.Final
org.infinispan : infinispan-query-dsl jar 10.1.5.Final
org.infinispan : infinispan-query jar 10.1.5.Final
org.infinispan : infinispan-remote-query-client jar 10.1.5.Final
org.infinispan.protostream : protostream jar 4.3.2.Final
com.fasterxml.jackson.core : jackson-databind jar 2.10.2

provided (1)

Group / Artifact Type Version
org.apache.kafka : connect-api jar 2.4.1

test (6)

Group / Artifact Type Version
junit : junit jar 4.13
org.infinispan : infinispan-server-runtime jar 10.1.5.Final
org.infinispan : infinispan-core test-jar 10.1.5.Final
org.apache.logging.log4j : log4j-api jar 2.13.0
org.apache.logging.log4j : log4j-core jar 2.13.0
org.apache.logging.log4j : log4j-jcl jar 2.13.0

Project Modules

There are no modules declared in this project.

Kafka Connector for Infinispan

Introduction

This is a Kafka Connector to connect to an Infinispan instance (domain or standalone). For more information about Kafka Connect, take a look here

Running

mvn clean package

then in core folder

export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"

and you're now able to run from top folder

$KAFKA_HOME/bin/connect-standalone $KAFKA_HOME/config/connect-standalone.properties config/InfinispanSinkConnector.properties

Status

  • Currently only the Sink Connector (from Kafka to Infinispan) has been developed and still need work.

Sink Connector Properties

Name Description Type Default Importance
infinispan.connection.hosts List of comma separated Infinispan hosts string localhost high
infinispan.connection.hotrod.port Infinispan Hot Rod port int 11222 high
infinispan.connection.cache.name Infinispan Cache name of use String default medium
infinispan.use.proto If true, the Remote Cache Manager will be configured to use protostream schemas boolean false medium
infinispan.proto.marshaller.class If infinispan.use.proto is true, this option has to contain an annotated protostream class to be used Class String.class medium
infinispan.cache.force.return.values By default, previously existing values for Map operations are not returned, if set to true the values will be returned boolean false low
infinispan.use.lifespan If true, the Remote Cache Manager will be configured to use Lifespan associated to cache entries boolean false low
infinispan.use.maxidle If true, the Remote Cache Manager will be configured to use Max idle value associated to cache entries boolean false low
infinispan.cache.lifespan.entry If infinispan.use.lifespan is true, this option has to the lifespan associated with the entries to be stored (in seconds) long false low
infinispan.cache.maxidle.entry If infinispan.use.maxidle is true, this option has to the max idle associated with the entries to be stored (in seconds) long false low
infinispan.hotrod.protocol.version The infinispan hotrod client protocol version to use String DEFAULT_PROTOCOL_VERSION low
infinispan.hotrod.socket_timeout The infinispan hotrod client timeout for socket read/writes int 5000 low
infinispan.hotrod.connect_timeout The infinispan hotrod client timeout for connections int 5000 low
infinispan.hotrod.max_retries The infinispan hotrod client maximum number of operation retries int 5 low

Configuration example

Suppose you want to store in your Infinispan cache object of kind Author. Here is Protostream annotated Author class:

package org.infinispan.kafka;

import java.io.Serializable;

import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;

@ProtoDoc("@Indexed")
public class Author implements Serializable {

   private String name;

   @ProtoField(number = 1, required = true)
   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   @Override
   public String toString() {
      return "Author [name=" + name + "]";
   }
}

You can then define the following configuration for your Infinispan Sink Connector

name=InfinispanSinkConnector
topics=mytopic
tasks.max=1
connector.class=org.infinispan.kafka.InfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

infinispan.connection.hosts=127.0.0.1
infinispan.connection.hotrod.port=11222
infinispan.connection.cache.name=default
infinispan.cache.force.return.values=false
infinispan.use.proto=true
infinispan.proto.marshaller.class=org.infinispan.kafka.Author

At this point you will be able to run your connector. You'll need a running Infinispan server and a running Kafka server. In your Infinispan Kafka working directory run:

mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$KAFKA_HOME/bin/connect-standalone $KAFKA_HOME/config/connect-standalone.properties config/InfinispanSinkConnector.properties

At this point you can try to send some messages to Kafka:

package org.infinispan.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class SimpleProducer {

    public static void main(String[] args) throws JsonProcessingException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
        Author author = new Author();
        author.setName("oscerd");
        
        ObjectMapper mapper = new ObjectMapper();

        prod.send(new ProducerRecord<String, String>("test", "key1", mapper.writeValueAsString(author)));

        prod.close();
    }
}

In your Infinispan default cache you should now have a key/value pair with key1 as key and an Author object as value.

Releasing

Make sure MAVEN_HOME/conf/settings.xml contains credentials for the release repository. Add the following section in <servers>:

<server>
   <id>jboss-snapshots-repository</id>
   <username>RELEASE_USER</username>
   <password>RELEASE_PASS</password>
</server>
<server>
   <id>jboss-releases-repository</id>
   <username>RELEASE_USER</username>
   <password>RELEASE_PASS</password>
</server>
  • Run mvn release:prepare release:perform -B
org.infinispan.kafka

Infinispan

Infinispan is a distributed in-memory key/value data store with optional schema, available under the Apache License 2.0.

Versions

Version
0.7
0.6
0.5
0.4
0.3
0.2
0.1