Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cacheStore for hstream #1828

Merged
merged 36 commits into from
Jul 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
125621c
Basic implementation of CacheStore
YangKian Jun 12, 2024
4677e03
refactor cache store
YangKian Jun 13, 2024
6c37ea2
implement health checker for hstore
YangKian Jun 17, 2024
48d0c20
refactor cacheStore: split read/write mode for cacheStore && add retr…
YangKian Jun 17, 2024
9c33981
fix cacheStore: make sure key is unique
YangKian Jun 19, 2024
448769e
code clean for hstore checker
YangKian Jun 19, 2024
a28aab2
implement health monitor
YangKian Jun 19, 2024
adcc752
handle lookup when meta cluster unavailable
YangKian Jun 19, 2024
1de7b0b
healthMonitor: update cabal file
YangKian Jun 19, 2024
23dc65d
add config
YangKian Jun 19, 2024
41c0df9
integration
YangKian Jun 19, 2024
d0b4009
cache store: fix encode key to make sure record write in order
YangKian Jun 26, 2024
f7c9df9
change ld_checker ffi to safe
YangKian Jun 26, 2024
b90ab56
every time server switch to backup mode, recreate column family itera…
YangKian Jun 26, 2024
73339b8
fix open db
YangKian Jun 26, 2024
71cfb2d
update
YangKian Jun 26, 2024
390b6a2
add more rocksdb options
YangKian Jun 26, 2024
f8f9c9f
bypass reader when server is in backup mode
YangKian Jun 26, 2024
98fbed0
log update
YangKian Jun 28, 2024
7e975b9
add HStreamEnableCacheStore flag
YangKian Jun 28, 2024
2ae14c6
Revert "add HStreamEnableCacheStore flag"
YangKian Jul 1, 2024
e924d36
add cli option to enable server cache
YangKian Jul 1, 2024
350bc1e
add cache store related stats
YangKian Jul 1, 2024
1945152
add healthy monitor related stats
YangKian Jul 2, 2024
35e0cb7
add per_cache_store_stats.inc
YangKian Jul 2, 2024
d9c4318
update stats handler
YangKian Jul 2, 2024
4e6a6a4
change writeRecord method to return an Either result
YangKian Jul 2, 2024
056c4be
fix stats
YangKian Jul 3, 2024
2c5e425
catch meta exception when get ConnectorStatsIsAlive stat
YangKian Jul 3, 2024
3df16e7
fix stream append stats
YangKian Jul 3, 2024
9dd6646
update cache store stats
YangKian Jul 3, 2024
b9e3173
update protocol
YangKian Jul 3, 2024
7f94aaf
disable unsupported rocksdb option
YangKian Jul 3, 2024
7e853c8
fix test
YangKian Jul 4, 2024
f5599c4
update
YangKian Jul 4, 2024
2f10a09
handle rocksdb exception in write path
YangKian Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix cacheStore: make sure key is unique
  • Loading branch information
YangKian committed Jun 19, 2024
commit 9c33981612ccb161f5593bedf024b94095372289
57 changes: 34 additions & 23 deletions hstream/src/HStream/Server/CacheStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ import Control.Concurrent.STM (TVar, atomically, check, newTVarIO,
readTVar, swapTVar, writeTVar)
import Control.Exception (Exception (displayException), throwIO,
try)
import Control.Monad (void, when)
import Control.Monad (unless, void, when)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import Data.Int (Int64)
import Data.IORef (IORef, atomicModifyIORef', newIORef)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Word (Word64)
import System.Clock
import Text.Printf (printf)

import Database.RocksDB
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Exception as HE

-- StoreMode is a logical concept that represents the operations that the current CacheStore can perform.
-- Cache mode: data can only be written to the store
Expand All @@ -48,6 +51,7 @@ data CacheStore = CacheStore
, readOptions :: ReadOptions
, enableWrite :: MVar Bool
, dumpState :: TVar DumpState
, counter :: IORef Word64
}

-- Store is actually a column family in rocksdb,
Expand All @@ -69,6 +73,7 @@ mkCacheStore path dbOpts writeOptions readOptions = do
store <- newMVar Nothing
enableWrite <- newMVar False
dumpState <- newTVarIO NoDump
counter <- newIORef 0
return CacheStore {..}

-- Create a rocksdb store
Expand All @@ -80,10 +85,12 @@ initCacheStore CacheStore{..} = do
Just st' -> return $ Just st'
Nothing -> do
db <- open dbOpts path
Log.info $ "Open rocksdb"
name <- show <$> getCurrentTimestamp
columnFamily <- createColumnFamily db dbOpts name
Log.info $ "Create cached store " <> Log.build name
return $ Just Store{..}
atomicModifyIORef' counter $ const (0, ())

-- Destroy both rocksdb and columnFamily
-- Users need to ensure that all other operations are stopped before deleting the store.
Expand All @@ -92,8 +99,10 @@ deleteCacheStore CacheStore{store} = do
modifyMVar_ store $ \st -> do
case st of
Just Store{..} -> do
destroyColumnFamily columnFamily
dropColumnFamily db columnFamily
Log.info $ "Drop column family"
destroyColumnFamily columnFamily
Log.info $ "Destroy column family"
close db
Log.info $ "Delete cached store " <> Log.build name
return Nothing
Expand All @@ -104,20 +113,22 @@ getStore CacheStore{store} = do
st <- readMVar store
case st of
Just st' -> return st'
-- FIXME: handle exception
Nothing -> do
Log.fatal $ "Cached store not initialized."
throwIO $ userError "Store is not initialized."
Log.fatal $ "Cached store not initialized, should not get here"
throwIO $ HE.UnexpectedError "Store is not initialized."

writeRecord :: CacheStore -> T.Text -> Word64 -> ByteString -> IO S.AppendCompletion
writeRecord st@CacheStore{..} streamName shardId payload = do
rMode <- readMVar enableWrite
when rMode $ do
canWrite <- readMVar enableWrite
unless canWrite $ do
Log.warning $ "Cannot write to cached store becasue the store is not write-enabled."
throwIO $ userError "CacheStore is not write-enabled."
-- throw an unavailable exception to client and let it retry
throwIO $ HE.ResourceAllocationException "CacheStore is not write-enabled."

Store{..} <- getStore st
let k = encodeKey streamName shardId
-- Obtain a monotonically increasing offset to ensure that each encoded-key is unique.
offset <- atomicModifyIORef' counter (\x -> (x + 1, x))
let k = encodeKey streamName shardId offset
putCF db writeOptions columnFamily k payload
lsn <- getCurrentTimestamp
return S.AppendCompletion
Expand Down Expand Up @@ -151,17 +162,18 @@ dumpToHStore st@CacheStore{..} ldClient cmpStrategy = do
payload <- value iter
let (_, shardId) = decodeKey k
-- TODO: How to handle LSN?
-- TODO: What if retry failed?
void $ appendHStoreWithRetry st ldClient shardId payload cmpStrategy
next iter

-- FIXME: What if iterator return error when iterating?
errorM <- getError iter
case errorM of
Just msg -> Log.fatal $ "cached store iterator error: " <> Log.build msg
Just msg -> Log.fatal $ "Cached store iterator error: " <> Log.build msg
Nothing -> do
end <- getTime Monotonic
let sDuration = toNanoSecs (diffTimeSpec end start) `div` 1000000000
Log.info $ "Finish dump cached store, total time " <> Log.build sDuration <> "s"
let sDuration = toNanoSecs (diffTimeSpec end start) `div` 1000000
Log.info $ "Finish dump cached store, total time " <> Log.build sDuration <> "ms"
return ()

-- What if dump process error?
Expand All @@ -176,7 +188,7 @@ dumpToHStore st@CacheStore{..} ldClient cmpStrategy = do

appendHStoreWithRetry :: CacheStore -> S.LDClient -> Word64 -> ByteString -> S.Compression -> IO ()
appendHStoreWithRetry CacheStore{..} ldClient shardId payload cmpStrategy = do
void $ loop 3
void $ loop (3 :: Int)
where
loop cnt
| cnt >= 0 = do
Expand All @@ -185,15 +197,15 @@ appendHStoreWithRetry CacheStore{..} ldClient shardId payload cmpStrategy = do
Left (e :: S.SomeHStoreException) -> do
void . atomically $ readTVar dumpState >>= \s -> check (s == Dumping)
let cnt' = cnt - 1
Log.warning $ "dump to shardId " <> Log.build shardId <> " failed"
Log.warning $ "Dump to shardId " <> Log.build shardId <> " failed"
<> ", error: " <> Log.build (displayException e)
<> ", left retries = " <> Log.build (show cnt')
-- sleep 1s
threadDelay $ 1 * 1000 * 1000
loop cnt'
Right lsn -> return $ Just lsn
| otherwise = do
Log.fatal $ "dump to shardId " <> Log.build shardId <> " failed after exausting the retry attempts."
Log.fatal $ "Dump to shardId " <> Log.build shardId <> " failed after exausting the retry attempts, drop the record."
return Nothing

-----------------------------------------------------------------------------------------------------
Expand All @@ -207,15 +219,15 @@ setCacheStoreMode CacheStore{..} CacheMode = do
case state of
Dumping -> writeTVar dumpState Suspend
_ -> pure ()
Log.info $ "set CacheStore to CacheMode"
Log.info $ "Set CacheStore to CacheMode"
setCacheStoreMode CacheStore{..} DumpMode = do
void $ swapMVar enableWrite False
void $ atomically $ do
state <- readTVar dumpState
case state of
Suspend -> writeTVar dumpState Dumping
_ -> pure ()
Log.info $ "set CacheStore to DumpMode"
Log.info $ "Set CacheStore to DumpMode"

whileM :: IO Bool -> IO () -> IO ()
whileM cond act = do
Expand All @@ -225,15 +237,14 @@ whileM cond act = do
whileM cond act
{-# INLINABLE whileM #-}

encodeKey :: T.Text -> Word64 -> ByteString
encodeKey streamName shardId = BS.concat [T.encodeUtf8 streamName, ":", BSC.pack $ show shardId]
encodeKey :: T.Text -> Word64 -> Word64 -> ByteString
encodeKey streamName shardId offset =
BS.concat [T.encodeUtf8 streamName, ":", BSC.pack $ show shardId, ":", BSC.pack $ printf "%032d" offset]

decodeKey :: ByteString -> (T.Text, Word64)
decodeKey bs =
let (textBs, rest) = BS.breakSubstring ":" bs
-- Remove the colon from the start of the rest and convert the remainder to a Word64
shardIdBs = BSC.drop 1 rest
in (T.decodeUtf8 textBs, read $ BSC.unpack shardIdBs)
let (streamName : shardId : _) = BSC.split ':' bs
in (T.decodeUtf8 streamName, read $ BSC.unpack shardId)

getCurrentTimestamp :: IO Int64
getCurrentTimestamp = fromIntegral . toNanoSecs <$> getTime Monotonic