Skip to content

Commit 6d6e31f

Browse files
committed
implement split/merge shard
1 parent 1e05fd3 commit 6d6e31f

6 files changed

Lines changed: 182 additions & 55 deletions

File tree

Lines changed: 125 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,42 @@
1+
{-# LANGUAGE LambdaCase #-}
2+
13
module HStream.Server.Core.Shard
2-
( readShard,
3-
listShards,
4-
splitShards,
5-
)
4+
( readShard,
5+
listShards,
6+
splitShards,
7+
mergeShards,
8+
)
69
where
710

8-
import Control.Exception (bracket)
9-
import Control.Monad (void)
10-
import Data.Foldable (foldl')
11-
import qualified Data.Map.Strict as M
12-
import qualified Data.Vector as V
13-
import GHC.Stack (HasCallStack)
14-
import Network.GRPC.HighLevel.Generated
15-
import qualified Z.Data.CBytes as CB
16-
17-
import Data.Maybe (fromJust)
18-
import HStream.Connector.HStore (transToStreamName)
19-
import HStream.Server.Exception (InvalidArgument (..),
20-
StreamNotExist (..))
21-
import HStream.Server.Handler.Common (decodeRecordBatch)
22-
import qualified HStream.Server.HStreamApi as API
23-
import HStream.Server.ReaderPool (getReader, putReader)
24-
import HStream.Server.Types (ServerContext (..))
25-
import qualified HStream.Store as S
26-
import HStream.ThirdParty.Protobuf as PB
11+
import Control.Exception (bracket)
12+
import Control.Monad (foldM, void)
13+
import Data.Foldable (foldl')
14+
import qualified Data.HashMap.Strict as HM
15+
import qualified Data.Map.Strict as M
16+
import qualified Data.Vector as V
17+
import GHC.Stack (HasCallStack)
18+
import qualified Z.Data.CBytes as CB
19+
20+
import Control.Concurrent (MVar, modifyMVar, modifyMVar_,
21+
readMVar)
22+
import Data.Maybe (fromJust, fromMaybe)
23+
import Data.Text (Text)
24+
import Data.Word (Word64)
25+
import HStream.Server.Core.Common (decodeRecordBatch)
26+
import qualified HStream.Server.HStreamApi as API
27+
import HStream.Server.ReaderPool (getReader, putReader)
28+
import HStream.Server.Shard (Shard (..), ShardKey (..),
29+
SharedShardMap, cBytesToKey,
30+
mergeTwoShard, mkShard,
31+
mkSharedShardMapWithShards,
32+
shardKeyToText, splitByKey,
33+
splitHalf, textToShardKey)
34+
import HStream.Server.Types (ServerContext (..),
35+
transToStreamName)
36+
import qualified HStream.Store as S
2737
import HStream.Utils
28-
import Proto3.Suite (Enumerated (Enumerated))
38+
import Proto3.Suite (Enumerated (Enumerated))
39+
import Z.Data.CBytes (CBytes)
2940

3041
-------------------------------------------------------------------------------
3142

@@ -36,16 +47,13 @@ listShards
3647
-> IO (V.Vector API.Shard)
3748
listShards ServerContext{..} API.ListShardsRequest{..} = do
3849
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
39-
V.foldM' getShardInfo V.empty $ V.fromList shards
50+
V.foldM' constructShard V.empty $ V.fromList shards
4051
where
4152
streamId = transToStreamName listShardsRequestStreamName
42-
startKey = CB.pack "startKey"
43-
endKey = CB.pack "endKey"
44-
epoch = CB.pack "epoch"
4553

46-
getShardInfo shards logId = do
54+
constructShard shards logId = do
4755
attr <- S.getStreamPartitionExtraAttrs scLDClient logId
48-
case getInfo attr of
56+
case getShardInfo attr of
4957
Nothing -> return . V.snoc shards $
5058
API.Shard { API.shardStreamName = listShardsRequestStreamName
5159
, API.shardShardId = logId
@@ -54,19 +62,13 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do
5462
Just(sKey, eKey, ep) -> return . V.snoc shards $
5563
API.Shard { API.shardStreamName = listShardsRequestStreamName
5664
, API.shardShardId = logId
57-
, API.shardStartHashRangeKey = sKey
58-
, API.shardEndHashRangeKey = eKey
65+
, API.shardStartHashRangeKey = shardKeyToText sKey
66+
, API.shardEndHashRangeKey = shardKeyToText eKey
5967
, API.shardEpoch = ep
6068
-- FIXME: neet a way to find if this shard is active
6169
, API.shardIsActive = True
6270
}
6371

64-
getInfo mp = do
65-
startHashRangeKey <- cBytesToText <$> M.lookup startKey mp
66-
endHashRangeKey <- cBytesToText <$> M.lookup endKey mp
67-
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
68-
return (startHashRangeKey, endHashRangeKey, shardEpoch)
69-
7072
readShard
7173
:: HasCallStack
7274
=> ServerContext
@@ -101,8 +103,90 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
101103
return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs
102104

103105
splitShards
104-
:: HasCallStack
105-
=> ServerContext
106+
:: ServerContext
106107
-> API.SplitShardsRequest
107108
-> IO (V.Vector API.Shard)
108-
splitShards = error ""
109+
splitShards ServerContext{..} API.SplitShardsRequest{..} = do
110+
sharedShardMp <- getShardMap scLDClient shardInfo splitShardsRequestStreamName
111+
newShards <- splitShard sharedShardMp
112+
updateShardTable newShards
113+
return . V.map (shardToPb splitShardsRequestStreamName) $ V.fromList newShards
114+
where
115+
splitKey = textToShardKey splitShardsRequestSplitKey
116+
117+
split :: Bool -> ShardKey -> SharedShardMap -> IO (Shard, Shard)
118+
split True key mps = splitHalf scLDClient mps key
119+
split False key mps = splitByKey scLDClient mps key
120+
121+
splitShard sharedShardMp =
122+
modifyMVar shardInfo $ \info -> do
123+
(s1, s2) <- split splitShardsRequestHalfSplit splitKey sharedShardMp
124+
return (HM.insert splitShardsRequestStreamName sharedShardMp info, [s1, s2])
125+
126+
updateShardTable newShards =
127+
modifyMVar_ shardTable $ \mp -> do
128+
let dict = fromMaybe M.empty $ HM.lookup splitShardsRequestStreamName mp
129+
dict' = foldl' (\acc Shard{startKey=sKey, shardId=sId} -> M.insert sKey sId acc) dict newShards
130+
return $ HM.insert splitShardsRequestStreamName dict' mp
131+
132+
mergeShards
133+
:: ServerContext
134+
-> API.MergeShardsRequest
135+
-> IO API.Shard
136+
mergeShards ServerContext{..} API.MergeShardsRequest{..} = do
137+
sharedShardMp <- getShardMap scLDClient shardInfo mergeShardsRequestStreamName
138+
(newShard, removedKey) <- mergeShard sharedShardMp
139+
updateShardTable newShard removedKey
140+
return . shardToPb mergeShardsRequestStreamName $ newShard
141+
where
142+
mergeShard sharedShardMp = do
143+
modifyMVar shardInfo $ \info -> do
144+
let [shardKey1, shardKey2] = V.toList . V.map textToShardKey $ mergeShardsRequestShardKeys
145+
res <- mergeTwoShard scLDClient sharedShardMp shardKey1 shardKey2
146+
return (HM.insert mergeShardsRequestStreamName sharedShardMp info, res)
147+
148+
updateShardTable Shard{startKey=sKey, shardId=sId} removedKey =
149+
modifyMVar_ shardTable $ \mp -> do
150+
let dict = fromMaybe M.empty $ HM.lookup mergeShardsRequestStreamName mp
151+
dict' = M.insert sKey sId dict
152+
dict'' = M.delete removedKey dict'
153+
return $ HM.insert mergeShardsRequestStreamName dict'' mp
154+
155+
getShardMap :: S.LDClient -> MVar (HM.HashMap Text SharedShardMap) -> Text -> IO SharedShardMap
156+
getShardMap client shardInfo streamName = do
157+
let streamId = transToStreamName streamName
158+
readMVar shardInfo >>= pure <$> HM.lookup streamName >>= \case
159+
Just mps -> return mps
160+
Nothing -> loadSharedShardMap client streamId
161+
162+
loadSharedShardMap :: S.LDClient -> S.StreamId -> IO SharedShardMap
163+
loadSharedShardMap client streamId = do
164+
shardIds <- M.elems <$> S.listStreamPartitions client streamId
165+
mkSharedShardMapWithShards =<< foldM createShard [] shardIds
166+
where
167+
createShard acc shardId = do
168+
attrs <- S.getStreamPartitionExtraAttrs client shardId
169+
case getShardInfo attrs of
170+
Nothing -> return acc
171+
Just (sKey, eKey, epoch) -> return $ mkShard shardId streamId sKey eKey epoch : acc
172+
173+
getShardInfo :: M.Map CBytes CBytes -> Maybe (ShardKey, ShardKey, Word64)
174+
getShardInfo mp = do
175+
startHashRangeKey <- cBytesToKey <$> M.lookup startKey mp
176+
endHashRangeKey <- cBytesToKey <$> M.lookup endKey mp
177+
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
178+
return (startHashRangeKey, endHashRangeKey, shardEpoch)
179+
where
180+
startKey = CB.pack "startKey"
181+
endKey = CB.pack "endKey"
182+
epoch = CB.pack "epoch"
183+
184+
shardToPb :: Text -> Shard -> API.Shard
185+
shardToPb sName Shard{..} = API.Shard
186+
{ API.shardShardId = shardId
187+
, API.shardStreamName = sName
188+
, API.shardStartHashRangeKey = shardKeyToText startKey
189+
, API.shardEndHashRangeKey = shardKeyToText endKey
190+
, API.shardEpoch = epoch
191+
, API.shardIsActive = True
192+
}

hstream/src/HStream/Server/Core/Stream.hs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@ import qualified Data.Text as Text
2828
import qualified Data.Vector as V
2929
import GHC.Stack (HasCallStack)
3030
import Network.GRPC.HighLevel.Generated
31-
import qualified Z.Data.CBytes as CB
3231

33-
import Control.Concurrent (MVar, modifyMVar,
34-
modifyMVar_)
35-
import HStream.Connector.HStore (transToStreamName)
3632
import qualified HStream.Logger as Log
37-
import HStream.Server.Core.Common (decodeRecordBatch)
3833
import HStream.Server.Exception (InvalidArgument (..),
3934
StreamNotExist (..))
4035
import qualified HStream.Server.HStreamApi as API
@@ -53,8 +48,6 @@ import qualified HStream.Stats as Stats
5348
import qualified HStream.Store as S
5449
import HStream.ThirdParty.Protobuf as PB
5550
import HStream.Utils
56-
import Network.GRPC.HighLevel.Generated
57-
import Proto3.Suite (Enumerated (Enumerated))
5851

5952
-------------------------------------------------------------------------------
6053

hstream/src/HStream/Server/Exception.hs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import ZooKeeper.Exception
2121

2222
import qualified HStream.Logger as Log
2323
import HStream.Server.Persistence.Exception (PersistenceException)
24+
import HStream.Server.Shard (CanNotMerge, CanNotSplit,
25+
ShardNotExist)
2426
import qualified HStream.Store as Store
2527
import HStream.Utils (mkServerErrResp)
2628

@@ -102,7 +104,7 @@ finalExceptionHandlers = [
102104
Log.fatal $ Log.buildString' err
103105
return (StatusInternal, mkStatusDetails err)
104106
,
105-
Handler $ \(err :: AsyncCancelled) -> do
107+
Handler $ \(_ :: AsyncCancelled) -> do
106108
return (StatusOk, "")
107109
,
108110
Handler $ \(err :: SomeException) -> do
@@ -140,6 +142,19 @@ zooKeeperExceptionHandler = [
140142
Handler $ \(e :: ZooException ) -> handleZKException e StatusInternal
141143
]
142144

145+
shardExceptionHandler :: Handlers (StatusCode, StatusDetails)
146+
shardExceptionHandler = [
147+
Handler $ \(e :: CanNotSplit) -> do
148+
Log.fatal $ Log.buildString' e
149+
return (StatusFailedPrecondition, mkStatusDetails e),
150+
Handler $ \(e :: CanNotMerge) -> do
151+
Log.fatal $ Log.buildString' e
152+
return (StatusFailedPrecondition, mkStatusDetails e),
153+
Handler $ \(e :: ShardNotExist) -> do
154+
Log.fatal $ Log.buildString' e
155+
return (StatusNotFound, mkStatusDetails e)
156+
]
157+
143158
defaultHandlers :: Handlers (StatusCode, StatusDetails)
144159
defaultHandlers = serverExceptionHandlers
145160
++ storeExceptionHandlers

hstream/src/HStream/Server/Handler.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ handlers serverContext@ServerContext{..} =
5757
hstreamApiReadShard = readShardHandler serverContext,
5858
hstreamApiListShards = listShardsHandler serverContext,
5959
hstreamApiSplitShards = splitShardsHandler serverContext,
60+
hstreamApiMergeShards = mergeShardsHandler serverContext,
6061

6162
-- Stats
6263
hstreamApiPerStreamTimeSeriesStats = H.perStreamTimeSeriesStats scStatsHolder,

hstream/src/HStream/Server/Handler/Shard.hs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ module HStream.Server.Handler.Shard
99
( listShardsHandler
1010
, readShardHandler
1111
, splitShardsHandler
12+
, mergeShardsHandler
1213
)
1314
where
1415

1516
import Control.Exception
1617
import Network.GRPC.HighLevel.Generated
1718

19+
import Control.Monad (when)
20+
import qualified Data.Vector as V
1821
import qualified HStream.Logger as Log
1922
import qualified HStream.Server.Core.Shard as C
2023
import HStream.Server.Exception
@@ -29,7 +32,7 @@ listShardsHandler
2932
:: ServerContext
3033
-> ServerRequest 'Normal ListShardsRequest ListShardsResponse
3134
-> IO (ServerResponse 'Normal ListShardsResponse)
32-
listShardsHandler sc (ServerNormalRequest _metadata request) = do
35+
listShardsHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
3336
Log.debug "Receive List Shards Request"
3437
C.listShards sc request >>= returnResp . ListShardsResponse
3538

@@ -45,14 +48,35 @@ splitShardsHandler
4548
:: ServerContext
4649
-> ServerRequest 'Normal SplitShardsRequest SplitShardsResponse
4750
-> IO (ServerResponse 'Normal SplitShardsResponse)
48-
splitShardsHandler sc (ServerNormalRequest _metadata request) = do
51+
splitShardsHandler sc (ServerNormalRequest _metadata request) = shardExceptionHandle $ do
4952
Log.debug $ "Receive Split Shards Request: " <> Log.buildString' (show request)
5053
C.splitShards sc request >>= returnResp . SplitShardsResponse
5154

55+
mergeShardsHandler
56+
:: ServerContext
57+
-> ServerRequest 'Normal MergeShardsRequest MergeShardsResponse
58+
-> IO (ServerResponse 'Normal MergeShardsResponse)
59+
mergeShardsHandler sc (ServerNormalRequest _metadata request@MergeShardsRequest{..}) = shardExceptionHandle $ do
60+
Log.debug $ "Receive Merge Shards Request: " <> Log.buildString' (show request)
61+
when (V.length mergeShardsRequestShardKeys /= 2) $ throwIO WrongShardCnt
62+
C.mergeShards sc request >>= returnResp . MergeShardsResponse . Just
63+
5264
-----------------------------------------------------------------------------------
5365

5466
readShardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a)
5567
readShardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $
5668
[ Handler (\(err :: Store.NOTFOUND) ->
5769
return (StatusUnavailable, mkStatusDetails err))
5870
] ++ defaultHandlers
71+
72+
shardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a)
73+
shardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $
74+
[ Handler (\(err :: WrongShardCnt) ->
75+
return (StatusInvalidArgument, mkStatusDetails err))
76+
] ++ shardExceptionHandler ++ defaultHandlers
77+
78+
data WrongShardCnt = WrongShardCnt
79+
instance Show WrongShardCnt where
80+
show _ = "Only two shards can be merged at a time"
81+
82+
instance Exception WrongShardCnt where

hstream/src/HStream/Server/Shard.hs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ module HStream.Server.Shard(
3939
hashShardKey,
4040
keyToCBytes,
4141
cBytesToKey,
42+
shardKeyToText,
43+
textToShardKey,
4244
shardStartKey,
4345
shardEndKey,
4446
shardEpoch
@@ -79,6 +81,12 @@ hashShardKey key =
7981
let w8KeyList = BA.unpack (CH.hash . encodeUtf8 $ key :: CH.Digest CH.MD5)
8082
in ShardKey $ foldl' (\acc c -> (.|.) (acc `shiftL` 8) (fromIntegral c)) (0 :: Integer) w8KeyList
8183

84+
shardKeyToText :: ShardKey -> T.Text
85+
shardKeyToText (ShardKey key) = T.pack (show key)
86+
87+
textToShardKey :: T.Text -> ShardKey
88+
textToShardKey = ShardKey . read . T.unpack
89+
8290
keyToCBytes :: ShardKey -> CB.CBytes
8391
keyToCBytes (ShardKey key) = CB.pack . show $ key
8492

@@ -234,14 +242,14 @@ getShardByKey mp key = do
234242
type SplitStrategies = ShardMap -> ShardKey -> Either ShardException (Shard, Shard)
235243

236244
-- | Split Shard with specific ShardKey
237-
splitByKey :: S.LDClient -> SharedShardMap -> ShardKey -> IO ()
245+
splitByKey :: S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard)
238246
splitByKey = splitShardInternal getSplitedShard
239247

240248
-- | Split Shard by half
241-
splitHalf :: S.LDClient -> SharedShardMap -> ShardKey -> IO ()
249+
splitHalf :: S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard)
242250
splitHalf = splitShardInternal getHalfSplitedShard
243251

244-
splitShardInternal :: SplitStrategies -> S.LDClient -> SharedShardMap -> ShardKey -> IO ()
252+
splitShardInternal :: SplitStrategies -> S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard)
245253
splitShardInternal stratege client sharedMp key = do
246254
let hash1 = getShardMapIdx key
247255
bracket
@@ -278,9 +286,10 @@ splitShardInternal stratege client sharedMp key = do
278286
atomically $ do
279287
putShardMap sharedMp hash1' newMp1
280288
putShardMap sharedMp hash2' newMp2
289+
return (s1', s2')
281290
)
282291

283-
mergeTwoShard :: S.LDClient -> SharedShardMap -> ShardKey -> ShardKey -> IO ()
292+
mergeTwoShard :: S.LDClient -> SharedShardMap -> ShardKey -> ShardKey -> IO (Shard, ShardKey)
284293
mergeTwoShard client mp key1 key2 = do
285294
let hash1 = getShardMapIdx key1
286295
let hash2 = getShardMapIdx key2
@@ -303,6 +312,7 @@ mergeTwoShard client mp key1 key2 = do
303312
atomically $ do
304313
putShardMap mp (getShardMapIdx removedKey) removedShardMp
305314
putShardMap mp (getShardMapIdx startKey) updateShardMp
315+
return (newShard, removedKey)
306316
)
307317
where
308318
getShards hash1 hash2

0 commit comments

Comments
 (0)