This new release is featuring fs2 integration as well fully reworked streaming support, allowing for easier testing of streaming endpoints.
FS2 Support
#1042 adds fs2 support in Finch (thanks @sergeykolbasov). As of 0.27, it's now possible to receive and send JSON (or pretty much any arbitrary) streams using fs2's Stream
type. The new API entry point, bodyStream
, allows to construct streaming endpoints with the requested stream type, Enumerator
(iteratee) or Stream
(fs2).
scala> import fs2.Stream, io.finch._, io.finch.catsEffect._, io.finch.fs2._
scala> val bin = binaryBodyStream[Stream] // raw byte-array stream
bin: Endpoint[IO, fs2.Stream[IO, Array[Byte]]] = binaryBodyStream
scala> val str = stringBodyStream[Stream] // stream of strings
str: Endpoint[IO, fs2.Stream[IO, String]] = stringBodyStream
Similarly, accepting a stream of JSON object is possible via bodyStream
or jsonBodyStream
endpoints.
scala> import fs2.Stream, io.finch._, io.finch.catsEffect._, io.finch.fs2._
scala> import io.finch.circe._, io.circe.generic.auto._
scala> case class Foo(s: String)
defined class Foo
scala> val foos = bodyStream[Stream, Foo, Application.Json]
foos: Endpoint[IO,f s2.Stream[IO, Foo]] = bodyStream
scala> val foos = jsonBodyStream[Stream, Foo]
foos: Endpoint[IO, fs2.Stream[IO, Foo]] = bodyStream
Serving streaming bodies is no different from serving fully-buffered payloads, just return Stream
(fs2) or Enumerator
from an endpoint.
scala> import fs2.Stream, cats.effect.IO, io.finch._, io.finch.catsEffect._, io.finch.fs2._
scala> import io.finch.circe._, io.circe.generic.auto._
scala> case class Foo(s: String)
defined class Foo
scala> val foos = Endpoint[IO].const(Stream[IO, Foo](Foo("bar")))
foos: Endpoint[IO, fs2.Stream[IO, Foo]] = io.finch.Endpoint$$anon$34@5428e82e
scala> foos.toServiceAs[Application.Json]
res1: Service[Request, Response] = io.finch.ToService$$anon$4$$anon$2
Testing Streaming Endpoints
#1056 enables users to construct Input
instances with streaming bodies (non fully buffered). At this point, both fs2 and iteratee streams are supported.
import io.finch._, io.finch.iteratee._, io.interatee.Enumerator, cats.effect.IO
val i = Input
.post("/")
.withBody[Text.Plain](Enumerator.enumerate[IO, String]("foo", "bar", "baz"))
Creating an Input
out of a JSON stream is also possible:
import io.finch._, io.finch.iteratee._, io.interatee.Enumerator, cats.effect.IO
import io.finch.circe._, io.circe.generic.auto._
case class Foo(s: String)
val i = Input
.post("/")
.withBody[Application.Json](Enumerator.enumerate[IO, Foo](Foo("foo"), Foo("bar")))
Switching between iteratee and fs2 is as easy as replacing io.finch.iteratee._
import with io.finch.fs2._
.