Showing posts with label Akka Streams. Show all posts
Showing posts with label Akka Streams. Show all posts

Friday, May 4, 2018

Stream a file to AWS S3 using Akka Streams (via Alpakka) in Play Framework

In this blog post we’ll see how a file can be streamed from a client (eg: browser) to Amazon S3 (AWS S3) using Alpakka’s AWS S3 connector. Aplakka provides various Akka Stream connectors, integration patterns and data transformations for integration use cases.
The example in this blog post uses Play Framework to provide a user interface to submit a file from a web page directly to AWS S3 without creating any temporary files (on the storage space) during the process. The file will be streamed to AWS S3 using S3’s multipart upload API.

(To understand this blog post basic knowledge of Play Framework and Akka Streams is required. Also, check out What can Reactive Streams offer EE4J by James Roper and also check its Servlet IO section to fully understand the extent to which the example mentioned in this blog post can be helpful)
Let’s begin by looking at the artifacts used for achieving the task at hand
  1. Scala 2.11.11
  2. Play Framework 2.6.10
  3. Alpakka S3 0.18
Now moving on to the fun part, let’s see what the code base will look like. We’ll first create a class for interacting with AWS S3 using the Alpakka S3 connector, let’s name the class as AwsS3Client.
@Singleton
class AwsS3Client @Inject()(system: ActorSystem, materializer: Materializer) {

  private val awsCredentials = new BasicAWSCredentials("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")
  private val awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials)
  private val regionProvider =
    new AwsRegionProvider {
      def getRegion: String = "us-west-2"
    }
  private val settings = new S3Settings(MemoryBufferType, None, awsCredentialsProvider, regionProvider, false, None, ListBucketVersion2)
  private val s3Client = new S3Client(settings)(system, materializer)

  def s3Sink(bucketName: String, bucketKey: String): Sink[ByteString, Future[MultipartUploadResult]] =
    s3Client.multipartUpload(bucketName, bucketKey)
}
From the first line it can be seen the class is marked as a Singleton, this is because we do not want multiple instances of this class to be created. From the next line it can be seen that ActorSystem and Materializer is injected which is required for configuring the Alpakka’s AWS S3 client. The next few lines are for configuring an instance of Alpakka’s AWS S3 client which will be used for interfacing with your AWS S3 bucket. Also, in the last section of the class there’s a behavior which returns a Akka Streams Sink, of type Sink[ByteSring, Future[MultipartUploadResult]], this Sink does the job of sending the file stream to AWS S3 bucket using AWS multipart upload API.
In order to make this class workable replace AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with your AWS S3 access key and secret key respectively. And replace us-west-2 with your respective AWS region.
Next, let’s look at how the s3Sink behavior of this call can be used to connect our Play Framework’s controller with AWS S3 multipart upload API. But before doing that and slightly digressing from the example [bear with me, it’s going to build up the example further :)], if you followed my previous blog post — Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework [containing Customer Management example] — you might have seen how a CustomerController was used to build a functionality wherein a Play Framework’s route was available to stream the customer data directly from PostgreSQL into a downloadable CSV file (without the need to buffering data as file on storage space). This blog post builds an example on top of the Customer Management example highlighted in the previous blog post. So, we’re going to use the same CustomerController but modify it a bit in terms of adding a new Play Framework’s Action for accepting the file from the web page.
For simplicity, let’s name the controller Action as upload, this Action is used for accepting a file from a web page via one of the reverse route. Let’s first look at the controller code base and then we’ll discuss about the reverse route.
@Singleton
class CustomerController @Inject()(cc: ControllerComponents, awsS3Client: AwsS3Client)
                                  (implicit ec: ExecutionContext) extends AbstractController(cc) {

  def upload: Action[MultipartFormData[MultipartUploadResult]] =
    Action(parse.multipartFormData(handleFilePartAwsUploadResult)) { request =>
      val maybeUploadResult =
        request.body.file("customers").map {
          case FilePart(key, filename, contentType, multipartUploadResult) =>
            multipartUploadResult
          }
 
      maybeUploadResult.fold(
        InternalServerError("Something went wrong!")
      )(uploadResult =>
        Ok(s"File ${uploadResult.key} upload to bucket ${uploadResult.bucket}")
      )
    }

   private def handleFilePartAwsUploadResult: Multipart.FilePartHandler[MultipartUploadResult] = {
     case FileInfo(partName, filename, contentType) =>
       val accumulator = Accumulator(awsS3Client.s3Sink("test-ocr", filename))

       accumulator map { multipartUploadResult =>
         FilePart(partName, filename, contentType, multipartUploadResult)
       }
   }
}

Dissecting the controller code base, it can be seen that the controller is a singleton and the AwsS3Client class that was created earlier is injected in the controller along with the Play ControllerComponents and ExecutionContext.
Let’s look at the private behavior of the CustomerController first, i.e handleFilePartAwsUploadResult. It can be seen that the return type of this behavior is
Multipart.FilePartHandler[MultipartUploadResult]
which is nothing but a Scala type defined inside Play’s Multipart object:
type FilePartHandler[A] = FileInfo => Accumulator[ByteString, FilePart[A]]
It should be noted here that the example uses multipart/form-data encoding for file upload, so the default multipartFormData parser is used by providing a FilePartHandler of type FilePartHandler[MultipartUploadResult]. The type of FilePartHandler is MultipartUploadResult because Alpakka AWS S3 Sink is of type Sink[ByteString, Future[MultipartUploadResult]] to which the file will be finally sent to.
Looking at this private behavior and understanding what it does, it accepts a case class of type FileInfo, creates an Accumulator from s3Sink and then finally maps the result of the Accumulator to a result of type FilePart.
NOTE: Accumulator is essentially a lightweight wrapper around Akka Sink that gets materialized to a Future. It provides convenient methods for working directly with Future as well as transforming the inputs.
Moving ahead and understanding the upload Action, it looks like any other normal Play Framework Action with the only difference that the request body is being parsed to MultipartFormData and then handled via our custom FilePartHandler, i.e handleFilePartAwsUploadResult, which was discussed earlier.
For connecting everything together, we need to enable an endpoint to facilitate this file upload and a view to be able to submit a file. Let’s add a new reverse route to the Play’s route file:
POST /upload controllers.CustomerController.upload
and a view to enable file upload from the user interface
@import helper._

@()(implicit request: RequestHeader)

@main("Customer Management Portal") {
  <h1><b>Upload Customers to AWS S3</b></h1>
  @helper.form(CSRF(routes.CustomerController.upload()), 'enctype -> "multipart/form-data") {
    <input type="file" name="customers">
    <br>
    <input type="submit">
  }
}
Note the CSRF which is required for the form as it is enabled by default in Play Framework.
The entire code base is available at the following repository playakkastreams.
Hope this helps, shout out your queries in the comment section :)
This article was first published on the Knoldus blog.

Tuesday, May 1, 2018

Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework

In this blog post I’ll try to explain the process wherein you can stream data directly from PostgreSQL database using Scala Slick (which is Scala’s database access/query library) and Akka Streams (which is an implementation of Reactive Streams specification on top of Akka toolkit) in Play Framework. The process is going to be pretty straightforward in terms of implementation where data is read from one of the tables in your SQL database as stream and then it is sent/streamed to one of the REST end point configured to download this data.



For better understanding let’s take an example of an application or service which is used for administering a huge customer base of an organisation/company. The person involved in administering the customer base wants to get the entire data-set of customers for let’s say auditing purpose. Based on requirements it would sometimes make sense to stream this data directly into a downloadable file which is what we are going to do in this blog post.
(For this blog post you should have a basic knowledge of using Play Framework and Slick library)

The example uses following dependencies
  1. Play Framework 2.6.10 (“com.typesafe.play” % “sbt-plugin” % “2.6.10”)
  2. Play-Slick 3.0.1 (“com.typesafe.play” %% “play-slick” % “3.0.1”)
  3. Akka Streams 2.5.8 (“com.typesafe.akka” %% “akka-stream” % “2.5.8”)
  4. PostgreSQL 42.1.4 (“org.postgresql” % “postgresql” % “42.1.4”)
Let’s start by assuming we have a customer table in our PostgreSQL database which has the following structure

CREATE TABLE customers (
  id        BIGSERIAL PRIMARY KEY,
  firstname VARCHAR(255) NOT NULL,
  lastname  VARCHAR(255),
  email     VARCHAR(255)
);
Slick’s functional relational mapping corresponding to this table structure should look like this
case class Customer(id: Long,
                    firstName: String,
                    lastName: String,
                    email: String)

trait CustomerTable extends HasDatabaseConfigProvider[slick.jdbc.JdbcProfile] {
  
  import profile.api._

  val customerQuery: TableQuery[CustomerMapping] = TableQuery[CustomerMapping]

  private[models] class CustomerMapping(tag: Tag) extends Table[Customer](tag, "customers") {

    def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)

    def firstName: Rep[String] = column[String]("firstname")

    def lastName: Rep[String] = column[String]("lastname")

    def email: Rep[String] = column[String]("email")

    def * : ProvenShape[Customer] = (id, firstName, lastName, email) <>(Customer.tupled, Customer.unapply)

  }

}
Now let’s use the customerQuery to get data from the customers table in the form of DatabasePublisher of type Customer, i.e DatabasePublisher[Customer], which is Slick’s implementation of reactive stream’s Publisher where Publisher is the (potential) unbounded sequence of elements that publishes the elements according to the demand from the Subscriber. We will define this inside CustomerRepository.

def customers: DatabasePublisher[Customer] =
  db.stream(
    customerQuery
      .result
      .withStatementParameters(
         rsType = ResultSetType.ForwardOnly,
         rsConcurrency = ResultSetConcurrency.ReadOnly,
         fetchSize = 10000)
      .transactionally)
Certain things to be noted when using PostgreSQL for streaming data/records, which is also noted in Slick’s Official documentation:
  1. The use of transactionally which enforces the code to run on a single Connection with auto commit set as false [setAutoCommit(false)], by default slick is set to run in auto commit mode.
  2. The use of fetchSize so that the JDBC driver does not fetch all rows to the memory (i.e on client side) at once but instead fetch the specified number of rows at a time.
  3. ResultSetType.ForwardOnly sets the type to allow results to be read sequentially so that the cursor will only move forward.
  4. ResultSetConcurrency.ReadOnly makes sure that the ResultSet may not be updated.
Only if all of the above is done will the streaming work properly for PostgreSQL else it won’t and the actions inside the stream behavior will fetch the entire dataset.
So, the database repository code base is now sorted out. Let’s focus on the controller and how it’ll stream this data to a downloadable file.
We can create a new Play controller for the purpose of managing all APIs related to the customers and this controller has access to the CustomerRepository we created earlier in which the customers method is defined and implemented.
We’ll use Play’s simple Result to stream the data to our client, i.e to the person administering the customers on /customers API (added to Play routes) by providing the customer stream to HttpEntity.Streamed case class like this
Result(
      header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
      body = HttpEntity.Streamed(csvSource, None, None))
The entire controller method would look something like this
def customers: Action[AnyContent] = Action { implicit request =>
  val customerDatabasePublisher = customerRepository.customers
  val customerSource = Source.fromPublisher(customerDatabasePublisher)

  val headerCSVSource = Source.single(ByteString(""""First Name","Last Name","Email"""" + "\n"))
  val customerCSVSource =
    customerSource.map(data => ByteString(s""""${data.firstName}","${data.lastName}","${data.email}"""" + "\n"))
  
  val csvSource = Source.combine(headerCSVSource, customerCSVSource)(Concat[ByteString])

  Result(
        header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
        body = HttpEntity.Streamed(csvSource, None, None))
}
Note that the DatabasePublisher[Customer] is converted to Source of Customer using the Source.fromPublisher helper method which is used to create a Source from Publisher.
Rest of the manipulations are done on the Source to convert the data into readable CSV file format.
Also, note the use of Source.combine method which is used to combine sources with fan-in strategy which in our case is Concat.
Hope this helps :)
The entire code base is available in the following repository playakkastreams.
This article was first published on the Knoldus blog.