When creating software that makes heavy use of cloud infrastructure, it is wise to test the integration in the end-to-end fashion. Doing so in cloud environment is one option, but it may not be viable for all cases. It can be pricy and you cannot do it in an local environment.

A popular solution to this problem is to run Localstack and test your code against it. In this post we’ll explore how to manage your Localstack resources for E2E tests using Testcontainers, Cats-effect Resource and Weaver.

All code examples for this post are available in this repository: https://github.com/majk-p/weaver-localstack

Something to test 🛠️

Exploring our testing possibilities requires having something to test, so let’s start with a simple app. We’ll be testing a simple application, that reads events from a Kinesis stream and writes them to a S3 bucket.

Disclaimer: In this post we are not exploring the best way to utilize AWS. The example is only intended to use some AWS features and have something to test.

Our not very sophisticated application can be implemented as a stream of repeated calls to Kinesis and S3:

object Main extends IOApp.Simple {

  def run: IO[Unit] = {
    val kinesisClient: KinesisAsyncClientOp[IO] = ???
    val s3Client: S3AsyncClientOp[IO]           = ???
    val kinesisStream                           = "my-stream"
    val s3Bucket                                = "my-bucket"

    val messageProcessor = MessageProcessor.instance(
      kinesisClient,
      s3Client,
      kinesisStream,
      s3Bucket
    )

    fs2.Stream
      .repeatEval(messageProcessor.process)
      .compile
      .drain
  }
}

Where the MessageProcessor is the main component we want to test. Let’s have a brief look at the implementation:

trait MessageProcessor[F[_]] {
  def process: F[Unit]
}

object MessageProcessor {
  def instance(
      kinesisClient: KinesisAsyncClientOp[IO],
      s3Client: S3AsyncClientOp[IO],
      kinesisStreamName: String,
      s3BucketName: String
  ): MessageProcessor[IO] =
    new MessageProcessor[IO] {

      override def process: IO[Unit] =
        for {
          messages <- kinesisClient.readMessages(kinesisStreamName)
          _        <- messages.parTraverse(s3Client.putMessage(s3BucketName))
        } yield ()
    }
}

The implementation is straightforward: read all messages from the Kinesis stream and store each one on S3. While AWS experts may have more elegant ways of doing this, bear with me, let’s keep it as simple as possible.

The details of AWS interaction are not important at this point. If you are curious, check out the implementation on Github.

Test environment 🧰

Our application is ready. Now let’s prepare the testing environment. The sequence diagram below highlights the key components of our test environment.

weaver localstack test sequence diagram

The parts in orange represent the environment shared across test scenarios. Let’s familiarize with our toolbox to see how we can set up the test.

Weaver

We’ll build our test on top of Weaver. It’s a test framework built for integration testing. Since Localstack services need to be provision for the test and shut down afterwards, we’ll use Resource based test described in weaver docs. The example from the documentation does a great job explaining how it works:

object HttpSuite extends IOSuite {

  // Sharing a single http client across all tests
  override type Res = Client[IO]
  override def sharedResource: Resource[IO, Res] = BlazeClientBuilder[IO].resource

  // The test receives the shared client as an argument
  test("Good requests lead to good results") { httpClient =>
    for {
      statusCode <- httpClient.get("https://httpbin.org/get"){
        response => IO.pure(response.status.code)
      }
    } yield expect(statusCode == 200)
  }

}

When the test is executed, the Client gets created. Then each test defined using test gets to re-use it. Once all tests are done (regardless of their result), the Resource is closed. We’ll rely on this mechanism to manage the docker container with Localstack, but for that we need Testcontainers.

Testcontainers

Think of Testcontainers as a lightweight wrapper over a docker container, aware of the service being launched. You can use it for all kinds of services like databases, message queues, caches etc. It provides bindings for all kinds of languages. For our example we’ll use testcontainers-scala, namely the testcontainers-scala-localstack-v2 module - it provides the API to interact with Localstack V2.

Starting a Localstack instance that runs S3 and Kinesis is as simple as this snippet:

val localStackTag = "2.3.2"
val container =
  LocalStackV2Container(
    tag = localStackTag,
    services = Seq(Service.S3, Service.KINESIS)
  ).configure(
    _.setDockerImageName(s"localstack/localstack:$localStackTag")
  )
container.start()

This configures and starts the docker container. Since LocalStackV2Container implements AutoCloseable we can turn it into resource with Resource.fromAutoCloseable(container) 🎉

Notice: docker has to be installed the user executing the test needs to have permissions to run containers without sudo

Localstack

If you are not familiar with localstack, think of it as of an AWS cloud running on your development environment. Should you wish to set up localstack in docker-compose instead for other purposes, we did this in my previous post about pass4s basics, so I encourage you to read that one!

Writing the test ✏️

Because we’re using weaver for our tests, we need to fit it’s structure. Here’s the template we need to fill in:

object LocalstackSuite extends IOSuite {

  override type Res = ??? // What are type of resource do we share?

  // How do we build the resource?
  override def sharedResource: Resource[IO, Res] = ???

  // The body of our tests. We'll do a simple test of our application logic
  test("read message from kinesis and write it to s3 bucket"){
    ???
  }
    
}

Recalling the diagram above the Res and sharedResource represent the orange part, while the test body is the part in blue. Let’s start with implementing the environment first, then we’ll move to the test logic.

object LocalstackSuite extends IOSuite {

  type StreamName = String
  type BucketName = String

  type Res = (
    KinesisAsyncClientOp[IO],
    S3AsyncClientOp[IO],
    StreamName,
    BucketName
  )

  val streamNameBase = "my-stream"
  val bucketNameBase = "my-bucket"

  def sharedResource: Resource[IO, Res] =
    for {
      container     <- localstack.utils.runLocalstack(Seq(Service.KINESIS, Service.S3))
      kinesisClient <- localstack.utils.kinesisClient(container)
      s3Client      <- localstack.utils.s3Client(container)
      streamName    <- localstack.utils.kinesisStreamResource(kinesisClient)(streamNameBase)
      bucketName    <- localstack.utils.s3BucketResource(s3Client)(bucketNameBase)
    } yield (
      kinesisClient,
      s3Client,
      streamName,
      bucketName
    )

  test("read message from kinesis and write it to s3 bucket"){
    ???
  }

}

Our test resource will be a tuple of four things: Kinesis and S3 clients (as defined in fs2-aws) and names of Kinesis stream and S3 bucket. This is all we need for our test execution.

The setup of the resource looks nice from the user perspective, since the whole complexity has been hidden in utils.scala. It incorporates a set of calls to AWS API for creating clients and AWS resources, as you would normally do from your Scala codebase. To give you a preview, lets see some of it’s internals:

package dev.michal.pawlik.localstack

// Selected subset of utils.scala
object utils {

  def runLocalstack(
      services: Seq[LocalStackV2Container.Service]
  ): Resource[IO, LocalStackV2Container] =
    containerResource(createContainer(services))

  private def createContainer(
      services: Seq[LocalStackV2Container.Service]
  ): IO[LocalStackV2Container] =
    IO {
      val localStackTag = "2.3.2"
      LocalStackV2Container(tag = localStackTag, services = services)
        .configure(
          _.setDockerImageName(s"localstack/localstack:$localStackTag")
        )
    }

  private def containerResource[T <: SingleContainer[?]](
      container: IO[T]
  ): Resource[IO, T] =
    Resource.fromAutoCloseable(container.flatTap(c => IO(c.start())))

}

This is the entire magic behind managing Localstack instance as your test resource. If you read Testcontainers section carefully you’ll notice this is just a refactored version of the snippet above.

Test resources

To have a better understanding of managing resources on the AWS itself, let’s see how Kinesis client and stream are managed.

// Selected subset of utils.scala
object utils {

  def kinesisClient(
      container: LocalStackV2Container
  ): Resource[IO, KinesisAsyncClientOp[IO]] =
    KinesisInterpreter
      .apply[IO]
      .KinesisAsyncClientOpResource(
        KinesisAsyncClient
          .builder()
          .endpointOverride(container.endpointOverride(Service.KINESIS))
          .region(container.region)
          .credentialsProvider(container.staticCredentialsProvider)
      )

  def kinesisStreamResource(
      kinesisClient: KinesisAsyncClientOp[IO]
  )(
      streamNameBase: String,
      additionalParameters: Endo[CreateStreamRequest.Builder] = identity
  ): Resource[IO, String] =
    Resource.make(
      for {
        name <- IO(Random.alphanumeric.take(8).mkString).map(randomSuffix =>
          s"$streamNameBase-$randomSuffix"
        )
        _ <- kinesisClient.createStream(
          additionalParameters(
            CreateStreamRequest.builder().streamName(name).shardCount(1)
          ).build()
        )
        _ <- kinesisClient.waiter.flatMap { waiter =>
          val describeStreamRequest = DescribeStreamRequest.builder().streamName(name).build()
          IO.fromFuture(IO(waiter.waitUntilStreamExists(describeStreamRequest).asScala))
        }
      } yield name
    )(name =>
      kinesisClient
        .deleteStream(DeleteStreamRequest.builder().streamName(name).build())
        .void
    )
}

Managing other resources is just a matter of creating cats.effect.Resource. In this case we benefit from fs2-aws library design that provides us with constructors like KinesisAsyncClientOpResource that already return resources.

For the stream resource we had to do a little bit of work. In this one we take advantage of Resource.make method that takes an IO that produces a resource, and an effectful function that shuts down the resource once it’s released. The IO provided as the first argument is just a call to create a stream using kinesisClient and then waiting for it to become present, just to make sure we don’t end up in a race condition in our test logic.

Releasing the resource boils down to a call to Localstack to delete the stream.

The logic for S3 is very similar so I won’t bore you with the details, but encourage you to study utils.scala by yourself. Drop me a message or an issue in repo if you find it unclear!

Test body

The shared resource is now ready, the last thing we need to do is fill in the test body itself. As you recall from the description above, our application reads messages from Kinesis and places them on an S3 bucket. Let’s write a simple test then. We’ll send a test message to the stream, read contents of a bucket and expect to find an object containing our message.

The test resource we have prepared provides us with everything we need for the test:

type Res = (
  KinesisAsyncClientOp[IO],
  S3AsyncClientOp[IO],
  StreamName,
  BucketName
)

We have both clients, stream name and bucket name. For our convenience, weaver provides usingRes extension on test method to make it simple to pattern match against complex resources as ours. Let’s see how we can implement our test then:

test("read message from kinesis and write it to s3 bucket").usingRes {
  case (kinesisClient, s3Client, streamName, bucketName) =>
    val messageProcessor = MessageProcessor.instance(
      kinesisClient,
      s3Client,
      streamName,
      bucketName
    )

    // prepare test message
    val messageContent = "test message content"

    for {
      // place the message on kinesis stream
      _ <- kinesisClient.putRecord(
        PutRecordRequest
          .builder()
          .streamName(streamName)
          .data(SdkBytes.fromUtf8String(messageContent))
          .partitionKey("some-partition")
          .build()
      )

      // execute message processor, the business logic we are testing
      _ <- messageProcessor.process

      // inspect the contents of s3
      listObjectsResponse <- s3Client.listObjects(
        ListObjectsRequest.builder().bucket(bucketName).build()
      )
      objects <- listObjectsResponse.contents().asScala.toList.traverse {
        obj =>
          s3Client.getObject(
            GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build(),
            AsyncResponseTransformer.toBytes[GetObjectResponse]
          )
      }
      s3Objects = objects.map(_.asUtf8String())

      // expect that we'll only find our test message on S3
    } yield expect.all(s3Objects == List(messageContent))
}

Although it looks like a bit of code, it’s mostly this way because of chunky parts of AWS builders. The attached comments divide the code into sections of sending message, running the logic, reading from S3 and making the expectation.

Run the test 🚀

For the completeness let’s run the test to see that it works:

sbt:Weaver Localstack> test
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
[info] dev.pawlik.michal.LocalstackSuite
[info] + read message from kinesis and write it to s3 bucket 1s
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 37 s, completed 29 gru 2023, 15:46:07

Summary 🗒️

Managing integration testing resources with Testcontainers and making it work with your testing framework doesn’t have to be as complex as it might sound. I think it’s an investment worth making once, as it makes it super easy to run tests locally. It also makes the life easier when onboarding new team members, as they don’t have to learn any extra steps outside of the tools they already are familiar with.