JSON and XML messaging with pass4s
In the previous post we have learned how implement the SNS/SQS based message flow using pass4s. The examples only shown how to exchange basic string messages. In this one I’ll show you how to exchange more complex types and serialize them in transport. Examples shown in this post are also available in https://github.com/majk-p/pass4s-playground/ so feel free to clone and play around.
Pass4s is elastic in terms of message encoding, it provides JSON and XML serialization mechanisms out of the box.
JSON
The JSON serialization is implemented with Circe and can be included in your project by adding a dependency on "com.ocadotechnology" %% "pass4s-circe" % version
. Let’s create a simple domain model that we’d like to use for our message.
import io.circe.Codec
import io.circe.generic.semiauto._
final case class DomainMessage(description: String, value: Long)
object DomainMessage {
implicit val codec: Codec[DomainMessage] = deriveCodec
}
Any data type is okay as long as you can provide the io.circe.Codec
. To be precise the consumer will only need Decoder
while the producer needs Encoder
.
Consumer
Now that we have a data type we can adapt our existing String
based consumer to accept our model. First a short refresher how our current consumer looks like:
sqsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
IO.println(s"Processor listening for messages on $sqsSource") *>
broker
.consumer(sqsSource)
.consume(message => IO.println(s"Received message: $message"))
.background
.void
.use(_ => IO.never)
}
Let’s use MessageProcessor
and circe syntax to enrich our message processing with JSON capabilities
import com.ocadotechnology.pass4s.circe.syntax._
import com.ocadotechnology.pass4s.extra.MessageProcessor
/*...*/
sqsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val processor =
MessageProcessor
.init[IO]
.effectful
.bindBroker(broker)
.enrich(_.asJsonConsumer[DomainMessage])
IO.println(s"Processor listening for messages on $sqsSource") *>
processor
.handle(sqsSource) { message =>
IO.println(s"Received message: $message")
}
.use(_ => IO.never)
}
The biggest difference here is the introduction of MessageProcessor
. Think of it as a way to apply enrichments to the consumer. It’s usage resembles a builder pattern. We first initialize the builder over a certain effect, in this case IO[_]
. After that you can either call effectful
like in the snippet or transacted
if you wish to perform a transaction when handling a message. The next step is to bind it to a broker using bindBroker
so that the processor has access to an existing consumer.
The last call yields MessageHandler
instance that provides def enrich[B](f: Consumer[F, A] => Consumer[F, B])
method which allows you to add custom behavior to the consumer. At this point you can add things like logging, tracing or, like in this case, deserialization. The call _.asJsonConsumer[DomainMessage]
transforms Consumer[F, Message.Payload]
to Consumer[F, DomainMessage]
.
The MessageHandler
provides the handle
method:
def handle[R >: P](source: Source[R])(process: A => T[Unit])
This way we provide the logic for handling messages and bind it to a certain source. This way we can provide a common transformation for the consumer first, and then implement logic specific to the data source. The method returns Resource[F, Unit]
so we no longer have to call .background.void
. Thus this is how our source and logic binding happens:
processor
.handle(sqsSource) { message =>
IO.println(s"Received message: $message")
}
Simple as that, you can now process the incoming JSON messages and model complex data.
Producer
Now we know how to consume JSON messages, but how to produce them? Again a quick recall of how it looked for non-serialized messages:
snsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val message = Message(Message.Payload("hello world!", Map()), snsDestination)
IO.println(s"Sending message $message to $snsDestination") *>
broker.sender.sendOne(message) *>
IO.println("Sent, exiting!").as(ExitCode.Success)
}
Now let’s try producing DomainMessage
that can be processed by our customized consumer:
import com.ocadotechnology.pass4s.circe.syntax._
snsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val domainMessageSender = broker.sender.asJsonSender[DomainMessage](snsDestination)
val domainMessage = DomainMessage("hello world!", 10)
IO.println(s"Sending message: $domainMessage to $snsDestination") *>
domainMessageSender.sendOne(domainMessage) *>
IO.println("Sent, exiting!").as(ExitCode.Success)
}
Yes, it’s that simple. The only thing you need to do is to add an import and apply the serialization using .asJsonSender[DomainMessage]
syntax.
Check out https://github.com/majk-p/pass4s-playground/tree/master/json for complete code for the above examples.
XML
The XML serialization is implemented with Phobos, you can add it by including following dependency "com.ocadotechnology" %% "pass4s-phobos" % version
.
First let’s model the data:
import ru.tinkoff.phobos.decoding._
import ru.tinkoff.phobos.encoding._
import ru.tinkoff.phobos.syntax._
import ru.tinkoff.phobos.derivation.semiauto._
final case class XmlMessage(description: String, value: Long, rows: List[String])
object XmlMessage {
implicit val xmlEncoder: XmlEncoder[XmlMessage] = deriveXmlEncoder("xmlMessage")
implicit val xmlDecoder: XmlDecoder[XmlMessage] = deriveXmlDecoder("xmlMessage")
}
The important thing to notice here is that we had to provide XmlEncoder
for producer and XmDecoder
for consumer.
Consumer
Applying the deserialization on consumer is almost identical as for JSON, just import the right syntax and enrich the consumer using MessageProcessor
import com.ocadotechnology.pass4s.phobos.syntax._
/**/
val processor =
MessageProcessor
.init[IO]
.effectful
.bindBroker(broker)
.enrich(_.map(_.text))
.enrich(
_.mapM(rawText => IO.println(s"Raw message text: $rawText").as(rawText))
)
.enrich(_.asXmlConsumer[XmlMessage])
Producer
Producer is similar, just import the syntax and apply it on the sender
import com.ocadotechnology.pass4s.phobos.syntax._
snsConnector.use { connector =>
val broker = Broker.fromConnector(connector)
val domainMessageSender = broker.sender.asXmlSender[XmlMessage](snsDestination)
val domainMessage = XmlMessage("hello world!", 10, List("lorem", "ipsum", "dolor"))
IO.println(s"Sending message: $domainMessage to $snsDestination") *>
domainMessageSender.sendOne(domainMessage) *>
IO.println("Sent, exiting!").as(ExitCode.Success)
}
Again the full code example can be found in https://github.com/majk-p/pass4s-playground/tree/master/xml
Summary
Today we have learned how to use pass4s with rich data structures, serializing them in two ways to both JSON and XML. On the way we have used MessageProcessor
to simplify building rich message consumers.
In the next post I’ll show you how to proxy the messages through S3 to overcome message size limitations.