data-quality-monitoring


License

License

Categories

Categories

Data Monitoring Application Testing & Monitoring
GroupId

GroupId

com.github.piotr-kalanski
ArtifactId

ArtifactId

data-quality-monitoring_2.11
Last Version

Last Version

0.3.8
Release Date

Release Date

Type

Type

jar
Description

Description

data-quality-monitoring
data-quality-monitoring
Project URL

Project URL

https://github.com/piotr-kalanski/data-quality-monitoring
Project Organization

Project Organization

com.github.piotr-kalanski
Source Code Management

Source Code Management

https://github.com/piotr-kalanski/data-quality-monitoring

Download data-quality-monitoring_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.piotr-kalanski/data-quality-monitoring_2.11/ -->
<dependency>
    <groupId>com.github.piotr-kalanski</groupId>
    <artifactId>data-quality-monitoring_2.11</artifactId>
    <version>0.3.8</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.piotr-kalanski/data-quality-monitoring_2.11/
implementation 'com.github.piotr-kalanski:data-quality-monitoring_2.11:0.3.8'
// https://jarcasting.com/artifacts/com.github.piotr-kalanski/data-quality-monitoring_2.11/
implementation ("com.github.piotr-kalanski:data-quality-monitoring_2.11:0.3.8")
'com.github.piotr-kalanski:data-quality-monitoring_2.11:jar:0.3.8'
<dependency org="com.github.piotr-kalanski" name="data-quality-monitoring_2.11" rev="0.3.8">
  <artifact name="data-quality-monitoring_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.piotr-kalanski', module='data-quality-monitoring_2.11', version='0.3.8')
)
libraryDependencies += "com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.8"
[com.github.piotr-kalanski/data-quality-monitoring_2.11 "0.3.8"]

Dependencies

compile (5)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.8
org.apache.spark : spark-sql_2.11 jar 2.1.1
com.github.piotr-kalanski : es-client_2.11 jar 0.2.1
com.typesafe : config jar 1.3.0
com.github.piotr-kalanski : class2sql_2.11 jar 0.1.6

test (3)

Group / Artifact Type Version
org.scalatest : scalatest_2.11 jar 2.2.6
junit : junit jar 4.10
com.h2database : h2 jar 1.4.195

Project Modules

There are no modules declared in this project.

data-quality-monitoring

Data Quality Monitoring Tool for Big Data implemented using Spark

Build Status codecov.io License

Table of contents

Goals

  • Validate data using provided business rules
  • Log result
  • Send alerts

Getting started

Include dependency:

"com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.2"

or

<dependency>
    <groupId>com.github.piotr-kalanski</groupId>
    <artifactId>data-quality-monitoring_2.11</artifactId>
    <version>0.3.2</version>
</dependency>

Data quality monitoring process

Data quality monitoring process consists from below steps:

  • Load configuration with business rules
  • Run data validation
  • Log validation results
  • Send alerts

Load configuration

Configuration can be loaded from:

  • file
  • directory
  • RDBMS

Additionally there are plans to support:

  • Dynamo DB

Example configuration

tablesConfiguration = [
  {
    location = {type = Hive, table = clients}, // location of first table that should be validated
    rules = { // validation rules 
      rowRules = [ // validation rules working on single row level
        {
          field = client_id, // name of field that should be validated
          rules = [
            {type = NotNull}, // this field shouldn't be null
            {type = min, value = 0} // minimum value for this field is 0
          ]
        },
        {
          field = client_name,
          rules = [
            {type = NotNull} // this field shouldn't be null
          ]
        }
      ]
    }
  },
  {
    location = {type = Hive, table = companies}, // location of first table that should be validated
    rules = {
      rowRules = [
        {
          field = company_id, // name of field that should be validated
          rules = [
            {type = NotNull}, // this field shouldn't be null
            {type = max, value = 100} // maximum value for this field is 100
          ]
        },
        {
          field = company_name, // name of field that should be validated
          rules = [
            {type = NotNull} // this field shouldn't be null
          ]
        }
      ]
    }
  }
]

Load configuration from file

Use class: FileSingleTableConfigurationLoader or FileMultipleTablesConfigurationLoader.

Example:

import com.datawizards.dqm.configuration.loader.FileMultipleTablesConfigurationLoader
val configurationLoader = new FileMultipleTablesConfigurationLoader("configuration.conf")
configurationLoader.loadConfiguration()

Load configuration from directory

Use class: DirectoryConfigurationLoader.

One file should contain configuration for one table (TableConfiguration).

Load configuration from database

Use class: DatabaseConfigurationLoader.

One table row should contain configuration for one table (TableConfiguration).

Validation rules

Currently supported categories of data validation rules:

  • field rules - validating value of single field e.g.: not null, min value, max value
  • group rules - validating result of group by expression e.g.: expected groups (countries, types)
  • table trend rules - validating table trend rules e.g.: comparing current day row count vs previous day row count

Field rules

Field rules should be defined in section rules.rowRules:

tablesConfiguration = [
  {
    location = [...],
    rules = {
      rowRules = [
        {
          field = Field name,
          rules = [...]
        }
      ]
    }
  }
]

Supported field validation rules:

  • not null

    {type = NotNull}

  • dictionary

    {type = dict, values=[1,2,3]}

  • regex

    {type = regex, value = """\s.*"""}

  • min value

    {type = min, value = 0}

  • max value

    {type = max, value = 100}

Group rules

Group rules should be defined in section groups.rules:

tablesConfiguration = [
  {
    location = [...],
    rules = [...],
    groups = [
      {
        name = Group name,
        field = Group by field name,
        rules = [
          {
            type = NotEmptyGroups,
            expectedGroups = [c1,c2,c3,c4]
          }
        ]
      }
    ]
  }
]

Supported group validation rules:

  • not empty groups

    {type = NotEmptyGroups, expectedGroups = [c1,c2,c3,c4]}

Table trend rules

Table trend rules should be defined in section rules.tableTrendRules:

tablesConfiguration = [
  {
    location = [...],
    rules = {
      rowRules = [...],
      tableTrendRules = [
        {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
      ]
    }
  }
]

Supported table trends validation rules:

  • current vs previous day row count

    {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}

Log validation results

Validation results can be logged into:

  • Elasticsearch using class ElasticsearchValidationResultLogger

    val logger = new ElasticsearchValidationResultLogger(
        esUrl = "http://localhost:9200", // Elasticsearch URL
        invalidRecordsIndexName = "invalid_records", // Index name where to store invalid records
        tableStatisticsIndexName = "table_statistics", // Index name where to store table statistics
        columnStatisticsIndexName = "column_statistics", // Index name where to store column statistics
        groupsStatisticsIndexName = "group_statistics", // Index name where to store group statistics
        invalidGroupsIndexName = "invalid_groups" // Index name where to store group statistics
    )
  • RDBMS using class DatabaseValidationResultLogger

    val logger = new DatabaseValidationResultLogger(
      driverClassName = "org.h2.Driver", // JDBC driver class name
      dbUrl = connectionString, // DB connection string
      connectionProperties = new Properties(), // JDBC connection properties, especially user and password
      invalidRecordsTableName = "INVALID_RECORDS", // name of table where to insert invalid records
      tableStatisticsTableName = "TABLE_STATISTICS", // name of table where to insert table statistics records
      columnStatisticsTableName = "COLUMN_STATISTICS", // name of table where to insert column statistics records
      groupsStatisticsTableName = "GROUP_STATISTICS", // name of table where to insert group by statistics records
      invalidGroupsTableName = "INVALID_GROUPS" // name of table where to insert invalid groups
    )

Send alerts

Alerts can be send to:

  • Slack using class SlackAlertSender

Additionally there are plans to support:

  • email

Full example

Example

import com.datawizards.dqm.configuration.loader.FileConfigurationLoader
import com.datawizards.dqm.logger.ElasticsearchValidationResultLogger
import com.datawizards.dqm.alert.SlackAlertSender
import com.datawizards.dqm.DataQualityMonitor

val configurationLoader = new FileConfigurationLoader("configuration.conf")
val esUrl = "http://localhost:9200"
val invalidRecordsIndexName = "invalid_records"
val tableStatisticsIndexName = "table_statistics"
val columnStatisticsIndexName = "column_statistics"
val groupsStatisticsIndexName = "group_statistics"
val invalidGroupsIndexName = "invalid_groups"
private val logger = new ElasticsearchValidationResultLogger(esUrl, invalidRecordsIndexName, tableStatisticsIndexName, columnStatisticsIndexName, groupsStatisticsIndexName, invalidGroupsIndexName)
val alertSender = new SlackAlertSender("webhook url", "Slack channel", "Slack user name")
val processingDate = new java.util.Date()
DataQualityMonitor.run(processingDate, configurationLoader, logger, alertSender)

configuration.conf:

tablesConfiguration = [
  {
    location = {type = Hive, table = clients},
    rules = {
      rowRules = [
        {
          field = client_id,
          rules = [
            {type = NotNull}
          ]
        }
      ]
    }
  }
]

Versions

Version
0.3.8
0.3.7
0.3.6
0.3.5
0.3.4
0.3.3
0.3.2
0.3.1
0.3.0
0.2.3
0.2.2
0.2.1
0.2.0
0.1.0