Showing posts with label Scala. Show all posts
Showing posts with label Scala. Show all posts

Sunday, January 17, 2021

Tagless Final in Scala for Beginners

This post is an effort to explain what is tagless-final in Scala and if one has bumped into strange-looking F[_] notations and wondered what it is, where, and how it is used, then this post will also try to answer those questions. There's nothing new to learn for a seasoned functional programmer, but this would give a head start for anyone beginning on the journey. Let's walk through it step-by-step.

What's a type constructor?

Simply put, something that constructs types can be considered as a type constructor. For example, List can be considered as a type constructor because it has the ability to construct types based on what type argument is passed to its constructor. 
As per the type theory, List would be both a type (denoted by *) and type-constructor (denoted by * -> *)

val list: List.type = List
val intList: List[Int] = list[Int]()

In the above code snippet, Int is passed to the variable list (which is of type List) as an argument, i.e Int is the type argument to List type list.

What are higher-kinded types?

A higher-kinded type can be considered as something that abstracts over the type constructor. As a continuation of the above example of List, abstraction over the type constructor can be done with the help of using the F[_] notation, for example:

trait Test[F[_]]
val test = new Test[List] {}
val test = new Test[Option] {}
val test = new Test[Vector] {}

In the above code snippet, trait Test provides the ability to abstracts over the type constructor with the help of F[_] notation, and hence it is possible to pass List type constructor to it. And for that matter, it is also possible to pass other type constructors as well, for example, Option, Vector, etc. In this code snippet, Test now becomes a higher-kinded type.

What's an Effect in functional programming?

Do not confuse Effect with a side-effect! A side-effect can be one of the Effect but it is not the only Effect. An Effect can be considered as something that can happen within a Wrapper. For example, consider the Wrapper to be an Either. Now, anything that happens inside an Either can be considered as an Effect, as shown below:

def work[A, B](value: A)(fn: A => B): Either[Throwable, B] =
  try Right(fn(value))
  catch {
    case ex: Exception => Left(ex)
  }

work("1")(_.toInt) // Right(1)
work("T")(_.toInt) // Left(NumberFormatException)

The method work is considered as an Effectful method as instead of returning just the value B, it is returning Either[Throwable, B] so the caller of the method will know what to expect from the method's return type. A method can be considered as Effectful if it returns F[B] instead of B and F in the above example is Either.

What's a type class?

A type class can be considered as a class that defines a common behavior that can be associated with some types (data types). A common behavior could be anything, for example, an Adder type class that defines behaviors for the addition of data types like Int, Double, Float, etc.

trait Adder[A] {
  def add(value1: A, value2: A): A
}

Additionally, a type class should also be lawful in the sense that it should follow certain algebraic laws, in the case of the Adder type class above, it should follow the laws of associativity which is tantamount to it being a Semigroup. 

What's ad-hoc polymorphism with respect to a type class?

The behavior of adding two numeric values specific to a data type can be achieved by method overloading in a normal OOPs setting, for example:

case class Age(v: Int)

def add(value1: Int, value2: Int): Int = value1 + value2
def add(value1: Double, value2: Double): Double = value1 + value2
def add(value1: Age, value2: Age): Age = Age(value1.v + value2.v)
But this would result in some code duplication and a more generic way to write this would be using the method definition as seen from the Adder type class example above:
def add(value1: A, value2: A): A

But how would add know what is the way to add two values, the values could be Int or they could very well be Age data type. A way to do it is by using the type classes, their instances tied to specific data types, and injecting datatype specific instances to the method using implicits in Scala, giving ad-hoc polymorphism capabilities.

case class Age(v: Int)

trait Adder[A] {
  def add(value1: A, value2: A): A
}

object Adder {
  def apply[A: Adder]: Adder[A]              = implicitly[Adder[A]]
  def add[A: Adder](value1: A, value2: A): A = Adder[A].add(value1, value2)
}

object AdderInstances {
  implicit val intAdder: Adder[Int] = (value1, value2) => value1 + value2
  implicit val ageAdder: Adder[Age] = (value1, value2) => Age(Adder[Int].add(value1.v, value2.v))
}

import AdderInstances._
Adder.add(25, 25) // 50
Adder.add(Age(25), Age(25)) // Age(50)

What's tagless-final?

Let's look at a dumbed-down representation of a Stock Market with capabilities to buy and sell financial instruments.

import cats.Monad
import cats.effect.IO
import cats.implicits._
import scala.language.higherKinds

sealed trait OrderType
case object Buy  extends OrderType
case object Sell extends OrderType

sealed trait FinancialInstrument {
  val id: String
  val quantity: Int
  val price: Float
  val orderType: OrderType
}

case class FutureDerivativeInstrument(id: String, quantity: Int, price: Float, orderType: OrderType) extends FinancialInstrument
case class OptionDerivativeInstrument(id: String, quantity: Int, price: Float, orderType: OrderType) extends FinancialInstrument
case class EquityCashInstrument(id: String, quantity: Int, price: Float, orderType: OrderType)       extends FinancialInstrument

trait StockMarket[F[_]] {
  def buyInstrument(instrument: FinancialInstrument): F[FinancialInstrument]
  def sellInstrument(instrument: FinancialInstrument): F[FinancialInstrument]
}

object StockMarketInstances {
  implicit val instrument: StockMarket[IO] = new StockMarket[IO] {
    override def buyInstrument(instrument: FinancialInstrument): IO[FinancialInstrument]  = IO(instrument)
    override def sellInstrument(instrument: FinancialInstrument): IO[FinancialInstrument] = IO(instrument)
  }
}

object StockMarket {
  def apply[F[_]: StockMarket]: StockMarket[F] = implicitly[StockMarket[F]]

  def executeBuyOrder[F[_]: StockMarket](instrument: FinancialInstrument): F[FinancialInstrument]  = StockMarket[F].buyInstrument(instrument)
  def executeSellOrder[F[_]: StockMarket](instrument: FinancialInstrument): F[FinancialInstrument] = StockMarket[F].sellInstrument(instrument)
}

def placeOrder[F[_]: StockMarket: Monad](orders: Vector[FinancialInstrument]): F[Vector[FinancialInstrument]] = {
  orders.pure[F].flatMap { instruments =>
    instruments.traverse { instrument =>
      instrument.orderType match {
        case Buy  => StockMarket[F].buyInstrument(instrument)
        case Sell => StockMarket[F].sellInstrument(instrument)
      }
    } 
  }
}

import StockMarketInstances._
placeOrder[IO](Vector(OptionDerivativeInstrument("SA-121", 50, 100, Buy), FutureDerivativeInstrument("DS-991", 50, 100, Sell))).unsafeRunSync()

Here, StockMarket is a tagless-final type class that describes the capabilities of generic type F[_]. The core concept of tagless-final is declaring dependencies for which Scala's implicits are used.

Tagless-final pattern enables:

  • Capability to abstract over the higher-kinded type providing means to use any of the available Effect types in place of F, like Cats Effect IO, or Task, or Monix. That is, it enables Effect type indirection.
  • Ability to reason about the implementation of the polymorphic function (in the example above, the placeOrder function) by looking at the implicits required in order to be able to successfully use it. From placeOrder signature, it can be seen the function needs or uses an instance of StockMarket and Applicative to implement its functionality. That is, it enables Effect parametric reasoning.
  • The inclination to use the principle of least power by declaring only those type classes as implicit parameters that are needed for the placeOrder function implementation.
As an end note, it is only by following a disciplined approach as noted above the tagless-final would be useful but is not something that comes automatically just by using it.

Saturday, January 16, 2021

Background processing with Scala Cats Effect



Running any tasks with Scala Future in the background can be done using the following:

Snippet 1

  import scala.concurrent.duration._
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.concurrent.{Await, Future}

  val future1 = Future { Thread.sleep(4000); println("Work 1 Completed") }
  val future2 = Future { Thread.sleep(1000); println("Work 2 Completed") }

  val future3 =
    for {
      _ <- future1
      _ <- future2
    } yield ()

  Await.result(future3, Duration.Inf)

And if one has been working with Scala Future it'd be obvious that as soon as lines with variables future1 and future2 are executed the body inside the Future starts executing immediately, i.e the body of the Future is eagerly evaluated by submitting it for execution on a different Java Thread using the Implicit ExecutionContext available in the scope via the import. Now with the help of the for-comprehension, which is nothing but a combination of flatMap, the Futures are composed together to yield a Unit. And finally, Awaiting on future3 gets the result when both future1 and future2 are completed successfully (here Await is just used for demonstration purposes).

But if the above code snippet is changed a bit to the following:

Snippet 2

  val future =
    for {
      _ <- Future { Thread.sleep(4000); println("Work 1 Completed") }
      _ <- Future { Thread.sleep(1000); println("Work 2 Completed") }
    } yield ()

  Await.result(future, Duration.Inf)

both Futures will be executed sequentially because the second Future is not initialized/executed until the first Future is completed, i.e the above for-comprehension is the same as the following:

Snippet 3

  Future {
    Thread.sleep(4000); println("Work 1 Completed")
  }.flatMap { _ =>
      Future {
        Thread.sleep(1000); println("Work 2 Completed")
      }
    }

Similar results can be achieved using Cats Effect IO with the added benefit of referential transparency (note that the Scala Future is not referentially transparent).

Cats Effect version similar to Snippet 2 will look something like the following:

Snippet 4

import cats.effect.{ExitCode, IO, IOApp}
import scala.concurrent.duration._

object Test extends IOApp {

  def work[A](work: A, time: FiniteDuration): IO[Unit] =
    IO.sleep(time) *> IO(work)
      .flatMap(completedWork => IO(println(s"Done work: $completedWork")))

  val program: IO[Unit] =
    for {
      _ <- work("work 1", 4.second)
      _ <- work("work 2", 1.second)
    } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    program *> IO.never
}

In the above case, the second IO (inside for-comprehension) is not evaluated until the first IO is completed, i.e the IOs are run sequentially and the order in which the print line statements will be executed is "Done work: work 1" and then "Done work: work 2".

The Cats Effect version similar to Snippet 1 will look something like the following:

Snippet 5

val program1 = work("work 1", 4.second).start
val program2 = work("work 2", 1.second).start

val program: IO[Unit] =
  for {
    _ <- program1
    _ <- program2
    _ <- IO(println("for-comprehension done!"))
  } yield ()

wherein the order in which the print line statements will be executed is "for-comprehension done!" then "Done work: work 2" and then "Done work: work 1". Here, the start method uses ContextShift instead of Scala's ExecutionContext directly.

start returns a Fiber which can be canceled, and the following code snippet will cancel the Fiber returned by program1 as soon as the program2 is evaluated. 

Snippet 6

  val program1 = work("work 1", 4.second).start
  val program2 = work("work 2", 1.second).start

  val program: IO[Unit] =
    for {
      fiber1 <- program1
      _      <- program2
      _      <- fiber1.cancel
      _      <- IO(println("for-comprehension done!"))
    } yield ()

and in this case, the following will be the probable output on the console: "for-comprehension done!" then "Done work: work 2" and "Done work: work 1" won't be printed. "Probable" because the by the time program1 gets a chance to complete, fiber1.cancel line will be executed that'll cancel the execution of program1.








Friday, May 4, 2018

Stream a file to AWS S3 using Akka Streams (via Alpakka) in Play Framework

In this blog post we’ll see how a file can be streamed from a client (eg: browser) to Amazon S3 (AWS S3) using Alpakka’s AWS S3 connector. Aplakka provides various Akka Stream connectors, integration patterns and data transformations for integration use cases.
The example in this blog post uses Play Framework to provide a user interface to submit a file from a web page directly to AWS S3 without creating any temporary files (on the storage space) during the process. The file will be streamed to AWS S3 using S3’s multipart upload API.

(To understand this blog post basic knowledge of Play Framework and Akka Streams is required. Also, check out What can Reactive Streams offer EE4J by James Roper and also check its Servlet IO section to fully understand the extent to which the example mentioned in this blog post can be helpful)
Let’s begin by looking at the artifacts used for achieving the task at hand
  1. Scala 2.11.11
  2. Play Framework 2.6.10
  3. Alpakka S3 0.18
Now moving on to the fun part, let’s see what the code base will look like. We’ll first create a class for interacting with AWS S3 using the Alpakka S3 connector, let’s name the class as AwsS3Client.
@Singleton
class AwsS3Client @Inject()(system: ActorSystem, materializer: Materializer) {

  private val awsCredentials = new BasicAWSCredentials("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")
  private val awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials)
  private val regionProvider =
    new AwsRegionProvider {
      def getRegion: String = "us-west-2"
    }
  private val settings = new S3Settings(MemoryBufferType, None, awsCredentialsProvider, regionProvider, false, None, ListBucketVersion2)
  private val s3Client = new S3Client(settings)(system, materializer)

  def s3Sink(bucketName: String, bucketKey: String): Sink[ByteString, Future[MultipartUploadResult]] =
    s3Client.multipartUpload(bucketName, bucketKey)
}
From the first line it can be seen the class is marked as a Singleton, this is because we do not want multiple instances of this class to be created. From the next line it can be seen that ActorSystem and Materializer is injected which is required for configuring the Alpakka’s AWS S3 client. The next few lines are for configuring an instance of Alpakka’s AWS S3 client which will be used for interfacing with your AWS S3 bucket. Also, in the last section of the class there’s a behavior which returns a Akka Streams Sink, of type Sink[ByteSring, Future[MultipartUploadResult]], this Sink does the job of sending the file stream to AWS S3 bucket using AWS multipart upload API.
In order to make this class workable replace AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with your AWS S3 access key and secret key respectively. And replace us-west-2 with your respective AWS region.
Next, let’s look at how the s3Sink behavior of this call can be used to connect our Play Framework’s controller with AWS S3 multipart upload API. But before doing that and slightly digressing from the example [bear with me, it’s going to build up the example further :)], if you followed my previous blog post — Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework [containing Customer Management example] — you might have seen how a CustomerController was used to build a functionality wherein a Play Framework’s route was available to stream the customer data directly from PostgreSQL into a downloadable CSV file (without the need to buffering data as file on storage space). This blog post builds an example on top of the Customer Management example highlighted in the previous blog post. So, we’re going to use the same CustomerController but modify it a bit in terms of adding a new Play Framework’s Action for accepting the file from the web page.
For simplicity, let’s name the controller Action as upload, this Action is used for accepting a file from a web page via one of the reverse route. Let’s first look at the controller code base and then we’ll discuss about the reverse route.
@Singleton
class CustomerController @Inject()(cc: ControllerComponents, awsS3Client: AwsS3Client)
                                  (implicit ec: ExecutionContext) extends AbstractController(cc) {

  def upload: Action[MultipartFormData[MultipartUploadResult]] =
    Action(parse.multipartFormData(handleFilePartAwsUploadResult)) { request =>
      val maybeUploadResult =
        request.body.file("customers").map {
          case FilePart(key, filename, contentType, multipartUploadResult) =>
            multipartUploadResult
          }
 
      maybeUploadResult.fold(
        InternalServerError("Something went wrong!")
      )(uploadResult =>
        Ok(s"File ${uploadResult.key} upload to bucket ${uploadResult.bucket}")
      )
    }

   private def handleFilePartAwsUploadResult: Multipart.FilePartHandler[MultipartUploadResult] = {
     case FileInfo(partName, filename, contentType) =>
       val accumulator = Accumulator(awsS3Client.s3Sink("test-ocr", filename))

       accumulator map { multipartUploadResult =>
         FilePart(partName, filename, contentType, multipartUploadResult)
       }
   }
}

Dissecting the controller code base, it can be seen that the controller is a singleton and the AwsS3Client class that was created earlier is injected in the controller along with the Play ControllerComponents and ExecutionContext.
Let’s look at the private behavior of the CustomerController first, i.e handleFilePartAwsUploadResult. It can be seen that the return type of this behavior is
Multipart.FilePartHandler[MultipartUploadResult]
which is nothing but a Scala type defined inside Play’s Multipart object:
type FilePartHandler[A] = FileInfo => Accumulator[ByteString, FilePart[A]]
It should be noted here that the example uses multipart/form-data encoding for file upload, so the default multipartFormData parser is used by providing a FilePartHandler of type FilePartHandler[MultipartUploadResult]. The type of FilePartHandler is MultipartUploadResult because Alpakka AWS S3 Sink is of type Sink[ByteString, Future[MultipartUploadResult]] to which the file will be finally sent to.
Looking at this private behavior and understanding what it does, it accepts a case class of type FileInfo, creates an Accumulator from s3Sink and then finally maps the result of the Accumulator to a result of type FilePart.
NOTE: Accumulator is essentially a lightweight wrapper around Akka Sink that gets materialized to a Future. It provides convenient methods for working directly with Future as well as transforming the inputs.
Moving ahead and understanding the upload Action, it looks like any other normal Play Framework Action with the only difference that the request body is being parsed to MultipartFormData and then handled via our custom FilePartHandler, i.e handleFilePartAwsUploadResult, which was discussed earlier.
For connecting everything together, we need to enable an endpoint to facilitate this file upload and a view to be able to submit a file. Let’s add a new reverse route to the Play’s route file:
POST /upload controllers.CustomerController.upload
and a view to enable file upload from the user interface
@import helper._

@()(implicit request: RequestHeader)

@main("Customer Management Portal") {
  <h1><b>Upload Customers to AWS S3</b></h1>
  @helper.form(CSRF(routes.CustomerController.upload()), 'enctype -> "multipart/form-data") {
    <input type="file" name="customers">
    <br>
    <input type="submit">
  }
}
Note the CSRF which is required for the form as it is enabled by default in Play Framework.
The entire code base is available at the following repository playakkastreams.
Hope this helps, shout out your queries in the comment section :)
This article was first published on the Knoldus blog.

Tuesday, May 1, 2018

Using Microsoft SQL Server with Scala Slick

This blog post shows simple CRUD operations on Microsoft SQL Server using Scala Slick version 3.2.3. You might be thinking what’s really great about it? Duh! But until Scala Slick 3.2.x was released, using commercial databases was within the horizon of an additional closed source package know as Slick Extensions which supported Slick drivers for following databases
  1. Oracle
  2. IBM DB2
  3. Microsoft SQL Server


Library dependency used for Slick Extensions
libraryDependencies += "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
But with the newer version of Slick, i.e 3.2.x these drivers are now available within the Slick core package as open source release which can also be seen from the change log as well.
If you find yourself struggling with a setup to make Microsoft SQL Server work with Scala Slick in your project, maybe because of the lack of resources available on the web, then read up further :)

TL;DR

SQL Server database configurations
sqlserver = {
 driver = "slick.jdbc.SQLServerProfile$"
 db {
 host = ${?SQLSERVER_HOST}
 port = ${?SQLSERVER_PORT}
 databaseName = ${?SQLSERVER_DB_NAME}

 url = "jdbc:sqlserver://"${sqlserver.db.host}":"${sqlserver.db.port}";databaseName="${sqlserver.db.databaseName}
 user = ${?SQLSERVER_USERNAME}
 password = ${?SQLSERVER_PASSWORD}
 }
}
Database instance
val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig("sqlserver")
val db: JdbcProfile#Backend#Database = dbConfig.db

SBT project setup

For the example used in this blog post following dependencies and versions of respective artifacts are used
  1. Scala 2.11.11
  2. SBT 0.13.17
  3. Slick 3.2.3
  4. HikariCP 3.2.3
  5. Mssql JDBC 6.2.1.jre8
which inside our build.sbt file will look like the following set of instructions
name := "mssql-example"

version := "1.0"

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
 "com.typesafe.slick" %% "slick" % "3.2.3",
 "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
 "com.microsoft.sqlserver" % "mssql-jdbc" % "6.2.1.jre8"
)
and the instructions of build.properties file will be
sbt.version = 0.13.17
The settings required to configure Microsoft SQL Server should go inside application.conffile, whose instructions would be to specify the details of our database

sqlserver = {
 driver = "slick.jdbc.SQLServerProfile$"
 db {
  host = ${?SQLSERVER_HOST}
  port = ${?SQLSERVER_PORT}
  databaseName = ${?SQLSERVER_DB_NAME}

  url = "jdbc:sqlserver://"${sqlserver.db.host}":"${sqlserver.db.port}";databaseName="${sqlserver.db.databaseName}
  user = ${?SQLSERVER_USERNAME}
  password = ${?SQLSERVER_PASSWORD}
 }
}
where it can be seen that SQLSERVER_HOST, SQLSERVER_PORT, SQLSERVER_DB_NAME, SQLSERVER_USERNAME and SQLSERVER_PASSWORD are to be provided as environment variables.
Now moving onto our FRM (Functional Relational Mapping) and repository setup, the following import will be used for MS SQL Server Slick driver’s API
import slick.jdbc.SQLServerProfile.api._
And thereafter the FRM will look same as the rest of the FRM’s delineated on the official Slick documentation. For the example on this blog let’s use the following table structure
CREATE TABLE user_profiles (
 id         INT IDENTITY (1, 1) PRIMARY KEY,
 first_name VARCHAR(100) NOT NULL,
 last_name  VARCHAR(100) NOT NULL
)
whose functional relational mapping will look like this:
class UserProfiles(tag: Tag) extends Table[UserProfile](tag, "user_profiles") {

 def id: Rep[Int] = column[Int]("id", O.PrimaryKey, O.AutoInc)

 def firstName: Rep[String] = column[String]("first_name")

 def lastName: Rep[String] = column[String]("last_name")

 def * : ProvenShape[UserProfile] = (id, firstName, lastName) <>(UserProfile.tupled, UserProfile.unapply) // scalastyle:ignore

}
Moving further up with the CRUD operations, they are fairly straightforward as per the integrated query model provided by Slick, which can be seen from the following UserProfileRepository class
class UserProfileRepository {

 val userProfileQuery: TableQuery[UserProfiles] = TableQuery[UserProfiles]

 def insert(user: UserProfile): Future[Int] =
  db.run(userProfileQuery += user)

 def get(id: Int): Future[Option[UserProfile]] =
  db.run(
   userProfileQuery
    .filter(_.id === id)
    .take(1)
    .result
    .headOption)

 def update(id: Int, firstName: String): Future[Int] =
  db.run(
   userProfileQuery
    .filter(_.id === id)
    .map(_.firstName)
    .update(firstName))

 def delete(id: Int): Future[Int] =
  db.run(userProfileQuery.filter(_.id === id).delete)
Lastly, in order to get the database instance using the configurations provided in application.conf file, the following code snippet can be used
val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig("sqlserver")
val db: JdbcProfile#Backend#Database = dbConfig.db
Working codebase of this example is available at the following repository: scala-slick-mssql.
Also, if you’re interested in knowing how data can be directly streamed from PostgreSQL to a client using Akka Stream and Scala Slick then you might find the following article useful: Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework
This blog post has been inspired by an endeavor to make Microsoft SQL Server work with Slick and an answer on StackOverFlow which is the reference of the configurations.
Hope this helps :)
This article was first published on the Knoldus blog.

Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework

In this blog post I’ll try to explain the process wherein you can stream data directly from PostgreSQL database using Scala Slick (which is Scala’s database access/query library) and Akka Streams (which is an implementation of Reactive Streams specification on top of Akka toolkit) in Play Framework. The process is going to be pretty straightforward in terms of implementation where data is read from one of the tables in your SQL database as stream and then it is sent/streamed to one of the REST end point configured to download this data.



For better understanding let’s take an example of an application or service which is used for administering a huge customer base of an organisation/company. The person involved in administering the customer base wants to get the entire data-set of customers for let’s say auditing purpose. Based on requirements it would sometimes make sense to stream this data directly into a downloadable file which is what we are going to do in this blog post.
(For this blog post you should have a basic knowledge of using Play Framework and Slick library)

The example uses following dependencies
  1. Play Framework 2.6.10 (“com.typesafe.play” % “sbt-plugin” % “2.6.10”)
  2. Play-Slick 3.0.1 (“com.typesafe.play” %% “play-slick” % “3.0.1”)
  3. Akka Streams 2.5.8 (“com.typesafe.akka” %% “akka-stream” % “2.5.8”)
  4. PostgreSQL 42.1.4 (“org.postgresql” % “postgresql” % “42.1.4”)
Let’s start by assuming we have a customer table in our PostgreSQL database which has the following structure

CREATE TABLE customers (
  id        BIGSERIAL PRIMARY KEY,
  firstname VARCHAR(255) NOT NULL,
  lastname  VARCHAR(255),
  email     VARCHAR(255)
);
Slick’s functional relational mapping corresponding to this table structure should look like this
case class Customer(id: Long,
                    firstName: String,
                    lastName: String,
                    email: String)

trait CustomerTable extends HasDatabaseConfigProvider[slick.jdbc.JdbcProfile] {
  
  import profile.api._

  val customerQuery: TableQuery[CustomerMapping] = TableQuery[CustomerMapping]

  private[models] class CustomerMapping(tag: Tag) extends Table[Customer](tag, "customers") {

    def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)

    def firstName: Rep[String] = column[String]("firstname")

    def lastName: Rep[String] = column[String]("lastname")

    def email: Rep[String] = column[String]("email")

    def * : ProvenShape[Customer] = (id, firstName, lastName, email) <>(Customer.tupled, Customer.unapply)

  }

}
Now let’s use the customerQuery to get data from the customers table in the form of DatabasePublisher of type Customer, i.e DatabasePublisher[Customer], which is Slick’s implementation of reactive stream’s Publisher where Publisher is the (potential) unbounded sequence of elements that publishes the elements according to the demand from the Subscriber. We will define this inside CustomerRepository.

def customers: DatabasePublisher[Customer] =
  db.stream(
    customerQuery
      .result
      .withStatementParameters(
         rsType = ResultSetType.ForwardOnly,
         rsConcurrency = ResultSetConcurrency.ReadOnly,
         fetchSize = 10000)
      .transactionally)
Certain things to be noted when using PostgreSQL for streaming data/records, which is also noted in Slick’s Official documentation:
  1. The use of transactionally which enforces the code to run on a single Connection with auto commit set as false [setAutoCommit(false)], by default slick is set to run in auto commit mode.
  2. The use of fetchSize so that the JDBC driver does not fetch all rows to the memory (i.e on client side) at once but instead fetch the specified number of rows at a time.
  3. ResultSetType.ForwardOnly sets the type to allow results to be read sequentially so that the cursor will only move forward.
  4. ResultSetConcurrency.ReadOnly makes sure that the ResultSet may not be updated.
Only if all of the above is done will the streaming work properly for PostgreSQL else it won’t and the actions inside the stream behavior will fetch the entire dataset.
So, the database repository code base is now sorted out. Let’s focus on the controller and how it’ll stream this data to a downloadable file.
We can create a new Play controller for the purpose of managing all APIs related to the customers and this controller has access to the CustomerRepository we created earlier in which the customers method is defined and implemented.
We’ll use Play’s simple Result to stream the data to our client, i.e to the person administering the customers on /customers API (added to Play routes) by providing the customer stream to HttpEntity.Streamed case class like this
Result(
      header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
      body = HttpEntity.Streamed(csvSource, None, None))
The entire controller method would look something like this
def customers: Action[AnyContent] = Action { implicit request =>
  val customerDatabasePublisher = customerRepository.customers
  val customerSource = Source.fromPublisher(customerDatabasePublisher)

  val headerCSVSource = Source.single(ByteString(""""First Name","Last Name","Email"""" + "\n"))
  val customerCSVSource =
    customerSource.map(data => ByteString(s""""${data.firstName}","${data.lastName}","${data.email}"""" + "\n"))
  
  val csvSource = Source.combine(headerCSVSource, customerCSVSource)(Concat[ByteString])

  Result(
        header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
        body = HttpEntity.Streamed(csvSource, None, None))
}
Note that the DatabasePublisher[Customer] is converted to Source of Customer using the Source.fromPublisher helper method which is used to create a Source from Publisher.
Rest of the manipulations are done on the Source to convert the data into readable CSV file format.
Also, note the use of Source.combine method which is used to combine sources with fan-in strategy which in our case is Concat.
Hope this helps :)
The entire code base is available in the following repository playakkastreams.
This article was first published on the Knoldus blog.

How to setup and use Zookeeper in Scala using Apache Curator

In order to use Zookeeper to manage your project’s configurations across the cluster, first we will setup the zookeeper ensemble on our local machine (setup is for testing on a single machine) by following these steps:


1) Download a stable zookeeper release
2) Unpack it at three places and rename it to:
1
2
3
/home/user/Desktop/zookeeper1,
/home/user/Desktop/zookeeper2, and
/home/user/Desktop/zookeeper3

3) In order to use zookeeper we will need to setup configuration files for all servers.
Make a new file zoo.cfg,
/home/user/Desktop/zookeeper1/conf/zoo.cfg
and add following details:
1
2
3
4
5
6
7
8
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/user/Desktop/zookeeperData1
clientPort=2181
server.1= localhost:2888:3888
server.2= localhost:2889:3889
server.3= localhost:2890:3890

Similarly,
/home/user/Desktop/zookeeper2/conf/zoo.cfg, as:

1
2
3
4
5
6
7
8
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/user/Desktop/zookeeperData2
clientPort=2182
server.1= localhost:2888:3888
server.2= localhost:2889:3889
server.3= localhost:2890:3890

And,
/home/user/Desktop/zookeeper3/conf/zoo.cfg, as:

1
2
3
4
5
6
7
8
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/user/Desktop/zookeeperData3
clientPort=2183
server.1= localhost:2888:3888
server.2= localhost:2889:3889
server.3= localhost:2890:3890

4) Now we will have to define each server’s id by making a new file in:
/home/user/Desktop/zookeeperData1/myid
which should have: 1
/home/user/Desktop/zookeeperData2/myid
which should have: 2
/home/user/Desktop/zookeeperData3/myid
which should have: 3
5) Next, we will start zookeeper ensemble for each server in 3 different terminals:
cd /home/user/Desktop/zookeeper1
bin/zkServer.sh start
cd /home/user/Desktop/zookeeper2
bin/zkServer.sh start
cd /home/user/Desktop/zookeeper3
bin/zkServer.sh start
6) Now we will add some data in one of the ZNode of the zookeeper ensemble by following steps:
a) bin/zkCli.sh
b) create /test_node “Some data”
7) Then we will write the following code in order to setup a watcher for zookeeper node so as to get stored data from zookeeper server using apache curator as a library to interact with our zookeeper server.
Add the following dependency in your build.sbt file:
1
2
3
4
libraryDependencies ++= Seq(
"org.apache.curator" % "curator-framework" % "2.6.0",
"org.apache.curator" % "curator-recipes" % "2.6.0"
)

and use this to interact with the zookeeper server:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ZookeeperClient {
 
 private val logger = LoggerFactory.getLogger(this.getClass.getName)
 
 def main(args: Array[String]) = {
  val retryPolicy = new ExponentialBackoffRetry(1000, 3)
  val curatorZookeeperClient = CuratorFrameworkFactory.newClient("localhost:2181,localhost:2182,localhost:2183", retryPolicy)
  curatorZookeeperClient.start
  curatorZookeeperClient.getZookeeperClient.blockUntilConnectedOrTimedOut
  val znodePath = "/test_node"
  val originalData = new String(curatorZookeeperClient.getData.forPath(znodePath)) // This should be "Some data"
 
  /* Zookeeper NodeCache service to get properties from ZNode */
  val nodeCache = new NodeCache(curatorZookeeperClient, znodePath)
  nodeCache.getListenable.addListener(new NodeCacheListener {
 
  @Override
  def nodeChanged = {
   try {
    val dataFromZNode = nodeCache.getCurrentData
    val newData = new String(currentData.getData) // This should be some new data after it is changed in the Zookeeper ensemble
   } catch {
    case ex: Exception => logger.error("Exception while fetching properties from zookeeper ZNode, reason " + ex.getCause)
   }
  }

  nodeCache.start
  })
 }
}


This article was first published on the Knoldus blog.