htypes-akka-stream


License

License

MIT
Categories

Categories

Akka Container Microservices Reactive libraries
GroupId

GroupId

com.svenvandam
ArtifactId

ArtifactId

htypes-akka-stream_2.12
Last Version

Last Version

0.5
Release Date

Release Date

Type

Type

jar
Description

Description

htypes-akka-stream
htypes-akka-stream
Project URL

Project URL

https://github.com/SvenvDam/HTypes
Project Organization

Project Organization

com.svenvandam
Source Code Management

Source Code Management

https://github.com/SvenvDam/HTypes.git

Download htypes-akka-stream_2.12

How to add to project

<!-- https://jarcasting.com/artifacts/com.svenvandam/htypes-akka-stream_2.12/ -->
<dependency>
    <groupId>com.svenvandam</groupId>
    <artifactId>htypes-akka-stream_2.12</artifactId>
    <version>0.5</version>
</dependency>
// https://jarcasting.com/artifacts/com.svenvandam/htypes-akka-stream_2.12/
implementation 'com.svenvandam:htypes-akka-stream_2.12:0.5'
// https://jarcasting.com/artifacts/com.svenvandam/htypes-akka-stream_2.12/
implementation ("com.svenvandam:htypes-akka-stream_2.12:0.5")
'com.svenvandam:htypes-akka-stream_2.12:jar:0.5'
<dependency org="com.svenvandam" name="htypes-akka-stream_2.12" rev="0.5">
  <artifact name="htypes-akka-stream_2.12" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.svenvandam', module='htypes-akka-stream_2.12', version='0.5')
)
libraryDependencies += "com.svenvandam" % "htypes-akka-stream_2.12" % "0.5"
[com.svenvandam/htypes-akka-stream_2.12 "0.5"]

Dependencies

compile (2)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.12.9
com.svenvandam : htypes-core_2.12 jar 0.5

provided (2)

Group / Artifact Type Version
org.apache.hbase : hbase-client jar 2.2.0
com.typesafe.akka : akka-stream_2.12 jar 2.5.26

test (9)

Group / Artifact Type Version
com.typesafe.scala-logging : scala-logging_2.12 jar 3.9.2
org.scalatest : scalatest_2.12 jar 3.0.8
org.apache.hadoop : hadoop-common jar 2.8.5
org.apache.hbase : hbase-server jar 2.2.0
org.apache.hadoop : hadoop-hdfs jar 2.8.5
org.apache.hbase : hbase-hadoop-compat jar 2.2.0
org.apache.hbase : hbase-hadoop2-compat jar 2.2.0
org.apache.hbase : hbase-testing-util jar 2.2.0
com.typesafe.akka : akka-stream-testkit_2.12 jar 2.5.26

Project Modules

There are no modules declared in this project.

Build Status

HTypes

A type-safe, asynchronous, composable Scala extension to the HBase Client API

HTypes is a simple Scala extension to the Apache HBase API with no dependencies. It assumes you have have the HBase client API available as a dependency in your project. It adds the following improvements to the Java API:

  • Asynchronous execution of queries.
  • Automatic conversion between objects and an encoded form.
  • Automatic conversion between common primitive types and byte arrays (and the option to add more yourself).

Installation

libraryDependencies += "com.svenvandam" %% "htypes-core" % "0.5"

Example

Basic usage

import com.svenvandam.htypes.Implicits._
import com.svenvandam.htypes.model._
import com.svenvandam.htypes.hbase._
import com.svenvandam.htypes.hbase.query.{PutUtils, GetUtils}
import org.apache.hadoop.hbase.client.{Connection, Put, Scan}
import org.apache.hadoop.hbase._


// an example type we'll be encoding and decoding

case class User(id: String, name: String, age: Int)

// typeclass instances for User to en/decode it to an HBase compatible format

implicit val userCodec = new RowCodec[User] {
  private val ageColumn = Column("profile", "age")
  private val nameColumn = Column("profile", "name")

  def getColumns = Set(
    ageColumn,
    nameColumn
  )
  
  def encode(user: User): Row = Row(
    user.id,
    Map(
      ageColumn -> CellValue(user.age, None),
      nameColumn -> CellValue(user.name, None)
    )
  )
  
  def decode(row: Row): Option[User] = for {
    id                      <- row.key.as[String]
    CellValue(ageBytes, _)  <- row.values.get(ageColumn)
    age                     <- ageBytes.as[Int]
    CellValue(nameBytes, _) <- row.values.get(nameColumn)
    name                    <- nameBytes.as[String]
  } yield User(id, name, age)
}

// you'll have to do this yourself

val conn: Connection = getConnection()

val table = conn.getTable(TableName.valueOf("MyTable"))

// a user gets automatically encoded to an HBase compatible form and stored in a Put query

val put = PutUtils.createFrom(User("id123", "Alice", 30))

table.put(put)

// Automatically scan all columns associated with User

val scan = new Scan().addColumnsFrom[User]

// Automatically convert scan result to a series of time-versioned User's

val scanResult = table.getScanner(scan).as[User]

for {
  usersInScan       <- scanResult
  userValues        <- usersInScan
  (user, timestamp) <- userValues
} println(s"Found user $user at timestamp $timestamp")

Wrapping side-effects

// HTypes lets you execute queries in an effect wrapper
// You have to define a backend to execute your query in (Future, Task, IO, etc)
// An EffectBackend instance for Future is provided by HTypes

import com.svenvandam.htypes.effect.FutureEffectBackend
import scala.concurrent.ExecutionContext.Implicits.global

implicit val asyncBackend = FutureEffectBackend()

val f: Future[Unit] = table.putEffect(put)

f.foreach(_ => println("Put result!"))

Composing encoders and decoders

import com.svenvandam.htypes.Implicits._
import com.svenvandam.htypes.model._
import com.svenvandam.htypes.hbase._

case class UserWithSession(id: String, name: String, age: Int, SessionId: Long)

case class User(id: String, name: String, age: Int)

val userCodec = new RowCodec[User] {
  private val ageColumn = Column("profile", "age")
  private val nameColumn = Column("profile", "name")

  def getColumns = Set(
    ageColumn,
    nameColumn
  )
  
  def encode(user: User): Row = Row(
    user.id,
    Map(
      ageColumn -> CellValue(user.age, None),
      nameColumn -> CellValue(user.name, None)
    )
  )
  
  def decode(row: Row): Option[User] = for {
    id                      <- row.key.as[String]
    CellValue(ageBytes, _)  <- row.values.get(ageColumn)
    age                     <- ageBytes.as[Int]
    CellValue(nameBytes, _) <- row.values.get(nameColumn)
    name                    <- nameBytes.as[String]
  } yield User(id, name, age)
}


// mapping an existing RowDecoder
def getNewSessionId(): Long = ???

val userSessionDecoder: RowDecoder[UserWithSession] = 
  userCodec.map { case user =>
    UserWithSession(user.id, user.name, user.age, getNewSessionId())
  }

// contramap an existing RowDecoder
val userSessionEncoder: RowEncoder[UserWithSession] = 
  userCodec.contramap[UserWithSession] { case userWithSession =>
    User(userWithSession.id, userWithSession.name, userWithSession.age)
  } 

// combining decoders

case class UserInfo(id: String, lastBoughtItem: Int)

case class UserWithInfo(id: String, name: String, age: Int, lastBoughtItem: Int)

val productColumn = Column("history", "last_bought_item")

def decode(row: Row): Option[UserInfo] = for {
  id                          <- row.key.as[String]
  CellValue(productBytes, _)  <- row.values.get(productColumn)
  product                     <- productBytes.as[Int]
} yield UserInfo(id, product)

val userInfoDecoder = RowDecoder(decode, Set(productColumn))

val userWithInfoDecoder: RowDecoder[UserWithInfo] = userCodec.combine(
  userInfoDecoder, 
  (user: User, info: UserInfo) => UserWithInfo(user.id, user.name, user.age, info.lastBoughtItem)
)

Versions

Version
0.5