Skip to content

Commit

Permalink
Prototype atomic-memory-safe implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
TravisWhitaker committed Jul 29, 2020
1 parent c3d3616 commit 420f559
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 173 deletions.
72 changes: 41 additions & 31 deletions Data/Atomic.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{-# LANGUAGE BangPatterns, ForeignFunctionInterface #-}
{-# LANGUAGE BangPatterns
, CPP
, ForeignFunctionInterface
, MagicHash
, UnboxedTuples
#-}
-- | An atomic integer value. All operations are thread safe.
module Data.Atomic
(
Expand All @@ -12,33 +17,41 @@ module Data.Atomic
, subtract
) where

import Data.Int (Int64)
import Foreign.ForeignPtr (ForeignPtr, mallocForeignPtr, withForeignPtr)
import Foreign.Ptr (Ptr)
import Foreign.Storable (poke)
import Prelude hiding (read, subtract)

import GHC.Int
import GHC.IO
import GHC.Prim

#include "MachDeps.h"

#if WORD_SIZE_IN_BYTES > 32
#define ARRLEN 8
#else
#define ARRLEN 4
#endif

-- | A mutable, atomic integer.
newtype Atomic = C (ForeignPtr Int64)
--newtype Atomic = C (ForeignPtr Int64)
data Atomic = C (MutableByteArray# RealWorld)

-- | Create a new, zero initialized, atomic.
new :: Int64 -> IO Atomic
new n = do
fp <- mallocForeignPtr
withForeignPtr fp $ \ p -> poke p n
return $ C fp
new :: Int -> IO Atomic
new (I# n) = IO $ \s ->
case newByteArray# ARRLEN# s of { (# s1, mba #) ->
case atomicWriteIntArray# mba 0# n s1 of { s2 ->
(# s2, C mba #) }}

read :: Atomic -> IO Int64
read (C fp) = withForeignPtr fp cRead

foreign import ccall unsafe "hs_atomic_read" cRead :: Ptr Int64 -> IO Int64
read :: Atomic -> IO Int
read (C mba) = IO $ \s ->
case atomicReadIntArray# mba 0# s of { (# s1, n #) ->
(# s1, I# n #)}

-- | Set the atomic to the given value.
write :: Atomic -> Int64 -> IO ()
write (C fp) n = withForeignPtr fp $ \ p -> cWrite p n

foreign import ccall unsafe "hs_atomic_write" cWrite
:: Ptr Int64 -> Int64 -> IO ()
write :: Atomic -> Int -> IO ()
write (C mba) (I# n) = IO $ \s ->
case atomicWriteIntArray# mba 0# n s of { s1 ->
(# s1, () #) }

-- | Increase the atomic by one.
inc :: Atomic -> IO ()
Expand All @@ -49,16 +62,13 @@ dec :: Atomic -> IO ()
dec atomic = subtract atomic 1

-- | Increase the atomic by the given amount.
add :: Atomic -> Int64 -> IO ()
add (C fp) n = withForeignPtr fp $ \ p -> cAdd p n
add :: Atomic -> Int -> IO ()
add (C mba) (I# n) = IO $ \s ->
case fetchAddIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }

-- | Decrease the atomic by the given amount.
subtract :: Atomic -> Int64 -> IO ()
subtract (C fp) n = withForeignPtr fp $ \ p -> cSubtract p n

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_add" cAdd :: Ptr Int64 -> Int64 -> IO ()

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_subtract" cSubtract
:: Ptr Int64 -> Int64 -> IO ()
subtract :: Atomic -> Int -> IO ()
subtract (C mba) (I# n) = IO $ \s ->
case fetchSubIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }
41 changes: 20 additions & 21 deletions System/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ module System.Metrics

import Control.Applicative ((<$>))
import Control.Monad (forM)
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
import qualified Data.HashMap.Strict as M
Expand Down Expand Up @@ -133,8 +132,8 @@ data GroupSampler = forall a. GroupSampler
}

-- TODO: Rename this to Metric and Metric to SampledMetric.
data MetricSampler = CounterS !(IO Int64)
| GaugeS !(IO Int64)
data MetricSampler = CounterS !(IO Int)
| GaugeS !(IO Int)
| LabelS !(IO T.Text)
| DistributionS !(IO Distribution.Stats)

Expand All @@ -156,18 +155,18 @@ newStore = do
-- | Register a non-negative, monotonically increasing, integer-valued
-- metric. The provided action to read the value must be thread-safe.
-- Also see 'createCounter'.
registerCounter :: T.Text -- ^ Counter name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerCounter :: T.Text -- ^ Counter name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerCounter name sample store =
register name (CounterS sample) store

-- | Register an integer-valued metric. The provided action to read
-- the value must be thread-safe. Also see 'createGauge'.
registerGauge :: T.Text -- ^ Gauge name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerGauge :: T.Text -- ^ Gauge name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerGauge name sample store =
register name (GaugeS sample) store
Expand Down Expand Up @@ -333,11 +332,11 @@ createDistribution name store = do

#if MIN_VERSION_base(4,10,0)
-- | Convert nanoseconds to milliseconds.
nsToMs :: Int64 -> Int64
nsToMs :: Int -> Int
nsToMs s = round (realToFrac s / (1000000.0 :: Double))
#else
-- | Convert seconds to milliseconds.
sToMs :: Double -> Int64
sToMs :: Double -> Int
sToMs s = round (s * 1000.0)
#endif

Expand Down Expand Up @@ -430,15 +429,15 @@ registerGcMetrics store =
, ("rts.gc.cumulative_bytes_used" , Counter . fromIntegral . Stats.cumulative_live_bytes)
, ("rts.gc.bytes_copied" , Counter . fromIntegral . Stats.copied_bytes)
#if MIN_VERSION_base(4,12,0)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . Stats.init_elapsed_ns)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . fromIntegral . Stats.init_elapsed_ns)
#endif
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . Stats.elapsed_ns)
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . fromIntegral . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . fromIntegral . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . fromIntegral . Stats.elapsed_ns)
, ("rts.gc.max_bytes_used" , Gauge . fromIntegral . Stats.max_live_bytes)
, ("rts.gc.current_bytes_used" , Gauge . fromIntegral . Stats.gcdetails_live_bytes . Stats.gc)
, ("rts.gc.current_bytes_slop" , Gauge . fromIntegral . Stats.gcdetails_slop_bytes . Stats.gc)
Expand Down Expand Up @@ -615,8 +614,8 @@ sampleGroups cbSamplers = concat `fmap` sequence (map runOne cbSamplers)
return $! map (\ (n, f) -> (n, f a)) (M.toList groupSamplerMetrics)

-- | The value of a sampled metric.
data Value = Counter {-# UNPACK #-} !Int64
| Gauge {-# UNPACK #-} !Int64
data Value = Counter {-# UNPACK #-} !Int
| Gauge {-# UNPACK #-} !Int
| Label {-# UNPACK #-} !T.Text
| Distribution !Distribution.Stats
deriving (Eq, Show)
Expand Down
5 changes: 2 additions & 3 deletions System/Metrics/Counter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module System.Metrics.Counter
) where

import qualified Data.Atomic as Atomic
import Data.Int (Int64)
import Prelude hiding (read)

-- | A mutable, integer-valued counter.
Expand All @@ -23,13 +22,13 @@ new :: IO Counter
new = C `fmap` Atomic.new 0

-- | Get the current value of the counter.
read :: Counter -> IO Int64
read :: Counter -> IO Int
read = Atomic.read . unC

-- | Increase the counter by one.
inc :: Counter -> IO ()
inc counter = add counter 1

-- | Add the argument to the counter.
add :: Counter -> Int64 -> IO ()
add :: Counter -> Int -> IO ()
add counter = Atomic.add (unC counter)
Loading

0 comments on commit 420f559

Please sign in to comment.