Posts tagged "AWS":
Getting Amazonka S3 to work with localstack
I'm writing this in case someone else is getting strange errors when trying to use amazonka-s3 with localstack. It took me rather too long finding the answer and neither the errors I got from Amazonka nor from localstack were very helpful.
The code I started with for setting up the connection looked like this
main = do awsEnv <- AWS.overrideService localEndpoint <$> AWS.newEnv AWS.discover -- do S3 stuff where localEndpoint = AWS.setEndpoint False "localhost" 4566
A few years ago, when I last wrote some Haskell to talk to S3 this was enough1, but now I got some strange errors.
It turns out there are different ways to address buckets and the default, which
is used by AWS itself, isn't used by localstack. The documentation of
S3AddressingStyle
has more details.
So to get it to work I had to change the S3 addressing style as well and ended up with this code instead
main = do awsEnv <- AWS.overrideService (s3AddrStyle . localEndpoint) <$> AWS.newEnv AWS.discover -- do S3 stuff where localEndpoint = AWS.setEndpoint False "localhost" 4566 s3AddrStyle svc = svc {AWS.s3AddressingStyle = AWS.S3AddressingStylePath}
Footnotes:
That was before version 2.0 of Amazonka, so it did look slightly different, but overriding the endpoint was all that was needed.
Combining Amazonka and Conduit
Combining amazonka and conduit turned out to be easier than I had expected.
Here's an SNS sink I put together today
snsSink :: (MonadAWS m, MonadIO m) => T.Text -> C.ConduitT Value C.Void m () snsSink topic = do C.await >>= \case Nothing -> pure () Just msg -> do _ <- C.lift $ publishSNS topic (TL.toStrict $ TL.decodeUtf8 $ encode msg) snsSink topic
Putting it to use can be done with something like
foo = do ... awsEnv <- newEnv Discover runAWSCond awsEnv $ <source producing Value> .| snsSink topicArn where runAWSCond awsEnv = runResourceT . runAWS awsEnv . within Frankfurt . C.runConduit
X-Ray and WAI
For a while we've been planning on introducing AWS X-Ray into our system at work. There's official support for a few languages, but not too surprisingly Haskell isn't on that list. I found freckle/aws-xray-client on GitHub, which is so unofficial that it isn't even published on Hackage. While it looks very good, I suspect it does more than I need and since it lacks licensing information I decided to instead implement a version tailored to our needs.
As a first step I implemented a WAI middleware that wraps an HTTP request and reports the time it took to produce a response. Between the X-Ray Developer Guide and the code in Freckle's git repo it turned out to be fairly simple.
First off, this is the first step towards X-Ray nirvana, so all I'm aiming for
is minimal support. That means all I want is to send minimal X-Ray segments,
with the small addition that I want to support parent_id
from the start.
The first step then is to parse the HTTP header containing the X-Ray information
– X-Amzn-Trace-Id
. For now I'm only interested in two parts, Root
and
Parent
, so for simplicity's sake I use a tuple to keep them in. The idea is to
take the header's value, split on ;
to get the parts, then split each part in
two, a key and a value, and put them into an association list ([(Text, Text)]
)
for easy lookup using, well lookup
.
parseXRayTraceIdHdr :: Text -> Maybe (Text, Maybe Text) parseXRayTraceIdHdr hdr = do bits <- traverse parseHeaderComponent $ T.split (== ';') hdr traceId <- lookup "Root" bits let parent = lookup "Parent" bits pure (traceId, parent) parseHeaderComponent :: Text -> Maybe (Text, Text) parseHeaderComponent cmp = case T.split (== '=') cmp of [name, value] -> Just (name, value) _ -> Nothing
The start and end times for processing a request are also required. The docs say
that using at least millisecond resolution is a good idea, so I decided to do
exactly that. NominalDiffTime
, which is what getPOSIXTime
produces, supports
a resolution of picoseconds (though I doubt my system's clock does) which
requires a bit of (type-based) converting.
mkTimeInMilli :: IO Milli mkTimeInMilli = ndfToMilli <$> getPOSIXTime where ndfToMilli = fromRational . toRational
The last support function needed is one that creates the segment. Just
building the JSON object, using aeson's object
, is enough at this
point.
mkSegment :: Text -> Text -> Milli -> Milli -> (Text, Maybe Text) -> Value mkSegment name id startTime endTime (root, parent) = object $ [ "name" .= name , "id" .= id , "trace_id" .= root , "start_time" .= startTime , "end_time" .= endTime ] <> p where p = maybe [] (\ v -> ["parent_id" .= v]) parent
Armed with all this, I can now put together a WAI middleware that
- records the start time of the call
- processes the request
- sends off the response and keeps the result of it
- records the end time
- parses the tracing header
- builds the segment prepended with the X-Ray daemon header
- sends the segment to the X-Ray daemon
traceId :: Text -> Middleware traceId xrayName app req sendResponse = do startTime <- mkTimeInMilli app req $ \ res -> do rr <- sendResponse res endTime <- mkTimeInMilli theId <- T.pack . (\ v -> showHex v "") <$> randomIO @Word64 let traceParts = (decodeUtf8 <$> requestHeaderTraceId req) >>= parseXRayTraceIdHdr segment = mkSegment xrayName theId startTime endTime <$> traceParts case segment of Nothing -> pure () Just segment' -> sendXRayPayload $ toStrict $ prepareXRayPayload segment' pure rr where prepareXRayPayload segment = let header = object ["format" .= ("json" :: String), "version" .= (1 :: Int)] in encode header <> "\n" <> encode segment sendXRayPayload payload = do addrInfos <- S.getAddrInfo Nothing (Just "127.0.0.1") (Just "2000") case addrInfos of [] -> pure () -- silently skip (xrayAddr:_) -> do sock <- S.socket (S.addrFamily xrayAddr) S.Datagram S.defaultProtocol S.connect sock (S.addrAddress xrayAddr) sendAll sock payload S.close sock
The next step will be to instrument the actual processing. The service I'm instrumenting is asynchronous, so all the work happens after the response has been sent. My plan for this is to use subsegments to record it. That means I'll have to
- keep the
Root
and ID (theId
intraceId
above) for use in subsegments - keep the original tracing header, for use in outgoing calls
- make sure all outgoing HTTP calls include a tracing header with a proper
Parent
- wrap all outgoing HTTP calls with time keeping and sending a subsegment to the X-Ray daemon
I'm saving that work for a rainy day though, or rather, for a day when I'm so upset at Clojure that I don't want to see another parenthesis.
Edit (2020-04-10): Corrected the segment field name for the parent ID, it
should be parent_id
.