Combining inputs in conduit
- Magnus Therning
The code the post on using my simplistic state machine together with conduit will happily wait forever for input, and the only way to terminate it is pressing Ctrl-C. In this series I’m writing a simple adder machine, but my actual use case for these ideas are protocols for communication (see the first post), and waiting forever isn’t such a good thing; I want my machine to time out if input doesn’t arrive in a timely fashion.
I can think of a few ways to achieve this:
- Use a watchdog thread that is signalled from the main thread, e.g. by a
Conduit
that sends a signal as eachChar
passes through it. - As each
Char
is read kick off a “timeout thread” which throws an exception back to the main thread, unless the main thread kills it before the timeout expires. - Run a thread that creates ticks that then can be combined with the
Char
s read and fed into the state machine itself.
Since this is all about state machines I’ve opted for option 3. Furthermore, since I’ve recently finished reading the excellent Parallel and Concurrent Programming in Haskell I decided to attempt writing the conduit code myself instead of using something like stm-conduit.
The idea is to write two functions:
- one
Source
to combine twoSource
s, and - one
Sink
that writes its input into aTMVar
.
The latter of the two is the easiest one. Given a TMVar
it just awaits input, stores it in the TMVar
and then calls itself:
sinkTMVar :: MonadIO m => TMVar a -> Sink a m ()
= forever $ do
sinkTMVar tmv <- await
v case v of
Nothing -> return ()
Just v' -> liftIO (atomically $ putTMVar tmv v')
The other one is only slightly more involved:
whyTMVar :: MonadIO m => Source (ResourceT IO) a -> Source (ResourceT IO) a -> Source m a
= do
whyTMVar src1 src2 <- liftIO newEmptyTMVarIO
t1 <- liftIO newEmptyTMVarIO
t2 $ liftIO $ async $ fstProc t1
void $ liftIO $ async $ sndProc t2
void $ liftIO (atomically $ takeTMVar t1 `orElse` takeTMVar t2) >>= C.yield
forever
where
= runResourceT $ src1 $$ sinkTMVar t
fstProc t = runResourceT $ src2 $$ sinkTMVar t sndProc t
Rather short and sweet I think. However, there are a few things that I’m not completely sure of yet.
forkIO
vs.async
vs.resourceForkIO
- There is a choice between at least three functions when it comes to creating the threads and I haven’t looked into which one is better to use. AFAIU there may be issues around exception handling and with resources. For now I’ve gone with
async
for no particular reason at all. - Using
TMVar
- In this example the input arrives rather slowly, which means having room for a single piece at a time is enough. If the use case changes and input arrives quicker then this decision has to be revisited. I’d probably choose to use stm-conduit in that case since it uses
TMChan
internally. - Combining only two
Source
s - Again, this use case doesn’t call for more than two
Source
s, at least at this point. If the need arises for using moreSource
s I’ll switch to stm-conduit since it already defines a function to combine a list ofSource
s.
The next step will be to modify the conduit process and the state machine.