'Akka Streams, break tuple item apart?
Using the superPool
from akka-http
, I have a stream that passes down a tuple. I would like to pipeline it to the Alpakka Google Pub/Sub connector. At the end of the HTTP processing, I encode everything for the pub/sub connector and end up with
(PublishRequest, Long) // long is a timestamp
but the interface of the connector is
Flow[PublishRequest, Seq[String], NotUsed]
One first approach is to kill one part:
.map{ case(publishRequest, timestamp) => publishRequest }
.via(publishFlow)
Is there an elegant way to create this pipeline while keeping the Long
information?
EDIT: added my not-so-elegant solution in the answers. More answers welcome.
Solution 1:[1]
I don't see anything inelegant about your solution using GraphDSL.create()
, which I think has an advantage of visualizing the stream structure via the diagrammatic ~>
clauses. I do see problem in your code. For example, I don't think publisher
should be defined by add
-ing a flow to the builder.
Below is a skeletal version (briefly tested) of what I believe publishAndRecombine
should look like:
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
val zipper = b.add(Zip[Seq[String], Long])
val publisher = Flow[(PublishRequest, Long)].
map{ case (pr, _) => pr }.
via(publishFlow)
val timestamp = Flow[(PublishRequest, Long)].
map{ case (_, ts) => ts }
bcast.out(0) ~> publisher ~> zipper.in0
bcast.out(1) ~> timestamp ~> zipper.in1
FlowShape(bcast.in, zipper.out)
})
Solution 2:[2]
There is now a much nicer solution for this which will be released in Akka 2.6.19 (see https://github.com/akka/akka/pull/31123).
In order to use the aformentioned unsafeViaData
you would first have to represent (PublishRequest, Long)
using FlowWithContext
/SourceWithContext
. FlowWithContext
/SourceWithContext
is an abstraction that was specifically designed to solve this problem (see https://doc.akka.io/docs/akka/current/stream/stream-context.html). The problem being you have a stream with the data part that is typically what you want to operate on (in your case the ByteString
) and then you have the context (aka metadata) part which you typically just pass along unmodified (in your case the Long
).
So in the end you would have something like this
val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output
myFlow.unsafeViaData(publishFlow)
In contrast to Akka Streams, break tuple item apart?, not only is this solution involve much less boilerplate since its part of akka but it also retains the materialized value rather than losing it and always ending up with a NotUsed
.
For the people wondering why the method unsafeViaData
has unsafe
in the name, its because the Flow
that you pass into this method cannot add,drop or reorder any of the elements in the stream (doing so would mean that the context no longer properly corresponds to the data part of the stream). Ideally we would use Scala's type system to catch such errors at compile time but doing so would require a lot of changes to akka-stream especially if the changes need to remain backwards compatibility (which when dealing with akka we do). More details are in the PR mentioned earlier.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 |