Reasoning about Stream Processing with Effects

Friday, 6 January 2012 | Index | Tags: | [RSS Icon] RSS Feed

It is a truth, universally acknowledged, that any programming technique must be in want of a reasoning principle.

Stream processing in Haskell is very much in the air at the moment, what with Iteratees (as embodied in the Enumerator library), Conduits and probably some more that I don’t know about.

Patty Johann, Neil Ghani, Bart Jacobs and I have recently had the paper “Fibrational Induction Meets Effects” accepted to FoSSaCS 2012 that turns out to be very relevant to discussing and reasoning about the kinds of stream processing problems that these Haskell libraries seek to resolve.

In the paper we build upon the work of Andrzej Filinski and Kristian Støvring on induction principles for data structures that interleave pure data with effects, where the effects are defined in terms of some monad. Filinski and Støvring gave a induction principle for such interleaved effectful data types in a small programming language with effects and fixed notion of data type and predicate. In our FoSSaCS paper, we have generalised this to a more general categorical setting, and also generalised the notion of predicate that can be considered (so you can have proof-relevant predicates, or Kripke predicates, for instance).

Our paper is quite technical, since we need the structure of fibrations and so on to properly attain the right level of generality. Nevertheless, the core principle is a relatively simple and revolves around a recursion scheme for pure data interleaved with monadic effects (which generalises the one given by Filinski and Støvring). This turns out to have direct application to reasoning about the processing of streaming data.

Effectful Streams

Effectful streams generate potentially infinite amounts of data by executing effects as they do so. For example, an effectful stream may produce data by reading it from a network socket. An effectful stream is nothing more than an interleaving of some monad “m” with news of whether the stream has stopped or has yielded an element. This can be expressed by the following Haskell declarations of a pair of mutually recursive types:

newtype Stream m a = Stream { forceStream :: m (StreamStep m a) }

data StreamStep m a
    = StreamEmit a (Stream m a)
    | StreamStop

Constructing streams with no (new) effects is accomplished by just using the return of our chosen monad, and the appropriate constructor:

nil :: Monad m => Stream m a
nil = Stream $ return StreamStop

cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = Stream $ return (StreamEmit a s)

Given a value stream of type Stream m a we can kick it with forceStream to give us a monadic action that will yield either StreamEmit a moreStream or StreamStop. The stream gets to execute some effect in the monad m before telling us whether the stream has ended or not. It can use this monadic effect to do things like read from files, consult random number sources or use it as a side channel to report errors.

The stream append function shows how a stream can be interrogated by a recursive function:

append :: Monad m => Stream m a -> Stream m a -> Stream m a
append s1 s2 = Stream $ do
  streamStep <- forceStream s1
  case streamStep of
    StreamEmit a s1' -> forceStream (cons a (append s1' s2))
    StreamStop       -> forceStream s2

Streams look a lot like normal Haskell lists, except that rather than the ambient Haskell effect of “possibly a non-terminating black hole”, we have the effects described by the monad m (plus the non-optional “possibly a non-terminating black hole” effect). For instance, we can define a stream that emits characters read from a Handle until it hits the end of file:

ofHandle :: Handle -> Stream IO Char
ofHandle handle = loop
    loop = Stream $ do
      isEOF <- hIsEOF handle
      if isEOF then do hClose handle
                       return StreamStop
               else do c <- hGetChar handle
                       return (StreamEmit c loop)

Stream Readers

Streams constitute the supply-side of data processing. We can use a similar pattern to treat the consumption side. Readers are defined as follows, and look very similar to Iteratees:

newtype Reader a m b = Reader { forceReader :: m (ReaderStep a m b) }

data ReaderStep a m b
    = ReaderRead (Maybe a -> Reader a m b)
    | ReaderEmit b

Similar to Streams, a value of type Reader a m b can be prodded with forceReader. It will then tell us (after some monadic effect) whether it wants to read some data (ReaderRead) or if it has finished reading and wants to output some data (ReaderEmit). The Maybe type constructor in ReaderRead is intended to indicate whether or not end-of-stream has been reached.

The obvious thing to do now is to connect up a Stream and a Reader to feed the data from one into the other. There are actually (at least) two different ways of doing this, depending on whether we execute the effects of the stream before the reader, or the other way round. Executing the stream before the reader gives the following definition:

(|>|) :: Monad m => Stream m a -> Reader a m b -> m b
s |>| r = do
    streamStep <- forceStream s
    case streamStep of
      StreamEmit a s' -> do
        readerStep <- forceReader r
        case readerStep of
          ReaderRead k -> s' |>| k (Just a)
          ReaderEmit b -> return b
      StreamStop -> do
        runOnNothing r

If the stream stops early by returning StreamStop, then we use the following function runOnNothing to feed Nothing to a Reader until it yields a value:

runOnNothing :: Monad m => Reader a m b -> m b
runOnNothing r = do
    readerStep <- forceReader r
    case readerStep of
      ReaderRead k -> runOnNothing (k Nothing)
      ReaderEmit b -> return b

Generic Interleaved Data and A Recursion Scheme

I will now show that the Stream and Reader types may be generalised to a common pattern. By exploiting this common pattern, we obtain a powerful recursion principle for data interleaved with effects. This recursion principle also comes equipped with a reasoning principle, which allows us to prove things about functions that recurse over data interleaved with effects.

The common shape of the Stream and Reader types is captured by the following pair of Haskell type declarations. All the parts specific to Streams or Readers have been abstracted out into the argument f :: * -> * (which we will assume is an instance of the Functor type class).

data Step m f = Step (f (D m f))

newtype D m f = D { force :: m (Step m f) }

A value of type D m f is therefore an interleaving of the effects described by m with the pure data described by f. The function force plays the part of the forceStream and forceReader functions defined above.

Effectful data can be constructed and deconstructed by the following functions:

construct :: Monad m => f (D m f) -> D m f
construct x = D $ return (Step x)

deconstruct :: Monad m => D m f -> m (f (D m f))
deconstruct d = do Step x <- force d
                   return x

When using deconstruct there may be some effects to execute before we get access to the underlying pure data described by f.

To recover the Stream type, we simply define the appropriate f to describe the two things that Streams are allowed to do: emit values and cease to be.

data StreamStep a x
    = StreamEmit a x
    | StreamStop
    deriving Functor

type Stream m a = D m (StreamStep a)

In the new StreamStep type, the type parameter x indicates the hole where the next step of the recursion is placed.

The nil and cons constructors can then be defined in terms of construct and the appropriate part of the StreamStep type:

nil :: Monad m => Stream m a
nil = construct StreamStop

cons :: Monad m => a -> Stream m a -> Stream m a
cons a s = construct (StreamEmit a s)

The benefit of re-expressing the Stream type in this way is that we can now define a powerful recursion scheme on values of type D m f, including Stream m a, and use this to define functions like append and (|>|). The interesting part is that this recursion scheme comes equipped with a reasoning principle, which allows us to prove things about our functions. To properly define the recursion scheme we first need to know what an Eilenberg-Moore algebra for a monad is.

Eilenberg-Moore Algebras

For any monad m, an m-Eilenberg-Moore algebra consists of a pair of a type a and a function h :: m a -> a, satisfying some laws that state that h interacts nicely with the structure of the monad. I think of the existence of an Eilenberg-Moore algebra structure on a as roughly stating that the type a is effectful in the sense that it can have additional effects “prepended” on to it.

The existence of an Eilenberg-Moore algebra structure for a type can be expressed as a Haskell type class, EMAlgebra:

class (Functor m, Monad m) => EMAlgebra m a where
    algebra :: m a -> a

Of course we cannot state the laws, so we shall just commit to promising that they hold. The type class mechanism also ties us to giving at most one Eilenberg-Moore structure for each pair of a monad m and a type a, where in fact there may be many, but this is a small price to pay for the convenience that type classes provide.

Every type of the form m a for some monad m is obviously effectful, so we can give it an Eilenberg-Moore algebra structure using the function join :: m (m a) -> m a defined in the Control.Monad module:

instance (Functor m, Monad m) => EMAlgebra m (m a) where
    algebra = join

This is the free Eilenberg-Moore algebra for the monad m and the type a.

The property of having an Eilenberg-Moore algebra is also preserved by the construction of function types. If b has an Eilenberg-Moore structure with respect to m, then so does a -> b for any a:
2012-01-07 13:15 Unfortunately, this instance and the previous one overlap, so GHC needs -XIncoherentInstances to handle it. This seems to work for the examples here, but probably a better solution is needed in general.
instance (EMAlgebra m b) => EMAlgebra m (a -> b) where
    algebra x a = algebra $ do f <- x; return (f a)

Finally, every one of our interleaved data and effects types carries a default Eilenberg-Moore structure, achieved by prepending the new effect before the first effect of the data:

instance (Functor m, Monad m) => EMAlgebra m (D m f) where
    algebra x = D $ x >>= force
Note that we now have two algebra structures on D m f: the m-Eilenberg-Moore structure defined here, and the f-algebra structure defined by the construct function above.
2012-01-07 13:15 Fixed a typo here, thanks to ehird in the comments below for spotting it.

A Recursion Scheme

A basic recursion scheme is the catamorphism, which allows us to eliminate an element of a recursive type Rec f using an algebra f a -> a, where Rec f is defined as follows:

data Rec f = In (f (Rec f))

The catamorphism function, or to use its common name fold, has the following type:

fold :: Functor f => (f a -> a) -> Rec f -> a

A function of type f a -> a is the (structure part) of an f-algebra. f-algebras are similar to Eilenberg-Moore algebras, except that they do not need to satisfy any laws (because we do not assume that f is a monad).

The fold function itself satisfies the following law (which can also be taken to be the definition):

fold h (In x) = h (fmap (fold h) x)

and is the unique function of type Rec f -> a to do so. This uniqueness property can be used to reason about functions built from fold.

To define functions that eliminate our interleaved effects and data types D m f, we could just use the fact that they are equivalent to the form m (Rec (f :.: m)), where - :.: - indicates composition of functors, and use fold on the type Rec (f :.: m). However, in doing this we are forced to consider the pure and effectful parts of our data simultaneously.

A better approach, which is derivable from the basic fold operator, is to eliminate values of type D m f using f-and-m-algebras. That is, we assume that the result type a has a an f-algebra structure f a -> a and an Eilenberg-Moore structure m a -> a. In this way, we can separate the pure and effectful parts of our recursion. By making use of the EMAlgebra type class defined above, we do not need to explicitly mention the effectful part at all.

Our new effectfulFold combinator has the following type, and I have given the direct Haskell implementation. It is an interesting exercise to see now it can be defined in terms of the fold combinator on Rec (f :.: m).
2012-01-07 13:15 Thanks to spacespacecomma on reddit for pointing out that the original definition here didn’t type check. I was missing the applications of force.
effectfulFold :: (Functor f, EMAlgebra m a) =>
                 (f a -> a)
              -> D m f
              -> a
effectfulFold h = algebra . fmap loop . force
    loop (Step x) = 
      h $ fmap algebra $ fmap (fmap loop) $ fmap force $ x

As we prove in the paper, generalising Filinski and Støvring’s proof, functions defined using effectfulFold satisfy the following two properties. First, they preserve f-algebras, taking the f-algebra construct to the supplied f-algebra h:

effectfulFold h (construct x) = h (fmap (effectfulFold h) x)

Moreover, they preserve m-Eilenberg-Moore algebras, taking the default Eilenberg-Moore structure on D m f to the implicitly provided Eilenberg-Moore structure on a:

effectfulFold h (algebra x) = algebra (fmap (effectfulFold h) x)

Analogously to fold h, effectfulFold h is the unique function satisfying these two properties. In the paper, we use uniqueness to derive an induction principle for data interleaved with monadic effects, but here we can use uniqueness directly to reason about functions defined using effectfulFold.

Using the Recursion Scheme

The append function on streams that I defined by hand above can be re-expressed using the effectfulFold combinator:

append :: (Functor m, Monad m) => Stream m a -> Stream m a -> Stream m a
append s1 s2 = effectfulFold h s1
  where h (StreamEmit a s) = cons a s
        h StreamStop       = s2

By its definition in terms of effectfulFold we know that append preserves the Eilenberg-Moore structure on its first argument. This means that the following equation holds for x :: m (Stream m a)

append (algebra x) s2 = algebra (fmap (\s -> append s s2) x)

Note that append does not preserve the Eilenberg-Moore structure on its second argument. The Eilenberg-Moore structure for streams prepends effects on to the stream, but these effects will not be executed by append until after all the effects in the first stream have been executed.

Also by its definition in terms of effectfulFold, we know that append satisfies the following equations with respect to the “pure” part of streams:

append (construct StreamStop)        s2 = s2
append (construct (StreamEmit a s1)) s2 = construct (StreamEmit a (append s1 s2))

We can now use the uniqueness of functions defined by effectfulFold to see that append is associative. To show this, I’ll use the uniqueness of functions defined by effectfulFold. We have two functions of type Stream m a -> Stream m a that we wish to prove equivalent:

\s1 -> append s1 (append s2 s3)


\s1 -> append (append s1 s2) s3

where the first one is defined directly in terms of effectfulFold. If we can show that the second function obeys the same properties as the first, i.e. that it preserves the m-Eilenberg-Moore algebra and the f-algebra used in the definition of append, then by uniqueness we will have shown that they are the same function.

It is easy to check that the second function preserves the Eilenberg-Moore structure on Stream m a, since it is just the composition of two functions that do so. So the meat of the proof is in showing that it preserves the f-algebra structure. This splits into two cases, one for StreamStop and one for StreamEmit. For the former, we must show that

append (append (construct StreamStop) s2) s3 = append s2 s3

but this follows directly from the properties we already know about append. The second case is a little harder. We must show that

  append (append (construct (StreamEmit a s1)) s2) s3
  construct (StreamEmit a (append (append s1 s2) s3))

This follows by repeated application of our knowledge about how append operates on input of the form construct (StreamEmit a s1).

Thus we have used the uniqueness property of functions defined using effectfulFold to show that append is associative.

Back to Readers

Just as the Stream type can be defined in terms of the generic D m f type, the Reader type can be defined in the same way:

data ReaderStep a b x
    = ReaderRead (Maybe a -> x)
    | ReaderEmit b
    deriving Functor

type Reader a m b = D m (ReaderStep a b)

The runOnNothing function can be defined in terms of effectfulFold, by recursing on the Reader argument:

runOnNothing :: (Functor m, Monad m) => Reader a m b -> m b
runOnNothing = effectfulFold f
  where f (ReaderRead k) = k Nothing
        f (ReaderEmit b) = return b

Likewise, the (|>|) function that connects a stream with a reader can be defined in terms of effectfulFold, by recursion on the stream argument, and using the Eilenberg-Moore structure on the type Reader a m b -> m b:

(|>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
(|>|) = effectfulFold f
  where f (StreamEmit a h) r = do
          readerStep <- deconstruct r
          case readerStep of
            ReaderRead k -> h (k (Just a))
            ReaderEmit b -> return b
        f StreamStop r = runOnNothing r

The (|>|) function executes the effects of the stream before the effects of the reader, letting the stream dictate how events proceed. An alternative is to execute the effects of the reader first, by defining the function in terms of recursion on the reader argument, again using effectfulFold:

(|>>|) :: (Monad m, Functor m) => Stream m a -> Reader a m b -> m b
s |>>| r = effectfulFold f r s
  where f (ReaderRead k) s = do
            streamStep <- deconstruct s
            case streamStep of
              StreamEmit a s' -> k (Just a) s'
              StreamStop      -> k Nothing nil
        f (ReaderEmit b) s = return b

There are obviously many other possibilities for connecting Streams to Readers. If the Stream finishes early, we could return the uncompleted Reader rather than passing it on to runOnNothing. Likewise, if the Reader emits a value before all the Stream has been read, we could return the leftover stream instead of just dropping it on the floor. In any case, all these possibilities are definable in terms of effectfulFold.

Readers are an instance of the Monad type class as the following Haskell declaration witnesses:
2012-01-07 13:15 Thanks again to spacespacecomma on reddit for pointing out that this doesn’t directly work. You need to wrap the Reader type in a newtype to get it to work. I’ll leave it as-is to give the general idea.
instance Monad m => Monad (Reader a m) where
    return b = construct (ReaderEmit b)
    c >>= k  = effectfulFold f c
        where f (ReaderRead k) = construct (ReaderRead k)
              f (ReaderEmit b) = k b

Using the uniqueness property of effectfulFold it is possible to prove that this instance really does satisfy the monads laws. In fact, it is possible to show that Reader a m is the sum of the monad m with the free monad on the functor f x = Maybe a -> x. This is an instance of a fact originally observed by Hyland, Plotkin and Power (Theorem 4).


If Streams produce data and Readers consume data, then what goes in between? Processors! A Processor is an instance of the same interleaved data and effects pattern as Streams and Readers:

data ProcessorStep a b x
    = ProcessorRead (Maybe a -> x)
    | ProcessorEmit b x
    | ProcessorStop

type Processor a m b = D m (ProcessorStep a b)

A Processor, once kicked using force, can either ask for more input (ProcessorRead), can emit some output (ProcessorEmit) with a promise to carry on, or can signal end of stream (ProcessorStop). Processors are intended to fulfil the same role as Enumeratees or Conduits as intermediaries that manipulate data in some way. They are also very similar to the StreamProcessor representation defined by Ghani, Hancock and Pattinson.

With a few helper functions:

processorRead :: Monad m => (Maybe a -> Processor a m b) -> Processor a m b
processorRead k = construct (ProcessorRead k)

processorStop :: Monad m => Processor a m b
processorStop = construct ProcessorStop

processorEmit :: Monad m => b -> Processor a m b -> Processor a m b
processorEmit b proc = construct (ProcessorEmit b proc)

we can define a processor that filters:

filter :: Monad m => (a -> Maybe b) -> Processor a m b
filter h = processorRead getInput
    getInput Nothing  = processorStop
    getInput (Just a) =
      case h a of
        Nothing -> filter h
        Just a  -> processorEmit a (filter h)

It is also possible to define several combinators that compose Streams with Processors, Processors with Readers and Processors with Processors in a similar manner to the Conduits library. We have same choices as with the composition of Streams and Readers in terms of the ordering of effects. The key point is that they may all be defined in terms of effectfulFold, and reasoned about using the universal property. For instance, we can prove that composition of Processors is associative, which opens the door to justified optimisation principles for chains of stream processors.

The Co-Inductive View

In the above I have taken an inductive view on Streams, Readers and so on. In Haskell, each recursive type is simultaneously an inductive and co-inductive type. We can use this change-of-viewpoint to give another way of defining interleaved data and effects in terms of unfolds. The following function unfold takes a seed state of type s and an evolution function of type s -> m (f s) and generates a value of type D m f.
2012-01-07 13:15 Was missing an additional Monad m constraint here.
unfold :: (Monad m, Functor f) => s -> (s -> m (f s)) -> D m f
unfold s step = loop s
  where loop s = D $ do
    f <- step s
    return (Step (fmap loop f))

The ofHandle stream defined above can be re-expressed in terms of unfold as follows:

ofHandle :: Handle -> Stream IO Char
ofHandle handle = unfold () step
    step () = do
      isEOF <- hIsEOF handle
      if isEOF then do hClose handle
                       return StreamStop
               else do c <- hGetChar handle
                       return (StreamEmit c ())

In fact, we could change the representation of our interleaved data and effect type to be directly expressed in terms of unfolds:

data D m f where
    D :: s -> (s -> m (f s)) -> D m f

This representation makes it harder to define the effectfulFold function and its associated reasoning principle. On the other hand, it does allow for fusion of chained sequences of Streams, Processors and Readers in a similar manner to the Stream Fusion (alternative ACM paywall link) paper. Hopefully, this may allow for sequences of stream processing functions to be fused together and open up more possibilities for optimisation. But this remains to be seen.