hyperdrive

Hyperdrive is a configurable and scalable ingestion platform that allows data movement and transformation from the fast to the batch layer in a Lambda Architecture on top of Apache Spark.

License

License

GroupId

GroupId

za.co.absa.hyperdrive
ArtifactId

ArtifactId

hyperdrive
Last Version

Last Version

4.1.0
Release Date

Release Date

Type

Type

pom
Description

Description

hyperdrive
Hyperdrive is a configurable and scalable ingestion platform that allows data movement and transformation from the fast to the batch layer in a Lambda Architecture on top of Apache Spark.
Project URL

Project URL

https://github.com/AbsaOSS/hyperdrive
Project Organization

Project Organization

ABSA Group Limited
Source Code Management

Source Code Management

https://github.com/AbsaOSS/hyperdrive/tree/master

Download hyperdrive

Filename Size
hyperdrive-4.1.0.pom 8 KB
Browse

How to add to project

<!-- https://jarcasting.com/artifacts/za.co.absa.hyperdrive/hyperdrive/ -->
<dependency>
    <groupId>za.co.absa.hyperdrive</groupId>
    <artifactId>hyperdrive</artifactId>
    <version>4.1.0</version>
    <type>pom</type>
</dependency>
// https://jarcasting.com/artifacts/za.co.absa.hyperdrive/hyperdrive/
implementation 'za.co.absa.hyperdrive:hyperdrive:4.1.0'
// https://jarcasting.com/artifacts/za.co.absa.hyperdrive/hyperdrive/
implementation ("za.co.absa.hyperdrive:hyperdrive:4.1.0")
'za.co.absa.hyperdrive:hyperdrive:pom:4.1.0'
<dependency org="za.co.absa.hyperdrive" name="hyperdrive" rev="4.1.0">
  <artifact name="hyperdrive" type="pom" />
</dependency>
@Grapes(
@Grab(group='za.co.absa.hyperdrive', module='hyperdrive', version='4.1.0')
)
libraryDependencies += "za.co.absa.hyperdrive" % "hyperdrive" % "4.1.0"
[za.co.absa.hyperdrive/hyperdrive "4.1.0"]

Dependencies

There are no dependencies for this project. It is a standalone project that does not depend on any other jars.

Project Modules

  • hyperdrive-release
  • driver
  • parent-conf
  • component-archetype
  • component-scanner
  • api
  • ingestor-default
  • shared
Copyright 2018 ABSA Group Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Hyperdrive - An extensible streaming ingestion pipeline on top of Apache Spark

Build Status

master develop
Build Status Build Status

What is Hyperdrive?

Hyperdrive is a configurable and scalable ingestion platform that allows data movement and transformation from streaming sources with exactly-once fault-tolerance semantics by using Apache Spark Structured Streaming.

In Hyperdrive, each ingestion is defined by the three components reader, transformer and writer. This separation allows adapting to different streaming sources and sinks, while reusing transformations common across multiple ingestion pipelines.

Motivation

Similar to batch processing, data ingestion pipelines are needed to process streaming data sources. While solutions for data pipelines exist, exactly-once fault-tolerance in streaming processing is an intricate problem and cannot be solved with the same strategies that exist for batch processing.

This is the gap the Hyperdrive aims to fill, by leveraging the exactly-once guarantee of Spark's Structured Streaming and by providing a flexible data pipeline.

Architecture

The data ingestion pipeline of Hyperdrive consists of four components: readers, transformers, writers.

  • Readers define how to connect to sources, e.g. how to connect to Kafka in a secure cluster by providing security directives, which topic and brokers to connect to.
  • Transformers define transformations to be applied to the decoded DataFrame, e.g. dropping columns.
  • Writers define where DataFrames should be sent after the transformations, e.g. into HDFS as Parquet files.

Built-in components

  • KafkaStreamReader - reads from a Kafka topic.
  • ParquetStreamReader - reads Parquet files from a source directory.
  • ConfluentAvroDecodingTransformer - decodes the payload as Confluent Avro (through ABRiS), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using.
  • ConfluentAvroEncodingTransformer - encodes the payload as Confluent Avro (through ABRiS), updating the schema to the specified Schema Registry. This transformer is capable of seamlessly handling whatever schema the dataframe is using.
  • ColumnSelectorStreamTransformer - selects all columns from the decoded DataFrame.
  • AddDateVersionTransformerStreamWriter - adds columns for ingestion date and an auto-incremented version number, to be used for partitioning.
  • ParquetStreamWriter - writes the DataFrame as Parquet, in append mode.
  • KafkaStreamWriter - writes to a Kafka topic.

Custom components

Custom components can be implemented using the Component Archetype following the API defined in the package za.co.absa.hyperdrive.ingestor.api

  • A custom component has to be a class which extends either of the abstract classes StreamReader, StreamTransformer or StreamWriter
  • The class needs to have a companion object which implements the corresponding trait StreamReaderFactory, StreamTransformerFactory or StreamWriterFactory
  • The implemented components have to be packaged to a jar file, which can then be added to the classpath of the driver. To use a component, it has to be configured as described under Usage

After that, the new component will be able to be seamlessly invoked from the driver.

Usage

Hyperdrive has to be executed with Spark. Due to Spark-Kafka integration issues, it will only work with Spark 2.3 and higher.

How to run

git clone [email protected]:AbsaOSS/hyperdrive.git
mvn clean package

Given a configuration file has already been created, hyperdrive can be executed as follows:

spark-submit --class za.co.absa.hyperdrive.driver.drivers.PropertiesIngestionDriver driver/target/driver*.jar config.properties

Alternatively, configuration properties can also be passed as command-line arguments

spark-submit --class za.co.absa.hyperdrive.driver.drivers.CommandLineIngestionDriver driver/target/driver*.jar \
  component.ingestor=spark \
  component.reader=za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader \
  # more properties ...

Configuration

The configuration file may be created from the template located at driver/src/resources/Ingestion.properties.template.

CommandLineIngestionDriverDockerTest may be consulted for a working pipeline configuration.

General settings

Pipeline settings
Property Name Required Description
component.ingestor Yes Defines the ingestion pipeline. Only spark is currently supported.
component.reader Yes Fully qualified name of reader component, e.g.za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
component.transformer.id.{order} No An arbitrary but unique string, referenced in this documentation as {transformer-id}
component.transformer.class.{transformer-id} No Fully qualified name of transformer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
component.writer Yes Fully qualified name of writer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter

Multiple transformers can be configured in the pipeline, including multiple instances of the same transformer. For each transformer instance, component.transformer.id.{order} and component.transformer.class.{transformer-id} have to specified, where {order} and {transformer-id} need to be unique. In the above table, {order} must be an integer and may be negative. {transformer-id} is only used within the configuration to identify which configuration options belong to a certain transformer instance.

Spark settings
Property Name Required Description
ingestor.spark.app.name Yes User-defined name of the Spark application. See Spark property spark.app.name
ingestor.spark.await.termination.timeout No Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method awaitTermination

Settings for built-in components

KafkaStreamReader
Property Name Required Description
reader.kafka.topic Yes The name of the kafka topic to ingest data from. Equivalent to Spark property subscribe
reader.kafka.brokers Yes List of kafka broker URLs . Equivalent to Spark property kafka.bootstrap.servers

Any additional properties for kafka can be added with the prefix reader.option.. E.g. the property kafka.security.protocol can be added as reader.option.kafka.security.protocol

See e.g. the Structured Streaming + Kafka Integration Guide for optional kafka properties.

ParquetStreamReader

The parquet stream reader infers the schema from parquet files that already exist in the source directory. If no file exists, the reader will fail.

Property Name Required Description
reader.parquet.source.directory Yes Source path for the parquet files. Equivalent to Spark property path for the DataStreamReader

Any additional properties can be added with the prefix reader.parquet.options.. See Spark Structured Streaming Documentation

ConfluentAvroStreamDecodingTransformer

The ConfluentAvroStreamDecodingTransformer is built on ABRiS. More details about the configuration properties can be found there. Caution: The ConfluentAvroStreamDecodingTransformer requires the property reader.kafka.topic to be set.

Property Name Required Description
transformer.{transformer-id}.schema.registry.url Yes URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_REGISTRY_URL
transformer.{transformer-id}.value.schema.id Yes The schema id. Use latest or explicitly provide a number. Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_ID
transformer.{transformer-id}.value.schema.naming.strategy Yes Subject name strategy of Schema Registry. Possible values are topic.name, record.name or topic.record.name. Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY
transformer.{transformer-id}.value.schema.record.name Yes for naming strategies record.name and topic.record.name Name of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY
transformer.{transformer-id}.value.schema.record.namespace Yes for naming strategies record.name and topic.record.name Namespace of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
transformer.{transformer-id}.consume.keys No If set to true, keys will be consumed and added as columns to the dataframe. Key columns will be prefixed with key__
transformer.{transformer-id}.key.schema.id Yes if consume.keys is true The schema id for the key.
transformer.{transformer-id}.key.schema.naming.strategy Yes if consume.keys is true Subject name strategy for key
transformer.{transformer-id}.key.schema.record.name Yes for key naming strategies record.name and topic.record.name Name of the record.
transformer.{transformer-id}.key.schema.record.namespace Yes for key naming strategies record.name and topic.record.name Namespace of the record.
transformer.{transformer-id}.keep.columns No Comma-separated list of columns to keep (e.g. offset, partition)

For detailed information on the subject name strategy, please take a look at the Schema Registry Documentation.

ConfluentAvroStreamEncodingTransformer

The ConfluentAvroStreamEncodingTransformer is built on ABRiS. More details about the configuration properties can be found there. Caution: The ConfluentAvroStreamEncodingTransformer requires the property writer.kafka.topic to be set.

Property Name Required Description
transformer.{transformer-id}.schema.registry.url Yes URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_REGISTRY_URL
transformer.{transformer-id}.value.schema.naming.strategy Yes Subject name strategy of Schema Registry. Possible values are topic.name, record.name or topic.record.name. Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY
transformer.{transformer-id}.value.schema.record.name Yes for naming strategies record.name and topic.record.name Name of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY
transformer.{transformer-id}.value.schema.record.namespace Yes for naming strategies record.name and topic.record.name Namespace of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
transformer.{transformer-id}.produce.keys No If set to true, keys will be produced according to the properties key.column.prefix and key.column.names of the Hyperdrive Context
transformer.{transformer-id}.key.schema.naming.strategy Yes if produce.keys is true Subject name strategy for key
transformer.{transformer-id}.key.schema.record.name Yes for key naming strategies record.name and topic.record.name Name of the record.
transformer.{transformer-id}.key.schema.record.namespace Yes for key naming strategies record.name and topic.record.name Namespace of the record.
ColumnSelectorStreamTransformer
Property Name Required Description
transformer.{transformer-id}.columns.to.select Yes Comma-separated list of columns to select. * can be used to select all columns. Only existing columns using column names may be selected (i.e. expressions cannot be constructed)
AddDateVersionTransformer

The AddDateVersionTransformer adds the columns hyperdrive_date and hyperdrive_version. hyperdrive_date is the ingestion date (or a user-defined date), while hyperdrive_version is a number automatically incremented with every ingestion, starting at 1. For the auto-increment to work, hyperdrive_date and hyperdrive_version need to be defined as partition columns. Caution: This transformer requires a writer which defines writer.parquet.destination.directory.

Property Name Required Description
transformer.{transformer-id}.report.date No User-defined date for hyperdrive_date in format yyyy-MM-dd. Default date is the date of the ingestion
ColumnRenamingStreamTransformer

ColumnRenamingStreamTransformer allows renaming of columns specified in the configuration.

To add the transformer to the pipeline use this class name:

component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
Property Name Required Description
transformer.{transformer-id}.columns.rename.from Yes A comma-separated list of columns to rename. For example, column1, column2.
transformer.{transformer-id}.columns.rename.to Yes A comma-separated list of new column names. For example, column1_new, column2_new.
ColumnCopyStreamTransformer

ColumnCopyStreamTransformer allows copying of columns specified in the configuration. Dots in column names are interpreted as nested structs, unless they are surrounded by backticks (same as Spark convention)

Note that usage of the star-operator * within column names is not supported and may lead to unexpected behaviour.

To add the transformer to the pipeline use this class name:

component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformer
Property Name Required Description
transformer.{transformer-id}.columns.copy.from Yes A comma-separated list of columns to copy from. For example, column1.fieldA, column2.fieldA.
transformer.{transformer-id}.columns.copy.to Yes A comma-separated list of new column names. For example, newColumn.col1_fieldA, newColumn.col2_fieldA.

Example

Given a dataframe with the following schema

 |-- column1
 |    |-- fieldA
 |    |-- fieldB
 |-- column2
 |    |-- fieldA
 |-- column3

Then, the following column parameters

  • transformer.{transformer-id}.columns.copy.from=column1.fieldA, column2.fieldA
  • transformer.{transformer-id}.columns.copy.to=newColumn.col1_fieldA, newColumn.col2_fieldA

will produce the following schema

 |-- column1
 |    |-- fieldA
 |    |-- fieldB
 |-- column2
 |    |-- fieldA
 |-- column3
 |-- newColumn
 |    |-- col1_fieldA
 |    |-- col2_fieldA

See Pipeline settings for details about {transformer-id}.

ParquetStreamWriter
Property Name Required Description
writer.parquet.destination.directory Yes Destination path of the sink. Equivalent to Spark property path for the DataStreamWriter
writer.parquet.partition.columns No Comma-separated list of columns to partition by.
writer.parquet.metadata.check No Set this option to true if the consistency of the metadata log should be checked prior to the query. For very large tables, the check may be very expensive
writer.common.trigger.type No See Combination writer properties
writer.common.trigger.processing.time No See Combination writer properties

Any additional properties for the DataStreamWriter can be added with the prefix writer.parquet.options, e.g. writer.parquet.options.key=value

KafkaStreamWriter
Property Name Required Description
writer.kafka.topic Yes The name of the kafka topic to ingest data from. Equivalent to Spark property topic
writer.kafka.brokers Yes List of kafka broker URLs . Equivalent to Spark property kafka.bootstrap.servers
writer.common.trigger.type No See Combination writer properties
writer.common.trigger.processing.time No See Combination writer properties

Common writer properties

Property Name Required Description
writer.common.checkpoint.location Yes Used for Spark property checkpointLocation. The checkpoint location has to be unique among different workflows.
writer.common.trigger.type No Either Once for one-time execution or ProcessingTime for micro-batch executions for micro-batch execution. Default: Once.
writer.common.trigger.processing.time No Interval in ms for micro-batch execution (using ProcessingTime). Default: 0ms, i.e. execution as fast as possible.

Behavior of Triggers

Trigger (writer.common.trigger.type) Timeout (ingestor.spark.termination.timeout) Runtime Details
Once No timeout Limited Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch.
ProcessingTime With timeout Limited Consumes data in micro-batches and only stops when the timeout is reached or the query is killed.
ProcessingTime No timeout Long-running Consumes data in micro-batches and only stops when the query is killed.
  • Note 1: The first micro-batch of the query will contain all available messages to consume and can therefore be quite large, even if the trigger ProcessingTime is configured, and regardless of what micro-batch interval is configured. To limit the size of a micro-batch, the property reader.option.maxOffsetsPerTrigger should be used. See also http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
  • Note 2: It's possible to define a timeout for trigger Once. If the timeout is reached before the micro-batch is processed, it won't be completed and no data will be committed. Such a behavior seems quite unpredictable and therefore we don't recommend it.

See the Spark Documentation for more information about triggers.

Hyperdrive Context

HyperdriveContext is an object intended to be used by the components to share data. It is a key-value store, where the key is a string and the value can be of any type. The following context variables are currently used by the default implementation.

Name Type Description
key.column.prefix String If ConfluentAvroDecodingTransformer is configured to consume keys, it prefixes the key columns with key__ such that they can be distinguished in the dataframe. If key__ happens to be a prefix of a value column, a random alphanumeric string is used instead.
key.column.names Seq[String] If ConfluentAvroDecodingTransformer is configured to consume keys, it contains the original column names (without prefix) in the key schema.

Other

Hyperdrive uses Apache Commons Configuration 2. This allows properties to be referenced, e.g. like so

transformer.[avro.decoder].schema.registry.url=http://localhost:8081
writer.kafka.schema.registry.url=${transformer.[avro.decoder].schema.registry.url}

Workflow Manager

Hyperdrive ingestions may be triggered using the Workflow Manager, which is developed in a separate repository: https://github.com/AbsaOSS/hyperdrive-trigger

A key feature of the Workflow Manager are triggers, which define when an ingestion should be executed and how it should be requested. The workflow manager supports cron-based triggers as well as triggers that listen to a notification topic.

za.co.absa.hyperdrive

ABSA OSS

ABSA Open Source

Versions

Version
4.1.0
4.0.0
3.3.0
3.2.2
3.1.0
3.0.0
2.0.0
1.0.0