Ten Cache Misses

Crushing Haskell like a Tin Can

Updatable timeouts for your transformer stacks

A relatively common issue we have dealing with pubsub services on the internet is reliability. Conceptually it should be simple: connect, subscribe and then data flows magically for the rest of time. Reality comes well short of our desires. We use a very large pubsub service to consume 140 character messages… and sometimes it just decides to stop sending new messages after a few days. They’re not alone either. The timeout function in base almost does what we want but falls short in one key area: extending the timeout duration.

I was dredging through the GHC event manager a while ago for an unrelated reason and stumbled upon updatable timers, hidden in the depths of GHC.Event. I figured it would be more useful as a monad transformer, and so timeout-control was born.

Like many unsupported features in GHC, it melted down two of our servers and taught me exactly what 42 means… but I think most of the kinks have been worked out since. We’ve been using it in production for quite a while now without issue. It was originally written for use with our (legacy) enumerator codebase and now with conduits. Providing your code is exception safe it may work for you too. Find it on Hackage or fork it on Github.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import Control.Monad.Trans (liftIO)
import Data.Conduit (($$), (=$), runResourceT)
import qualified Data.Conduit.List as Cl
import Data.Conduit.Network (ClientSettings(..), runTCPClient)
import System.Timeout.Control (Microseconds, runTimeout, updateTimeout)

main :: IO ()
main = do
  -- the event manager uses microsecond resolution
  let oneSecond :: Microseconds
      oneSecond = 1000000
      settings  = ClientSettings{clientHost = "127.0.0.1", clientPort = 8888}

  -- evaluate a timeout action, starting with a 10 second timeout
  res <- runTimeout (10 * oneSecond) $ do
         runTCPClient settings $ \ source sink ->
              source
           -- set the timeout to t+5 seconds whenever data arrives from the server
           $$ Cl.mapM  (\ val -> updateTimeout (5 * oneSecond) >> return val)
           -- and print out the received bytes
           =$ Cl.mapM_ (liftIO . print)

  case res of
    Left exc -> putStrLn $ "Action timed out: " ++ show exc
    Right () -> putStrLn $ "Action ran to completion"

Comments