Showing posts with label Parallel Programming. Show all posts
Showing posts with label Parallel Programming. Show all posts

Friday, May 4, 2018

Stream a file to AWS S3 using Akka Streams (via Alpakka) in Play Framework

In this blog post we’ll see how a file can be streamed from a client (eg: browser) to Amazon S3 (AWS S3) using Alpakka’s AWS S3 connector. Aplakka provides various Akka Stream connectors, integration patterns and data transformations for integration use cases.
The example in this blog post uses Play Framework to provide a user interface to submit a file from a web page directly to AWS S3 without creating any temporary files (on the storage space) during the process. The file will be streamed to AWS S3 using S3’s multipart upload API.

(To understand this blog post basic knowledge of Play Framework and Akka Streams is required. Also, check out What can Reactive Streams offer EE4J by James Roper and also check its Servlet IO section to fully understand the extent to which the example mentioned in this blog post can be helpful)
Let’s begin by looking at the artifacts used for achieving the task at hand
  1. Scala 2.11.11
  2. Play Framework 2.6.10
  3. Alpakka S3 0.18
Now moving on to the fun part, let’s see what the code base will look like. We’ll first create a class for interacting with AWS S3 using the Alpakka S3 connector, let’s name the class as AwsS3Client.
@Singleton
class AwsS3Client @Inject()(system: ActorSystem, materializer: Materializer) {

  private val awsCredentials = new BasicAWSCredentials("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")
  private val awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials)
  private val regionProvider =
    new AwsRegionProvider {
      def getRegion: String = "us-west-2"
    }
  private val settings = new S3Settings(MemoryBufferType, None, awsCredentialsProvider, regionProvider, false, None, ListBucketVersion2)
  private val s3Client = new S3Client(settings)(system, materializer)

  def s3Sink(bucketName: String, bucketKey: String): Sink[ByteString, Future[MultipartUploadResult]] =
    s3Client.multipartUpload(bucketName, bucketKey)
}
From the first line it can be seen the class is marked as a Singleton, this is because we do not want multiple instances of this class to be created. From the next line it can be seen that ActorSystem and Materializer is injected which is required for configuring the Alpakka’s AWS S3 client. The next few lines are for configuring an instance of Alpakka’s AWS S3 client which will be used for interfacing with your AWS S3 bucket. Also, in the last section of the class there’s a behavior which returns a Akka Streams Sink, of type Sink[ByteSring, Future[MultipartUploadResult]], this Sink does the job of sending the file stream to AWS S3 bucket using AWS multipart upload API.
In order to make this class workable replace AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with your AWS S3 access key and secret key respectively. And replace us-west-2 with your respective AWS region.
Next, let’s look at how the s3Sink behavior of this call can be used to connect our Play Framework’s controller with AWS S3 multipart upload API. But before doing that and slightly digressing from the example [bear with me, it’s going to build up the example further :)], if you followed my previous blog post — Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework [containing Customer Management example] — you might have seen how a CustomerController was used to build a functionality wherein a Play Framework’s route was available to stream the customer data directly from PostgreSQL into a downloadable CSV file (without the need to buffering data as file on storage space). This blog post builds an example on top of the Customer Management example highlighted in the previous blog post. So, we’re going to use the same CustomerController but modify it a bit in terms of adding a new Play Framework’s Action for accepting the file from the web page.
For simplicity, let’s name the controller Action as upload, this Action is used for accepting a file from a web page via one of the reverse route. Let’s first look at the controller code base and then we’ll discuss about the reverse route.
@Singleton
class CustomerController @Inject()(cc: ControllerComponents, awsS3Client: AwsS3Client)
                                  (implicit ec: ExecutionContext) extends AbstractController(cc) {

  def upload: Action[MultipartFormData[MultipartUploadResult]] =
    Action(parse.multipartFormData(handleFilePartAwsUploadResult)) { request =>
      val maybeUploadResult =
        request.body.file("customers").map {
          case FilePart(key, filename, contentType, multipartUploadResult) =>
            multipartUploadResult
          }
 
      maybeUploadResult.fold(
        InternalServerError("Something went wrong!")
      )(uploadResult =>
        Ok(s"File ${uploadResult.key} upload to bucket ${uploadResult.bucket}")
      )
    }

   private def handleFilePartAwsUploadResult: Multipart.FilePartHandler[MultipartUploadResult] = {
     case FileInfo(partName, filename, contentType) =>
       val accumulator = Accumulator(awsS3Client.s3Sink("test-ocr", filename))

       accumulator map { multipartUploadResult =>
         FilePart(partName, filename, contentType, multipartUploadResult)
       }
   }
}

Dissecting the controller code base, it can be seen that the controller is a singleton and the AwsS3Client class that was created earlier is injected in the controller along with the Play ControllerComponents and ExecutionContext.
Let’s look at the private behavior of the CustomerController first, i.e handleFilePartAwsUploadResult. It can be seen that the return type of this behavior is
Multipart.FilePartHandler[MultipartUploadResult]
which is nothing but a Scala type defined inside Play’s Multipart object:
type FilePartHandler[A] = FileInfo => Accumulator[ByteString, FilePart[A]]
It should be noted here that the example uses multipart/form-data encoding for file upload, so the default multipartFormData parser is used by providing a FilePartHandler of type FilePartHandler[MultipartUploadResult]. The type of FilePartHandler is MultipartUploadResult because Alpakka AWS S3 Sink is of type Sink[ByteString, Future[MultipartUploadResult]] to which the file will be finally sent to.
Looking at this private behavior and understanding what it does, it accepts a case class of type FileInfo, creates an Accumulator from s3Sink and then finally maps the result of the Accumulator to a result of type FilePart.
NOTE: Accumulator is essentially a lightweight wrapper around Akka Sink that gets materialized to a Future. It provides convenient methods for working directly with Future as well as transforming the inputs.
Moving ahead and understanding the upload Action, it looks like any other normal Play Framework Action with the only difference that the request body is being parsed to MultipartFormData and then handled via our custom FilePartHandler, i.e handleFilePartAwsUploadResult, which was discussed earlier.
For connecting everything together, we need to enable an endpoint to facilitate this file upload and a view to be able to submit a file. Let’s add a new reverse route to the Play’s route file:
POST /upload controllers.CustomerController.upload
and a view to enable file upload from the user interface
@import helper._

@()(implicit request: RequestHeader)

@main("Customer Management Portal") {
  <h1><b>Upload Customers to AWS S3</b></h1>
  @helper.form(CSRF(routes.CustomerController.upload()), 'enctype -> "multipart/form-data") {
    <input type="file" name="customers">
    <br>
    <input type="submit">
  }
}
Note the CSRF which is required for the form as it is enabled by default in Play Framework.
The entire code base is available at the following repository playakkastreams.
Hope this helps, shout out your queries in the comment section :)
This article was first published on the Knoldus blog.

Tuesday, May 1, 2018

Using Microsoft SQL Server with Scala Slick

This blog post shows simple CRUD operations on Microsoft SQL Server using Scala Slick version 3.2.3. You might be thinking what’s really great about it? Duh! But until Scala Slick 3.2.x was released, using commercial databases was within the horizon of an additional closed source package know as Slick Extensions which supported Slick drivers for following databases
  1. Oracle
  2. IBM DB2
  3. Microsoft SQL Server


Library dependency used for Slick Extensions
libraryDependencies += "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
But with the newer version of Slick, i.e 3.2.x these drivers are now available within the Slick core package as open source release which can also be seen from the change log as well.
If you find yourself struggling with a setup to make Microsoft SQL Server work with Scala Slick in your project, maybe because of the lack of resources available on the web, then read up further :)

TL;DR

SQL Server database configurations
sqlserver = {
 driver = "slick.jdbc.SQLServerProfile$"
 db {
 host = ${?SQLSERVER_HOST}
 port = ${?SQLSERVER_PORT}
 databaseName = ${?SQLSERVER_DB_NAME}

 url = "jdbc:sqlserver://"${sqlserver.db.host}":"${sqlserver.db.port}";databaseName="${sqlserver.db.databaseName}
 user = ${?SQLSERVER_USERNAME}
 password = ${?SQLSERVER_PASSWORD}
 }
}
Database instance
val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig("sqlserver")
val db: JdbcProfile#Backend#Database = dbConfig.db

SBT project setup

For the example used in this blog post following dependencies and versions of respective artifacts are used
  1. Scala 2.11.11
  2. SBT 0.13.17
  3. Slick 3.2.3
  4. HikariCP 3.2.3
  5. Mssql JDBC 6.2.1.jre8
which inside our build.sbt file will look like the following set of instructions
name := "mssql-example"

version := "1.0"

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
 "com.typesafe.slick" %% "slick" % "3.2.3",
 "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
 "com.microsoft.sqlserver" % "mssql-jdbc" % "6.2.1.jre8"
)
and the instructions of build.properties file will be
sbt.version = 0.13.17
The settings required to configure Microsoft SQL Server should go inside application.conffile, whose instructions would be to specify the details of our database

sqlserver = {
 driver = "slick.jdbc.SQLServerProfile$"
 db {
  host = ${?SQLSERVER_HOST}
  port = ${?SQLSERVER_PORT}
  databaseName = ${?SQLSERVER_DB_NAME}

  url = "jdbc:sqlserver://"${sqlserver.db.host}":"${sqlserver.db.port}";databaseName="${sqlserver.db.databaseName}
  user = ${?SQLSERVER_USERNAME}
  password = ${?SQLSERVER_PASSWORD}
 }
}
where it can be seen that SQLSERVER_HOST, SQLSERVER_PORT, SQLSERVER_DB_NAME, SQLSERVER_USERNAME and SQLSERVER_PASSWORD are to be provided as environment variables.
Now moving onto our FRM (Functional Relational Mapping) and repository setup, the following import will be used for MS SQL Server Slick driver’s API
import slick.jdbc.SQLServerProfile.api._
And thereafter the FRM will look same as the rest of the FRM’s delineated on the official Slick documentation. For the example on this blog let’s use the following table structure
CREATE TABLE user_profiles (
 id         INT IDENTITY (1, 1) PRIMARY KEY,
 first_name VARCHAR(100) NOT NULL,
 last_name  VARCHAR(100) NOT NULL
)
whose functional relational mapping will look like this:
class UserProfiles(tag: Tag) extends Table[UserProfile](tag, "user_profiles") {

 def id: Rep[Int] = column[Int]("id", O.PrimaryKey, O.AutoInc)

 def firstName: Rep[String] = column[String]("first_name")

 def lastName: Rep[String] = column[String]("last_name")

 def * : ProvenShape[UserProfile] = (id, firstName, lastName) <>(UserProfile.tupled, UserProfile.unapply) // scalastyle:ignore

}
Moving further up with the CRUD operations, they are fairly straightforward as per the integrated query model provided by Slick, which can be seen from the following UserProfileRepository class
class UserProfileRepository {

 val userProfileQuery: TableQuery[UserProfiles] = TableQuery[UserProfiles]

 def insert(user: UserProfile): Future[Int] =
  db.run(userProfileQuery += user)

 def get(id: Int): Future[Option[UserProfile]] =
  db.run(
   userProfileQuery
    .filter(_.id === id)
    .take(1)
    .result
    .headOption)

 def update(id: Int, firstName: String): Future[Int] =
  db.run(
   userProfileQuery
    .filter(_.id === id)
    .map(_.firstName)
    .update(firstName))

 def delete(id: Int): Future[Int] =
  db.run(userProfileQuery.filter(_.id === id).delete)
Lastly, in order to get the database instance using the configurations provided in application.conf file, the following code snippet can be used
val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig("sqlserver")
val db: JdbcProfile#Backend#Database = dbConfig.db
Working codebase of this example is available at the following repository: scala-slick-mssql.
Also, if you’re interested in knowing how data can be directly streamed from PostgreSQL to a client using Akka Stream and Scala Slick then you might find the following article useful: Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework
This blog post has been inspired by an endeavor to make Microsoft SQL Server work with Slick and an answer on StackOverFlow which is the reference of the configurations.
Hope this helps :)
This article was first published on the Knoldus blog.

Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework

In this blog post I’ll try to explain the process wherein you can stream data directly from PostgreSQL database using Scala Slick (which is Scala’s database access/query library) and Akka Streams (which is an implementation of Reactive Streams specification on top of Akka toolkit) in Play Framework. The process is going to be pretty straightforward in terms of implementation where data is read from one of the tables in your SQL database as stream and then it is sent/streamed to one of the REST end point configured to download this data.



For better understanding let’s take an example of an application or service which is used for administering a huge customer base of an organisation/company. The person involved in administering the customer base wants to get the entire data-set of customers for let’s say auditing purpose. Based on requirements it would sometimes make sense to stream this data directly into a downloadable file which is what we are going to do in this blog post.
(For this blog post you should have a basic knowledge of using Play Framework and Slick library)

The example uses following dependencies
  1. Play Framework 2.6.10 (“com.typesafe.play” % “sbt-plugin” % “2.6.10”)
  2. Play-Slick 3.0.1 (“com.typesafe.play” %% “play-slick” % “3.0.1”)
  3. Akka Streams 2.5.8 (“com.typesafe.akka” %% “akka-stream” % “2.5.8”)
  4. PostgreSQL 42.1.4 (“org.postgresql” % “postgresql” % “42.1.4”)
Let’s start by assuming we have a customer table in our PostgreSQL database which has the following structure

CREATE TABLE customers (
  id        BIGSERIAL PRIMARY KEY,
  firstname VARCHAR(255) NOT NULL,
  lastname  VARCHAR(255),
  email     VARCHAR(255)
);
Slick’s functional relational mapping corresponding to this table structure should look like this
case class Customer(id: Long,
                    firstName: String,
                    lastName: String,
                    email: String)

trait CustomerTable extends HasDatabaseConfigProvider[slick.jdbc.JdbcProfile] {
  
  import profile.api._

  val customerQuery: TableQuery[CustomerMapping] = TableQuery[CustomerMapping]

  private[models] class CustomerMapping(tag: Tag) extends Table[Customer](tag, "customers") {

    def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)

    def firstName: Rep[String] = column[String]("firstname")

    def lastName: Rep[String] = column[String]("lastname")

    def email: Rep[String] = column[String]("email")

    def * : ProvenShape[Customer] = (id, firstName, lastName, email) <>(Customer.tupled, Customer.unapply)

  }

}
Now let’s use the customerQuery to get data from the customers table in the form of DatabasePublisher of type Customer, i.e DatabasePublisher[Customer], which is Slick’s implementation of reactive stream’s Publisher where Publisher is the (potential) unbounded sequence of elements that publishes the elements according to the demand from the Subscriber. We will define this inside CustomerRepository.

def customers: DatabasePublisher[Customer] =
  db.stream(
    customerQuery
      .result
      .withStatementParameters(
         rsType = ResultSetType.ForwardOnly,
         rsConcurrency = ResultSetConcurrency.ReadOnly,
         fetchSize = 10000)
      .transactionally)
Certain things to be noted when using PostgreSQL for streaming data/records, which is also noted in Slick’s Official documentation:
  1. The use of transactionally which enforces the code to run on a single Connection with auto commit set as false [setAutoCommit(false)], by default slick is set to run in auto commit mode.
  2. The use of fetchSize so that the JDBC driver does not fetch all rows to the memory (i.e on client side) at once but instead fetch the specified number of rows at a time.
  3. ResultSetType.ForwardOnly sets the type to allow results to be read sequentially so that the cursor will only move forward.
  4. ResultSetConcurrency.ReadOnly makes sure that the ResultSet may not be updated.
Only if all of the above is done will the streaming work properly for PostgreSQL else it won’t and the actions inside the stream behavior will fetch the entire dataset.
So, the database repository code base is now sorted out. Let’s focus on the controller and how it’ll stream this data to a downloadable file.
We can create a new Play controller for the purpose of managing all APIs related to the customers and this controller has access to the CustomerRepository we created earlier in which the customers method is defined and implemented.
We’ll use Play’s simple Result to stream the data to our client, i.e to the person administering the customers on /customers API (added to Play routes) by providing the customer stream to HttpEntity.Streamed case class like this
Result(
      header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
      body = HttpEntity.Streamed(csvSource, None, None))
The entire controller method would look something like this
def customers: Action[AnyContent] = Action { implicit request =>
  val customerDatabasePublisher = customerRepository.customers
  val customerSource = Source.fromPublisher(customerDatabasePublisher)

  val headerCSVSource = Source.single(ByteString(""""First Name","Last Name","Email"""" + "\n"))
  val customerCSVSource =
    customerSource.map(data => ByteString(s""""${data.firstName}","${data.lastName}","${data.email}"""" + "\n"))
  
  val csvSource = Source.combine(headerCSVSource, customerCSVSource)(Concat[ByteString])

  Result(
        header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
        body = HttpEntity.Streamed(csvSource, None, None))
}
Note that the DatabasePublisher[Customer] is converted to Source of Customer using the Source.fromPublisher helper method which is used to create a Source from Publisher.
Rest of the manipulations are done on the Source to convert the data into readable CSV file format.
Also, note the use of Source.combine method which is used to combine sources with fan-in strategy which in our case is Concat.
Hope this helps :)
The entire code base is available in the following repository playakkastreams.
This article was first published on the Knoldus blog.

Parallelism in Clojure

An axiom of microprocessor development stating that the number of transistors on integrated circuits doubles approximately every two years is analogous to the fact the processing power doubles during the same period, relative to the cost or size. This has lead to the need of parallelism in programming languages and Clojure is one such language which provides a variety of functions for your multi-threaded code. In this article we’ll discuss about futures, atoms and refs which are all a consequential part of parallel programming.


Futures
Futures are supported out-of-box in clojure. They can be used to simply start a new memory intensive operation in a new thread which typically runs in a thread pool. Dereferencing a future may yield a result immediately or it will block until the future is done.
Let’s start with an example of a memory intensive operation. Here, we will try to write a range of numerical values in separate files, with each write operation for a file done with a separate future.
1
2
3
4
5
6
7
user=> (def a
 #_=>      (doall
 #_=>         (map
 #_=>            (fn [n] (future (spit (str (gensym "test") ".txt")
 #_=>                                        (apply str (doall (map inc (range n)))))))
 #_=>            [10000000 1000000 10000000])))
 #'user/a

In the above code doall causes the entire lazyseq to reside in memory at one time. As “map” maps through the vector of length 3, so 3 futures are started where each future starts a new spit operation on test[random number].txt files to write the range of numbers defined by the vector “[10000000 1000000 10000000]”. The above code will return a lazyseq of futures which will be stored in “a”.
Now let’s check when the future gets completed by using the following code:


1
2
3
4
user=> (map future-done? a)
 (false true false)
 user=> (map future-done? a)
 (true true false)

future-done? returns true if the following future is done. On my system the future operation with range of “10000000” takes more time as compared to the range of “1000000”.
So, “(map future-done? a)” for the first time returns (false true false), meaning that the second future is done but the first and third futures are still processing.
After awhile, “(map future-done? a)” returns (true true false), meaning that first and second futures are done.
The order of completion of futures may vary on your system, but eventually all will be done after awhile. This means that you can offload heavy operations in different threads and continue with other tasks.


Atoms
In the most simple terms atoms are uncoordinated and synchronous. We use atoms only when one identity needs to be updated synchronously. Synchronous access means that all values are updated before continuing further with the next update. Atoms simply ensures that in a multi-threaded environment the values of atom are either updated entirely or not at all.
We’ll start with the most basic example:
1
2
3
4
user=> (def a (atom {:firstname "" :lastname ""}))
 #'user/a
 user=> @a
 {:firstname "", :lastname ""}

Here we’ve defined a map as an atom. In order to find the value stored in an atom we use deref or a short form of deref “@”. Dereferencing returns the value stored in “a”, in this case we get the map defined during initialization.
To change the value of an atom we can either use swap! or reset!


1
2
user=> (swap! a #(assoc % :firstname %2 :lastname %3) "Sidharth" "Khattri")
 {:firstname "Sidharth", :lastname "Khattri"}

swap! takes atom as the first argument and the function to apply on the value stored in the atom as the second argument. In the above case we’ve defined an anonymous function that associates a new value to the previously defined map and returns that new value. Internally swap! applies compare-and-set! on the new values to cross check for any race conditions. If their exist a race condition i.e if two threads are trying to apply same function on the atom simultaneously and if the value of the atom has changed since they first began swapping, the loser thread will try to swap again until the present value is same as the last commit.

1
2
user=> (reset! a 1)
 1

reset! sets the value of atom to a new value without regard for the previously stored value in the atom.
Refs
Unlike atoms, refs are coordinated and are used for synchronous access. They are used when multiple identities have to be changed synchronously and they use Software Transactional Memory System (STM) for memory transactions. Any changes to the values of refs have to be done in transactional blocks, i.e sync or dosync.
Refs are defined in the same way atoms are defined:
1
2
3
4
user=> (def task-to-be-done (ref #{1,2,3,4,5}))
 #'user/task-to-be-done
 user=> (def task-done (ref #{}))
 #'user/task-done

Here, we have defined two tasks that have to be updated simultaneously, i.e task-to-be-done and task-done, which are defined as refs containing hashsets.


1
2
3
4
5
user=> (defn update-values [value]
 #_=>      (dosync
 #_=>        (alter task-to-be-done disj value)
 #_=>          (alter task-done conj value)))
 #'user/check

Then we define a function – update-values, to alter the values of both the refs simultaneously in the dosync transactional block. “alter” is used to change the in-transactional-value of the refs. “disj” returns a new set that does not contain the key(s) passed as the argument. Similarly, “conj” returns a new set that contains the key(s) passed as the argument.

1
2
3
4
5
6
user=> (update-values 1)
 #{1}
 user=> @task-to-be-done
 #{2 3 4 5}
 user=> @task-done
 #{1}

On passing some value to “update-values” function, the values of both refs are updated simultaneously. In order to view the values stored in refs we use the same deref or “@” as used in atoms. Dereferencing task-to-be-done and @task-done returns the values stored in the refs.
Thus refs are used for managing multiple data structures in transactional blocks, atomically.
This article was first published on the Knoldus blog.