com.groupon.dse:schema-inferer

Schema Inferer is a library, based on Spark Streaming, that can be used to infer the schema of a given Kafka data stream and continously monitor the stream for schema changes

License

License

Categories

Categories

Infer Application Testing & Monitoring Code Analysis
GroupId

GroupId

com.groupon.dse
ArtifactId

ArtifactId

schema-inferer
Last Version

Last Version

1.0
Release Date

Release Date

Type

Type

jar
Description

Description

com.groupon.dse:schema-inferer
Schema Inferer is a library, based on Spark Streaming, that can be used to infer the schema of a given Kafka data stream and continously monitor the stream for schema changes
Project URL

Project URL

https://github.com/groupon/schema-inferer
Source Code Management

Source Code Management

https://github.com/groupon/schema-inferer

Download schema-inferer

How to add to project

<!-- https://jarcasting.com/artifacts/com.groupon.dse/schema-inferer/ -->
<dependency>
    <groupId>com.groupon.dse</groupId>
    <artifactId>schema-inferer</artifactId>
    <version>1.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.groupon.dse/schema-inferer/
implementation 'com.groupon.dse:schema-inferer:1.0'
// https://jarcasting.com/artifacts/com.groupon.dse/schema-inferer/
implementation ("com.groupon.dse:schema-inferer:1.0")
'com.groupon.dse:schema-inferer:jar:1.0'
<dependency org="com.groupon.dse" name="schema-inferer" rev="1.0">
  <artifact name="schema-inferer" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.groupon.dse', module='schema-inferer', version='1.0')
)
libraryDependencies += "com.groupon.dse" % "schema-inferer" % "1.0"
[com.groupon.dse/schema-inferer "1.0"]

Dependencies

compile (8)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.10.4
com.groupon.dse : baryon jar 1.0
org.apache.kafka : kafka_2.10 jar 0.8.1.1
com.101tec : zkclient jar 0.7
org.apache.zookeeper : zookeeper jar 3.4.6
org.json4s : json4s-core_2.10 jar 3.2.10
org.json4s : json4s-jackson_2.10 jar 3.2.10
org.slf4j : slf4j-api jar 1.7.10

provided (7)

Group / Artifact Type Version
org.apache.spark : spark-core_2.10 jar 1.5.2
org.apache.spark : spark-streaming_2.10 jar 1.5.2
org.apache.spark : spark-catalyst_2.10 jar 1.5.2
org.apache.spark : spark-sql_2.10 jar 1.5.2
org.apache.spark : spark-hive_2.10 jar 1.5.2
org.apache.spark : spark-streaming-kafka_2.10 jar 1.5.2
log4j : log4j jar 1.2.17

test (1)

Group / Artifact Type Version
org.scalatest : scalatest_2.10 jar 2.2.4

Project Modules

There are no modules declared in this project.

Schema Inferer

###Overview Schema Inferer is a library, based on Spark Streaming, that can be used to infer the schema of a given Kafka data stream and continuously monitor the stream for schema changes. A schema is basically the union of all fields, in a data stream, observed over a period of time. The inferred schemas (initial and the later modified ones) are then stored in a SchemaStore which can be systems like Zookeeper among others. Depending upon the SchemaStore implementation, it can also be accessed externally to obtain the current schema for a given data stream or to obtain the schema at a certain point in time. The Schema Inferer also exposes an interface which lets users define an action to be performed when a schema change is observed. This could include creating/updating a Hive table, emitting a schema change to a changelog etc.

Schema Inferer is built using Baryon, a library that consumes data from Kafka and provides an interface to operate on the consumed data.

###Common use-cases

  • Understand the structure of a Kafka data stream based on the infered schema.
  • Track schema changes and understand how a schema has evolved over time. Every time the schema for a stream changes, the SchemaStore captures and stores the observed schema.
  • Monitor a schema and take action as soon as an invalid or unexpected change in schema is observed.
  • Publish schema changes to other systems for further analysis. Schema changes can be stored in HDFS for long term analysis or even sent to systems like Kafka, from which other real time applications can consume the change logs and perform relevant actions.
  • If the Kafka data stream is persisted to a long term storage like HDFS, then Schema Inferer can be used to automatically create Hive EXTERNAL tables for that data.

###Components

  • SchemaStore: The SchemaStore trait is responsible for storing the latest schema inferred from a data stream. Schema Inferer has implementations for two stores - Zookeeper and In-Memory.
    • ZookeeperSchemaStore - The ZookeeperSchemaStore creates a new znode every time a new schema is inferred. The Zookeeper location can be adjusted and will be of the format /$ZK_ROOT/$TOPIC_NAME/$TOPIC_NAME_$TIMESTAMP. Every topic will also have a current node which contains the latest schema corresponding to that topic data stream. The ZookeeperSchemaStore thus contains the entire history of schemas corresponding to a data stream. On application restarts, the Schema Inferer using the ZookeeperSchemaStore will fetch the previously inferred schema and continue monitoring the data stream using the stored schema as the baseline.
    • InMemorySchemaStore - The InMemorySchemaStore saves the inferred schemas inside an in-memory cache. Unlike ZookeeperSchemaStore the InMemorySchemaStore does not provide external API's to fetch the schemas inferred over a period of time. The other disadvantage is that during each application restart, the baseline schema will have to be inferred, since there is no previous schema to start with. Note: ZookeeperSchemaStore internally uses InMemorySchemaStore as the cache.
  • Publisher: The Publisher trait is responsible for taking an action when a schema is inferred for the first time or during a subsequent schema change. The current Schema Inferer has implementations for a HiveSchemaPublisher and Log4jSchemaPublisher.

###Design The Schema Inferer plugin operates on RDD's generated by the Baryon receiver. The schema for a data stream is generated by iteratively inferring the schema for these RDD's. The RDD's of type WrappedMessage, obtained from Baryon, are first converted to RDD's of type TopicAndEvent by an Extractor and then later converted to a SchemaRDD of type String. Spark SQL API's are then used to extract the schema for the RDD. The fetched schema is later converted to a custom type called Schema and passed on to the corresponding SchemaStore and Publisher implementations. The Schema object generated during each run is compared with the cached Schema object to detect changes. In the event a change, the newly generated Schema object is merged with the cached Schema object. The new merged Schema is then saved in the appropriate SchemaStore and processed using the provided Publisher implementation.

###Resolving schema merge conflicts If the cached and newly generated Schema objects have a field at the same nested level but belong to different types, a merge conflict occurs. To resolve the merge conflicts we have defined a precedence order so that the type with greater precedence is chosen over the one with lower precedence. Precedence order:

"struct", "array", "string", "double", "float", "long", "integer", "short", "byte", "boolean"

Here struct has the highest precedence while boolean has the lowest.

###Example of schema merge

  • Cached schema:
root
 |-- body: struct <
 |    |-- request: string
 |    |-- hostname: string
 |    |-- page: struct <
 |    |    |-- parentEventId: string
 |    |    |-- country: string
 |    |    |-- division: string
 |    |    |-- id: integer
 |    |    |-- app: string
 |    |-- >
 |-- timestamp: string
  • Schema inferred during a run:
root
 |-- body: struct <
 |    |-- request: string
 |    |-- hostname: string
 |    |-- page: struct <
 |    |    |-- id: string
 |    |    |-- app: string
 |    |    |-- platform: string
 |    |-- >
 |    |-- layer: string
 |-- timestamp: string
  • Difference between cached and newly generated schema
root
  |-- body: struct <
  |    |-- page: struct <
  |    |    |-- id: string -- integer
  |    |    |-- platform: string
  |    |-- >
  |    |-- layer: string
  • New Merged schema:
root
 |-- body: struct <
 |    |-- request: string
 |    |-- hostname: string
 |    |-- page: struct <
 |    |    |-- parentEventId: string
 |    |    |-- country: string
 |    |    |-- division: string
 |    |    |-- id: string
 |    |    |-- app: string
 |    |    |-- platform: string
 |    |-- >
 |    |-- layer: string
 |-- timestamp: string

###Configurations The following configurations need to be used along with the Baryon configs. They can be hard coded in your driver or provided via a config file

Config name Default Description
topic.warmup.policy TIME Policy that decides how long to wait before the initial publish/store of a schema. Valid values:[TIME]
topic.warmup.time.sec 60 Time in seconds to wait before publishing/storing schema for first time
spark.num.receivers 3 Number of spark streaming receivers
store.type HASH The type of schema store to use ZOOKEEPER or HASH(In memory)
store.zk.connect NONE (Required if store.type is ZOOKEEPER) The zookeeper connect string
store.zk.root NONE (Required if store.type is ZOOKEEPER) The zookeeper root path
publisher.type NONE (Required) The type of the publisher : HIVE/KAFKA/DEFAULT/FILE/LOG
publisher.hive.db Default The hive database to connect to
publisher.hive.serde.jar NONE (Required if publisher.type is HIVE) The hive serde jar
publisher.hive.serde.class NONE (Required if publisher.type is HIVE) The hive serde class
publisher.hive.blacklist NONE The topics to not publish to hive
rdd.sampling.ratio 0.25 The sampling ratio to use when inferring the schema
consumer.type baryon The consumer to use : direct/baryon

###Maven

<dependency>
  <groupId>com.groupon.dse</groupId>
  <artifactId>schema-inferer</artifactId>
  <version>1.0</version>
</dependency>

###Sample Usage Refer to a sample inferer to understand the usage.

###Limitations

  • Schema Inferer only supports Kafka data streams comprising of JSON formatted events.
  • The HiveSchemaPublisher currently performs a DROP TABLE followed by a CREATE TABLE during a schema change. As a result of this, existing Hive partitions have to be re-created.
com.groupon.dse

Groupon

Versions

Version
1.0