-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathBatchUpdateService.cs
More file actions
143 lines (120 loc) · 4.22 KB
/
BatchUpdateService.cs
File metadata and controls
143 lines (120 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
using System.Collections.Concurrent;
using System.Timers;
using Microsoft.EntityFrameworkCore;
using NRedisStack.RedisStackCommands;
using OpenShock.Common.Constants;
using OpenShock.Common.OpenShockDb;
using OpenShock.Common.Redis;
using OpenShock.Common.Utils;
using StackExchange.Redis;
using Timer = System.Timers.Timer;
namespace OpenShock.Common.Services.BatchUpdate;
internal sealed class ConcurrentUniqueBatchQueue<TKey, TValue> where TKey : notnull
{
private readonly ReaderWriterLockSlim _lock = new();
private readonly ConcurrentDictionary<TKey, TValue> _dictionary = new();
public void Enqueue(TKey key, TValue value)
{
_lock.EnterReadLock();
try
{
_dictionary[key] = value;
}
finally
{
_lock.ExitReadLock();
}
}
public KeyValuePair<TKey, TValue>[] DequeueAll()
{
_lock.EnterWriteLock();
try
{
var items = _dictionary.ToArray();
_dictionary.Clear();
return items;
}
finally
{
_lock.ExitWriteLock();
}
}
}
public sealed class BatchUpdateService : IHostedService, IBatchUpdateService
{
private static readonly TimeSpan Interval = TimeSpan.FromSeconds(10);
private readonly IDbContextFactory<OpenShockContext> _dbFactory;
private readonly ILogger<BatchUpdateService> _logger;
private readonly IConnectionMultiplexer _connectionMultiplexer;
private readonly Timer _updateTimer;
private readonly ConcurrentUniqueBatchQueue<Guid, bool> _tokenLastUsed = new();
private readonly ConcurrentUniqueBatchQueue<string, DateTimeOffset> _sessionLastUsed = new();
public BatchUpdateService(IDbContextFactory<OpenShockContext> dbFactory, ILogger<BatchUpdateService> logger, IConnectionMultiplexer connectionMultiplexer)
{
_dbFactory = dbFactory;
_logger = logger;
_connectionMultiplexer = connectionMultiplexer;
_updateTimer = new Timer(Interval);
_updateTimer.Elapsed += UpdateTimerOnElapsed;
}
private async void UpdateTimerOnElapsed(object? sender, ElapsedEventArgs eventArgs)
{
try
{
await Task.WhenAll(UpdateTokens(), UpdateSessions());
}
catch (Exception e)
{
_logger.LogError(e, "Error in batch update loop");
}
}
private async Task UpdateTokens()
{
var keys = _tokenLastUsed.DequeueAll().Select(x => x.Key).ToArray();
// Skip if there is nothing
if (keys.Length < 1) return;
await using var db = await _dbFactory.CreateDbContextAsync();
await db.ApiTokens.Where(x => keys.Contains(x.Id))
.ExecuteUpdateAsync(x => x.SetProperty(y => y.LastUsed, DateTime.UtcNow));
await db.SaveChangesAsync();
}
private async Task UpdateSessions()
{
var sessionsToUpdate = new List<Task<bool>>();
var json = _connectionMultiplexer.GetDatabase().JSON();
foreach (var (sessionToken, lastUsed) in _sessionLastUsed.DequeueAll())
{
sessionsToUpdate.Add(json.SetAsync(typeof(LoginSession).FullName + ":" + sessionToken, "LastUsed", lastUsed.ToUnixTimeMilliseconds(), When.Always));
}
try
{
await Task.WhenAll(sessionsToUpdate);
} catch (Exception e)
{
_logger.LogTrace(e, "Error updating a sessions last used value");
}
}
public void UpdateApiTokenLastUsed(Guid apiTokenId)
{
_tokenLastUsed.Enqueue(apiTokenId, false);
}
public void UpdateSessionLastUsed(string sessionToken, DateTimeOffset lastUsed)
{
// Only hash new tokens, old ones are 64 chars long
if (sessionToken.Length == AuthConstants.GeneratedTokenLength)
{
sessionToken = HashingUtils.HashToken(sessionToken);
}
_sessionLastUsed.Enqueue(sessionToken, lastUsed);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_updateTimer.Start();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_updateTimer.Stop();
return Task.CompletedTask;
}
}