Posts tagged "conduit":
Combining Amazonka and Conduit
Combining amazonka and conduit turned out to be easier than I had expected.
Here's an SNS sink I put together today
snsSink :: (MonadAWS m, MonadIO m) => T.Text -> C.ConduitT Value C.Void m () snsSink topic = do C.await >>= \case Nothing -> pure () Just msg -> do _ <- C.lift $ publishSNS topic (TL.toStrict $ TL.decodeUtf8 $ encode msg) snsSink topic
Putting it to use can be done with something like
foo = do ... awsEnv <- newEnv Discover runAWSCond awsEnv $ <source producing Value> .| snsSink topicArn where runAWSCond awsEnv = runResourceT . runAWS awsEnv . within Frankfurt . C.runConduit
Architecture of a service
Early this summer it was finally time to put this one service I've been working on into our sandbox environment. It's been running without hickups so last week I turned it on for production as well. In this post I thought I'd document the how and why of the service in the hope that someone will find it useful.
The service functions as an interface to external SMS-sending services, offering a single place to change if we find that we are unhappy with the service we're using.1 This service replaces an older one, written in Ruby and no one really dares touch it. Hopefully the Haskell version will prove to be a joy to work with over time.
Overview of the architecture
The service is split into two parts, one web server using scotty, and streaming data processing using conduit. Persistent storage is provided by a PostgreSQL database. The general idea is that events are picked up from the database, acted upon, which in turn results in other events which written to the database. Those are then picked up and round and round we go. The web service accepts requests, turns them into events and writes the to the database.
Hopefully this crude diagram clarifies it somewhat.
There are a few things that might need some explanation
In the past we've wanted to have the option to use multiple external SMS services at the same time. One is randomly chosen as the request comes in. There's also a possibility to configure the frequency for each external service.
Picker implements the random picking and I've written about that earlier in Choosing a conduit randomly.
Success and fail are dummy senders. They don't actually send anything, and the former succeeds at it while the latter fails. I found them useful for manual testing.
Successfully sending off a request to an external SMS service, getting status 200 back, doesn't actually mean that the SMS has been sent, or even that it ever will be. Due to the nature of SMS messaging there are no guarantees of timeliness at all. Since we are interested in finding out whether an SMS actually is sent a delayed action is scheduled, which will fetch the status of a sent SMS after a certain time (currently 2 minutes). If an SMS hasn't been sent after that time it might as well never be – it's too slow for our end-users.
This is what report-fetcher and fetcher-func do.
- The queue sink and queue src are actually
sourceTQueue
andsinkTQueue
. Splitting the stream like that makes it trivial to push in events by usingwriteTQueue
. - I use
sequenceConduits
in order to send a single event to multiple =Conduit=s and then combine all their results back into a single stream. The ease with which this can be done in conduit is one of the main reasons why I choose to use it.2
Effects and tests
I started out writing everything based on a type like ReaderT <my cfg type> IO
and using liftIO
for effects that needed lifting. This worked nicely while I
was setting up the basic structure of the service, but as soon as I hooked in
the database I really wanted to do some testing also of the effectful code.
After reading Introduction to Tagless Final and The ReaderT Design Patter, playing a bit with both approaches, and writing Tagless final and Scotty and The ReaderT design pattern or tagless final?, I finally chose to go down the route of tagless final. There's no strong reason for that decision, maybe it was just because I read about it first and found it very easy to move in that direction in small steps.
There's a split between property tests and unit tests:
- Data types, their monad instances (like JSON (de-)serialisation), pure functions and a few effects are tested using properties. I'm using QuickCheck for that. I've since looked a little closer at hedgehog and if I were to do a major overhaul of the property tests I might be tempted to rewrite them using that library instead.
- Most of the =Conduit=s are tested using HUnit.
Configuration
The service will be run in a container and we try to follow the 12-factor app rules, where the third one says that configuration should be stored in the environment. All previous Haskell projects I've worked on have been command line tools were configuration is done (mostly) using command line argument. For that I usually use optparse-applicative, but it's not applicable in this setting.
After a bit of searching on hackage I settled on etc. It turned out to be nice
an easy to work with. The configuration is written in JSON and only specifies
environment variables. It's then embedded in the executable using file-embed.
The only thing I miss is a ToJSON
instance for Config
– we've found it
quite useful to log the active configuration when starting a service and that
log entry would become a bit nicer if the message was JSON rather than the
(somewhat difficult to read) string that Config
's Show
instance produces.
Logging
There are two requirements we have when it comes to logging
- All log entries tied to a request should have a correlation ID.
- Log requests and responses
I've written about correlation ID before, Using a configuration in Scotty.
Logging requests and responses is an area where I'm not very happy with scotty.
It feels natural to solve it using middleware (i.e. using middleware
) but the
representation, especially of responses, is a bit complicated so for the time
being I've skipped logging the body of both. I'd be most interested to hear of
libraries that could make that easier.
Data storage and picking up new events
The data stream processing depends heavily on being able to pick up when new
events are written to the database. Especially when there are more than one
instance running (we usually have at least two instance running in the
production environment). To get that working I've used postgresql-simple's
support for LISTEN
and NOTIFY
via the function getNotification
.
When I wrote about this earlier, Conduit and PostgreSQL I got some really good feedback that made my solution more robust.
Delayed actions
Some things in Haskell feel almost like cheating. The light-weight threading
makes me confident that a forkIO
followed by a threadDelay
(or in my case,
the ones from unliftio) will suffice.
Footnotes:
It has happened in the past that we've changed SMS service after finding that they weren't living up to our expectations.
A while ago I was experimenting with other streaming libraries, but I gave up on getting re-combination to work – Zipping streams
Conduit and PostgreSQL
For a while now I've been playing around with an event-drive software design
(EDA) using conduit
for processing of events. For this post the processing can
basically be viewed as the following diagram
+-----------+ +------------+ +---------+ | | | | | | | PG source |-->| Processing |-->| PG sink | | | | | | | +-----------+ +------------+ +---------+ ^ | | +------+ | | | | | | | PG | | +------------| DB |<-----------+ | | +------+
I started out looking for Conduit components for PostgreSQL on Hackage but
failed to find something fitting so I started looking into writing them myself
using postgresql-simple
.
The sink wasn't much of a problem, use await
to get an event (a tuple) and
write it to the database. My almost complete ignorance of using databases
resulted in a first version of the source was rather naive and used
busy-waiting. Then I stumbled on PostgreSQL's support for notifications through
the LISTEN
and NOTIFY
commands. I rather like the result and it seems to
work well.1
It looks like this
import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson (Value) import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as CC import Data.Text (Text) import Data.Time.Clock (UTCTime) import Data.UUID (UUID) import Database.PostgreSQL.Simple (Connection, Only(..), execute, execute_, query) import Database.PostgreSQL.Simple.Notification (getNotification) fst8 :: (a, b, c, d, e, f, g, h) -> a fst8 (a, _, _, _, _, _, _, _) = a dbSource :: MonadIO m => Connection -> Int -> C.ConduitT () (Int, UTCTime, Int, Int, Bool, UUID, Text, Value) m () dbSource conn ver = do res <- liftIO $ query conn "SELECT * from events where id > (?) ORDER BY id" (Only ver) case res of [] -> do liftIO $ execute_ conn "LISTEN MyEvent" liftIO $ getNotification conn dbSource conn ver _ -> do let ver' = maximum $ map fst8 res CC.yieldMany res dbSource conn ver' dbSink :: MonadIO m => Connection -> C.ConduitT (Int, Int, Bool, UUID, Text, Value) C.Void m () dbSink conn = do evt <- C.await case evt of Nothing -> return () Just event -> do liftIO $ execute conn "INSERT INTO events \ \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \ \VALUES (?, ?, ?, ?, ?, ?)" event liftIO $ execute_ conn "NOTIFY MyEvent" dbSink conn
Footnotes:
If I've missed something crucial I would of course love to hear about it.
Choosing a conduit randomly
Lately I've been playing around conduit. One thing I wanted to try out was to set up processing where one processing step was chosen on random from a number of components, based on weights. In short I guess I wanted a function with a type something like this
foo :: [(Int, ConduitT i o m r)] -> ConduitT i o m r
I have to admit I don't even know where to start writing such a function1 but after a little bit of thinking I realised I could get the same effect by controlling how chunks of data is routed. That is, instead of choosing a component randomly, I can choose a route randomly. It would look something like when choosing from three components
+---------+ +----------+ +-------------+ | Filter | | Drop tag | | Component A | +-->| Value-0 |-->| |-->| |--+ | +---------+ +----------+ +-------------+ | +----------------+ | +---------+ +----------+ +-------------+ | | Choose random | | | Filter | | Drop tag | | Component B | | | value based on +----->| Value-1 |-->| |-->| |-----> | weights | | +---------+ +----------+ +-------------+ | +----------------+ | +---------+ +----------+ +-------------+ | | | Filter | | Drop tag | | Component C | | +-->| Value-2 |-->| |-->| |--+ +---------+ +----------+ +-------------+
That is
- For each chunk that comes in, choose a value randomly based on weights and tag the chunk with the choosen value, then
- split the processing into one route for each component,
- in each route filter out chunks tagged with a single value, and
- remove the tag, then
- pass the chunk to the component, and finally
- bring the routes back together again.
Out of these steps all but the very first one are already available in conduit:
- for splitting routes combining them again, use
sequenceConduits
- for filtering, use
filter
- for dropping the tag, use
map
What's left is the beginning. I started with a function to pick a value on random based on weights2
pickByWeight :: [(Int, b)] -> IO b pickByWeight xs = randomRIO (1, tot) >>= \ n -> return (pick n xs) where tot = sum $ map fst xs pick n ((k, x):xs) | n <= k = x | otherwise = pick (n - k) xs pick _ _ = error "pick error"
Using that I then made a component that tags chunks
picker ws = do evt <- await case evt of Nothing -> return () Just e -> do p <- liftIO $ pickByWeight ws yield (p, e) picker ws
I was rather happy with this…
@snoyberg just have to let you know, conduit is a joy to use. Thanks for sharing it.
– Magnus Therning (@magthe) February 6, 2019
Footnotes:
Zipping streams
Writing the following is easy after glancing through the documentation for conduit:
foo = let src = mapM_ C.yield [0..9 :: Int] p0 = CC.map (\ i -> ("p0", succ i)) p1 = CC.filter odd .| CC.map (\ i -> ("p1", i)) p = C.getZipConduit $ C.ZipConduit p0 <* C.ZipConduit p1 sink = CC.mapM_ print in C.runConduit $ src .| p .| sink
Neither pipes nor streaming make it as easy to figure out. I must be missing something! What functions should I be looking at?