flink-elasticsearch-source-connector


License

License

Categories

Categories

Search Business Logic Libraries Elasticsearch
GroupId

GroupId

com.mnubo
ArtifactId

ArtifactId

flink-elasticsearch-source-connector_2.11
Last Version

Last Version

1.0.1-flink1
Release Date

Release Date

Type

Type

jar
Description

Description

flink-elasticsearch-source-connector
flink-elasticsearch-source-connector
Project URL

Project URL

https://github.com/mnubo/flink-elasticsearch-source-connector
Project Organization

Project Organization

com.mnubo
Source Code Management

Source Code Management

http://github.com/mnubo/flink-elasticsearch-source-connector

Download flink-elasticsearch-source-connector_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.mnubo/flink-elasticsearch-source-connector_2.11/ -->
<dependency>
    <groupId>com.mnubo</groupId>
    <artifactId>flink-elasticsearch-source-connector_2.11</artifactId>
    <version>1.0.1-flink1</version>
</dependency>
// https://jarcasting.com/artifacts/com.mnubo/flink-elasticsearch-source-connector_2.11/
implementation 'com.mnubo:flink-elasticsearch-source-connector_2.11:1.0.1-flink1'
// https://jarcasting.com/artifacts/com.mnubo/flink-elasticsearch-source-connector_2.11/
implementation ("com.mnubo:flink-elasticsearch-source-connector_2.11:1.0.1-flink1")
'com.mnubo:flink-elasticsearch-source-connector_2.11:jar:1.0.1-flink1'
<dependency org="com.mnubo" name="flink-elasticsearch-source-connector_2.11" rev="1.0.1-flink1">
  <artifact name="flink-elasticsearch-source-connector_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.mnubo', module='flink-elasticsearch-source-connector_2.11', version='1.0.1-flink1')
)
libraryDependencies += "com.mnubo" % "flink-elasticsearch-source-connector_2.11" % "1.0.1-flink1"
[com.mnubo/flink-elasticsearch-source-connector_2.11 "1.0.1-flink1"]

Dependencies

compile (2)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.7
org.apache.flink : flink-scala_2.11 jar 1.0.3

test (3)

Group / Artifact Type Version
org.apache.flink : flink-clients_2.11 jar 1.0.3
org.elasticsearch : elasticsearch jar 1.5.2
org.scalatest : scalatest_2.11 jar 2.2.6

Project Modules

There are no modules declared in this project.

Apache Flink source connector for Elasticsearch

Allow to pipe the result of an Elasticsearch query into a Flink data set. Supports scala & java tuples, case classes, POJO, and a variable length result set called DataRow.

Usage:

buil.sbt

libraryDependencies += "com.mnubo" %% "flink-elasticsearch-source-connector" % "1.0.0-flink1"

then:

import com.mnubo.flink.streaming.connectors.DataRow
import com.mnubo.flink.streaming.connectors.elasticsearch.ElasticsearchDataset
import org.apache.flink.api.scala._

val esIndexName = "my_es_index"

val esNodeHostNames = Set("es_node_1", "es_node_2", "es_node_3")

val esHttpPort = 9200

val esQuery = """{"fields": ["some_string","some_boolean","some_long","some_date","sub_doc.sub_doc_id"]}"""

val dataSet = ElasticsearchDataset.fromElasticsearchQuery[DataRow](
  ExecutionEnvironment.getExecutionEnvironment,
  esIndexName,
  esQuery,
  esNodeHostNamess,
  esHttpPort
)

dataSet
  .groupBy("sub_doc.sub_doc_id")
  .sum(2)
  .print

The Elasticsearch query must contain a fields field.

Aggregations are not supported.

Tested with Elasticsearch 1.5.2, 1.7.5, and 2.3.3.

com.mnubo

mnubo SmartObjects™ GitHub

mnubo's SmartObjects Cloud provides Big Data Analytics for the Internet of Things.

Versions

Version
1.0.1-flink1
1.0.0-flink1