Combining inputs in conduit

The code the post on using my simplistic state machine together with conduit will happily wait forever for input, and the only way to terminate it is pressing Ctrl-C. In this series I’m writing a simple adder machine, but my actual use case for these ideas are protocols for communication (see the first post), and waiting forever isn’t such a good thing; I want my machine to time out if input doesn’t arrive in a timely fashion.

I can think of a few ways to achieve this:

  1. Use a watchdog thread that is signalled from the main thread, e.g. by a Conduit that sends a signal as each Char passes through it.
  2. As each Char is read kick off a “timeout thread” which throws an exception back to the main thread, unless the main thread kills it before the timeout expires.
  3. Run a thread that creates ticks that then can be combined with the Chars read and fed into the state machine itself.

Since this is all about state machines I’ve opted for option 3. Furthermore, since I’ve recently finished reading the excellent Parallel and Concurrent Programming in Haskell I decided to attempt writing the conduit code myself instead of using something like stm-conduit.

The idea is to write two functions:

  1. one Source to combine two Sources, and
  2. one Sink that writes its input into a TMVar.

The latter of the two is the easiest one. Given a TMVar it just awaits input, stores it in the TMVar and then calls itself:

sinkTMVar :: MonadIO m => TMVar a -> Sink a m ()
sinkTMVar tmv = forever $ do
    v <- await
    case v of
        Nothing -> return ()
        Just v' -> liftIO (atomically $ putTMVar tmv v')

The other one is only slightly more involved:

whyTMVar :: MonadIO m => Source (ResourceT IO) a -> Source (ResourceT IO) a -> Source m a
whyTMVar src1 src2 = do
    t1 <- liftIO newEmptyTMVarIO 
    t2 <- liftIO newEmptyTMVarIO
    void $ liftIO $ async $ fstProc t1
    void $ liftIO $ async $ sndProc t2
    forever $ liftIO (atomically $ takeTMVar t1 `orElse` takeTMVar t2) >>= C.yield
        fstProc t = runResourceT $ src1 $$ sinkTMVar t
        sndProc t = runResourceT $ src2 $$ sinkTMVar t

Rather short and sweet I think. However, there are a few things that I’m not completely sure of yet.

forkIO vs. async vs. resourceForkIO
There is a choice between at least three functions when it comes to creating the threads and I haven’t looked into which one is better to use. AFAIU there may be issues around exception handling and with resources. For now I’ve gone with async for no particular reason at all.
Using TMVar
In this example the input arrives rather slowly, which means having room for a single piece at a time is enough. If the use case changes and input arrives quicker then this decision has to be revisited. I’d probably choose to use stm-conduit in that case since it uses TMChan internally.
Combining only two Sources
Again, this use case doesn’t call for more than two Sources, at least at this point. If the need arises for using more Sources I’ll switch to stm-conduit since it already defines a function to combine a list of Sources.

The next step will be to modify the conduit process and the state machine.

Leave a comment