RxKotlin-JDBC
Fluent, concise, and easy-to-use extension functions targeting JDBC in the Kotlin language with RxJava 2.0.
This library is inspired by Dave Moten's RxJava-JDBC but seeks to be much more lightweight by leveraging Kotlin functions. This works with threadpool DataSource
implementations such as HikariCP, but can also be used with vanilla JDBC Connection
s.
Extension functions like select()
, insert()
, and execute()
will target both DataSource
and JDBC Connection
types. There is also support for safe blocking Sequence
operations.
Binaries
Maven
<dependency>
<groupId>org.nield</groupId>
<artifactId>rxkotlin-jdbc</artifactId>
<version>0.4.1</version>
</dependency>
Gradle
repositories {
mavenCentral()
}
dependencies {
compile 'org.nield:rxkotlin-jdbc:0.4.1'
}
Managing JDBC with Observables and Flowables
Using DataSources
When you use a DataSource
, a Connection
will automatically be pulled from the pool upon subscription and given back when onComplete
is called.
val config = HikariConfig()
config.jdbcUrl = "jdbc:sqlite::memory:"
config.minimumIdle = 3
config.maximumPoolSize = 10
val ds = HikariDataSource(config)
//initialize
with(ds) {
execute("CREATE TABLE USER (ID INTEGER PRIMARY KEY, USERNAME VARCHAR(30) NOT NULL, PASSWORD VARCHAR(30) NOT NULL)")
execute("INSERT INTO USER (USERNAME,PASSWORD) VALUES (?,?)", "thomasnield", "password123")
execute("INSERT INTO USER (USERNAME,PASSWORD) VALUES (?,?)", "bobmarshal","batman43")
}
// Retrieve all users
ds.select("SELECT * FROM USER")
.toObservable { it.getInt("ID") to it.getString("USERNAME") }
.subscribe(::println)
// Retrieve user with specific ID
ds.select("SELECT * FROM USER WHERE ID = :id")
.parameter("id", 2)
.toSingle { it.getInt("ID") to it.getString("USERNAME") }
.subscribeBy(::println)
// Execute insert which return generated keys, and re-select the inserted record with that key
ds.insert("INSERT INTO USER (USERNAME, PASSWORD) VALUES (:username,:password)")
.parameter("username","josephmarlon")
.parameter("password","coffeesnob43")
.toFlowable { it.getInt(1) }
.flatMapSingle {
conn.select("SELECT * FROM USER WHERE ID = :id")
.parameter("id", it)
.toSingle { "${it.getInt("ID")} ${it.getString("USERNAME")} ${it.getString("PASSWORD")}" }
}
.subscribe(::println)
// Run deletion
conn.execute("DELETE FROM USER WHERE ID = :id")
.parameter("id",2)
.toSingle()
.subscribeBy(::println)
Using Connections
You can also use a standard Connection
with these extension functions, and closing will not happen automatically so you can micromanage the life of that connection. It is also helpful to execute transactions against an individual connection.
val connection = DriverManager.getConnection("jdbc:sqlite::memory:")
connection.select("SELECT * FROM USER")
.toObservable { it.getInt("ID") to it.getString("USERNAME") }
.subscribe(::println)
connection.close()
Blocking Sequences
It can be convenient to work with JDBC fluently in a blocking manner, so you don't always have to return everything as an Observable
or Flowable
. While you should strive to not break the monad, sometimes it is easier to not have items returned in reactive fashion. This is especially the case when you have reactively built T
objects, but you want to retrieve metadata U
and V
objects when T
is constructed.
You could try to use RxJava's native blocking operators, but these can have undesired side effects of interruption exceptions especially when the blocking operation is happening under a parent reactive operation that can be disposed. Therefore, these Sequence
parts of the API do not rely on RxJava at all and are implemented at the Iterable
level. These may be candidate to be moved to a separate library (and be used as a dependency in this one) before rxkotlin-jdbc reaches a 1.0 release. This transition is not planned to be very breaking, other than perhaps a few import changes.
For instance, we can call toSequence()
to iterate a ResultSet
as a Sequence
.
data class User(val id: Int, val userName: String, val password: String)
conn.select("SELECT * FROM USER").toSequence {
User(it.getInt("ID"), it.getString("USERNAME"), it.getString("PASSWORD"))
}.forEach(::println)
The toSequence()
actually returns a ResultSetSequence
, which is important because if you use Sequence operators like take()
, the iteration will need to manually be terminated early (completion events are not communicated like in RxJava). The returned ResultSetSequence
should provide a handle to do this.
val conn = connectionFactory()
conn.select("SELECT * FROM USER").toSequence { it.getInt("ID") }.apply {
take(1).forEach(::println)
close() // must manually close for operators that terminate iteration early
}
Typically, if you simply want to return one item from a ResultSet
, just call blockingFirst()
or blockingFirstOrNull()
which will handle the close()
operation for you.
val conn = connectionFactory()
val id = conn.select("SELECT * FROM USER WHERE ID = 1").blockingFirst { it.getInt("ID") }
println(id)
Allowing Choice of Sequence, Observable, or Flowable
This library supports emitting T
items built off a ResultSet
in the form of:
Sequence<T>
Observable<T>
Flowable<T>
Single<T>
Maybe<T>
If you are building an API, it may be handy to allow the user of the API to choose the means in which to receive the results.
The toPipeline()
function will allow mapping the ResultSet
to T
items, but defer to the API user how to receive the results.
data class User(val userName: String, val password: String)
fun getUsers() = conn.select("SELECT * FROM USER")
.toPipeline {
User(it.getString("USERNAME"), it.getString("PASSWORD"))
}
fun main(args: Array<String>) {
getUsers().toFlowable().subscribe { println("Receiving $it via Flowable") }
getUsers().toObservable().subscribe { println("Receiving $it via Observable") }
getUsers().toSequence().forEach { println("Receiving $it via Sequence") }
}
Batching
Batch operations are now supported in RxKotlin-JDBC. This allows you to write a large amount of data to a database quickly using JDBC's batching interface.
Simply have an Observable<T>
, Flowable<T>
, Iterable<T>
, or Sequence<T>
of desired elements to INSERT, UPDATE, or some other action against your database, and pass it to the batchExecute()
function on Connection
or DataSource
.
You will also need to specify the SQL template, the mapper to turn each T
element into SQL parameters, and the batch size.
You can have the result come back as an Observable<T>
, Flowable<T>
, Sequence<T>
, or a Compeletable
.
val conn = connectionFactory()
class User(val username: String, val password: String)
val insertElements = Flowable.just(
User("josephmarlon", "coffeesnob43"),
User("samuelfoley","shiner67"),
User("emilyearly","rabbit99"),
User("johnlawrey", "shiner23"),
User("tomstorm","coors44"),
User("danpaxy", "texas22"),
User("heathermorgan","squirrel22")
)
conn.batchExecute(
sqlTemplate = "INSERT INTO USER (USERNAME, PASSWORD) VALUES (:username,:password)",
elements = insertElements,
batchSize = 3,
parameterMapper = {
parameter("username", it.username)
parameter("password", it.password)
}
).toFlowable().count().subscribeBy { println("Inserted $it records") }
Building Where Conditions Fluently
An experimental feature in RxKotlin-JDBC is an API-friendly builder for WHERE conditions, that may or may not use certain parameters to build WHERE conditions.
For instance, here is a function that will query a table with three possible parameters:
fun userOf(id: Int? = null, userName: String? = null, password: String? = null) =
conn.select("SELECT * FROM USER")
.whereOptional("ID", id)
.whereOptional("USERNAME", userName)
.whereOptional("PASSWORD", password)
.toObservable(::User)
We can then query for any combination of these three parameters (including none of them).
userOf().subscribe { println(it) } // prints all users
userOf(userName = "thomasnield", password = "password123")
.subscribe { println(it) } // prints user with ID 1
userOf(id = 2).subscribe { println(it) } // prints user with ID 2
Instead of providing a simple field name, we can also provide an entire templated expression for more complex condtions.
conn.select("SELECT * FROM USER")
.whereOptional("ID > ?", 1)