Functional messaging in Scala with pass4s
The pass4s provides a functional abstraction for messaging in Scala. It provides implementations for AWS SQS/SNS and ActiveMQ.
In this post I want to show you the basic usage of the library to implement the messaging on top of SNS/SQS. All the examples in this post can be found in https://github.com/majk-p/pass4s-playground.
Localstack
Before we see the Scala code, let’s setup localstack, set up a SNS (Simple Notification Service) topic, SQS (Simple Queue Service) queue and a subscription between them. This post assumes you have a basic understanding of SNS/SQS setup. If you need a refresher please refer to this part of AWS docs: https://docs.aws.amazon.com/sns/latest/dg/subscribe-sqs-queue-to-sns-topic.html
I’ll try to make this example a straightforward one so you should be able to get this intuitively. Let’s get started.
docker-compose
For localstack we’ll use docker-compose. Our setup consists of two containers, both based on localstack/localstack
image. The first on is the localstack service itself, configured for sns
and sqs
.
The second one is the client. It starts with a delay and attempts to create the topic, queue and a subscription.
As a refresher, the topic is where you send the outgoing messages. The queue is where you read the messages from, and the subscription is what glues the two together on the AWS side.
The full example looks as shown below:
version: "3.8"
services:
localstack:
container_name: localstack_main
image: localstack/localstack
hostname: localhost.localstack.cloud
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DOCKER_HOST=unix:///var/run/docker.sock
- SERVICES=sqs,sns
- EAGER_SERVICE_LOADING=1
- SKIP_SSL_CERT_DOWNLOAD=1
- HOSTNAME_EXTERNAL=localhost.localstack.cloud
volumes:
- "/tmp/localstack:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
setup-resources:
image: localstack/localstack
environment:
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=AWSSECRET
- AWS_DEFAULT_REGION=eu-west-2
entrypoint: /bin/sh -c
command: >
"
sleep 15
alias aws='aws --endpoint-url http://localstack:4566'
# Executing SNS
aws sns create-topic --name local_sns
# Executing SQS
aws sqs create-queue --queue-name local_queue
# Subscribing to SNS to SQS
aws sns subscribe --attributes 'RawMessageDelivery=true' --topic-arn arn:aws:sns:eu-west-2:000000000000:local_sns --protocol sqs --notification-endpoint arn:aws:sqs:eu-west-2:000000000000:local_queue
aws sqs get-queue-url --queue-name local_queue
"
depends_on:
- localstack
Please notice the --attributes 'RawMessageDelivery=true'
in aws sns subscribe
line. The pass4s library assumes we are operating on raw messages.
Implementation
Now that we have the infrastructure in place, let’s implement the producer and consumer. Make sure to run the docker-compose up
at this point and notice the ARNs and URLs of generated resources:
localstack_main | 2023-02-25T15:36:31.073 INFO --- [ asgi_gw_0] localstack.request.aws : AWS sns.CreateTopic => 200
setup-resources_1 | {
setup-resources_1 | "TopicArn": "arn:aws:sns:eu-west-2:000000000000:local_sns"
setup-resources_1 | }
localstack_main | 2023-02-25T15:36:31.555 DEBUG --- [ asgi_gw_0] l.services.sqs.provider : creating queue key=local_queue attributes=None tags=None
localstack_main | 2023-02-25T15:36:31.556 INFO --- [ asgi_gw_0] localstack.request.aws : AWS sqs.CreateQueue => 200
setup-resources_1 | {
setup-resources_1 | "QueueUrl": "http://localstack:4566/000000000000/local_queue"
setup-resources_1 | }
localstack_main | 2023-02-25T15:36:32.066 INFO --- [ asgi_gw_0] localstack.request.aws : AWS sns.Subscribe => 200
setup-resources_1 | {
setup-resources_1 | "SubscriptionArn": "arn:aws:sns:eu-west-2:000000000000:local_sns:2e4211f2-43cb-4b24-91cd-0da0bda8bea6"
setup-resources_1 | }
localstack_main | 2023-02-25T15:36:32.578 INFO --- [ asgi_gw_1] localstack.request.aws : AWS sqs.GetQueueUrl => 200
setup-resources_1 | {
setup-resources_1 | "QueueUrl": "http://localstack:4566/000000000000/local_queue"
setup-resources_1 | }
Consumer
Here’s how you can implement a fully working consumer:
//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.2.2"
//> using lib "com.ocadotechnology::pass4s-core:0.2.2"
//> using lib "com.ocadotechnology::pass4s-high:0.2.2"
//> using lib "com.ocadotechnology::pass4s-connector-sqs:0.2.2"
//> using lib "org.typelevel::log4cats-noop:2.5.0"
package net.michalp.pass4splayground
import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sqs.SqsConnector
import com.ocadotechnology.pass4s.connectors.sqs.SqsEndpoint
import com.ocadotechnology.pass4s.connectors.sqs.SqsSource
import com.ocadotechnology.pass4s.connectors.sqs.SqsUrl
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import java.net.URI
object BaseConsumer extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
implicit val ioLogger: Logger[IO] = NoOpLogger[IO]
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
val localstackURI = new URI("http://localhost:4566")
val sqsSource = SqsEndpoint(SqsUrl("http://localhost:4566/000000000000/local_queue"))
val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
val sqsConnector =
SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
sqsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
IO.println(s"Processor listening for messages on $sqsSource") *>
broker
.consumer(sqsSource)
.consume(message => IO.println(s"Received message: $message"))
.background
.void
.use(_ => IO.never)
}
}
}
You can find the code in https://github.com/majk-p/pass4s-playground/blob/master/basic/BaseConsumer.scala, if you want to test it locally, you can run it using scala-cli
scala-cli run https://raw.githubusercontent.com/majk-p/pass4s-playground/master/basic/BaseConsumer.scala
Now let’s discuss the most important parts of the code.
We start the implementation by initializing the configuration. Since we are using localstack, we need to provide the credentials and URL override. If you want to use it in production, the SqsConnector
provides multiple constructors, some of which don’t require the endpoint override.
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET")
val localstackURI = new URI("http://localhost:4566")
val sqsSource = SqsEndpoint(SqsUrl("http://localhost:4566/000000000000/local_queue"))
val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
val sqsConnector =
SqsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
The sqsConnector
is a Resource[IO,SqsConnector.SqsConnector[IO]]
, so we’ll have to acquire it.
sqsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
IO.println(s"Processor listening for messages on $sqsSource") *>
broker
.consumer(sqsSource)
.consume(message => IO.println(s"Received message: $message"))
.background
.void
.use(_ => IO.never)
}
After acquiring the resource, we create a broker instance. Think of the Broker
as a kind of router for all incoming and outgoing messages. This is how it’s defined in the library.
trait Broker[F[_], +P] {
def consumer[R >: P](source: Source[R]): Consumer[F, Payload]
def sender[R >: P]: Sender[F, Message[R]]
}
For our simple example we only have a single consumer but this abstraction can handle more complexity if required.
The most interesting part is the binding:
broker
.consumer(sqsSource)
.consume(message => IO.println(s"Received message: $message"))
.background
.void
.use(_ => IO.never)
We use the broker to obtain a consumer of sqsSource
. Then using consume
method we provide an effectful logic for handling the messages (here we just print the message). If required you can use consumeCommit
that can apply a database transaction in ConnectionIO
assuming you can provide a transactor ConnectionIO ~> IO
.
The last three calls are there just to obtain a resource with .background
for handling the messages, ignore the resource value with .void
and obtain the resource forever using .use(_ => IO.never)
.
Let’s start the implemented consumer using scala-cli run BaseConsumer.scala
and move on.
Producer
Now that we can consume messages, let’s have a second program that is going to produce the messages. Here’s the code
//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.2.2"
//> using lib "com.ocadotechnology::pass4s-core:0.2.2"
//> using lib "com.ocadotechnology::pass4s-high:0.2.2"
//> using lib "com.ocadotechnology::pass4s-connector-sns:0.2.2"
//> using lib "org.typelevel::log4cats-noop:2.5.0"
package net.michalp.pass4splayground
import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sns.SnsArn
import com.ocadotechnology.pass4s.connectors.sns.SnsConnector
import com.ocadotechnology.pass4s.connectors.sns.SnsDestination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import java.net.URI
object Producer extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
implicit val ioLogger: Logger[IO] = NoOpLogger[IO]
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
val localstackURI = new URI("http://localhost:4566")
val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
val snsConnector =
SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
snsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val message = Message(Message.Payload("hello world!", Map()), snsDestination)
IO.println(s"Sending message $message to $snsDestination") *>
broker.sender.sendOne(message) *>
IO.println("Sent, exiting!").as(ExitCode.Success)
}
}
}
As before we start with configuring
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
val localstackURI = new URI("http://localhost:4566")
val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)
val snsConnector =
SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)
Please notice the difference - instead of SqsEndpoint
we now create SnsDestination
and start the SnsConnector
. The connector is a resource, same as before.
snsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val message = Message(Message.Payload("hello world!", Map()), snsDestination)
IO.println(s"Sending message $message to $snsDestination") *>
broker.sender.sendOne(message) *>
IO.println("Sent, exiting!").as(ExitCode.Success)
}
In a similar way we did before, we start off by creating a Broker
. Thanks to the broker
we have access to the broker.sender
instance. For this simple example we’ll just send a single message using def sendOne(msg: A): F[Unit]
but you can easily change that to use def send: fs2.Pipe[F, A, Unit]
in case you want to produce multiple messages using fs2 Stream
.
Outcome
Let’s run the code!
$ scala-cli run Producer.scala
Compiling project (Scala 2.13.10, JVM)
Compiled project (Scala 2.13.10, JVM)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sending message Message(Payload(hello world!,Map()),SnsDestination(SnsArn(arn:aws:sns:eu-west-2:000000000000:local_sns))) to SnsDestination(SnsArn(arn:aws:sns:eu-west-2:000000000000:local_sns))
Sent, exiting!
☝️ you can try launching that couple of times
When you launch the consumer, it should be able to produce similar output:
$ scala-cli run https://raw.githubusercontent.com/majk-p/pass4s-playground/master/basic/BaseConsumer.scala
Compiling project (Scala 2.13.10, JVM)
Compiled project (Scala 2.13.10, JVM)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Processor listening for messages on SqsEndpoint(SqsUrl(http://localhost:4566/000000000000/local_queue),Settings(30 seconds,true,3,10,1))
Received message: Payload(hello world!,Map())
Received message: Payload(hello world!,Map())
Received message: Payload(hello world!,Map())
Summing up
Today we have learned the basic usage of pass4s for AWS SNS/SQS. The library is much more powerful than that, in future posts I’ll show you how to serialize/deserialize messages with circe and proxy larger messages over S3.
Please refer to the library documentation https://ocadotechnology.github.io/pass4s/ and the demo example in the repository https://github.com/ocadotechnology/pass4s/blob/main/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala where you can see how to use the library with JMS.