Scala: A (slight) equivalent for Go channels with ZIO

2020/08/17

Long story short: today I was wondering about the question below. And yes, if you’re wondering, I’ve bought into ZIO (and I like it very much so far ;))

The Question

How, using ZIO, can I create a Stream to which I can add some values at an arbitrary point in the future?

The docs for Stream don’t make it obvious, but without too much digging one quickly runs into the answer: Stream.fromQueue(queue), and just call queue.offer(...) whenever you feel like appending something to the stream.

However, the next question quickly came:

How to properly terminate a ZIO Stream that is fed from a queue.

If you’ve ever worked with a channel in go, it’s about closing a channel while properly processing whatever it may still contain.

Here, the first obvious thing that comes to mind when looking at the Queue doc is to call shutdown(). However this immediately terminates the queue and causes whatever it contained to be lost.

The Gist

Thanks to the quickest reply I’ve ever gotten on a community chat, I’ve discovered that the answer is darn easy as well, and comes under the guise of flattenTake:

for {
    q <- Queue.unbounded[Take[Nothing, String]]
    stream = Stream.fromQueueWithShutdown(q).flattenTake
    _ <- q.offer(Take.single("Something")) *> q.offer(Take.end)
} yield ()

As an extra, Stream.fromQueueWithShutdown is used, so that once the stream has been terminated by the Take.end, the queue is automatically shutdown.

Heck, that solution even fits into a tweet!