It is a truth, universally acknowledged, that any programming technique must be in want of a reasoning principle.

Stream processing in Haskell is very much in the air at the moment, what with Iteratees (as embodied in the Enumerator library), Conduits and probably some more that I don't know about.

Patty Johann, Neil Ghani, Bart Jacobs and I have recently had the paper "Fibrational Induction Meets Effects" accepted to FoSSaCS 2012 that turns out to be very relevant to discussing and reasoning about the kinds of stream processing problems that these Haskell libraries seek to resolve.

In the paper we build upon the work of Andrzej Filinski and Kristian Støvring on induction principles for data structures that interleave pure data with effects, where the effects are defined in terms of some monad. Filinski and Støvring gave a induction principle for such interleaved effectful data types in a small programming language with effects and fixed notion of data type and predicate. In our FoSSaCS paper, we have generalised this to a more general categorical setting, and also generalised the notion of predicate that can be considered (so you can have proof-relevant predicates, or Kripke predicates, for instance).

Our paper is quite technical, since we need the structure of fibrations and so on to properly attain the right level of generality. Nevertheless, the core principle is a relatively simple and revolves around a recursion scheme for pure data interleaved with monadic effects (which generalises the one given by Filinski and Støvring). This turns out to have direct application to reasoning about the processing of streaming data.

Effectful streams generate potentially infinite amounts of data by
executing effects as they do so. For example, an effectful stream may
produce data by reading it from a network socket. An effectful stream
is nothing more than an interleaving of some monad "`m`

" with news of
whether the stream has stopped or has yielded an element. This can be
expressed by the following Haskell declarations of a pair of mutually
recursive types:

```
newtype Stream m a = Stream { forceStream :: m (StreamStep m a) }
data StreamStep m a
= StreamEmit a (Stream m a)
| StreamStop
```

Constructing streams with no (new) effects is accomplished by just
using the `return`

of our chosen monad, and the appropriate
constructor:

```
nil :: Monad m => Stream m a
nil = Stream $ return StreamStop
cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = Stream $ return (StreamEmit a s)
```

Given a value `stream`

of type `Stream m a`

we can kick it with
`forceStream`

to give us a monadic action that will yield either
`StreamEmit a moreStream`

or `StreamStop`

. The stream gets to execute
some effect in the monad `m`

before telling us whether the stream has
ended or not. It can use this monadic effect to do things like read
from files, consult random number sources or use it as a side channel
to report errors.

The stream `append`

function shows how a stream can be interrogated by
a recursive function:

```
append :: Monad m => Stream m a -> Stream m a -> Stream m a
append s1 s2 = Stream $ do
streamStep <- forceStream s1
case streamStep of
StreamEmit a s1' -> forceStream (cons a (append s1' s2))
StreamStop -> forceStream s2
```

Streams look a lot like normal Haskell lists, except that rather than
the ambient Haskell effect of "possibly a non-terminating black hole",
we have the effects described by the monad `m`

(plus the non-optional
"possibly a non-terminating black hole" effect). For instance, we can
define a stream that emits characters read from a `Handle`

until it
hits the end of file:

```
ofHandle :: Handle -> Stream IO Char
ofHandle handle = loop
where
loop = Stream $ do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c loop)
```

`Stream`

s constitute the supply-side of data processing. We can use a
similar pattern to treat the consumption side. `Reader`

s are defined
as follows, and look very similar to
Iteratees:

```
newtype Reader a m b = Reader { forceReader :: m (ReaderStep a m b) }
data ReaderStep a m b
= ReaderRead (Maybe a -> Reader a m b)
| ReaderEmit b
```

Similar to `Stream`

s, a value of type `Reader a m b`

can be prodded
with `forceReader`

. It will then tell us (after some monadic effect)
whether it wants to read some data (`ReaderRead`

) or if it has
finished reading and wants to output some data (`ReaderEmit`

). The
`Maybe`

type constructor in `ReaderRead`

is intended to indicate
whether or not end-of-stream has been reached.

The obvious thing to do now is to connect up a `Stream`

and a `Reader`

to feed the data from one into the other. There are actually (at
least) two different ways of doing this, depending on whether we
execute the effects of the stream before the reader, or the other way
round. Executing the stream before the reader gives the following
definition:

```
(|>|) :: Monad m => Stream m a -> Reader a m b -> m b
s |>| r = do
streamStep <- forceStream s
case streamStep of
StreamEmit a s' -> do
readerStep <- forceReader r
case readerStep of
ReaderRead k -> s' |>| k (Just a)
ReaderEmit b -> return b
StreamStop -> do
runOnNothing r
```

If the stream stops early by returning `StreamStop`

, then we use the
following function `runOnNothing`

to feed `Nothing`

to a `Reader`

until it yields a value:

```
runOnNothing :: Monad m => Reader a m b -> m b
runOnNothing r = do
readerStep <- forceReader r
case readerStep of
ReaderRead k -> runOnNothing (k Nothing)
ReaderEmit b -> return b
```

I will now show that the `Stream`

and `Reader`

types may be
generalised to a common pattern. By exploiting this common pattern, we
obtain a powerful recursion principle for data interleaved with
effects. This recursion principle also comes equipped with a reasoning
principle, which allows us to prove things about functions that
recurse over data interleaved with effects.

The common shape of the `Stream`

and `Reader`

types is captured by the
following pair of Haskell type declarations. All the parts specific to
`Stream`

s or `Reader`

s have been abstracted out into the argument ```
f
:: * -> *
```

(which we will assume is an instance of the `Functor`

type class).

```
data Step m f = Step (f (D m f))
newtype D m f = D { force :: m (Step m f) }
```

A value of type `D m f`

is therefore an interleaving of the effects
described by `m`

with the pure data described by `f`

. The function
`force`

plays the part of the `forceStream`

and `forceReader`

functions defined above.

Effectful data can be constructed and deconstructed by the following functions:

```
construct :: Monad m => f (D m f) -> D m f
construct x = D $ return (Step x)
deconstruct :: Monad m => D m f -> m (f (D m f))
deconstruct d = do Step x <- force d
return x
```

When using `deconstruct`

there may be some effects to execute before
we get access to the underlying pure data described by `f`

.

To recover the `Stream`

type, we simply define the appropriate `f`

to
describe the two things that `Stream`

s are allowed to do: emit values
and cease to be.

```
data StreamStep a x
= StreamEmit a x
| StreamStop
deriving Functor
type Stream m a = D m (StreamStep a)
```

In the new `StreamStep`

type, the type parameter `x`

indicates the
hole where the next step of the recursion is placed.

The `nil`

and `cons`

constructors can then be defined in terms of
`construct`

and the appropriate part of the `StreamStep`

type:

```
nil :: Monad m => Stream m a
nil = construct StreamStop
cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = construct (StreamEmit a s)
```

The benefit of re-expressing the `Stream`

type in this way is that we
can now define a powerful recursion scheme on values of type `D m f`

,
including `Stream m a`

, and use this to define functions like `append`

and `(|>|)`

. The interesting part is that this recursion scheme comes
equipped with a reasoning principle, which allows us to prove things
about our functions. To properly define the recursion scheme we first
need to know what an Eilenberg-Moore algebra for a monad is.

For any monad `m`

, an
`m`

-Eilenberg-Moore algebra
consists of a pair of a type `a`

and a function `h :: m a -> a`

,
satisfying some laws that state that `h`

interacts nicely with the
structure of the monad. I think of the existence of an Eilenberg-Moore
algebra structure on `a`

as roughly stating that the type `a`

is
effectful in the sense that it can have additional effects "prepended"
on to it.

The existence of an Eilenberg-Moore algebra structure for a type can
be expressed as a Haskell type class, `EMAlgebra`

:

```
class (Functor m, Monad m) => EMAlgebra m a where
algebra :: m a -> a
```

Of course we cannot state the laws, so we shall just commit to
promising that they hold. The type class mechanism also ties us to
giving at most one Eilenberg-Moore structure for each pair of a monad
`m`

and a type `a`

, where in fact there may be many, but this is a
small price to pay for the convenience that type classes provide.

Every type of the form `m a`

for some monad `m`

is obviously
effectful, so we can give it an Eilenberg-Moore algebra structure
using the function `join :: m (m a) -> m a`

defined in the
`Control.Monad`

module:

```
instance (Functor m, Monad m) => EMAlgebra m (m a) where
algebra = join
```

This is the free Eilenberg-Moore algebra for the monad `m`

and the
type `a`

.

The property of having an Eilenberg-Moore algebra is also preserved by
the construction of function types. If `b`

has an Eilenberg-Moore
structure with respect to `m`

, then so does `a -> b`

for any `a`

:

2012-01-07 13:15
Unfortunately, this instance and the previous one overlap, so GHC
needs

`-XIncoherentInstances`

to handle it. This seems to work
for the examples here, but probably a better solution is needed in
general.
```
instance (EMAlgebra m b) => EMAlgebra m (a -> b) where
algebra x a = algebra $ do f <- x; return (f a)
```

Finally, every one of our interleaved data and effects types carries a default Eilenberg-Moore structure, achieved by prepending the new effect before the first effect of the data:

```
instance (Functor m, Monad m) => EMAlgebra m (D m f) where
algebra x = D $ x >>= force
```

Note that we now have two algebra structures on `D m f`

: the
`m`

-Eilenberg-Moore structure defined here, and the `f`

-algebra
structure defined by the `construct`

function above.

2012-01-07 13:15
Fixed a typo here, thanks to ehird in the comments below for spotting
it.

A basic recursion scheme is the *catamorphism*, which allows us to
eliminate an element of a recursive type `Rec f`

using an algebra ```
f a
-> a
```

, where `Rec f`

is defined as follows:

`data Rec f = In (f (Rec f))`

The catamorphism function, or to use its common name `fold`

, has the
following type:

`fold :: Functor f => (f a -> a) -> Rec f -> a`

A function of type `f a -> a`

is the (structure part) of an
`f`

-algebra. `f`

-algebras are similar to Eilenberg-Moore algebras,
except that they do not need to satisfy any laws (because we do not
assume that `f`

is a monad).

The `fold`

function itself satisfies the following law (which can also
be taken to be the definition):

`fold h (In x) = h (fmap (fold h) x)`

and is the *unique* function of type `Rec f -> a`

to do so. This
uniqueness property can be used to reason about functions built from
`fold`

.

To define functions that eliminate our interleaved effects and data
types `D m f`

, we could just use the fact that they are equivalent to
the form `m (Rec (f :.: m))`

, where `- :.: -`

indicates composition of
functors, and use `fold`

on the type `Rec (f :.: m)`

. However, in
doing this we are forced to consider the pure and effectful parts of
our data simultaneously.

A better approach, which is derivable from the basic `fold`

operator,
is to eliminate values of type `D m f`

using
`f`

-and-`m`

-algebras. That is, we assume that the result type `a`

has
a an `f`

-algebra structure `f a -> a`

*and* an Eilenberg-Moore
structure `m a -> a`

. In this way, we can separate the pure and
effectful parts of our recursion. By making use of the `EMAlgebra`

type class defined above, we do not need to explicitly mention the
effectful part at all.

Our new `effectfulFold`

combinator has the following type, and I have
given the direct Haskell implementation. It is an interesting exercise
to see now it can be defined in terms of the `fold`

combinator on ```
Rec
(f :.: m)
```

.

2012-01-07 13:15
Thanks to spacespacecomma
on reddit for pointing out that the original definition here didn't
type check. I was missing the applications of

`force`

.```
effectfulFold :: (Functor f, EMAlgebra m a) =>
(f a -> a)
-> D m f
-> a
effectfulFold h = algebra . fmap loop . force
where
loop (Step x) =
h $ fmap algebra $ fmap (fmap loop) $ fmap force $ x
```

As we prove in the
paper,
generalising Filinski and Støvring's proof, functions defined using
`effectfulFold`

satisfy the following two properties. First, they
preserve `f`

-algebras, taking the `f`

-algebra `construct`

to the
supplied `f`

-algebra `h`

:

`effectfulFold h (construct x) = h (fmap (effectfulFold h) x)`

Moreover, they preserve `m`

-Eilenberg-Moore algebras, taking the
default Eilenberg-Moore structure on `D m f`

to the implicitly
provided Eilenberg-Moore structure on `a`

:

`effectfulFold h (algebra x) = algebra (fmap (effectfulFold h) x)`

Analogously to `fold h`

, `effectfulFold h`

is the *unique* function
satisfying these two properties. In the paper, we use uniqueness to
derive an induction principle for data interleaved with monadic
effects, but here we can use uniqueness directly to reason about
functions defined using `effectfulFold`

.

The `append`

function on streams that I defined by hand above can be
re-expressed using the `effectfulFold`

combinator:

```
append :: (Functor m, Monad m) => Stream m a -> Stream m a -> Stream m a
append s1 s2 = effectfulFold h s1
where h (StreamEmit a s) = cons a s
h StreamStop = s2
```

By its definition in terms of `effectfulFold`

we know that `append`

preserves the Eilenberg-Moore structure on its first argument. This
means that the following equation holds for `x :: m (Stream m a)`

`append (algebra x) s2 = algebra (fmap (\s -> append s s2) x)`

Note that `append`

does not preserve the Eilenberg-Moore structure on
its second argument. The Eilenberg-Moore structure for streams
prepends effects on to the stream, but these effects will not be
executed by `append`

until *after* all the effects in the first stream
have been executed.

Also by its definition in terms of `effectfulFold`

, we know that
`append`

satisfies the following equations with respect to the "pure"
part of streams:

```
append (construct StreamStop) s2 = s2
append (construct (StreamEmit a s1)) s2 = construct (StreamEmit a (append s1 s2))
```

We can now use the uniqueness of functions defined by `effectfulFold`

to see that `append`

is associative. To show this, I'll use the
uniqueness of functions defined by `effectfulFold`

. We have two
functions of type `Stream m a -> Stream m a`

that we wish to prove
equivalent:

`\s1 -> append s1 (append s2 s3)`

and

`\s1 -> append (append s1 s2) s3`

where the first one is defined directly in terms of
`effectfulFold`

. If we can show that the second function obeys the
same properties as the first, i.e. that it preserves the
`m`

-Eilenberg-Moore algebra and the `f`

-algebra used in the definition
of `append`

, then by uniqueness we will have shown that they are the
same function.

It is easy to check that the second function preserves the
Eilenberg-Moore structure on `Stream m a`

, since it is just the
composition of two functions that do so. So the meat of the proof is
in showing that it preserves the `f`

-algebra structure. This splits
into two cases, one for `StreamStop`

and one for `StreamEmit`

. For the
former, we must show that

`append (append (construct StreamStop) s2) s3 = append s2 s3`

but this follows directly from the properties we already know about
`append`

. The second case is a little harder. We must show that

```
append (append (construct (StreamEmit a s1)) s2) s3
=
construct (StreamEmit a (append (append s1 s2) s3))
```

This follows by repeated application of our knowledge about how
`append`

operates on input of the form `construct (StreamEmit a s1)`

.

Thus we have used the uniqueness property of functions defined using
`effectfulFold`

to show that `append`

is associative.

Just as the `Stream`

type can be defined in terms of the generic ```
D m
f
```

type, the `Reader`

type can be defined in the same way:

```
data ReaderStep a b x
= ReaderRead (Maybe a -> x)
| ReaderEmit b
deriving Functor
type Reader a m b = D m (ReaderStep a b)
```

The `runOnNothing`

function can be defined in terms of
`effectfulFold`

, by recursing on the `Reader`

argument:

```
runOnNothing :: (Functor m, Monad m) => Reader a m b -> m b
runOnNothing = effectfulFold f
where f (ReaderRead k) = k Nothing
f (ReaderEmit b) = return b
```

Likewise, the `(|>|)`

function that connects a stream with a reader
can be defined in terms of `effectfulFold`

, by recursion on the stream
argument, and using the Eilenberg-Moore structure on the type ```
Reader
a m b -> m b
```

:

```
(|>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
(|>|) = effectfulFold f
where f (StreamEmit a h) r = do
readerStep <- deconstruct r
case readerStep of
ReaderRead k -> h (k (Just a))
ReaderEmit b -> return b
f StreamStop r = runOnNothing r
```

The `(|>|)`

function executes the effects of the stream before the
effects of the reader, letting the stream dictate how events
proceed. An alternative is to execute the effects of the reader first,
by defining the function in terms of recursion on the reader argument,
again using `effectfulFold`

:

```
(|>>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
s |>>| r = effectfulFold f r s
where f (ReaderRead k) s = do
streamStep <- deconstruct s
case streamStep of
StreamEmit a s' -> k (Just a) s'
StreamStop -> k Nothing nil
f (ReaderEmit b) s = return b
```

There are obviously many other possibilities for connecting `Stream`

s
to `Reader`

s. If the `Stream`

finishes early, we could return the
uncompleted `Reader`

rather than passing it on to
`runOnNothing`

. Likewise, if the `Reader`

emits a value before all the
`Stream`

has been read, we could return the leftover stream instead of
just dropping it on the floor. In any case, all these possibilities are
definable in terms of `effectfulFold`

.

`Reader`

s are an instance of the `Monad`

type class as the following
Haskell declaration witnesses:

2012-01-07 13:15
Thanks again to spacespacecomma
on reddit for pointing out that this doesn't directly work. You need
to wrap the

`Reader`

type in a `newtype`

to get it to
work. I'll leave it as-is to give the general idea.```
instance Monad m => Monad (Reader a m) where
return b = construct (ReaderEmit b)
c >>= k = effectfulFold f c
where f (ReaderRead k) = construct (ReaderRead k)
f (ReaderEmit b) = k b
```

Using the uniqueness property of `effectfulFold`

it is possible to
prove that this instance really does satisfy the monads laws. In fact,
it is possible to show that `Reader a m`

is the sum of the monad `m`

with the free monad on the functor `f x = Maybe a -> x`

. This is an
instance of a fact originally observed by
Hyland, Plotkin and Power
(Theorem 4).

If `Stream`

s produce data and `Reader`

s consume data, then what goes
in between? `Processor`

s! A `Processor`

is an instance of the same
interleaved data and effects pattern as `Stream`

s and `Reader`

s:

```
data ProcessorStep a b x
= ProcessorRead (Maybe a -> x)
| ProcessorEmit b x
| ProcessorStop
type Processor a m b = D m (ProcessorStep a b)
```

A `Processor`

, once kicked using `force`

, can either ask for more
input (`ProcessorRead`

), can emit some output (`ProcessorEmit`

) with a
promise to carry on, or can signal end of stream
(`ProcessorStop`

). `Processor`

s are intended to fulfil the same role
as `Enumeratees`

or `Conduit`

s as intermediaries that manipulate data
in some way. They are also very similar to the StreamProcessor
representation defined by
Ghani, Hancock and Pattinson.

With a few helper functions:

```
processorRead :: Monad m => (Maybe a -> Processor a m b) -> Processor a m b
processorRead k = construct (ProcessorRead k)
processorStop :: Monad m => Processor a m b
processorStop = construct ProcessorStop
processorEmit :: Monad m => b -> Processor a m b -> Processor a m b
processorEmit b proc = construct (ProcessorEmit b proc)
```

we can define a processor that filters:

```
filter :: Monad m => (a -> Maybe b) -> Processor a m b
filter h = processorRead getInput
where
getInput Nothing = processorStop
getInput (Just a) =
case h a of
Nothing -> filter h
Just a -> processorEmit a (filter h)
```

It is also possible to define several combinators that compose
`Stream`

s with `Processor`

s, `Processor`

s with `Reader`

s and
`Processor`

s with `Processor`

s in a similar manner to the
Conduits library. We
have same choices as with the composition of `Stream`

s and `Reader`

s
in terms of the ordering of effects. The key point is that they may
all be defined in terms of `effectfulFold`

, and reasoned about using
the universal property. For instance, we can prove that composition of
`Processor`

s is associative, which opens the door to justified
optimisation principles for chains of stream processors.

In the above I have taken an inductive view on `Stream`

s, `Reader`

s
and so on. In Haskell, each recursive type is simultaneously an
inductive and co-inductive type. We can use this change-of-viewpoint
to give another way of defining interleaved data and effects in terms
of unfolds. The following function `unfold`

takes a seed state of type
`s`

and an evolution function of type `s -> m (f s)`

and generates a
value of type `D m f`

.

2012-01-07 13:15 Was missing an additional

`Monad m`

constraint here.```
unfold :: (Monad m, Functor f) => s -> (s -> m (f s)) -> D m f
unfold s step = loop s
where loop s = D $ do
f <- step s
return (Step (fmap loop f))
```

The `ofHandle`

stream defined above can be re-expressed in terms of
`unfold`

as follows:

```
ofHandle :: Handle -> Stream IO Char
ofHandle handle = unfold () step
where
step () = do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c ())
```

In fact, we could change the representation of our interleaved data
and effect type to be directly expressed in terms of `unfolds`

:

```
data D m f where
D :: s -> (s -> m (f s)) -> D m f
```

This representation makes it harder to define the `effectfulFold`

function and its associated reasoning principle. On the other hand, it
does allow for fusion of chained sequences of `Stream`

s, `Processor`

s
and `Reader`

s in a similar manner to the
Stream Fusion
(alternative ACM paywall link)
paper. Hopefully, this may allow for sequences of stream processing
functions to be fused together and open up more possibilities for
optimisation. But this remains to be seen.