Skip to content

Commit

Permalink
Merge pull request #18 from tvision-insights/master
Browse files Browse the repository at this point in the history
Export the flush action so that it can be invoked
  • Loading branch information
23Skidoo authored Mar 23, 2018
2 parents e79d886 + 2329afa commit 67464a0
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions System/Remote/Monitoring/Statsd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module System.Remote.Monitoring.Statsd
(
-- * The statsd syncer
Statsd
, statsdFlush
, statsdThreadId
, forkStatsd
, StatsdOptions(..)
Expand Down Expand Up @@ -52,6 +53,7 @@ import Prelude hiding (catch)
-- Created by 'forkStatsd'.
data Statsd = Statsd
{ threadId :: {-# UNPACK #-} !ThreadId
, flush :: IO ()
}

-- | The thread ID of the statsd sync thread. You can stop the sync by
Expand All @@ -60,6 +62,10 @@ data Statsd = Statsd
statsdThreadId :: Statsd -> ThreadId
statsdThreadId = threadId

-- | Flush a sample to the statsd server
statsdFlush :: Statsd -> IO ()
statsdFlush = flush

-- | Options to control how to connect to the statsd server and how
-- often to flush metrics. The flush interval should be shorter than
-- the flush interval statsd itself uses to flush data to its
Expand Down Expand Up @@ -131,28 +137,31 @@ forkStatsd opts store = do

return (sendSample, Socket.close socket)

let flush = do
sample <- Metrics.sampleAll store
flushSample sample sendSample opts

me <- myThreadId
tid <- forkFinally (loop store sendSample opts) $ \ r -> do
tid <- forkFinally (loop opts flush) $ \ r -> do
closeSocket
case r of
Left e -> throwTo me e
Right _ -> return ()
return $ Statsd tid

return $ Statsd tid flush
where
unsupportedAddressError = ioError $ userError $
"unsupported address: " ++ T.unpack (host opts)

loop :: Metrics.Store -- ^ Metric store
-> (B8.ByteString -> IO ()) -- ^ Action to send a sample
-> StatsdOptions -- ^ Options
loop :: StatsdOptions -- ^ Options
-> IO () -- ^ Action to flush the sample
-> IO ()
loop store sendSample opts = do
loop opts flush = do
start <- time
sample <- Metrics.sampleAll store
flushSample sample sendSample opts
flush
end <- time
threadDelay (flushInterval opts * 1000 - fromIntegral (end - start))
loop store sendSample opts
loop opts flush

-- | Microseconds since epoch.
time :: IO Int64
Expand Down

0 comments on commit 67464a0

Please sign in to comment.