Long story short: today I was wondering about the question below. And yes, if you’re wondering, I’ve bought into ZIO (and I like it very much so far ;))
The Question
How, using ZIO, can I create a Stream to which I can add some values at an arbitrary point in the future?
The docs for Stream don’t make it obvious, but without too much digging
one quickly runs into the answer: Stream.fromQueue(queue)
, and just call queue.offer(...)
whenever you feel like appending something to the stream.
However, the next question quickly came:
How to properly terminate a ZIO Stream that is fed from a queue.
If you’ve ever worked with a channel
in go, it’s about closing a channel while properly processing whatever it may still contain.
Here, the first obvious thing that comes to mind when looking at the Queue doc is to call shutdown()
. However this immediately terminates the queue and causes whatever it contained to be lost.
The Gist
Thanks to the quickest reply I’ve ever gotten on a community chat, I’ve discovered that the answer is darn easy as well,
and comes under the guise of flattenTake
:
for {
q <- Queue.unbounded[Take[Nothing, String]]
stream = Stream.fromQueueWithShutdown(q).flattenTake
_ <- q.offer(Take.single("Something")) *> q.offer(Take.end)
} yield ()
As an extra, Stream.fromQueueWithShutdown
is used, so that once the stream has been terminated by the Take.end
, the queue is automatically shutdown.
Heck, that solution even fits into a tweet!