Posts tagged "postgresql":
Early this summer it was finally time to put this one service I've been working on into our sandbox environment. It's been running without hickups so last week I turned it on for production as well. In this post I thought I'd document the how and why of the service in the hope that someone will find it useful.
The service functions as an interface to external SMS-sending services, offering a single place to change if we find that we are unhappy with the service we're using.1 This service replaces an older one, written in Ruby and no one really dares touch it. Hopefully the Haskell version will prove to be a joy to work with over time.
Overview of the architecture
The service is split into two parts, one web server using scotty, and streaming data processing using conduit. Persistent storage is provided by a PostgreSQL database. The general idea is that events are picked up from the database, acted upon, which in turn results in other events which written to the database. Those are then picked up and round and round we go. The web service accepts requests, turns them into events and writes the to the database.
Hopefully this crude diagram clarifies it somewhat.
There are a few things that might need some explanation
In the past we've wanted to have the option to use multiple external SMS services at the same time. One is randomly chosen as the request comes in. There's also a possibility to configure the frequency for each external service.
Picker implements the random picking and I've written about that earlier in Choosing a conduit randomly.
Success and fail are dummy senders. They don't actually send anything, and the former succeeds at it while the latter fails. I found them useful for manual testing.
Successfully sending off a request to an external SMS service, getting status 200 back, doesn't actually mean that the SMS has been sent, or even that it ever will be. Due to the nature of SMS messaging there are no guarantees of timeliness at all. Since we are interested in finding out whether an SMS actually is sent a delayed action is scheduled, which will fetch the status of a sent SMS after a certain time (currently 2 minutes). If an SMS hasn't been sent after that time it might as well never be – it's too slow for our end-users.
This is what report-fetcher and fetcher-func do.
- The queue sink and queue src are actually
sinkTQueue. Splitting the stream like that makes it trivial to push in events by using
- I use
sequenceConduitsin order to send a single event to multiple =Conduit=s and then combine all their results back into a single stream. The ease with which this can be done in conduit is one of the main reasons why I choose to use it.2
Effects and tests
I started out writing everything based on a type like
ReaderT <my cfg type> IO
liftIO for effects that needed lifting. This worked nicely while I
was setting up the basic structure of the service, but as soon as I hooked in
the database I really wanted to do some testing also of the effectful code.
After reading Introduction to Tagless Final and The ReaderT Design Patter, playing a bit with both approaches, and writing Tagless final and Scotty and The ReaderT design pattern or tagless final?, I finally chose to go down the route of tagless final. There's no strong reason for that decision, maybe it was just because I read about it first and found it very easy to move in that direction in small steps.
There's a split between property tests and unit tests:
- Data types, their monad instances (like JSON (de-)serialisation), pure functions and a few effects are tested using properties. I'm using QuickCheck for that. I've since looked a little closer at hedgehog and if I were to do a major overhaul of the property tests I might be tempted to rewrite them using that library instead.
- Most of the =Conduit=s are tested using HUnit.
The service will be run in a container and we try to follow the 12-factor app rules, where the third one says that configuration should be stored in the environment. All previous Haskell projects I've worked on have been command line tools were configuration is done (mostly) using command line argument. For that I usually use optparse-applicative, but it's not applicable in this setting.
After a bit of searching on hackage I settled on etc. It turned out to be nice
an easy to work with. The configuration is written in JSON and only specifies
environment variables. It's then embedded in the executable using file-embed.
The only thing I miss is a
ToJSON instance for
Config – we've found it
quite useful to log the active configuration when starting a service and that
log entry would become a bit nicer if the message was JSON rather than the
(somewhat difficult to read) string that
Show instance produces.
There are two requirements we have when it comes to logging
- All log entries tied to a request should have a correlation ID.
- Log requests and responses
I've written about correlation ID before, Using a configuration in Scotty.
Logging requests and responses is an area where I'm not very happy with scotty.
It feels natural to solve it using middleware (i.e. using
middleware) but the
representation, especially of responses, is a bit complicated so for the time
being I've skipped logging the body of both. I'd be most interested to hear of
libraries that could make that easier.
Data storage and picking up new events
The data stream processing depends heavily on being able to pick up when new
events are written to the database. Especially when there are more than one
instance running (we usually have at least two instance running in the
production environment). To get that working I've used postgresql-simple's
NOTIFY via the function
When I wrote about this earlier, Conduit and PostgreSQL I got some really good feedback that made my solution more robust.
For a while now I've been playing around with an event-drive software design
conduit for processing of events. For this post the processing can
basically be viewed as the following diagram
+-----------+ +------------+ +---------+ | | | | | | | PG source |-->| Processing |-->| PG sink | | | | | | | +-----------+ +------------+ +---------+ ^ | | +------+ | | | | | | | PG | | +------------| DB |<-----------+ | | +------+
The sink wasn't much of a problem, use
await to get an event (a tuple) and
write it to the database. My almost complete ignorance of using databases
resulted in a first version of the source was rather naive and used
busy-waiting. Then I stumbled on PostgreSQL's support for notifications through
NOTIFY commands. I rather like the result and it seems to
It looks like this
import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson (Value) import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as CC import Data.Text (Text) import Data.Time.Clock (UTCTime) import Data.UUID (UUID) import Database.PostgreSQL.Simple (Connection, Only(..), execute, execute_, query) import Database.PostgreSQL.Simple.Notification (getNotification) fst8 :: (a, b, c, d, e, f, g, h) -> a fst8 (a, _, _, _, _, _, _, _) = a dbSource :: MonadIO m => Connection -> Int -> C.ConduitT () (Int, UTCTime, Int, Int, Bool, UUID, Text, Value) m () dbSource conn ver = do res <- liftIO $ query conn "SELECT * from events where id > (?) ORDER BY id" (Only ver) case res of  -> do liftIO $ execute_ conn "LISTEN MyEvent" liftIO $ getNotification conn dbSource conn ver _ -> do let ver' = maximum $ map fst8 res CC.yieldMany res dbSource conn ver' dbSink :: MonadIO m => Connection -> C.ConduitT (Int, Int, Bool, UUID, Text, Value) C.Void m () dbSink conn = do evt <- C.await case evt of Nothing -> return () Just event -> do liftIO $ execute conn "INSERT INTO events \ \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \ \VALUES (?, ?, ?, ?, ?, ?)" event liftIO $ execute_ conn "NOTIFY MyEvent" dbSink conn
If I've missed something crucial I would of course love to hear about it.