Posts tagged "AWS":

09 Dec 2023

Getting Amazonka S3 to work with localstack

I'm writing this in case someone else is getting strange errors when trying to use amazonka-s3 with localstack. It took me rather too long finding the answer and neither the errors I got from Amazonka nor from localstack were very helpful.

The code I started with for setting up the connection looked like this

main = do
  awsEnv <- AWS.overrideService localEndpoint <$> AWS.newEnv AWS.discover
  -- do S3 stuff
  where
    localEndpoint = AWS.setEndpoint False "localhost" 4566

A few years ago, when I last wrote some Haskell to talk to S3 this was enough1, but now I got some strange errors.

It turns out there are different ways to address buckets and the default, which is used by AWS itself, isn't used by localstack. The documentation of S3AddressingStyle has more details.

So to get it to work I had to change the S3 addressing style as well and ended up with this code instead

main = do
  awsEnv <- AWS.overrideService (s3AddrStyle . localEndpoint) <$> AWS.newEnv AWS.discover
  -- do S3 stuff
  where
    localEndpoint = AWS.setEndpoint False "localhost" 4566
    s3AddrStyle svc = svc {AWS.s3AddressingStyle = AWS.S3AddressingStylePath}

Footnotes:

1

That was before version 2.0 of Amazonka, so it did look slightly different, but overriding the endpoint was all that was needed.

Tags: amazonka aws haskell localstack
11 Nov 2020

Combining Amazonka and Conduit

Combining amazonka and conduit turned out to be easier than I had expected.

Here's an SNS sink I put together today

snsSink :: (MonadAWS m, MonadIO m) => T.Text -> C.ConduitT Value C.Void m ()
snsSink topic = do
  C.await >>= \case
    Nothing -> pure ()
    Just msg -> do
      _ <- C.lift $ publishSNS topic (TL.toStrict $ TL.decodeUtf8 $ encode msg)
      snsSink topic

Putting it to use can be done with something like

foo = do
  ...
  awsEnv <- newEnv Discover
  runAWSCond awsEnv $
    <source producing Value> .| snsSink topicArn

  where
    runAWSCond awsEnv = runResourceT . runAWS awsEnv . within Frankfurt . C.runConduit
Tags: amazonka aws conduit haskell
08 Apr 2020

X-Ray and WAI

For a while we've been planning on introducing AWS X-Ray into our system at work. There's official support for a few languages, but not too surprisingly Haskell isn't on that list. I found freckle/aws-xray-client on GitHub, which is so unofficial that it isn't even published on Hackage. While it looks very good, I suspect it does more than I need and since it lacks licensing information I decided to instead implement a version tailored to our needs.

As a first step I implemented a WAI middleware that wraps an HTTP request and reports the time it took to produce a response. Between the X-Ray Developer Guide and the code in Freckle's git repo it turned out to be fairly simple.

First off, this is the first step towards X-Ray nirvana, so all I'm aiming for is minimal support. That means all I want is to send minimal X-Ray segments, with the small addition that I want to support parent_id from the start.

The first step then is to parse the HTTP header containing the X-Ray information – X-Amzn-Trace-Id. For now I'm only interested in two parts, Root and Parent, so for simplicity's sake I use a tuple to keep them in. The idea is to take the header's value, split on ; to get the parts, then split each part in two, a key and a value, and put them into an association list ([(Text, Text)]) for easy lookup using, well lookup.

parseXRayTraceIdHdr :: Text -> Maybe (Text, Maybe Text)
parseXRayTraceIdHdr hdr = do
  bits <- traverse parseHeaderComponent $ T.split (== ';') hdr
  traceId <- lookup "Root" bits
  let parent = lookup "Parent" bits
  pure (traceId, parent)

parseHeaderComponent :: Text -> Maybe (Text, Text)
parseHeaderComponent cmp = case T.split (== '=') cmp of
                            [name, value] -> Just (name, value)
                            _ -> Nothing

The start and end times for processing a request are also required. The docs say that using at least millisecond resolution is a good idea, so I decided to do exactly that. NominalDiffTime, which is what getPOSIXTime produces, supports a resolution of picoseconds (though I doubt my system's clock does) which requires a bit of (type-based) converting.

mkTimeInMilli :: IO Milli
mkTimeInMilli = ndfToMilli <$> getPOSIXTime
  where
    ndfToMilli = fromRational . toRational

The last support function needed is one that creates the segment. Just building the JSON object, using aeson's object, is enough at this point.

mkSegment :: Text -> Text -> Milli -> Milli -> (Text, Maybe Text) -> Value
mkSegment name id startTime endTime (root, parent) =
  object $ [ "name" .= name
           , "id" .= id
           , "trace_id" .= root
           , "start_time" .= startTime
           , "end_time" .= endTime
           ] <> p
  where
    p = maybe [] (\ v -> ["parent_id" .= v]) parent

Armed with all this, I can now put together a WAI middleware that

  1. records the start time of the call
  2. processes the request
  3. sends off the response and keeps the result of it
  4. records the end time
  5. parses the tracing header
  6. builds the segment prepended with the X-Ray daemon header
  7. sends the segment to the X-Ray daemon
traceId :: Text -> Middleware
traceId xrayName app req sendResponse = do
  startTime <- mkTimeInMilli
  app req $ \ res -> do
    rr <- sendResponse res
    endTime <- mkTimeInMilli
    theId <- T.pack . (\ v -> showHex v "") <$> randomIO @Word64
    let traceParts = (decodeUtf8 <$> requestHeaderTraceId req) >>= parseXRayTraceIdHdr
        segment = mkSegment xrayName theId startTime endTime <$> traceParts
    case segment of
      Nothing -> pure ()
      Just segment' -> sendXRayPayload $ toStrict $ prepareXRayPayload segment'
    pure rr

  where
    prepareXRayPayload segment =
      let header = object ["format" .= ("json" :: String), "version" .= (1 :: Int)]
      in encode header <> "\n" <> encode segment

    sendXRayPayload payload = do
      addrInfos <- S.getAddrInfo Nothing (Just "127.0.0.1") (Just "2000")
      case addrInfos of
        [] -> pure () -- silently skip
        (xrayAddr:_) -> do
          sock <- S.socket (S.addrFamily xrayAddr) S.Datagram S.defaultProtocol
          S.connect sock (S.addrAddress xrayAddr)
          sendAll sock payload
          S.close sock

The next step will be to instrument the actual processing. The service I'm instrumenting is asynchronous, so all the work happens after the response has been sent. My plan for this is to use subsegments to record it. That means I'll have to

I'm saving that work for a rainy day though, or rather, for a day when I'm so upset at Clojure that I don't want to see another parenthesis.

Edit (2020-04-10): Corrected the segment field name for the parent ID, it should be parent_id.

Tags: AWS haskell XRay
Other posts