flink-elasticsearch2-connector

ElasticSearchOutputFormat for Flink DataSet, Through the ElasticSearchOutputFormat, we can save Flink DataSet to elasticsearch.

License

License

Categories

Categories

H2 Data Databases Search Business Logic Libraries Elasticsearch
GroupId

GroupId

com.iteblog
ArtifactId

ArtifactId

flink-elasticsearch2-connector
Last Version

Last Version

1.0.2
Release Date

Release Date

Type

Type

jar
Description

Description

flink-elasticsearch2-connector
ElasticSearchOutputFormat for Flink DataSet, Through the ElasticSearchOutputFormat, we can save Flink DataSet to elasticsearch.
Project URL

Project URL

https://www.iteblog.com
Source Code Management

Source Code Management

https://github.com/397090770/flink-elasticsearch2-connector

Download flink-elasticsearch2-connector

How to add to project

<!-- https://jarcasting.com/artifacts/com.iteblog/flink-elasticsearch2-connector/ -->
<dependency>
    <groupId>com.iteblog</groupId>
    <artifactId>flink-elasticsearch2-connector</artifactId>
    <version>1.0.2</version>
</dependency>
// https://jarcasting.com/artifacts/com.iteblog/flink-elasticsearch2-connector/
implementation 'com.iteblog:flink-elasticsearch2-connector:1.0.2'
// https://jarcasting.com/artifacts/com.iteblog/flink-elasticsearch2-connector/
implementation ("com.iteblog:flink-elasticsearch2-connector:1.0.2")
'com.iteblog:flink-elasticsearch2-connector:jar:1.0.2'
<dependency org="com.iteblog" name="flink-elasticsearch2-connector" rev="1.0.2">
  <artifact name="flink-elasticsearch2-connector" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.iteblog', module='flink-elasticsearch2-connector', version='1.0.2')
)
libraryDependencies += "com.iteblog" % "flink-elasticsearch2-connector" % "1.0.2"
[com.iteblog/flink-elasticsearch2-connector "1.0.2"]

Dependencies

compile (2)

Group / Artifact Type Version
org.elasticsearch : elasticsearch jar 2.3.4
org.apache.flink : flink-core jar 1.1.2

test (1)

Group / Artifact Type Version
junit : junit jar 4.4

Project Modules

There are no modules declared in this project.

flink-elasticsearch2-connector

Flink DataSet ElasticSearchOutputFormat create by https://www.iteblog.com based on org.apache.flink#flink-connector-elasticsearch2_2.10#1.1.2, We can use it in Scala or Java. Through the ElasticSearchOutputFormat, we can save Flink DataSet to elasticsearch.

Usage

Environment

Elasticsearch: 2.x.x

Flink: 1.x.x

Scala: 2.10.x

pom.xml

<dependency>
       <groupId>com.iteblog</groupId>
       <artifactId>flink-elasticsearch2-connector</artifactId>
       <version>1.0.1</version>
</dependency>

Using in Scala

import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")
val hosts = "www.iteblog.com"

val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toList

val data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {
      def createIndexRequest(element: String): IndexRequest = {
        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }

      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

Using in Java

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1000");
config.put("cluster.name", "elasticsearch");

String hosts = "www.iteblog.com";

List<InetSocketAddress> list = Lists.newArrayList();
for (String host : hosts.split(",")) {
    list.add(new InetSocketAddress(InetAddress.getByName(host), 9300));
}

DataSet<String> data  = ....;

data.output(new ElasticSearchOutputFormat<>(config, list, new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }

    private IndexRequest createIndexRequest(String element) {
        return Requests.indexRequest().index("iteblog").type("info").source(element);
    }
}));

Versions

Version
1.0.2
1.0.1