spark-hofs

Scala API for Apache Spark SQL high-order functions

License

License

GroupId

GroupId

za.co.absa
ArtifactId

ArtifactId

spark-hofs_2.12
Last Version

Last Version

0.4.0
Release Date

Release Date

Type

Type

jar
Description

Description

spark-hofs
Scala API for Apache Spark SQL high-order functions
Project URL

Project URL

https://github.com/AbsaOSS/spark-hofs
Project Organization

Project Organization

ABSA Group Limited
Source Code Management

Source Code Management

https://github.com/AbsaOSS/spark-hofs/tree/master

Download spark-hofs_2.12

How to add to project

<!-- https://jarcasting.com/artifacts/za.co.absa/spark-hofs_2.12/ -->
<dependency>
    <groupId>za.co.absa</groupId>
    <artifactId>spark-hofs_2.12</artifactId>
    <version>0.4.0</version>
</dependency>
// https://jarcasting.com/artifacts/za.co.absa/spark-hofs_2.12/
implementation 'za.co.absa:spark-hofs_2.12:0.4.0'
// https://jarcasting.com/artifacts/za.co.absa/spark-hofs_2.12/
implementation ("za.co.absa:spark-hofs_2.12:0.4.0")
'za.co.absa:spark-hofs_2.12:jar:0.4.0'
<dependency org="za.co.absa" name="spark-hofs_2.12" rev="0.4.0">
  <artifact name="spark-hofs_2.12" type="jar" />
</dependency>
@Grapes(
@Grab(group='za.co.absa', module='spark-hofs_2.12', version='0.4.0')
)
libraryDependencies += "za.co.absa" % "spark-hofs_2.12" % "0.4.0"
[za.co.absa/spark-hofs_2.12 "0.4.0"]

Dependencies

provided (4)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.12.10
org.apache.spark : spark-core_2.12 jar 2.4.4
org.apache.spark : spark-sql_2.12 jar 2.4.4
org.apache.spark : spark-catalyst_2.12 jar 2.4.4

test (1)

Group / Artifact Type Version
org.scalatest : scalatest_2.12 jar 3.0.3

Project Modules

There are no modules declared in this project.

spark-hofs

Apache Spark 2.4.0 introduced high-order functions as a part of SQL expressions. These new functions are accessible only via textual representation of Spark SQL.

This library makes the high-order functions accessible also for Dataframe/Dataset Scala API to get type safety when using the functions.

Usage

Reference the library

Scala 2.11

Maven Central

groupId: za.co.absa
artifactId: spark-hofs_2.11
version: 0.4.0

Scala 2.12

Maven Central

groupId: za.co.absa
artifactId: spark-hofs_2.12
version: 0.4.0

Please, use the table below to determine what version of spark-hofs to use for Spark compatibility.

spark-hofs version Scala version Spark version
0.1.0 2.11 2.4.0
0.2.0 2.11 2.4.1
0.3.x 2.11 2.4.2
0.4.x 2.11, 2.12 2.4.3+

Import Scala API of the high-order functions into your scope.

import za.co.absa.spark.hofs._

Functions

Transform

The transform function is an equivalent to the map function from functional programming. It takes a column of arrays as the first argument and projects every element in each array with using a function passed as the second argument.

scala> df.withColumn("output", transform('input, x => x + 1)).show
+------------+------------+
|       input|      output|
+------------+------------+
|[1, 4, 5, 7]|[2, 5, 6, 8]|
+------------+------------+

If the logic of the projection function requires information about the element position of a given array, the transform function can pass an index starting from 0 to the projection function as the second argument.

scala> df.withColumn("output", transform('input, (x, i) => x + i)).show
+------------+-------------+
|       input|       output|
+------------+-------------+
|[1, 4, 5, 7]|[1, 5, 7, 10]|
+------------+-------------+

By default, the lambda variable representing the element will be seen as elm and the lambda variable representing the index as idx in Spark execution plans.

scala> df.withColumn("output", transform('input, (x, i) => x + i)).explain(true)
== Parsed Logical Plan ==
'Project [input#8, transform('input, lambdafunction(('elm + 'idx), 'elm, 'idx, false)) AS output#45]
+- Project [value#6 AS input#8]
   +- LocalRelation [value#6]

== Analyzed Logical Plan ==
input: array<int>, output: array<int>
Project [input#8, transform(input#8, lambdafunction((lambda elm#51 + lambda idx#52), lambda elm#51, lambda idx#52, false)) AS output#45]
+- Project [value#6 AS input#8]
   +- LocalRelation [value#6]

...

Names of the lambda variables can be changed by passing extra argument to the transform function.

scala> df.withColumn("output", transform('input, (x, i) => x + i, "myelm", "myidx")).explain(true)
== Parsed Logical Plan ==
'Project [input#8, transform('input, lambdafunction(('myelm + 'myidx), 'myelm, 'myidx, false)) AS output#53]
+- Project [value#6 AS input#8]
   +- LocalRelation [value#6]

== Analyzed Logical Plan ==
input: array<int>, output: array<int>
Project [input#8, transform(input#8, lambdafunction((lambda myelm#59 + lambda myidx#60), lambda myelm#59, lambda myidx#60, false)) AS output#53]
+- Project [value#6 AS input#8]
   +- LocalRelation [value#6]
   
...   

Filter

The filter function takes a column of arrays as the first argument and eliminates all elements that do not satisfy the predicate that is passed as the second argument.

scala> df.withColumn("output", filter('input, x => x % 2 === 1)).show
+------------------+---------+
|             input|   output|
+------------------+---------+
|[1, 2, 4, 5, 7, 8]|[1, 5, 7]|
+------------------+---------+

The lambda variable within the predicate will be seen as elm in Spark execution plans. This name can be changed by passing the third argument to the filter function.

Aggregate

The aggregate function is an equivalent of the foldLeft function from functional programming. The method takes a column of arrays and a column of zero elements as first two arguments. The next argument is a binary function merging a zero element and all elements from an input array into one element. The first argument of the merging function is an accumulated value and the second one is an element of given iteration.

scala> df.withColumn("output", aggregate('input, 'zero, (acc, x)  => acc + x)).show
+------------------+----+------+
|             input|zero|output|
+------------------+----+------+
|[1, 2, 4, 5, 7, 8]| 100|   127|
+------------------+----+------+

If an user wants to transform the reduced value before returning the result, the user can pass a function performing the transformation logic as the fourth argument.

scala> df.withColumn("output", aggregate('input, 'zero, (acc, x)  => acc + x, y => concat(y, y))).show
+------------------+----+------+
|             input|zero|output|
+------------------+----+------+
|[1, 2, 4, 5, 7, 8]| 100|127127|
+------------------+----+------+

The lambda variable representing the accumulator will be seen as acc and the lambda variable representing the element as elm in Spark execution plans. The names can be changed by passing extra arguments to the aggregate function.

Zip With

The zip_with function takes two columns of arrays as the first two arguments and performs element-wise merge into a single column of arrays. The third argument ia a function taking one element from each array at the same position and specifying the merge logic. If one array is shorter, null elements are appended this array to be the same length as the longer array.

scala> df.withColumn("output", zip_with('input1, 'input2, (x, y) => x + y)).show
+---------------+-------------+---------------+
|         input1|       input2|         output|
+---------------+-------------+---------------+
|[1, 2, 4, 5, 7]|[2, 4, 8, 12]|[3, 6, 12, 17,]|
+---------------+-------------+---------------+

The lambda variables indicating input elements to the merging function will be seen as left and right in Spark execution plans. The names can be changed by passing extra arguments to the zip_with function.

za.co.absa

ABSA OSS

ABSA Open Source

Versions

Version
0.4.0