spark-sql-kafka-0-8


License

License

GroupId

GroupId

com.github.harbby
ArtifactId

ArtifactId

spark-sql-kafka-0-8
Last Version

Last Version

1.0.1
Release Date

Release Date

Type

Type

jar
Description

Description

spark-sql-kafka-0-8
spark-sql-kafka-0-8
Project URL

Project URL

https://github.com/harbby/spark-sql-kafka-0-8
Source Code Management

Source Code Management

https://github.com/harbby/spark-sql-kafka-0-8

Download spark-sql-kafka-0-8

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.harbby/spark-sql-kafka-0-8/ -->
<dependency>
    <groupId>com.github.harbby</groupId>
    <artifactId>spark-sql-kafka-0-8</artifactId>
    <version>1.0.1</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.harbby/spark-sql-kafka-0-8/
implementation 'com.github.harbby:spark-sql-kafka-0-8:1.0.1'
// https://jarcasting.com/artifacts/com.github.harbby/spark-sql-kafka-0-8/
implementation ("com.github.harbby:spark-sql-kafka-0-8:1.0.1")
'com.github.harbby:spark-sql-kafka-0-8:jar:1.0.1'
<dependency org="com.github.harbby" name="spark-sql-kafka-0-8" rev="1.0.1">
  <artifact name="spark-sql-kafka-0-8" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.harbby', module='spark-sql-kafka-0-8', version='1.0.1')
)
libraryDependencies += "com.github.harbby" % "spark-sql-kafka-0-8" % "1.0.1"
[com.github.harbby/spark-sql-kafka-0-8 "1.0.1"]

Dependencies

compile (1)

Group / Artifact Type Version
org.apache.spark : spark-streaming-kafka-0-8_2.11 jar 2.4.2

test (1)

Group / Artifact Type Version
junit : junit jar 4.12

Project Modules

There are no modules declared in this project.

spark-sql-kafka-0-8

Spark Structured Streaming kafka source

support kafka-0.8.2.1+ kafka-0.9

License

Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0

maven

<dependency>
  <groupId>com.github.harbby</groupId>
  <artifactId>spark-sql-kafka-0-8</artifactId>
  <version>1.0.0</version>
</dependency>

limit

  • must spark2.3+
  • must writeStream().trigger(Trigger.Continuous...)

Use

  • create
val sparkSession = ...

val kafka:DataFrame = sparkSession.readStream()
    .format("kafka08")
    .option("topics", "topic1,topic2")
    .option("bootstrap.servers", "broker1:9092,broker2:9092")
    .option("group.id", "test1")
    .option("auto.offset.reset", "largest")  //largest or smallest
    .option("zookeeper.connect", "zk1:2181,zk2:2181")
    .option("auto.commit.enable", "true")
    .option("auto.commit.interval.ms", "5000")
    .load(); 
  • schema
kafka.printSchema();

root
 |-- _key: binary (nullable = true)
 |-- _message: binary (nullable = true)
 |-- _topic: string (nullable = false)
 |-- _partition: integer (nullable = false)
 |-- _offset: long (nullable = false)
  • sink
    dataFrame.writeStream()
         .trigger(Trigger.Continuous(Duration.apply(90, TimeUnit.SECONDS))) //it is necessary
         ...  

Versions

Version
1.0.1
1.0.0
1.0.0-alpha3
1.0.0-alpha2
1.0.0-alpha1