A few days ago I decided to try to write a version of Rx in Haskell, and that's what I'm going to tell you about in this post. In the following, I expect you to understand some Haskell and to know the basic concepts of Reactive Extensions.
Briefly, Rx is a framework for reactive programming. The main concept there if Observable, which is a source of events that you can subscribe to. The thing that makes Rx interesting is the concept of Combinators. For instance, in Rx for Javascript, you can create Observables for click events of HTML buttons. In the following example (Stolen from Joni Freemans's learn-rx slide, thanks) these clicks are first mapped in to integers +1 and -1 using the
Select combinator, then merged into a single Observable usingMerge, then converted into a sum using Scan. Finally, a side-effect is added using Subscribe: a label is updated so that it will display a counter that can be increased and decreased by pressing the + and - buttons.$(function() {
  function always(x) { return function(_) { return x }}
  var incr = $('#incr').toObservable('click').Select(always(1))
  var decr = $('#decr').toObservable('click').Select(always(-1))
  incr.Merge(decr)
    .Scan(0, function(total, x) { return total + x })
    .Subscribe(updateCount)
  function updateCount(total) {
    $('#count').html(total)
  }
})Rx with typeclasses
So, with my background in OOP, I began by declaring a typeclass forObservable. Like this:class Observable a observable where
  subscribe :: observable -> Observer a -> IO Disposable
type Observer a = (a -> IO ())
type Disposable = IO ()
type Subscribe a = (Observer a -> IO Disposable)Observable typeclass it should be easy to make your own Observables just by declaring an instance for them. So, I wrote an instance for List:instance Observable a ([a]) where
  subscribe list observer = do
    mapM observer list
    return (return ())Rx> subscribe ["a", "b"] putStrLn
a
bObserver (here the putStrLn function) to an array. Promising start. Note that I needed to turn on some GHC flags to allow stuff like multiparameter typeclasses and making instances for arrays. So, in the beginning of my module, I had this:{-# LANGUAGE MultiParamTypeClasses,FlexibleInstances,TypeSynonymInstances #-}
module Rx where
import Control.Monadselect :: Observable a a' => (a -> b) -> a' -> Subscribe b 
select func observable = (\ observerB -> subscribe observable (convert observerB))
  where convert observerB = observerB . funcObservable a into an Observable b using a given mapping function. Should work, right? However,*Combinators> subscribe (select show [1, 2, 3]) putStrLn
<interactive>:1:11:
    No instance for (Observable a [t])
      arising from a use of `select' at <interactive>:1:11-31
    Possible fix: add an instance declaration for (Observable a [t])
    In the first argument of `subscribe', namely
        `(select show [1, 2, 3])'
    In the expression: subscribe (select show [1, 2, 3]) putStrLn
    In the definition of `it':
        it = subscribe (select show [1, 2, 3]) putStrLn- [1, 2, 3]has an instance for- Observable Int
- (select show [1, 2, 3])should be- Observable String, because show maps- Intto- StringStill, it does not compile. It took quite a while to figure out what's wrong. See:
*Combinators> let selectSpecific = select :: (Int -> String) -> [Int] -> Subscribe String
*Combinators> subscribe (selectSpecific show [1, 2, 3]) putStrLn
1
2
3Enter simplistic approach
It seems that my example compiles and runs if I just add some type annotations. For me this seems like the type inference system in Haskell (GHC) is lacking. It's of course more probable that I just don't get it :) Anyways, having to annotate the code using this "API" is unacceptable, so I decided to try to do this without the typeclass:type Observer a = (a -> IO ())
type Disposable = IO ()
type Subscribe a = (Observer a -> IO Disposable)
observableList :: [a] -> Subscribe a
observableList list observer = do
    mapM observer list 
    return (return ())select :: (a -> b) -> (Subscribe a) -> Observer b -> IO Disposable
select convert subscribe observer = subscribe (observer . convert)
filter :: (a -> Bool) -> (Subscribe a) -> Observer a -> IO Disposable
filter predicate subscribe observer = subscribe filteredObserver
  where filteredObserver a = if (predicate a) then (observer a) else return ()observableList to be able to subscribe to a list. But the upside is that it works and does not require type annotations in the client code:*Combinators> select show (Combinators.filter even $ observableList [1, 2]) putStrLn
2Monads Gonads
Now that's convenient. But, as Joni has convinced me,Observables are actually Monads and Functors too, so I wanted to make an instance for each. Likeinstance Functor Observable where
  fmap = selectObserver (which is just a subscribe function really) an instance of Monad, Functor or anything. So, I had to convert it into a data:data Observable a = Observable {subscribe :: Subscribe a}
type Observer a = (a -> IO ())
type Subscribe a = (Observer a -> IO Disposable)
type Disposable = IO ()Observable and writing the selectMany combinator, I'm finally able to proudly declare:instance Functor Observable where
  fmap = select
instance Monad Observable where
  return a = observableList [a]
  (>>=) = selectManyMonads and Functors, but I'm going to find out. To get back to the original one-liner for printing list items into the console, here's how it's done with the "final" solution:*Rx> subscribe (observableList ["a", "b"]) putStrLn
1
2
3select and filter combinators:*Rx> subscribe (select show (Rx.filter even $ observableList [1, 2])) putStrLn
2 
What you gain by making the Observable a Monad (and Functor) is that you get a lot of combinators for free (check out Control.Monad). One very useful is 'sequence':
VastaaPoistasequence :: Monad m => [m a] -> m [a]
*Main> subscribe (select show (sequence [observableList ['a', 'b'], observableList ['c', 'd']])) putStrLn
"ac"
"ad"
"bc"
"bd"
Not a best example but basically you can use it to create a new Observable from a list of Observables. Then there's mapM, Kleisli composition and so on.
Functor and Monad implementations get a bit more interesting if you'll add OnError and OnCompleted as in Rx. Then it will become clear how Functor is a structure preserving mapping between Observables, and how Observables can be combined monadically so that errors short-circuit the computation.