RocksDB State Store for Structured Streaming

RocksDB based State Storage for Structured Streaming Applications

License

License

Apache License, Version 2.0
GroupId

GroupId

com.qubole.spark
ArtifactId

ArtifactId

spark-rocksdb-state-store_2.11
Last Version

Last Version

1.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

RocksDB State Store for Structured Streaming
RocksDB based State Storage for Structured Streaming Applications
Project URL

Project URL

http://github.com/qubole/spark-state-store
Project Organization

Project Organization

Qubole
Source Code Management

Source Code Management

http://github.com/qubole/spark-state-store

Download spark-rocksdb-state-store_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.qubole.spark/spark-rocksdb-state-store_2.11/ -->
<dependency>
    <groupId>com.qubole.spark</groupId>
    <artifactId>spark-rocksdb-state-store_2.11</artifactId>
    <version>1.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.qubole.spark/spark-rocksdb-state-store_2.11/
implementation 'com.qubole.spark:spark-rocksdb-state-store_2.11:1.0.0'
// https://jarcasting.com/artifacts/com.qubole.spark/spark-rocksdb-state-store_2.11/
implementation ("com.qubole.spark:spark-rocksdb-state-store_2.11:1.0.0")
'com.qubole.spark:spark-rocksdb-state-store_2.11:jar:1.0.0'
<dependency org="com.qubole.spark" name="spark-rocksdb-state-store_2.11" rev="1.0.0">
  <artifact name="spark-rocksdb-state-store_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.qubole.spark', module='spark-rocksdb-state-store_2.11', version='1.0.0')
)
libraryDependencies += "com.qubole.spark" % "spark-rocksdb-state-store_2.11" % "1.0.0"
[com.qubole.spark/spark-rocksdb-state-store_2.11 "1.0.0"]

Dependencies

compile (2)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.12
org.apache.spark : spark-tags_2.11 jar 2.4.0

provided (1)

Group / Artifact Type Version
org.apache.spark : spark-sql_2.11 jar 2.4.0

test (5)

Group / Artifact Type Version
org.scalatest : scalatest_2.11 jar 3.0.3
org.apache.spark : spark-core_2.11 test-jar 2.4.0
org.apache.spark : spark-sql_2.11 test-jar 2.4.0
org.scalacheck : scalacheck_2.11 jar 1.13.5
junit : junit jar 4.13

Project Modules

There are no modules declared in this project.

Build Status

Rocksdb State Store for Structured Streaming

SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.

Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution

Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch

RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage. Rocksdb based state storage for Structured streaming provides major performance improvements for stateful stream processing.

Discussion on the PR raised can be found here

Downloading and Using the Connector

The connector is available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following connector artifact

com.qubole.spark/spark-rocksdb-state-store_2.11/1.0.0

Benchmark

Used following repo for the benchmark

Setup

  • Used Qubole's distribution of Apache Spark 2.4.0 for my tests.
  • Master Instance Type = i3.xlarge
  • Driver Memory = 2g
  • num-executors = 1
  • max-executors = 1
  • spark.sql.shuffle.partitions = 8
  • Run time = 30 mins
  • Source = Rate Source
  • executor Memory = 7g
  • spark.executor.memoryOverhead=3g
  • Processing Time = 30 sec

Executor Instance type = i3.xlarge cores per executor = 4 ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
memory Append ~7 mins 8.6 million 2 million Application failed before 30 mins
RockSB Append ~30 minutes 34.6 million 7 million

Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 30k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
memory Append 8 mins 12.6 million 3.1 million Application was stuck because of GC
RockSB Complete ~30 minutes 47.34 million 12.5 million

Executor info when memory based state storage is used Screenshot 2019-08-02 at 10 58 21 AM

Longevity run results

Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Number of Micro-batch Comments
RockSB Append ~1.5 hrs 104.3 million 10.5 million 114

Streaming Metrics Screenshot 2019-08-07 at 8 08 32 PM

Executor info Screenshot 2019-08-07 at 8 18 10 PM

com.qubole.spark

Qubole

A cloud based service that makes big data easy for analysts and data engineers.

Versions

Version
1.0.0