Conduit and PostgreSQL
For a while now I've been playing around with an event-drive software design
(EDA) using conduit
for processing of events. For this post the processing can
basically be viewed as the following diagram
+-----------+ +------------+ +---------+ | | | | | | | PG source |-->| Processing |-->| PG sink | | | | | | | +-----------+ +------------+ +---------+ ^ | | +------+ | | | | | | | PG | | +------------| DB |<-----------+ | | +------+
I started out looking for Conduit components for PostgreSQL on Hackage but
failed to find something fitting so I started looking into writing them myself
using postgresql-simple
.
The sink wasn't much of a problem, use await
to get an event (a tuple) and
write it to the database. My almost complete ignorance of using databases
resulted in a first version of the source was rather naive and used
busy-waiting. Then I stumbled on PostgreSQL's support for notifications through
the LISTEN
and NOTIFY
commands. I rather like the result and it seems to
work well.1
It looks like this
import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson (Value) import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as CC import Data.Text (Text) import Data.Time.Clock (UTCTime) import Data.UUID (UUID) import Database.PostgreSQL.Simple (Connection, Only(..), execute, execute_, query) import Database.PostgreSQL.Simple.Notification (getNotification) fst8 :: (a, b, c, d, e, f, g, h) -> a fst8 (a, _, _, _, _, _, _, _) = a dbSource :: MonadIO m => Connection -> Int -> C.ConduitT () (Int, UTCTime, Int, Int, Bool, UUID, Text, Value) m () dbSource conn ver = do res <- liftIO $ query conn "SELECT * from events where id > (?) ORDER BY id" (Only ver) case res of [] -> do liftIO $ execute_ conn "LISTEN MyEvent" liftIO $ getNotification conn dbSource conn ver _ -> do let ver' = maximum $ map fst8 res CC.yieldMany res dbSource conn ver' dbSink :: MonadIO m => Connection -> C.ConduitT (Int, Int, Bool, UUID, Text, Value) C.Void m () dbSink conn = do evt <- C.await case evt of Nothing -> return () Just event -> do liftIO $ execute conn "INSERT INTO events \ \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \ \VALUES (?, ?, ?, ?, ?, ?)" event liftIO $ execute_ conn "NOTIFY MyEvent" dbSink conn
Footnotes:
If I've missed something crucial I would of course love to hear about it.