Apache Flink, Cassandra keyspace cluster

The extension for flink cassandra connector that lets you specify default cassandra keyspace.

License

License

Categories

Categories

Cassandra Data Databases KeY Data Formats Formal Verification
GroupId

GroupId

com.github.szczurmys
ArtifactId

ArtifactId

flink-cassandra-keyspace-cluster
Last Version

Last Version

1.1.0
Release Date

Release Date

Type

Type

jar
Description

Description

Apache Flink, Cassandra keyspace cluster
The extension for flink cassandra connector that lets you specify default cassandra keyspace.
Project URL

Project URL

https://github.com/szczurmys/flink-cassandra-keyspace-cluster
Source Code Management

Source Code Management

http://github.com/szczurmys/flink-cassandra-keyspace-cluster/tree/master

Download flink-cassandra-keyspace-cluster

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.szczurmys/flink-cassandra-keyspace-cluster/ -->
<dependency>
    <groupId>com.github.szczurmys</groupId>
    <artifactId>flink-cassandra-keyspace-cluster</artifactId>
    <version>1.1.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.szczurmys/flink-cassandra-keyspace-cluster/
implementation 'com.github.szczurmys:flink-cassandra-keyspace-cluster:1.1.0'
// https://jarcasting.com/artifacts/com.github.szczurmys/flink-cassandra-keyspace-cluster/
implementation ("com.github.szczurmys:flink-cassandra-keyspace-cluster:1.1.0")
'com.github.szczurmys:flink-cassandra-keyspace-cluster:jar:1.1.0'
<dependency org="com.github.szczurmys" name="flink-cassandra-keyspace-cluster" rev="1.1.0">
  <artifact name="flink-cassandra-keyspace-cluster" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.szczurmys', module='flink-cassandra-keyspace-cluster', version='1.1.0')
)
libraryDependencies += "com.github.szczurmys" % "flink-cassandra-keyspace-cluster" % "1.1.0"
[com.github.szczurmys/flink-cassandra-keyspace-cluster "1.1.0"]

Dependencies

provided (1)

Group / Artifact Type Version
org.apache.flink : flink-connector-cassandra_2.11 jar 1.5.0

test (3)

Group / Artifact Type Version
junit : junit jar 4.12
org.assertj : assertj-core jar 3.10.0
org.mockito : mockito-all jar 1.10.19

Project Modules

There are no modules declared in this project.

flink-cassandra-keyspace-cluster

Apache License, Version 2.0, January 2004 Maven Central

An extension for flink cassandra connector that lets you specify default cassandra keyspace.


For flink version >= 1.6 you should not use it, because it does not work.
They added defaultKeyspace parameter for connector builder:

        CassandraSink.addSink(dataSource)
                //...
                .setDefaultKeyspace("Your default keyspace")
                //...
                .build();

The main adventages (for flink version < 1.6) of the KeyspaceClusterBuilder is that it allows you to use POJO (using mappers http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html) without defining constant keyspace, and you can get keyspace from properties.


Example:

public class Main {
    public void main(String[] args) {
        String keyspace = "keyspace_flink";
        if(args.length > 0) {
            keyspace = args[0];
        }

        final TypeCodec<LocalDateTime> localDateTimeCodec = null; //Own codec

        //...
        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(
            Pojo.class, 
            new KeyspaceClusterBuilder(keyspace) {
                @Override
                protected Cluster.Builder filledBuilder(Cluster.Builder builder) {
                    return builder.addContactPoint("localhost");
                }
                @Override
                protected void configureCluster(Cluster cluster) {
                    cluster
                        .getConfiguration()
                        .getCodecRegistry()
                        .register(localDateTimeCodec);
                }
            }
        );
        //...
    }
}

@Table(name = "test")
public class Pojo implements Serializable {
    //...
}

In standard solution you have to define constant keyspace in annotation:

public class Main {
    public void main(String[] args) {
        //...

        final TypeCodec<LocalDateTime> localDateTimeCodec = null; //Own codec

        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(
            Pojo.class, 
            new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    Cluster cluster = builder.addContactPoint("localhost").build();
                    cluster
                        .getConfiguration()
                        .getCodecRegistry()
                        .register(localDateTimeCodec);
                    return cluster;
                }
            }
        );
        //...
    }
}

@Table(keyspace = "keyspace_flink", name = "test")
public class Pojo implements Serializable {
    //...
}

Versions

Version
1.1.0
1.0.0