Spark Streaming Datasource Receiver

Datasource-Receiver is a library that allows the user to read data with Apache Spark Streaming from multiple datasource with Spark SQL.

License

License

Categories

Categories

Data
GroupId

GroupId

com.stratio.receiver
ArtifactId

ArtifactId

spark-datasource
Last Version

Last Version

0.1.0
Release Date

Release Date

Type

Type

jar
Description

Description

Spark Streaming Datasource Receiver
Datasource-Receiver is a library that allows the user to read data with Apache Spark Streaming from multiple datasource with Spark SQL.
Project URL

Project URL

https://github.com/Stratio/Datasource-Receiver
Project Organization

Project Organization

Stratio
Source Code Management

Source Code Management

https://github.com/Stratio/Datasource-Receiver.git

Download spark-datasource

How to add to project

<!-- https://jarcasting.com/artifacts/com.stratio.receiver/spark-datasource/ -->
<dependency>
    <groupId>com.stratio.receiver</groupId>
    <artifactId>spark-datasource</artifactId>
    <version>0.1.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.stratio.receiver/spark-datasource/
implementation 'com.stratio.receiver:spark-datasource:0.1.0'
// https://jarcasting.com/artifacts/com.stratio.receiver/spark-datasource/
implementation ("com.stratio.receiver:spark-datasource:0.1.0")
'com.stratio.receiver:spark-datasource:jar:0.1.0'
<dependency org="com.stratio.receiver" name="spark-datasource" rev="0.1.0">
  <artifact name="spark-datasource" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.stratio.receiver', module='spark-datasource', version='0.1.0')
)
libraryDependencies += "com.stratio.receiver" % "spark-datasource" % "0.1.0"
[com.stratio.receiver/spark-datasource "0.1.0"]

Dependencies

compile (1)

Group / Artifact Type Version
joda-time : joda-time jar 2.8.2

test (3)

Group / Artifact Type Version
org.scalatest : scalatest_2.10 jar 2.2.5
org.scalacheck : scalacheck_2.10 jar 1.11.3
junit : junit jar 4.12

Project Modules

  • spark-datasource_1.6
  • spark-datasource_1.5

Coverage Status

Datasource-Receiver

Datasource-Receiver is a library that allows the user to read data with SparkSQL from Datasources and then insert the data in Spark Streaming

Requirements

This library requires Spark 1.5+, Scala 2.10+

Using the library

There are two ways of using Datasource-Receiver library:

The first one is to add the next dependency in your pom.xml:

<dependency>
  <groupId>com.stratio.receiver</groupId>
  <artifactId>datasource</artifactId>
  <version>LATEST</version>
</dependency>

The other one is to clone the full repository and build the project:

git clone https://github.com/Stratio/Datasource-Receiver.git
mvn clean install

The basic idea is receive data in Spark Streaming with Spark Sql:

SparkSQL -> SparkStreaming

Some use cases are monitoring tables, migrating data, generate streaming data from batch data... The use of SparkSQL have the advantage that the library is polyglot and can read data from multiple databases, like JDBC, Cassandra, ElasticSearch, MongoDB ... The user must add the java jar with the connector library and then is possible to use it in the initial query. Per example:

  • Reading data from MongoDB with the Stratio Library:
InputSentences(
   query = "select * from tableName",
   initialStatements = Seq(
     "CREATE TEMPORARY TABLE tableName USING com.stratio.datasource.mongodb OPTIONS (host 'localhost', database 'sparta', collection 'streams'")
   )
) 

SparkSQL Datasources

Distributed Approach with SparkSQL

The distributed approach is based in SparkSQL, on each streaming window the receiver execute one query with SparkSQL and generate one DataFrame and the partitions of this DataFrame corresponds with the DStream partitions.

Datasource Receiver distributed with SparkSQL

With the receiver is possible to monitoring tables in the datasources, the user specify one offset field and on each window the offset is updated to generate the new incremental query.

Incremental Queries

The receiver have implemented one Object for specify the offset options, limit the results and generate stop conditions. One example:

InputSentences(
      query = "select * from tableName",
      offsetConditions = OffsetConditions(OffsetField(name = "followersCount")),
      stopConditions = StopConditions(stopWhenEmpty = true, finishContextWhenEmpty = true),
      initialStatements = Seq("CREATE TEMPORARY TABLE tableName USING com.stratio.datasource.mongodb OPTIONS (host 'localhost', database 'sparta', collection 'streamtrigger')")
)

Build

There are two modules to package with different versions of Spark

  • To package it with Spark-1.5:

    mvn clean package -pl com.stratio.receiver:spark-datasource_1.5

  • To package it with Spark-1.6 (default):

    mvn clean package -pl com.stratio.receiver:spark-datasource_1.6

Scala API

  • Implicitly use SparkSQLContext

    val receiverStream = DatasourceUtils.createStream(sparkStreamingContext, inputSentences, datasourceParams)
    
  • Using a generic Context

    val receiverStream = DatasourceUtils.createStream(sparkStreamingContext, inputSentences, datasourceParams, sqlContext)
    

Java API

  • Implicitly use SparkSQLContext

    JavaReceiverInputDStream receiverStream = DatasourceUtils.createJavaStream(javaSparkStreamingContext, inputSentences datasourceParams);
    
  • Using a generic Context

    JavaReceiverInputDStream receiverStream = DatasourceUtils.createJavaStream(javaSparkStreamingContext, inputSentences datasourceParams, sqlContext);
    

Spark Parameters Options

Parameter Description Optional
stopGracefully Stop Streaming Context gracefully Yes (default: true)
stopSparkContext Stop SparkContext when stop Streaming Yes (default: false)
storageLevel Max time to receive messages Yes (default: MEMORY_ONLY)
rememberDuration Remember duration for Spark Dstreams Yes (default: 60s)

License

Licensed to STRATIO (C) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The STRATIO (C) licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

com.stratio.receiver

STRATIO BIG DATA Inc.

Drive Digital Transformation through Big Data and AI with a unique product

Versions

Version
0.1.0