Schema Inferer
###Overview Schema Inferer is a library, based on Spark Streaming, that can be used to infer the schema of a given Kafka data stream and continuously monitor the stream for schema changes. A schema is basically the union of all fields, in a data stream, observed over a period of time. The inferred schemas (initial and the later modified ones) are then stored in a SchemaStore
which can be systems like Zookeeper among others. Depending upon the SchemaStore
implementation, it can also be accessed externally to obtain the current schema for a given data stream or to obtain the schema at a certain point in time. The Schema Inferer also exposes an interface which lets users define an action to be performed when a schema change is observed. This could include creating/updating a Hive table, emitting a schema change to a changelog etc.
Schema Inferer is built using Baryon, a library that consumes data from Kafka and provides an interface to operate on the consumed data.
###Common use-cases
- Understand the structure of a Kafka data stream based on the infered schema.
- Track schema changes and understand how a schema has evolved over time. Every time the schema for a stream changes, the
SchemaStore
captures and stores the observed schema. - Monitor a schema and take action as soon as an invalid or unexpected change in schema is observed.
- Publish schema changes to other systems for further analysis. Schema changes can be stored in HDFS for long term analysis or even sent to systems like Kafka, from which other real time applications can consume the change logs and perform relevant actions.
- If the Kafka data stream is persisted to a long term storage like HDFS, then Schema Inferer can be used to automatically create Hive EXTERNAL tables for that data.
###Components
- SchemaStore: The
SchemaStore
trait is responsible for storing the latest schema inferred from a data stream. Schema Inferer has implementations for two stores - Zookeeper and In-Memory.- ZookeeperSchemaStore - The
ZookeeperSchemaStore
creates a new znode every time a new schema is inferred. The Zookeeper location can be adjusted and will be of the format/$ZK_ROOT/$TOPIC_NAME/$TOPIC_NAME_$TIMESTAMP
. Every topic will also have acurrent
node which contains the latest schema corresponding to that topic data stream. TheZookeeperSchemaStore
thus contains the entire history of schemas corresponding to a data stream. On application restarts, the Schema Inferer using theZookeeperSchemaStore
will fetch the previously inferred schema and continue monitoring the data stream using the stored schema as the baseline. - InMemorySchemaStore - The
InMemorySchemaStore
saves the inferred schemas inside an in-memory cache. UnlikeZookeeperSchemaStore
theInMemorySchemaStore
does not provide external API's to fetch the schemas inferred over a period of time. The other disadvantage is that during each application restart, the baseline schema will have to be inferred, since there is no previous schema to start with. Note:ZookeeperSchemaStore
internally usesInMemorySchemaStore
as the cache.
- ZookeeperSchemaStore - The
- Publisher: The
Publisher
trait is responsible for taking an action when a schema is inferred for the first time or during a subsequent schema change. The current Schema Inferer has implementations for aHiveSchemaPublisher
andLog4jSchemaPublisher
.
###Design The Schema Inferer plugin operates on RDD's generated by the Baryon receiver. The schema for a data stream is generated by iteratively inferring the schema for these RDD's. The RDD's of type WrappedMessage
, obtained from Baryon, are first converted to RDD's of type TopicAndEvent
by an Extractor
and then later converted to a SchemaRDD
of type String
. Spark SQL API's are then used to extract the schema for the RDD. The fetched schema is later converted to a custom type called Schema
and passed on to the corresponding SchemaStore
and Publisher
implementations. The Schema
object generated during each run is compared with the cached Schema
object to detect changes. In the event a change, the newly generated Schema
object is merged with the cached Schema
object. The new merged Schema
is then saved in the appropriate SchemaStore
and processed using the provided Publisher
implementation.
###Resolving schema merge conflicts If the cached and newly generated Schema
objects have a field at the same nested level but belong to different types, a merge conflict occurs. To resolve the merge conflicts we have defined a precedence order so that the type with greater precedence is chosen over the one with lower precedence. Precedence order:
"struct", "array", "string", "double", "float", "long", "integer", "short", "byte", "boolean"
Here struct
has the highest precedence while boolean
has the lowest.
###Example of schema merge
- Cached schema:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- parentEventId: string
| | |-- country: string
| | |-- division: string
| | |-- id: integer
| | |-- app: string
| |-- >
|-- timestamp: string
- Schema inferred during a run:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- id: string
| | |-- app: string
| | |-- platform: string
| |-- >
| |-- layer: string
|-- timestamp: string
- Difference between cached and newly generated schema
root
|-- body: struct <
| |-- page: struct <
| | |-- id: string -- integer
| | |-- platform: string
| |-- >
| |-- layer: string
- New Merged schema:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- parentEventId: string
| | |-- country: string
| | |-- division: string
| | |-- id: string
| | |-- app: string
| | |-- platform: string
| |-- >
| |-- layer: string
|-- timestamp: string
###Configurations The following configurations need to be used along with the Baryon configs. They can be hard coded in your driver or provided via a config file
Config name | Default | Description |
---|---|---|
topic.warmup.policy | TIME | Policy that decides how long to wait before the initial publish/store of a schema. Valid values:[TIME] |
topic.warmup.time.sec | 60 | Time in seconds to wait before publishing/storing schema for first time |
spark.num.receivers | 3 | Number of spark streaming receivers |
store.type | HASH | The type of schema store to use ZOOKEEPER or HASH(In memory) |
store.zk.connect | NONE | (Required if store.type is ZOOKEEPER) The zookeeper connect string |
store.zk.root | NONE | (Required if store.type is ZOOKEEPER) The zookeeper root path |
publisher.type | NONE | (Required) The type of the publisher : HIVE/KAFKA/DEFAULT/FILE/LOG |
publisher.hive.db | Default | The hive database to connect to |
publisher.hive.serde.jar | NONE | (Required if publisher.type is HIVE) The hive serde jar |
publisher.hive.serde.class | NONE | (Required if publisher.type is HIVE) The hive serde class |
publisher.hive.blacklist | NONE | The topics to not publish to hive |
rdd.sampling.ratio | 0.25 | The sampling ratio to use when inferring the schema |
consumer.type | baryon | The consumer to use : direct/baryon |
###Maven
<dependency>
<groupId>com.groupon.dse</groupId>
<artifactId>schema-inferer</artifactId>
<version>1.0</version>
</dependency>
###Sample Usage Refer to a sample inferer to understand the usage.
###Limitations
- Schema Inferer only supports Kafka data streams comprising of JSON formatted events.
- The
HiveSchemaPublisher
currently performs aDROP TABLE
followed by aCREATE TABLE
during a schema change. As a result of this, existing Hive partitions have to be re-created.