Lumio ETL
This library provides utilities for building lumio-flow based ETLs.
It is important to note that this library was very much a work in progress in both spirit and body. The general concept is here: it is an aggregate of general-purpose implementations of lumio-flow actors with the intended goal of enabling the implementation of simple yet flexible ETL programs. However, much was still being worked on, or left to be worked on later, and so there are many shortcomings to keep in mind when reading what is here.
- The "best" level of abstraction to design actors is not decided: sometimes it is good to have small atomic operations (eg. reading a file on a filesystem or from AWS), sometimes it is good to have high-level operations encompassing a diversity of actions. So far, ETL programs implemented with this project have been a mix of both what I would consider successes as well as design failures. My belief was that this API needed a higher-level abstraction API for
lumio-flowDAG building in combination with well-designed actors ; sometimes the issue is not the scope of an actor itself as much as how fragmented its manipulation can be. Maybe some kind of "actor recipes" could also be a part of the solution. - The implementations of actors can be minimalist at times, reflecting the level of sophistication that was needed in the project for which it was developed, this is generally not really an issue, new features can be added.
- As it stands, there are also a handful of basic implementations for things that aren't exactly ETL-related (eg. most things in
transformer.text, ortransformer.jsoupfor instance), these could end up being scrapped at a later date.
Implementations found in this package shouldn't be tied to any specific Lumio project.
Note: This library is considered as "in beta" and as such significant API changes may occur without prior warning.
I. Installation
Add the following in your pom.xml:
<dependency>
<groupId>com.lumiomedical</groupId>
<artifactId>lumio-etl</artifactId>
<version>0.5</version>
</dependency>
II. Notes on Structure and Design
There are four facilities in this library:
- The
ETLclass is a higher level abstraction forlumio-flowDAGs meant to be a space for declaring DAG structure, host configuration parameters, as well as specify whichFlowCompilerto use. All ETL pipelines used throughoutlumio-coreuse this class as a base. - Anything in the
dataframepackage pertains to the manipulation oftech.tablesawdataframes. It provides notably helper functions, and theTableProcessorfeature, which was meant to be used in conjunction with a higher abstraction forlumio-flow(essentially, a mean to specify a specific dataframe refinement stage). It is possible that the final implementation for that idea would have ended up rejecting theTableProcessorcontract in favour oflumio-flowactor contracts (it would have certainly been preferable at least). - The
extractor,generator,loaderandtransformerpackages providelumio-flowactor implementations - The
vaultpackage provides a handful ofVaultModuleimplementations for handling custom configuration. It should be noted that as it stands, these were also very much work in progress: the general idea was to start with satisfying the need for conf-based ETL pipeline definition (at least their entry-points, in order to allow swapping between loading files from an AWS instance or a filesystem for instance) and tackle "prettiness" later. As it stands it is fairly verbose for pipelines with many data sources, I believe just building a more opinionated abstraction on top of it could go a long way.
TODO
III. Usage
Note that two sample "toy" programs are also provided: sample-nlp here and sample-crawl there. None of them leverage lumio-vault configuration features, but their structure could be simplified and made more resilient to changes with a bit of lumio-vault sprinkled in.
We'll also write down a basic example of ETL pipeline leveraging some features found in this library, we won't touch on the ETL classes, these are covered in the sample project.
Most of the syntax is actually from lumio-flow, it could be a good idea to start by having a look at it there.
Let us start by imagining we have a tiny CSV dataset like this:
key,value,metadata
0,234,interesting
1,139,not_interesting
3,982,interesting
Here is what a pipeline for manipulating this could look like:
var flow = Flow
.from(new FileStreamer(), "path/to/my.csv") //We open an inpustream from the CSV file
.pipe(new TablesawCSVParser()) //We interpret it as CSV and transform it into a tablesaw dataframe
.pipe(Tablesaw::print) // We print the dataframe to stdout
;
Flow.runAsPipeline(flow);
Running the above should display the following, granted a logger configured for printing INFO level information:
[main] INFO com.lumiomedical.etl - Initializing stream from filesystem at data/my.csv
[main] INFO com.lumiomedical.etl - Extracting CSV data into dataframe...
[main] INFO com.lumiomedical.etl - Extracted 3 lines into dataframe.
index | key | value | metadata |
-----------------------------------------------
0 | 0 | 234 | interesting |
1 | 1 | 139 | not_interesting |
2 | 3 | 982 | interesting |
(row_count=3)
Note that it added an index column, we can remove it by specifying a TableProperties object with setAddRowIndex(false). Let's also add a filter, and a persistence operation:
var tableProperties = new TableProperties().setAddRowIndex(false);
var flow = Flow
.from(new FileStreamer(), "path/to/my.csv")
.pipe(new TablesawCSVParser(tableProperties))
.pipe(Criterion.whereIsEqualTo("metadata", "interesting")) //We use a helper query feature, note that there are many other ways to do that, notably using the tablesaw API
.sink(new TablesawCSVWrite("path/to/my-filtered.csv")) //We dump the dataframe as CSV into another file
;
Flow.runAsPipeline(flow);
Upon running, the above should produce a CSV file like this one:
key,value,metadata
0,234,interesting
3,982,interesting
Will wrap-up this very simple example by replacing the source by one loading the file from AWS:
var tableProperties = new TableProperties().setAddRowIndex(false);
var flow = Flow
.from(new AmazonS3Streamer(s3, "my-bucket", "my.csv")) // Given a properly configured AmazonS3 instance
.pipe(new TablesawCSVParser(tableProperties))
.pipe(Criterion.whereIsEqualTo("metadata", "interesting"))
.sink(new TablesawCSVWrite("path/to/my-filtered.csv")) // We still write the output to the filesystem
;
Flow.runAsPipeline(flow);
As the reader can guess, the general idea is to define the execution plan (general structure and type transitions) separately from the choice of implementation used for performing the transformations. For instance, here, we would likely make the Extractor and Loader swappable, while retaining the interpretation as a CSV and subsequent filtering. Some situations may call for entire little pipelines with remote extracting, unzipping, streaming, etc. The goal was to make it possible to focus on the core logic and retain control over how the pipeline interacts with the outside world.
TODO
IV. Dev Installation
This project will require you to have the following:
- Java 11+
- Git (versioning)
- Maven (dependency resolving, publishing and packaging)