Reasoning about Stream Processing with Effects
It is a truth, universally acknowledged, that any programming technique must be in want of a reasoning principle.
Stream processing in Haskell is very much in the air at the moment, what with Iteratees (as embodied in the Enumerator library), Conduits and probably some more that I don't know about.
Patty Johann, Neil Ghani, Bart Jacobs and I have recently had the paper "Fibrational Induction Meets Effects" accepted to FoSSaCS 2012 that turns out to be very relevant to discussing and reasoning about the kinds of stream processing problems that these Haskell libraries seek to resolve.
In the paper we build upon the work of Andrzej Filinski and Kristian Støvring on induction principles for data structures that interleave pure data with effects, where the effects are defined in terms of some monad. Filinski and Støvring gave a induction principle for such interleaved effectful data types in a small programming language with effects and fixed notion of data type and predicate. In our FoSSaCS paper, we have generalised this to a more general categorical setting, and also generalised the notion of predicate that can be considered (so you can have proof-relevant predicates, or Kripke predicates, for instance).
Our paper is quite technical, since we need the structure of fibrations and so on to properly attain the right level of generality. Nevertheless, the core principle is a relatively simple and revolves around a recursion scheme for pure data interleaved with monadic effects (which generalises the one given by Filinski and Støvring). This turns out to have direct application to reasoning about the processing of streaming data.
Effectful Streams
Effectful streams generate potentially infinite amounts of data by executing effects as they do so. For example, an effectful stream may produce data by reading it from a network socket. An effectful stream is nothing more than an interleaving of some monad "m
" with news of whether the stream has stopped or has yielded an element. This can be expressed by the following Haskell declarations of a pair of mutually recursive types:
newtype Stream m a = Stream { forceStream :: m (StreamStep m a) }
data StreamStep m a
= StreamEmit a (Stream m a)
| StreamStop
Constructing streams with no (new) effects is accomplished by just using the return
of our chosen monad, and the appropriate constructor:
nil :: Monad m => Stream m a
nil = Stream $ return StreamStop
cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = Stream $ return (StreamEmit a s)
Given a value stream
of type Stream m a
we can kick it with forceStream
to give us a monadic action that will yield either StreamEmit a moreStream
or StreamStop
. The stream gets to execute some effect in the monad m
before telling us whether the stream has ended or not. It can use this monadic effect to do things like read from files, consult random number sources or use it as a side channel to report errors.
The stream append
function shows how a stream can be interrogated by a recursive function:
append :: Monad m => Stream m a -> Stream m a -> Stream m a
append s1 s2 = Stream $ do
streamStep <- forceStream s1
case streamStep of
StreamEmit a s1' -> forceStream (cons a (append s1' s2))
StreamStop -> forceStream s2
Streams look a lot like normal Haskell lists, except that rather than the ambient Haskell effect of "possibly a non-terminating black hole", we have the effects described by the monad m
(plus the non-optional "possibly a non-terminating black hole" effect). For instance, we can define a stream that emits characters read from a Handle
until it hits the end of file:
ofHandle :: Handle -> Stream IO Char
ofHandle handle = loop
where
loop = Stream $ do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c loop)
Stream Readers
Stream
s constitute the supply-side of data processing. We can use a similar pattern to treat the consumption side. Reader
s are defined as follows, and look very similar to Iteratees:
newtype Reader a m b = Reader { forceReader :: m (ReaderStep a m b) }
data ReaderStep a m b
= ReaderRead (Maybe a -> Reader a m b)
| ReaderEmit b
Similar to Stream
s, a value of type Reader a m b
can be prodded with forceReader
. It will then tell us (after some monadic effect) whether it wants to read some data (ReaderRead
) or if it has finished reading and wants to output some data (ReaderEmit
). The Maybe
type constructor in ReaderRead
is intended to indicate whether or not end-of-stream has been reached.
The obvious thing to do now is to connect up a Stream
and a Reader
to feed the data from one into the other. There are actually (at least) two different ways of doing this, depending on whether we execute the effects of the stream before the reader, or the other way round. Executing the stream before the reader gives the following definition:
(|>|) :: Monad m => Stream m a -> Reader a m b -> m b
s |>| r = do
streamStep <- forceStream s
case streamStep of
StreamEmit a s' -> do
readerStep <- forceReader r
case readerStep of
ReaderRead k -> s' |>| k (Just a)
ReaderEmit b -> return b
StreamStop -> do
runOnNothing r
If the stream stops early by returning StreamStop
, then we use the following function runOnNothing
to feed Nothing
to a Reader
until it yields a value:
runOnNothing :: Monad m => Reader a m b -> m b
runOnNothing r = do
readerStep <- forceReader r
case readerStep of
ReaderRead k -> runOnNothing (k Nothing)
ReaderEmit b -> return b
Generic Interleaved Data and A Recursion Scheme
I will now show that the Stream
and Reader
types may be generalised to a common pattern. By exploiting this common pattern, we obtain a powerful recursion principle for data interleaved with effects. This recursion principle also comes equipped with a reasoning principle, which allows us to prove things about functions that recurse over data interleaved with effects.
The common shape of the Stream
and Reader
types is captured by the following pair of Haskell type declarations. All the parts specific to Stream
s or Reader
s have been abstracted out into the argument f :: * -> *
(which we will assume is an instance of the Functor
type class).
data Step m f = Step (f (D m f))
newtype D m f = D { force :: m (Step m f) }
A value of type D m f
is therefore an interleaving of the effects described by m
with the pure data described by f
. The function force
plays the part of the forceStream
and forceReader
functions defined above.
Effectful data can be constructed and deconstructed by the following functions:
construct :: Monad m => f (D m f) -> D m f
construct x = D $ return (Step x)
deconstruct :: Monad m => D m f -> m (f (D m f))
deconstruct d = do Step x <- force d
return x
When using deconstruct
there may be some effects to execute before we get access to the underlying pure data described by f
.
To recover the Stream
type, we simply define the appropriate f
to describe the two things that Stream
s are allowed to do: emit values and cease to be.
data StreamStep a x
= StreamEmit a x
| StreamStop
deriving Functor
type Stream m a = D m (StreamStep a)
In the new StreamStep
type, the type parameter x
indicates the hole where the next step of the recursion is placed.
The nil
and cons
constructors can then be defined in terms of construct
and the appropriate part of the StreamStep
type:
nil :: Monad m => Stream m a
nil = construct StreamStop
cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = construct (StreamEmit a s)
The benefit of re-expressing the Stream
type in this way is that we can now define a powerful recursion scheme on values of type D m f
, including Stream m a
, and use this to define functions like append
and (|>|)
. The interesting part is that this recursion scheme comes equipped with a reasoning principle, which allows us to prove things about our functions. To properly define the recursion scheme we first need to know what an Eilenberg-Moore algebra for a monad is.
Eilenberg-Moore Algebras
For any monad m
, an m
-Eilenberg-Moore algebra consists of a pair of a type a
and a function h :: m a -> a
, satisfying some laws that state that h
interacts nicely with the structure of the monad. I think of the existence of an Eilenberg-Moore algebra structure on a
as roughly stating that the type a
is effectful in the sense that it can have additional effects "prepended" on to it.
The existence of an Eilenberg-Moore algebra structure for a type can be expressed as a Haskell type class, EMAlgebra
:
class (Functor m, Monad m) => EMAlgebra m a where
algebra :: m a -> a
Of course we cannot state the laws, so we shall just commit to promising that they hold. The type class mechanism also ties us to giving at most one Eilenberg-Moore structure for each pair of a monad m
and a type a
, where in fact there may be many, but this is a small price to pay for the convenience that type classes provide.
Every type of the form m a
for some monad m
is obviously effectful, so we can give it an Eilenberg-Moore algebra structure using the function join :: m (m a) -> m a
defined in the Control.Monad
module:
instance (Functor m, Monad m) => EMAlgebra m (m a) where
algebra = join
This is the free Eilenberg-Moore algebra for the monad m
and the type a
.
The property of having an Eilenberg-Moore algebra is also preserved by the construction of function types. If b
has an Eilenberg-Moore structure with respect to m
, then so does a -> b
for any a
:
-XIncoherentInstances
to handle it. This seems to work for the examples here, but probably a better solution is needed in general. instance (EMAlgebra m b) => EMAlgebra m (a -> b) where
algebra x a = algebra $ do f <- x; return (f a)
Finally, every one of our interleaved data and effects types carries a default Eilenberg-Moore structure, achieved by prepending the new effect before the first effect of the data:
instance (Functor m, Monad m) => EMAlgebra m (D m f) where
algebra x = D $ x >>= force
Note that we now have two algebra structures on D m f
: the m
-Eilenberg-Moore structure defined here, and the f
-algebra structure defined by the construct
function above.
A Recursion Scheme
A basic recursion scheme is the catamorphism, which allows us to eliminate an element of a recursive type Rec f
using an algebra f a -> a
, where Rec f
is defined as follows:
data Rec f = In (f (Rec f))
The catamorphism function, or to use its common name fold
, has the following type:
fold :: Functor f => (f a -> a) -> Rec f -> a
A function of type f a -> a
is the (structure part) of an f
-algebra. f
-algebras are similar to Eilenberg-Moore algebras, except that they do not need to satisfy any laws (because we do not assume that f
is a monad).
The fold
function itself satisfies the following law (which can also be taken to be the definition):
fold h (In x) = h (fmap (fold h) x)
and is the unique function of type Rec f -> a
to do so. This uniqueness property can be used to reason about functions built from fold
.
To define functions that eliminate our interleaved effects and data types D m f
, we could just use the fact that they are equivalent to the form m (Rec (f :.: m))
, where - :.: -
indicates composition of functors, and use fold
on the type Rec (f :.: m)
. However, in doing this we are forced to consider the pure and effectful parts of our data simultaneously.
A better approach, which is derivable from the basic fold
operator, is to eliminate values of type D m f
using f
-and-m
-algebras. That is, we assume that the result type a
has a an f
-algebra structure f a -> a
and an Eilenberg-Moore structure m a -> a
. In this way, we can separate the pure and effectful parts of our recursion. By making use of the EMAlgebra
type class defined above, we do not need to explicitly mention the effectful part at all.
Our new effectfulFold
combinator has the following type, and I have given the direct Haskell implementation. It is an interesting exercise to see now it can be defined in terms of the fold
combinator on Rec (f :.: m)
.
force
.effectfulFold :: (Functor f, EMAlgebra m a) =>
(f a -> a)
-> D m f
-> a
effectfulFold h = algebra . fmap loop . force
where
loop (Step x) =
h $ fmap algebra $ fmap (fmap loop) $ fmap force $ x
As we prove in the paper, generalising Filinski and Støvring's proof, functions defined using effectfulFold
satisfy the following two properties. First, they preserve f
-algebras, taking the f
-algebra construct
to the supplied f
-algebra h
:
effectfulFold h (construct x) = h (fmap (effectfulFold h) x)
Moreover, they preserve m
-Eilenberg-Moore algebras, taking the default Eilenberg-Moore structure on D m f
to the implicitly provided Eilenberg-Moore structure on a
:
effectfulFold h (algebra x) = algebra (fmap (effectfulFold h) x)
Analogously to fold h
, effectfulFold h
is the unique function satisfying these two properties. In the paper, we use uniqueness to derive an induction principle for data interleaved with monadic effects, but here we can use uniqueness directly to reason about functions defined using effectfulFold
.
Using the Recursion Scheme
The append
function on streams that I defined by hand above can be re-expressed using the effectfulFold
combinator:
append :: (Functor m, Monad m) => Stream m a -> Stream m a -> Stream m a
append s1 s2 = effectfulFold h s1
where h (StreamEmit a s) = cons a s
h StreamStop = s2
By its definition in terms of effectfulFold
we know that append
preserves the Eilenberg-Moore structure on its first argument. This means that the following equation holds for x :: m (Stream m a)
append (algebra x) s2 = algebra (fmap (\s -> append s s2) x)
Note that append
does not preserve the Eilenberg-Moore structure on its second argument. The Eilenberg-Moore structure for streams prepends effects on to the stream, but these effects will not be executed by append
until after all the effects in the first stream have been executed.
Also by its definition in terms of effectfulFold
, we know that append
satisfies the following equations with respect to the "pure" part of streams:
append (construct StreamStop) s2 = s2
append (construct (StreamEmit a s1)) s2 = construct (StreamEmit a (append s1 s2))
We can now use the uniqueness of functions defined by effectfulFold
to see that append
is associative. To show this, I'll use the uniqueness of functions defined by effectfulFold
. We have two functions of type Stream m a -> Stream m a
that we wish to prove equivalent:
\s1 -> append s1 (append s2 s3)
and
\s1 -> append (append s1 s2) s3
where the first one is defined directly in terms of effectfulFold
. If we can show that the second function obeys the same properties as the first, i.e. that it preserves the m
-Eilenberg-Moore algebra and the f
-algebra used in the definition of append
, then by uniqueness we will have shown that they are the same function.
It is easy to check that the second function preserves the Eilenberg-Moore structure on Stream m a
, since it is just the composition of two functions that do so. So the meat of the proof is in showing that it preserves the f
-algebra structure. This splits into two cases, one for StreamStop
and one for StreamEmit
. For the former, we must show that
append (append (construct StreamStop) s2) s3 = append s2 s3
but this follows directly from the properties we already know about append
. The second case is a little harder. We must show that
append (append (construct (StreamEmit a s1)) s2) s3
=
construct (StreamEmit a (append (append s1 s2) s3))
This follows by repeated application of our knowledge about how append
operates on input of the form construct (StreamEmit a s1)
.
Thus we have used the uniqueness property of functions defined using effectfulFold
to show that append
is associative.
Back to Readers
Just as the Stream
type can be defined in terms of the generic D m f
type, the Reader
type can be defined in the same way:
data ReaderStep a b x
= ReaderRead (Maybe a -> x)
| ReaderEmit b
deriving Functor
type Reader a m b = D m (ReaderStep a b)
The runOnNothing
function can be defined in terms of effectfulFold
, by recursing on the Reader
argument:
runOnNothing :: (Functor m, Monad m) => Reader a m b -> m b
runOnNothing = effectfulFold f
where f (ReaderRead k) = k Nothing
f (ReaderEmit b) = return b
Likewise, the (|>|)
function that connects a stream with a reader can be defined in terms of effectfulFold
, by recursion on the stream argument, and using the Eilenberg-Moore structure on the type Reader a m b -> m b
:
(|>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
(|>|) = effectfulFold f
where f (StreamEmit a h) r = do
readerStep <- deconstruct r
case readerStep of
ReaderRead k -> h (k (Just a))
ReaderEmit b -> return b
f StreamStop r = runOnNothing r
The (|>|)
function executes the effects of the stream before the effects of the reader, letting the stream dictate how events proceed. An alternative is to execute the effects of the reader first, by defining the function in terms of recursion on the reader argument, again using effectfulFold
:
(|>>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
s |>>| r = effectfulFold f r s
where f (ReaderRead k) s = do
streamStep <- deconstruct s
case streamStep of
StreamEmit a s' -> k (Just a) s'
StreamStop -> k Nothing nil
f (ReaderEmit b) s = return b
There are obviously many other possibilities for connecting Stream
s to Reader
s. If the Stream
finishes early, we could return the uncompleted Reader
rather than passing it on to runOnNothing
. Likewise, if the Reader
emits a value before all the Stream
has been read, we could return the leftover stream instead of just dropping it on the floor. In any case, all these possibilities are definable in terms of effectfulFold
.
Reader
s are an instance of the Monad
type class as the following Haskell declaration witnesses:
Reader
type in a newtype
to get it to work. I'll leave it as-is to give the general idea.instance Monad m => Monad (Reader a m) where
return b = construct (ReaderEmit b)
c >>= k = effectfulFold f c
where f (ReaderRead k) = construct (ReaderRead k)
f (ReaderEmit b) = k b
Using the uniqueness property of effectfulFold
it is possible to prove that this instance really does satisfy the monads laws. In fact, it is possible to show that Reader a m
is the sum of the monad m
with the free monad on the functor f x = Maybe a -> x
. This is an instance of a fact originally observed by Hyland, Plotkin and Power (Theorem 4).
Processors
If Stream
s produce data and Reader
s consume data, then what goes in between? Processor
s! A Processor
is an instance of the same interleaved data and effects pattern as Stream
s and Reader
s:
data ProcessorStep a b x
= ProcessorRead (Maybe a -> x)
| ProcessorEmit b x
| ProcessorStop
type Processor a m b = D m (ProcessorStep a b)
A Processor
, once kicked using force
, can either ask for more input (ProcessorRead
), can emit some output (ProcessorEmit
) with a promise to carry on, or can signal end of stream (ProcessorStop
). Processor
s are intended to fulfil the same role as Enumeratees
or Conduit
s as intermediaries that manipulate data in some way. They are also very similar to the StreamProcessor representation defined by Ghani, Hancock and Pattinson.
With a few helper functions:
processorRead :: Monad m => (Maybe a -> Processor a m b) -> Processor a m b
processorRead k = construct (ProcessorRead k)
processorStop :: Monad m => Processor a m b
processorStop = construct ProcessorStop
processorEmit :: Monad m => b -> Processor a m b -> Processor a m b
processorEmit b proc = construct (ProcessorEmit b proc)
we can define a processor that filters:
filter :: Monad m => (a -> Maybe b) -> Processor a m b
filter h = processorRead getInput
where
getInput Nothing = processorStop
getInput (Just a) =
case h a of
Nothing -> filter h
Just a -> processorEmit a (filter h)
It is also possible to define several combinators that compose Stream
s with Processor
s, Processor
s with Reader
s and Processor
s with Processor
s in a similar manner to the Conduits library. We have same choices as with the composition of Stream
s and Reader
s in terms of the ordering of effects. The key point is that they may all be defined in terms of effectfulFold
, and reasoned about using the universal property. For instance, we can prove that composition of Processor
s is associative, which opens the door to justified optimisation principles for chains of stream processors.
The Co-Inductive View
In the above I have taken an inductive view on Stream
s, Reader
s and so on. In Haskell, each recursive type is simultaneously an inductive and co-inductive type. We can use this change-of-viewpoint to give another way of defining interleaved data and effects in terms of unfolds. The following function unfold
takes a seed state of type s
and an evolution function of type s -> m (f s)
and generates a value of type D m f
.
Monad m
constraint here.unfold :: (Monad m, Functor f) => s -> (s -> m (f s)) -> D m f
unfold s step = loop s
where loop s = D $ do
f <- step s
return (Step (fmap loop f))
The ofHandle
stream defined above can be re-expressed in terms of unfold
as follows:
ofHandle :: Handle -> Stream IO Char
ofHandle handle = unfold () step
where
step () = do
isEOF <- hIsEOF handle
if isEOF then do hClose handle
return StreamStop
else do c <- hGetChar handle
return (StreamEmit c ())
In fact, we could change the representation of our interleaved data and effect type to be directly expressed in terms of unfolds
:
data D m f where
D :: s -> (s -> m (f s)) -> D m f
This representation makes it harder to define the effectfulFold
function and its associated reasoning principle. On the other hand, it does allow for fusion of chained sequences of Stream
s, Processor
s and Reader
s in a similar manner to the Stream Fusion (alternative ACM paywall link) paper. Hopefully, this may allow for sequences of stream processing functions to be fused together and open up more possibilities for optimisation. But this remains to be seen.