Friday
6th January
2012

# Reasoning about Stream Processing with Effects

It is a truth, universally acknowledged, that any programming technique must be in want of a reasoning principle.

Stream processing in Haskell is very much in the air at the moment, what with Iteratees (as embodied in the Enumerator library), Conduits and probably some more that I don't know about.

Patty Johann, Neil Ghani, Bart Jacobs and I have recently had the paper "Fibrational Induction Meets Effects" accepted to FoSSaCS 2012 that turns out to be very relevant to discussing and reasoning about the kinds of stream processing problems that these Haskell libraries seek to resolve.

In the paper we build upon the work of Andrzej Filinski and Kristian Støvring on induction principles for data structures that interleave pure data with effects, where the effects are defined in terms of some monad. Filinski and Støvring gave a induction principle for such interleaved effectful data types in a small programming language with effects and fixed notion of data type and predicate. In our FoSSaCS paper, we have generalised this to a more general categorical setting, and also generalised the notion of predicate that can be considered (so you can have proof-relevant predicates, or Kripke predicates, for instance).

Our paper is quite technical, since we need the structure of fibrations and so on to properly attain the right level of generality. Nevertheless, the core principle is a relatively simple and revolves around a recursion scheme for pure data interleaved with monadic effects (which generalises the one given by Filinski and Støvring). This turns out to have direct application to reasoning about the processing of streaming data.

## Effectful Streams

Effectful streams generate potentially infinite amounts of data by executing effects as they do so. For example, an effectful stream may produce data by reading it from a network socket. An effectful stream is nothing more than an interleaving of some monad "`m`" with news of whether the stream has stopped or has yielded an element. This can be expressed by the following Haskell declarations of a pair of mutually recursive types:

``````newtype Stream m a = Stream { forceStream :: m (StreamStep m a) }

data StreamStep m a
= StreamEmit a (Stream m a)
| StreamStop``````

Constructing streams with no (new) effects is accomplished by just using the `return` of our chosen monad, and the appropriate constructor:

``````nil :: Monad m => Stream m a
nil = Stream \$ return StreamStop

cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = Stream \$ return (StreamEmit a s)``````

Given a value `stream` of type `Stream m a` we can kick it with `forceStream` to give us a monadic action that will yield either `StreamEmit a moreStream` or `StreamStop`. The stream gets to execute some effect in the monad `m` before telling us whether the stream has ended or not. It can use this monadic effect to do things like read from files, consult random number sources or use it as a side channel to report errors.

The stream `append` function shows how a stream can be interrogated by a recursive function:

``````append :: Monad m => Stream m a -> Stream m a -> Stream m a
append s1 s2 = Stream \$ do
streamStep <- forceStream s1
case streamStep of
StreamEmit a s1' -> forceStream (cons a (append s1' s2))
StreamStop       -> forceStream s2``````

Streams look a lot like normal Haskell lists, except that rather than the ambient Haskell effect of "possibly a non-terminating black hole", we have the effects described by the monad `m` (plus the non-optional "possibly a non-terminating black hole" effect). For instance, we can define a stream that emits characters read from a `Handle` until it hits the end of file:

``````ofHandle :: Handle -> Stream IO Char
ofHandle handle = loop
where
loop = Stream \$ do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c loop)``````

`Stream`s constitute the supply-side of data processing. We can use a similar pattern to treat the consumption side. `Reader`s are defined as follows, and look very similar to Iteratees:

``````newtype Reader a m b = Reader { forceReader :: m (ReaderStep a m b) }

data ReaderStep a m b

Similar to `Stream`s, a value of type `Reader a m b` can be prodded with `forceReader`. It will then tell us (after some monadic effect) whether it wants to read some data (`ReaderRead`) or if it has finished reading and wants to output some data (`ReaderEmit`). The `Maybe` type constructor in `ReaderRead` is intended to indicate whether or not end-of-stream has been reached.

The obvious thing to do now is to connect up a `Stream` and a `Reader` to feed the data from one into the other. There are actually (at least) two different ways of doing this, depending on whether we execute the effects of the stream before the reader, or the other way round. Executing the stream before the reader gives the following definition:

``````(|>|) :: Monad m => Stream m a -> Reader a m b -> m b
s |>| r = do
streamStep <- forceStream s
case streamStep of
StreamEmit a s' -> do
ReaderRead k -> s' |>| k (Just a)
ReaderEmit b -> return b
StreamStop -> do
runOnNothing r``````

If the stream stops early by returning `StreamStop`, then we use the following function `runOnNothing` to feed `Nothing` to a `Reader` until it yields a value:

``````runOnNothing :: Monad m => Reader a m b -> m b
runOnNothing r = do
ReaderEmit b -> return b``````

## Generic Interleaved Data and A Recursion Scheme

I will now show that the `Stream` and `Reader` types may be generalised to a common pattern. By exploiting this common pattern, we obtain a powerful recursion principle for data interleaved with effects. This recursion principle also comes equipped with a reasoning principle, which allows us to prove things about functions that recurse over data interleaved with effects.

The common shape of the `Stream` and `Reader` types is captured by the following pair of Haskell type declarations. All the parts specific to `Stream`s or `Reader`s have been abstracted out into the argument ```f :: * -> *``` (which we will assume is an instance of the `Functor` type class).

``````data Step m f = Step (f (D m f))

newtype D m f = D { force :: m (Step m f) }``````

A value of type `D m f` is therefore an interleaving of the effects described by `m` with the pure data described by `f`. The function `force` plays the part of the `forceStream` and `forceReader` functions defined above.

Effectful data can be constructed and deconstructed by the following functions:

``````construct :: Monad m => f (D m f) -> D m f
construct x = D \$ return (Step x)

deconstruct :: Monad m => D m f -> m (f (D m f))
deconstruct d = do Step x <- force d
return x``````

When using `deconstruct` there may be some effects to execute before we get access to the underlying pure data described by `f`.

To recover the `Stream` type, we simply define the appropriate `f` to describe the two things that `Stream`s are allowed to do: emit values and cease to be.

``````data StreamStep a x
= StreamEmit a x
| StreamStop
deriving Functor

type Stream m a = D m (StreamStep a)``````

In the new `StreamStep` type, the type parameter `x` indicates the hole where the next step of the recursion is placed.

The `nil` and `cons` constructors can then be defined in terms of `construct` and the appropriate part of the `StreamStep` type:

``````nil :: Monad m => Stream m a
nil = construct StreamStop

cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = construct (StreamEmit a s)``````

The benefit of re-expressing the `Stream` type in this way is that we can now define a powerful recursion scheme on values of type `D m f`, including `Stream m a`, and use this to define functions like `append` and `(|>|)`. The interesting part is that this recursion scheme comes equipped with a reasoning principle, which allows us to prove things about our functions. To properly define the recursion scheme we first need to know what an Eilenberg-Moore algebra for a monad is.

### Eilenberg-Moore Algebras

For any monad `m`, an `m`-Eilenberg-Moore algebra consists of a pair of a type `a` and a function `h :: m a -> a`, satisfying some laws that state that `h` interacts nicely with the structure of the monad. I think of the existence of an Eilenberg-Moore algebra structure on `a` as roughly stating that the type `a` is effectful in the sense that it can have additional effects "prepended" on to it.

The existence of an Eilenberg-Moore algebra structure for a type can be expressed as a Haskell type class, `EMAlgebra`:

``````class (Functor m, Monad m) => EMAlgebra m a where
algebra :: m a -> a``````

Of course we cannot state the laws, so we shall just commit to promising that they hold. The type class mechanism also ties us to giving at most one Eilenberg-Moore structure for each pair of a monad `m` and a type `a`, where in fact there may be many, but this is a small price to pay for the convenience that type classes provide.

Every type of the form `m a` for some monad `m` is obviously effectful, so we can give it an Eilenberg-Moore algebra structure using the function `join :: m (m a) -> m a` defined in the `Control.Monad` module:

``````instance (Functor m, Monad m) => EMAlgebra m (m a) where
algebra = join``````

This is the free Eilenberg-Moore algebra for the monad `m` and the type `a`.

The property of having an Eilenberg-Moore algebra is also preserved by the construction of function types. If `b` has an Eilenberg-Moore structure with respect to `m`, then so does `a -> b` for any `a`:

2012-01-07 13:15 Unfortunately, this instance and the previous one overlap, so GHC needs `-XIncoherentInstances` to handle it. This seems to work for the examples here, but probably a better solution is needed in general.
``````instance (EMAlgebra m b) => EMAlgebra m (a -> b) where
algebra x a = algebra \$ do f <- x; return (f a)``````

Finally, every one of our interleaved data and effects types carries a default Eilenberg-Moore structure, achieved by prepending the new effect before the first effect of the data:

``````instance (Functor m, Monad m) => EMAlgebra m (D m f) where
algebra x = D \$ x >>= force``````

Note that we now have two algebra structures on `D m f`: the `m`-Eilenberg-Moore structure defined here, and the `f`-algebra structure defined by the `construct` function above.

2012-01-07 13:15 Fixed a typo here, thanks to ehird in the comments below for spotting it.

### A Recursion Scheme

A basic recursion scheme is the catamorphism, which allows us to eliminate an element of a recursive type `Rec f` using an algebra ```f a -> a```, where `Rec f` is defined as follows:

``data Rec f = In (f (Rec f))``

The catamorphism function, or to use its common name `fold`, has the following type:

``fold :: Functor f => (f a -> a) -> Rec f -> a``

A function of type `f a -> a` is the (structure part) of an `f`-algebra. `f`-algebras are similar to Eilenberg-Moore algebras, except that they do not need to satisfy any laws (because we do not assume that `f` is a monad).

The `fold` function itself satisfies the following law (which can also be taken to be the definition):

``fold h (In x) = h (fmap (fold h) x)``

and is the unique function of type `Rec f -> a` to do so. This uniqueness property can be used to reason about functions built from `fold`.

To define functions that eliminate our interleaved effects and data types `D m f`, we could just use the fact that they are equivalent to the form `m (Rec (f :.: m))`, where `- :.: -` indicates composition of functors, and use `fold` on the type `Rec (f :.: m)`. However, in doing this we are forced to consider the pure and effectful parts of our data simultaneously.

A better approach, which is derivable from the basic `fold` operator, is to eliminate values of type `D m f` using `f`-and-`m`-algebras. That is, we assume that the result type `a` has a an `f`-algebra structure `f a -> a` and an Eilenberg-Moore structure `m a -> a`. In this way, we can separate the pure and effectful parts of our recursion. By making use of the `EMAlgebra` type class defined above, we do not need to explicitly mention the effectful part at all.

Our new `effectfulFold` combinator has the following type, and I have given the direct Haskell implementation. It is an interesting exercise to see now it can be defined in terms of the `fold` combinator on ```Rec (f :.: m)```.

2012-01-07 13:15 Thanks to spacespacecomma on reddit for pointing out that the original definition here didn't type check. I was missing the applications of `force`.
``````effectfulFold :: (Functor f, EMAlgebra m a) =>
(f a -> a)
-> D m f
-> a
effectfulFold h = algebra . fmap loop . force
where
loop (Step x) =
h \$ fmap algebra \$ fmap (fmap loop) \$ fmap force \$ x``````

As we prove in the paper, generalising Filinski and Støvring's proof, functions defined using `effectfulFold` satisfy the following two properties. First, they preserve `f`-algebras, taking the `f`-algebra `construct` to the supplied `f`-algebra `h`:

``effectfulFold h (construct x) = h (fmap (effectfulFold h) x)``

Moreover, they preserve `m`-Eilenberg-Moore algebras, taking the default Eilenberg-Moore structure on `D m f` to the implicitly provided Eilenberg-Moore structure on `a`:

``effectfulFold h (algebra x) = algebra (fmap (effectfulFold h) x)``

Analogously to `fold h`, `effectfulFold h` is the unique function satisfying these two properties. In the paper, we use uniqueness to derive an induction principle for data interleaved with monadic effects, but here we can use uniqueness directly to reason about functions defined using `effectfulFold`.

### Using the Recursion Scheme

The `append` function on streams that I defined by hand above can be re-expressed using the `effectfulFold` combinator:

``````append :: (Functor m, Monad m) => Stream m a -> Stream m a -> Stream m a
append s1 s2 = effectfulFold h s1
where h (StreamEmit a s) = cons a s
h StreamStop       = s2``````

By its definition in terms of `effectfulFold` we know that `append` preserves the Eilenberg-Moore structure on its first argument. This means that the following equation holds for `x :: m (Stream m a)`

``append (algebra x) s2 = algebra (fmap (\s -> append s s2) x)``

Note that `append` does not preserve the Eilenberg-Moore structure on its second argument. The Eilenberg-Moore structure for streams prepends effects on to the stream, but these effects will not be executed by `append` until after all the effects in the first stream have been executed.

Also by its definition in terms of `effectfulFold`, we know that `append` satisfies the following equations with respect to the "pure" part of streams:

``````append (construct StreamStop)        s2 = s2
append (construct (StreamEmit a s1)) s2 = construct (StreamEmit a (append s1 s2))``````

We can now use the uniqueness of functions defined by `effectfulFold` to see that `append` is associative. To show this, I'll use the uniqueness of functions defined by `effectfulFold`. We have two functions of type `Stream m a -> Stream m a` that we wish to prove equivalent:

``\s1 -> append s1 (append s2 s3)``

and

``\s1 -> append (append s1 s2) s3``

where the first one is defined directly in terms of `effectfulFold`. If we can show that the second function obeys the same properties as the first, i.e. that it preserves the `m`-Eilenberg-Moore algebra and the `f`-algebra used in the definition of `append`, then by uniqueness we will have shown that they are the same function.

It is easy to check that the second function preserves the Eilenberg-Moore structure on `Stream m a`, since it is just the composition of two functions that do so. So the meat of the proof is in showing that it preserves the `f`-algebra structure. This splits into two cases, one for `StreamStop` and one for `StreamEmit`. For the former, we must show that

``append (append (construct StreamStop) s2) s3 = append s2 s3``

but this follows directly from the properties we already know about `append`. The second case is a little harder. We must show that

``````  append (append (construct (StreamEmit a s1)) s2) s3
=
construct (StreamEmit a (append (append s1 s2) s3))``````

This follows by repeated application of our knowledge about how `append` operates on input of the form `construct (StreamEmit a s1)`.

Thus we have used the uniqueness property of functions defined using `effectfulFold` to show that `append` is associative.

## Back to Readers

Just as the `Stream` type can be defined in terms of the generic ```D m f``` type, the `Reader` type can be defined in the same way:

``````data ReaderStep a b x
deriving Functor

type Reader a m b = D m (ReaderStep a b)``````

The `runOnNothing` function can be defined in terms of `effectfulFold`, by recursing on the `Reader` argument:

``````runOnNothing :: (Functor m, Monad m) => Reader a m b -> m b
runOnNothing = effectfulFold f
where f (ReaderRead k) = k Nothing
f (ReaderEmit b) = return b``````

Likewise, the `(|>|)` function that connects a stream with a reader can be defined in terms of `effectfulFold`, by recursion on the stream argument, and using the Eilenberg-Moore structure on the type ```Reader a m b -> m b```:

``````(|>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
(|>|) = effectfulFold f
where f (StreamEmit a h) r = do
readerStep <- deconstruct r
ReaderRead k -> h (k (Just a))
ReaderEmit b -> return b
f StreamStop r = runOnNothing r``````

The `(|>|)` function executes the effects of the stream before the effects of the reader, letting the stream dictate how events proceed. An alternative is to execute the effects of the reader first, by defining the function in terms of recursion on the reader argument, again using `effectfulFold`:

``````(|>>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
s |>>| r = effectfulFold f r s
where f (ReaderRead k) s = do
streamStep <- deconstruct s
case streamStep of
StreamEmit a s' -> k (Just a) s'
StreamStop      -> k Nothing nil
f (ReaderEmit b) s = return b``````

There are obviously many other possibilities for connecting `Stream`s to `Reader`s. If the `Stream` finishes early, we could return the uncompleted `Reader` rather than passing it on to `runOnNothing`. Likewise, if the `Reader` emits a value before all the `Stream` has been read, we could return the leftover stream instead of just dropping it on the floor. In any case, all these possibilities are definable in terms of `effectfulFold`.

`Reader`s are an instance of the `Monad` type class as the following Haskell declaration witnesses:

2012-01-07 13:15 Thanks again to spacespacecomma on reddit for pointing out that this doesn't directly work. You need to wrap the `Reader` type in a `newtype` to get it to work. I'll leave it as-is to give the general idea.
``````instance Monad m => Monad (Reader a m) where
return b = construct (ReaderEmit b)
c >>= k  = effectfulFold f c
f (ReaderEmit b) = k b``````

Using the uniqueness property of `effectfulFold` it is possible to prove that this instance really does satisfy the monads laws. In fact, it is possible to show that `Reader a m` is the sum of the monad `m` with the free monad on the functor `f x = Maybe a -> x`. This is an instance of a fact originally observed by Hyland, Plotkin and Power (Theorem 4).

## Processors

If `Stream`s produce data and `Reader`s consume data, then what goes in between? `Processor`s! A `Processor` is an instance of the same interleaved data and effects pattern as `Stream`s and `Reader`s:

``````data ProcessorStep a b x
= ProcessorRead (Maybe a -> x)
| ProcessorEmit b x
| ProcessorStop

type Processor a m b = D m (ProcessorStep a b)``````

A `Processor`, once kicked using `force`, can either ask for more input (`ProcessorRead`), can emit some output (`ProcessorEmit`) with a promise to carry on, or can signal end of stream (`ProcessorStop`). `Processor`s are intended to fulfil the same role as `Enumeratees` or `Conduit`s as intermediaries that manipulate data in some way. They are also very similar to the StreamProcessor representation defined by Ghani, Hancock and Pattinson.

With a few helper functions:

``````processorRead :: Monad m => (Maybe a -> Processor a m b) -> Processor a m b

processorStop :: Monad m => Processor a m b
processorStop = construct ProcessorStop

processorEmit :: Monad m => b -> Processor a m b -> Processor a m b
processorEmit b proc = construct (ProcessorEmit b proc)``````

we can define a processor that filters:

``````filter :: Monad m => (a -> Maybe b) -> Processor a m b
filter h = processorRead getInput
where
getInput Nothing  = processorStop
getInput (Just a) =
case h a of
Nothing -> filter h
Just a  -> processorEmit a (filter h)``````

It is also possible to define several combinators that compose `Stream`s with `Processor`s, `Processor`s with `Reader`s and `Processor`s with `Processor`s in a similar manner to the Conduits library. We have same choices as with the composition of `Stream`s and `Reader`s in terms of the ordering of effects. The key point is that they may all be defined in terms of `effectfulFold`, and reasoned about using the universal property. For instance, we can prove that composition of `Processor`s is associative, which opens the door to justified optimisation principles for chains of stream processors.

### The Co-Inductive View

In the above I have taken an inductive view on `Stream`s, `Reader`s and so on. In Haskell, each recursive type is simultaneously an inductive and co-inductive type. We can use this change-of-viewpoint to give another way of defining interleaved data and effects in terms of unfolds. The following function `unfold` takes a seed state of type `s` and an evolution function of type `s -> m (f s)` and generates a value of type `D m f`.

2012-01-07 13:15 Was missing an additional `Monad m` constraint here.
``````unfold :: (Monad m, Functor f) => s -> (s -> m (f s)) -> D m f
unfold s step = loop s
where loop s = D \$ do
f <- step s
return (Step (fmap loop f))``````

The `ofHandle` stream defined above can be re-expressed in terms of `unfold` as follows:

``````ofHandle :: Handle -> Stream IO Char
ofHandle handle = unfold () step
where
step () = do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c ())``````

In fact, we could change the representation of our interleaved data and effect type to be directly expressed in terms of `unfolds`:

``````data D m f where
D :: s -> (s -> m (f s)) -> D m f``````

This representation makes it harder to define the `effectfulFold` function and its associated reasoning principle. On the other hand, it does allow for fusion of chained sequences of `Stream`s, `Processor`s and `Reader`s in a similar manner to the Stream Fusion (alternative ACM paywall link) paper. Hopefully, this may allow for sequences of stream processing functions to be fused together and open up more possibilities for optimisation. But this remains to be seen.