Posts tagged "conduit":

11 Nov 2020

Combining Amazonka and Conduit

Combining amazonka and conduit turned out to be easier than I had expected.

Here's an SNS sink I put together today

snsSink :: (MonadAWS m, MonadIO m) => T.Text -> C.ConduitT Value C.Void m ()
snsSink topic = do
  C.await >>= \case
    Nothing -> pure ()
    Just msg -> do
      _ <- C.lift $ publishSNS topic (TL.toStrict $ TL.decodeUtf8 $ encode msg)
      snsSink topic

Putting it to use can be done with something like

foo = do
  ...
  awsEnv <- newEnv Discover
  runAWSCond awsEnv $
    <source producing Value> .| snsSink topicArn

  where
    runAWSCond awsEnv = runResourceT . runAWS awsEnv . within Frankfurt . C.runConduit
Tags: amazonka aws conduit haskell
10 Aug 2019

Architecture of a service

Early this summer it was finally time to put this one service I've been working on into our sandbox environment. It's been running without hickups so last week I turned it on for production as well. In this post I thought I'd document the how and why of the service in the hope that someone will find it useful.

The service functions as an interface to external SMS-sending services, offering a single place to change if we find that we are unhappy with the service we're using.1 This service replaces an older one, written in Ruby and no one really dares touch it. Hopefully the Haskell version will prove to be a joy to work with over time.

Overview of the architecture

The service is split into two parts, one web server using scotty, and streaming data processing using conduit. Persistent storage is provided by a PostgreSQL database. The general idea is that events are picked up from the database, acted upon, which in turn results in other events which written to the database. Those are then picked up and round and round we go. The web service accepts requests, turns them into events and writes the to the database.

Hopefully this crude diagram clarifies it somewhat.

2019-08-10-architecture.png
Figure 1: Diagram of the service architecture

There are a few things that might need some explanation

  • In the past we've wanted to have the option to use multiple external SMS services at the same time. One is randomly chosen as the request comes in. There's also a possibility to configure the frequency for each external service.

    Picker implements the random picking and I've written about that earlier in Choosing a conduit randomly.

    Success and fail are dummy senders. They don't actually send anything, and the former succeeds at it while the latter fails. I found them useful for manual testing.

  • Successfully sending off a request to an external SMS service, getting status 200 back, doesn't actually mean that the SMS has been sent, or even that it ever will be. Due to the nature of SMS messaging there are no guarantees of timeliness at all. Since we are interested in finding out whether an SMS actually is sent a delayed action is scheduled, which will fetch the status of a sent SMS after a certain time (currently 2 minutes). If an SMS hasn't been sent after that time it might as well never be – it's too slow for our end-users.

    This is what report-fetcher and fetcher-func do.

  • The queue sink and queue src are actually sourceTQueue and sinkTQueue. Splitting the stream like that makes it trivial to push in events by using writeTQueue.
  • I use sequenceConduits in order to send a single event to multiple =Conduit=s and then combine all their results back into a single stream. The ease with which this can be done in conduit is one of the main reasons why I choose to use it.2

Effects and tests

I started out writing everything based on a type like ReaderT <my cfg type> IO and using liftIO for effects that needed lifting. This worked nicely while I was setting up the basic structure of the service, but as soon as I hooked in the database I really wanted to do some testing also of the effectful code.

After reading Introduction to Tagless Final and The ReaderT Design Patter, playing a bit with both approaches, and writing Tagless final and Scotty and The ReaderT design pattern or tagless final?, I finally chose to go down the route of tagless final. There's no strong reason for that decision, maybe it was just because I read about it first and found it very easy to move in that direction in small steps.

There's a split between property tests and unit tests:

  • Data types, their monad instances (like JSON (de-)serialisation), pure functions and a few effects are tested using properties. I'm using QuickCheck for that. I've since looked a little closer at hedgehog and if I were to do a major overhaul of the property tests I might be tempted to rewrite them using that library instead.
  • Most of the =Conduit=s are tested using HUnit.

Configuration

The service will be run in a container and we try to follow the 12-factor app rules, where the third one says that configuration should be stored in the environment. All previous Haskell projects I've worked on have been command line tools were configuration is done (mostly) using command line argument. For that I usually use optparse-applicative, but it's not applicable in this setting.

After a bit of searching on hackage I settled on etc. It turned out to be nice an easy to work with. The configuration is written in JSON and only specifies environment variables. It's then embedded in the executable using file-embed. The only thing I miss is a ToJSON instance for Config – we've found it quite useful to log the active configuration when starting a service and that log entry would become a bit nicer if the message was JSON rather than the (somewhat difficult to read) string that Config's Show instance produces.

Logging

There are two requirements we have when it comes to logging

  1. All log entries tied to a request should have a correlation ID.
  2. Log requests and responses

I've written about correlation ID before, Using a configuration in Scotty.

Logging requests and responses is an area where I'm not very happy with scotty. It feels natural to solve it using middleware (i.e. using middleware) but the representation, especially of responses, is a bit complicated so for the time being I've skipped logging the body of both. I'd be most interested to hear of libraries that could make that easier.

Data storage and picking up new events

The data stream processing depends heavily on being able to pick up when new events are written to the database. Especially when there are more than one instance running (we usually have at least two instance running in the production environment). To get that working I've used postgresql-simple's support for LISTEN and NOTIFY via the function getNotification.

When I wrote about this earlier, Conduit and PostgreSQL I got some really good feedback that made my solution more robust.

Delayed actions

Some things in Haskell feel almost like cheating. The light-weight threading makes me confident that a forkIO followed by a threadDelay (or in my case, the ones from unliftio) will suffice.

Footnotes:

1

It has happened in the past that we've changed SMS service after finding that they weren't living up to our expectations.

2

A while ago I was experimenting with other streaming libraries, but I gave up on getting re-combination to work – Zipping streams

Tags: haskell conduit scotty postgresql tagless_final
03 Mar 2019

Conduit and PostgreSQL

For a while now I've been playing around with an event-drive software design (EDA) using conduit for processing of events. For this post the processing can basically be viewed as the following diagram

+-----------+   +------------+   +---------+
|           |   |            |   |         |
| PG source |-->| Processing |-->| PG sink |
|           |   |            |   |         |
+-----------+   +------------+   +---------+
     ^                                |
     |            +------+            |
     |            |      |            |
     |            |  PG  |            |
     +------------|  DB  |<-----------+
                  |      |
                  +------+

I started out looking for Conduit components for PostgreSQL on Hackage but failed to find something fitting so I started looking into writing them myself using postgresql-simple.

The sink wasn't much of a problem, use await to get an event (a tuple) and write it to the database. My almost complete ignorance of using databases resulted in a first version of the source was rather naive and used busy-waiting. Then I stumbled on PostgreSQL's support for notifications through the LISTEN and NOTIFY commands. I rather like the result and it seems to work well.1

It looks like this

import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Data.Aeson (Value)
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as CC
import           Data.Text (Text)
import           Data.Time.Clock (UTCTime)
import           Data.UUID (UUID)
import           Database.PostgreSQL.Simple (Connection, Only(..), execute, execute_, query)
import           Database.PostgreSQL.Simple.Notification (getNotification)

fst8 :: (a, b, c, d, e, f, g, h) -> a
fst8 (a, _, _, _, _, _, _, _) = a

dbSource :: MonadIO m => Connection -> Int -> C.ConduitT () (Int, UTCTime, Int, Int, Bool, UUID, Text, Value) m ()
dbSource conn ver = do
  res <- liftIO $ query conn "SELECT * from events where id > (?) ORDER BY id" (Only ver)
  case res of
    [] -> do
      liftIO $ execute_ conn "LISTEN MyEvent"
      liftIO $ getNotification conn
      dbSource conn ver
    _ -> do
      let ver' = maximum $ map fst8 res
      CC.yieldMany res
      dbSource conn ver'

dbSink :: MonadIO m => Connection -> C.ConduitT (Int, Int, Bool, UUID, Text, Value) C.Void m ()
dbSink conn = do
  evt <- C.await
  case evt of
    Nothing -> return ()
    Just event -> do
      liftIO $ execute conn "INSERT INTO events \
                            \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \
                            \VALUES (?, ?, ?, ?, ?, ?)" event
      liftIO $ execute_ conn "NOTIFY MyEvent"
      dbSink conn

Footnotes:

1

If I've missed something crucial I would of course love to hear about it.

Tags: conduit event_sourcing haskell postgresql
17 Feb 2019

Choosing a conduit randomly

Lately I've been playing around conduit. One thing I wanted to try out was to set up processing where one processing step was chosen on random from a number of components, based on weights. In short I guess I wanted a function with a type something like this

foo :: [(Int, ConduitT i o m r)] -> ConduitT i o m r

I have to admit I don't even know where to start writing such a function1 but after a little bit of thinking I realised I could get the same effect by controlling how chunks of data is routed. That is, instead of choosing a component randomly, I can choose a route randomly. It would look something like when choosing from three components

                        +---------+   +----------+   +-------------+
                        | Filter  |   | Drop tag |   | Component A |
                    +-->| Value-0 |-->|          |-->|             |--+
                    |   +---------+   +----------+   +-------------+  |
+----------------+  |   +---------+   +----------+   +-------------+  |
| Choose random  |  |   | Filter  |   | Drop tag |   | Component B |  |
| value based on +----->| Value-1 |-->|          |-->|             |----->
| weights        |  |   +---------+   +----------+   +-------------+  |
+----------------+  |   +---------+   +----------+   +-------------+  |
                    |   | Filter  |   | Drop tag |   | Component C |  |
                    +-->| Value-2 |-->|          |-->|             |--+
                        +---------+   +----------+   +-------------+

That is

  1. For each chunk that comes in, choose a value randomly based on weights and tag the chunk with the choosen value, then
  2. split the processing into one route for each component,
  3. in each route filter out chunks tagged with a single value, and
  4. remove the tag, then
  5. pass the chunk to the component, and finally
  6. bring the routes back together again.

Out of these steps all but the very first one are already available in conduit:

What's left is the beginning. I started with a function to pick a value on random based on weights2

pickByWeight :: [(Int, b)] -> IO b
pickByWeight xs = randomRIO (1, tot) >>= \ n -> return (pick n xs)
  where
    tot = sum $ map fst xs

    pick n ((k, x):xs)
      | n <= k = x
      | otherwise = pick (n - k) xs
    pick _ _ = error "pick error"

Using that I then made a component that tags chunks

picker ws = do
  evt <- await
  case evt of
    Nothing -> return ()
    Just e -> do
      p <- liftIO $ pickByWeight ws
      yield (p, e)
      picker ws

I was rather happy with this…

@snoyberg just have to let you know, conduit is a joy to use. Thanks for sharing it.

– Magnus Therning (@magthe) February 6, 2019

Footnotes:

1

Except maybe by using Template Haskell to generate the code I did come up with.

2

I used Quickcheck's frequency as inspiration for writing it.

Tags: haskell conduit
16 Oct 2018

Zipping streams

Writing the following is easy after glancing through the documentation for conduit:

foo = let src = mapM_ C.yield [0..9 :: Int]
          p0 = CC.map (\ i -> ("p0", succ i))
          p1 = CC.filter odd .| CC.map (\ i -> ("p1", i))
          p = C.getZipConduit $ C.ZipConduit p0 <* C.ZipConduit p1
          sink = CC.mapM_ print
      in C.runConduit $ src .| p .| sink

Neither pipes nor streaming make it as easy to figure out. I must be missing something! What functions should I be looking at?

Tags: haskell conduit pipes streaming
Other posts