03 Mar 2019

Conduit and PostgreSQL

For a while now I've been playing around with an event-drive software design (EDA) using conduit for processing of events. For this post the processing can basically be viewed as the following diagram

+-----------+   +------------+   +---------+
|           |   |            |   |         |
| PG source |-->| Processing |-->| PG sink |
|           |   |            |   |         |
+-----------+   +------------+   +---------+
     ^                                |
     |            +------+            |
     |            |      |            |
     |            |  PG  |            |
     +------------|  DB  |<-----------+
                  |      |
                  +------+

I started out looking for Conduit components for PostgreSQL on Hackage but failed to find something fitting so I started looking into writing them myself using postgresql-simple.

The sink wasn't much of a problem, use await to get an event (a tuple) and write it to the database. My almost complete ignorance of using databases resulted in a first version of the source was rather naive and used busy-waiting. Then I stumbled on PostgreSQL's support for notifications through the LISTEN and NOTIFY commands. I rather like the result and it seems to work well.1

It looks like this

import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Data.Aeson (Value)
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as CC
import           Data.Text (Text)
import           Data.Time.Clock (UTCTime)
import           Data.UUID (UUID)
import           Database.PostgreSQL.Simple (Connection, Only(..), execute, execute_, query)
import           Database.PostgreSQL.Simple.Notification (getNotification)

fst8 :: (a, b, c, d, e, f, g, h) -> a
fst8 (a, _, _, _, _, _, _, _) = a

dbSource :: MonadIO m => Connection -> Int -> C.ConduitT () (Int, UTCTime, Int, Int, Bool, UUID, Text, Value) m ()
dbSource conn ver = do
  res <- liftIO $ query conn "SELECT * from events where id > (?) ORDER BY id" (Only ver)
  case res of
    [] -> do
      liftIO $ execute_ conn "LISTEN MyEvent"
      liftIO $ getNotification conn
      dbSource conn ver
    _ -> do
      let ver' = maximum $ map fst8 res
      CC.yieldMany res
      dbSource conn ver'

dbSink :: MonadIO m => Connection -> C.ConduitT (Int, Int, Bool, UUID, Text, Value) C.Void m ()
dbSink conn = do
  evt <- C.await
  case evt of
    Nothing -> return ()
    Just event -> do
      liftIO $ execute conn "INSERT INTO events \
                            \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \
                            \VALUES (?, ?, ?, ?, ?, ?)" event
      liftIO $ execute_ conn "NOTIFY MyEvent"
      dbSink conn

Footnotes:

1

If I've missed something crucial I would of course love to hear about it.

Tags: conduit event_sourcing haskell postgresql