99
1010namespace OpenShock . Common . Services . BatchUpdate ;
1111
12+ internal sealed class ConcurrentUniqueBatchQueue < TKey , TValue > where TKey : notnull
13+ {
14+ private readonly ReaderWriterLockSlim _lock = new ( ) ;
15+ private readonly ConcurrentDictionary < TKey , TValue > _dictionary = new ( ) ;
16+
17+ public void Enqueue ( TKey key , TValue value )
18+ {
19+ _lock . EnterReadLock ( ) ;
20+ try
21+ {
22+ _dictionary [ key ] = value ;
23+ }
24+ finally
25+ {
26+ _lock . ExitReadLock ( ) ;
27+ }
28+ }
29+
30+ public KeyValuePair < TKey , TValue > [ ] DequeueAll ( )
31+ {
32+ _lock . EnterWriteLock ( ) ;
33+ try
34+ {
35+ var items = _dictionary . ToArray ( ) ;
36+ _dictionary . Clear ( ) ;
37+ return items ;
38+ }
39+ finally
40+ {
41+ _lock . ExitWriteLock ( ) ;
42+ }
43+ }
44+ }
45+
1246public sealed class BatchUpdateService : IHostedService , IBatchUpdateService
1347{
1448 private static readonly TimeSpan Interval = TimeSpan . FromSeconds ( 10 ) ;
@@ -18,8 +52,8 @@ public sealed class BatchUpdateService : IHostedService, IBatchUpdateService
1852 private readonly IConnectionMultiplexer _connectionMultiplexer ;
1953 private readonly Timer _updateTimer ;
2054
21- private readonly ConcurrentDictionary < Guid , bool > _tokenLastUsed = new ( ) ;
22- private readonly ConcurrentDictionary < string , DateTimeOffset > _sessionLastUsed = new ( ) ;
55+ private readonly ConcurrentUniqueBatchQueue < Guid , bool > _tokenLastUsed = new ( ) ;
56+ private readonly ConcurrentUniqueBatchQueue < string , DateTimeOffset > _sessionLastUsed = new ( ) ;
2357
2458 public BatchUpdateService ( IDbContextFactory < OpenShockContext > dbFactory , ILogger < BatchUpdateService > logger , IConnectionMultiplexer connectionMultiplexer )
2559 {
@@ -45,13 +79,10 @@ private async void UpdateTimerOnElapsed(object? sender, ElapsedEventArgs eventAr
4579
4680 private async Task UpdateTokens ( )
4781 {
48- var keys = _tokenLastUsed . Keys . ToArray ( ) ;
82+ var keys = _tokenLastUsed . DequeueAll ( ) . Select ( x => x . Key ) . ToArray ( ) ;
4983
5084 // Skip if there is nothing
5185 if ( keys . Length < 1 ) return ;
52-
53- // Yeah
54- foreach ( var guid in keys ) _tokenLastUsed . TryRemove ( guid , out _ ) ;
5586
5687 await using var db = await _dbFactory . CreateDbContextAsync ( ) ;
5788 await db . ApiTokens . Where ( x => keys . Contains ( x . Id ) )
@@ -66,9 +97,8 @@ private async Task UpdateSessions()
6697
6798 var json = _connectionMultiplexer . GetDatabase ( ) . JSON ( ) ;
6899
69- foreach ( var sessionKey in _sessionLastUsed . Keys )
100+ foreach ( var ( sessionKey , lastUsed ) in _sessionLastUsed . DequeueAll ( ) )
70101 {
71- if ( ! _sessionLastUsed . TryRemove ( sessionKey , out var lastUsed ) ) return ;
72102 sessionsToUpdate . Add ( json . SetAsync ( typeof ( LoginSession ) . FullName + ":" + sessionKey , "LastUsed" , lastUsed . ToUnixTimeMilliseconds ( ) , When . Always ) ) ;
73103 }
74104
@@ -83,12 +113,12 @@ private async Task UpdateSessions()
83113
84114 public void UpdateTokenLastUsed ( Guid tokenId )
85115 {
86- _tokenLastUsed [ tokenId ] = false ;
116+ _tokenLastUsed . Enqueue ( tokenId , false ) ;
87117 }
88118
89119 public void UpdateSessionLastUsed ( string sessionKey , DateTimeOffset lastUsed )
90120 {
91- _sessionLastUsed [ sessionKey ] = lastUsed ;
121+ _sessionLastUsed . Enqueue ( sessionKey , lastUsed ) ;
92122 }
93123
94124 public Task StartAsync ( CancellationToken cancellationToken )
0 commit comments