Skip to content

Commit 3fcdc69

Browse files
authored
Pull updating commit hashes out of SnapshotWorker (#56)
* Pull updating commits hashes out of SnapshotWorker * Implement IComparable in CommitBase * Fix test * Call SaveChanges only after new commits have been added * Remove redundant SaveChanges which bypasses save parameter * Remove unnecessary async/await syntax * Remove outdated deferred SaveChanges * Add clarifying comments * Add test to demonstrate hash validation
1 parent 1ac7462 commit 3fcdc69

6 files changed

Lines changed: 111 additions & 55 deletions

File tree

src/SIL.Harmony.Core/CommitBase.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace SIL.Harmony.Core;
77
/// most basic commit, does not contain any change data, that's stored in <see cref="CommitBase{TChange}"/>
88
/// this class is not meant to be inherited from directly, use <see cref="ServerCommit"/> or <see cref="SIL.Harmony.Commit"/> instead
99
/// </summary>
10-
public abstract class CommitBase
10+
public abstract class CommitBase : IComparable<CommitBase>
1111
{
1212
public const string NullParentHash = "0000";
1313
[JsonConstructor]
@@ -45,6 +45,12 @@ public override string ToString()
4545
{
4646
return $"{Id} [{DateTime}]";
4747
}
48+
49+
public int CompareTo(CommitBase? other)
50+
{
51+
if (other is null) return 1;
52+
return CompareKey.CompareTo(other.CompareKey);
53+
}
4854
}
4955

5056
/// <inheritdoc cref="CommitBase"/>

src/SIL.Harmony.Core/QueryHelpers.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,21 @@ public static async IAsyncEnumerable<TCommit> GetMissingCommits<TCommit, TChange
6060
}
6161
}
6262

63+
public static SortedSet<T> ToSortedSet<T>(this IEnumerable<T> queryable) where T : CommitBase
64+
{
65+
return [.. queryable];
66+
}
67+
68+
public static async Task<SortedSet<T>> ToSortedSetAsync<T>(this IQueryable<T> queryable) where T : CommitBase
69+
{
70+
var set = new SortedSet<T>();
71+
await foreach (var item in queryable.AsAsyncEnumerable())
72+
{
73+
set.Add(item);
74+
}
75+
return set;
76+
}
77+
6378
public static IQueryable<T> DefaultOrder<T>(this IQueryable<T> queryable) where T: CommitBase
6479
{
6580
return queryable

src/SIL.Harmony.Tests/DataModelIntegrityTests.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,21 @@ public async Task CanAddTheSameCommitMultipleTimesVisSync()
2929
var entry = await DataModel.GetLatest<Word>(entity1Id);
3030
entry!.Text.Should().Be("entity1.1");
3131
}
32+
33+
[Fact]
34+
public async Task InvalidCommitHashesResultInException()
35+
{
36+
// arrange
37+
var addedCommit = await WriteNextChange(SetWord(Guid.NewGuid(), "word"));
38+
39+
// act - break the hash
40+
addedCommit.SetParentHash("BBAADD");
41+
DbContext.SaveChanges();
42+
43+
// act - add another commit to trigger validation
44+
Func<Task> act = async () => await WriteNextChange(SetWord(Guid.NewGuid(), "word"));
45+
46+
// assert - validation detects the broken hash
47+
await act.Should().ThrowAsync<CommitValidationException>().WithMessage("*does not match expected hash*");
48+
}
3249
}

src/SIL.Harmony/DataModel.cs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId,
8181
repo.ClearChangeTracker();
8282

8383
await using var transaction = await repo.BeginTransactionAsync();
84-
await repo.AddCommits(commits);
85-
await UpdateSnapshots(repo, commits.First(), commits);
84+
var updatedCommits = await repo.AddCommits(commits);
85+
await UpdateSnapshots(repo, updatedCommits);
8686
await ValidateCommits(repo);
8787
await transaction.CommitAsync();
8888
}
@@ -119,8 +119,8 @@ private async Task Add(Commit commit)
119119
repo.ClearChangeTracker();
120120

121121
await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync();
122-
await repo.AddCommit(commit);
123-
await UpdateSnapshots(repo, commit, [commit]);
122+
var updatedCommits = await repo.AddCommit(commit);
123+
await UpdateSnapshots(repo, updatedCommits);
124124

125125
if (AlwaysValidate) await ValidateCommits(repo);
126126

@@ -155,9 +155,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)
155155
if (oldestChange is null || newCommits is []) return;
156156

157157
await using var transaction = await repo.BeginTransactionAsync();
158-
//don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done
159-
await repo.AddCommits(newCommits, false);
160-
await UpdateSnapshots(repo, oldestChange, newCommits);
158+
var updatedCommits = await repo.AddCommits(newCommits);
159+
await UpdateSnapshots(repo, updatedCommits);
161160
await ValidateCommits(repo);
162161
await transaction.CommitAsync();
163162
}
@@ -194,13 +193,16 @@ ValueTask<bool> ISyncable.ShouldSync()
194193
return ValueTask.FromResult(true);
195194
}
196195

197-
private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits)
196+
private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply)
198197
{
198+
if (commitsToApply.Count == 0) return;
199+
var oldestAddedCommit = commitsToApply.First();
199200
await repo.DeleteStaleSnapshots(oldestAddedCommit);
200201
Dictionary<Guid, Guid?> snapshotLookup;
201-
if (newCommits.Length > 10)
202+
if (commitsToApply.Count > 10)
202203
{
203-
var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
204+
// Bulk-load relevant snapshots to minimize DB queries
205+
var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
204206
snapshotLookup = await repo.CurrentSnapshots()
205207
.Where(s => entityIds.Contains(s.EntityId))
206208
.Select(s => new KeyValuePair<Guid, Guid?>(s.EntityId, s.Id))
@@ -212,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit
212214
}
213215

214216
var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value);
215-
await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits);
217+
await snapshotWorker.UpdateSnapshots(commitsToApply);
216218
}
217219

218220
private async Task ValidateCommits(CrdtRepository repo)
@@ -240,8 +242,10 @@ public async Task RegenerateSnapshots()
240242
await using var repo = await _crdtRepositoryFactory.CreateRepository();
241243
await repo.DeleteSnapshotsAndProjectedTables();
242244
repo.ClearChangeTracker();
243-
var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync();
244-
await UpdateSnapshots(repo, allCommits.First(), allCommits);
245+
var allCommits = await repo.CurrentCommits()
246+
.Include(c => c.ChangeEntities)
247+
.ToSortedSetAsync();
248+
await UpdateSnapshots(repo, allCommits);
245249
}
246250

247251
public async Task<ObjectSnapshot> GetLatestSnapshotByObjectId(Guid entityId)
@@ -305,7 +309,7 @@ public async Task<Dictionary<Guid, ObjectSnapshot>> GetSnapshotsAtCommit(Commit
305309
var repository = repo.GetScopedRepository(commit);
306310
var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits();
307311

308-
if (pendingCommits.Length != 0)
312+
if (pendingCommits.Count != 0)
309313
{
310314
snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots,
311315
repository,
@@ -340,8 +344,8 @@ public async Task<T> GetAtCommit<T>(Commit commit, Guid entityId)
340344
var newCommits = await repository.CurrentCommits()
341345
.Include(c => c.ChangeEntities)
342346
.WhereAfter(snapshot.Commit)
343-
.ToArrayAsync();
344-
if (newCommits.Length > 0)
347+
.ToSortedSetAsync();
348+
if (newCommits.Count > 0)
345349
{
346350
var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(
347351
new Dictionary<Guid, ObjectSnapshot>([

src/SIL.Harmony/Db/CrdtRepository.cs

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
207207
return snapshots;
208208
}
209209

210-
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
210+
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, SortedSet<Commit> pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
211211
{
212212
var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId);
213213

@@ -217,7 +217,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
217217
var newCommits = await CurrentCommits()
218218
.Include(c => c.ChangeEntities)
219219
.WhereAfter(lastCommit)
220-
.ToArrayAsync();
220+
.ToSortedSetAsync();
221221
return (snapshots, newCommits);
222222
}
223223

@@ -326,25 +326,11 @@ public async Task AddSnapshots(IEnumerable<ObjectSnapshot> snapshots)
326326
catch (DbUpdateException e)
327327
{
328328
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
329-
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
329+
var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}";
330330
_logger.LogError(e, message);
331331
throw new DbUpdateException(message, e);
332332
}
333333
}
334-
335-
// this extra try/catch was added as a quick way to get the NewEntityOnExistingEntityIsNoOp test to pass
336-
// it will be removed again in a larger refactor in https://github.com/sillsdev/harmony/pull/56
337-
try
338-
{
339-
await _dbContext.SaveChangesAsync();
340-
}
341-
catch (DbUpdateException e)
342-
{
343-
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
344-
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
345-
_logger.LogError(e, message);
346-
throw new DbUpdateException(message, e);
347-
}
348334
}
349335

350336
private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot)
@@ -392,16 +378,54 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit)
392378
return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit);
393379
}
394380

395-
public async Task AddCommit(Commit commit)
381+
/// <summary>
382+
/// Adds a commit to the database. If the new commit was authored before any commits that
383+
/// are already in the database, then history will be rewritten by updating those commit hashes.
384+
/// </summary>
385+
/// <returns>All added and updated commits.</returns>
386+
public async Task<SortedSet<Commit>> AddCommit(Commit commit)
396387
{
397-
_dbContext.Add(commit);
388+
var updatedCommits = await AddNewCommits([commit]);
398389
await _dbContext.SaveChangesAsync();
390+
return updatedCommits;
399391
}
400392

401-
public async Task AddCommits(IEnumerable<Commit> commits, bool save = true)
393+
/// <summary>
394+
/// Adds commits to the database. If any of the new commits were authored before any commits that
395+
/// are already in the database, then history will be rewritten by updating those commit hashes.
396+
/// </summary>
397+
/// <returns>All added and updated commits.</returns>
398+
public async Task<SortedSet<Commit>> AddCommits(IEnumerable<Commit> commits)
402399
{
403-
_dbContext.AddRange(commits);
404-
if (save) await _dbContext.SaveChangesAsync();
400+
var updatedCommits = await AddNewCommits(commits);
401+
await _dbContext.SaveChangesAsync();
402+
return updatedCommits;
403+
}
404+
405+
private async Task<SortedSet<Commit>> AddNewCommits(IEnumerable<Commit> newCommits)
406+
{
407+
if (newCommits is null || !newCommits.Any()) return [];
408+
var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey)
409+
?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits));
410+
var parentCommit = await FindPreviousCommit(oldestAddedCommit);
411+
var existingCommitsToUpdate = await GetCommitsAfter(parentCommit);
412+
var commitsToApply = existingCommitsToUpdate
413+
.UnionBy(newCommits, c => c.Id)
414+
.ToSortedSet();
415+
//we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes
416+
UpdateCommitHashes(commitsToApply, parentCommit);
417+
_dbContext.AddRange(newCommits);
418+
return commitsToApply;
419+
}
420+
421+
private void UpdateCommitHashes(SortedSet<Commit> commits, Commit? parentCommit = null)
422+
{
423+
var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash;
424+
foreach (var commit in commits)
425+
{
426+
commit.SetParentHash(previousCommitHash);
427+
previousCommitHash = commit.Hash;
428+
}
405429
}
406430

407431
public HybridDateTime? GetLatestDateTime()

src/SIL.Harmony/SnapshotWorker.cs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
3131
internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnapshots(
3232
Dictionary<Guid, ObjectSnapshot> snapshots,
3333
CrdtRepository crdtRepository,
34-
ICollection<Commit> commits,
34+
SortedSet<Commit> commits,
3535
CrdtConfig crdtConfig)
3636
{
3737
//we need to pass in the snapshots because we expect it to be modified, this is intended.
3838
//if the constructor makes a copy in the future this will need to be updated
39-
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null);
39+
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits);
4040
return snapshots;
4141
}
4242

@@ -49,32 +49,22 @@ internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
4949
{
5050
}
5151

52-
public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits)
52+
public async Task UpdateSnapshots(SortedSet<Commit> commits)
5353
{
54-
var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit);
55-
var commits = await _crdtRepository.GetCommitsAfter(previousCommit);
56-
await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash);
57-
54+
await ApplyCommitChanges(commits);
5855
await _crdtRepository.AddSnapshots([
5956
.._rootSnapshots.Values,
6057
.._newIntermediateSnapshots,
6158
.._pendingSnapshots.Values
6259
]);
6360
}
6461

65-
private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool updateCommitHash, string? previousCommitHash)
62+
private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
6663
{
6764
var intermediateSnapshots = new Dictionary<Guid, ObjectSnapshot>();
6865
var commitIndex = 0;
69-
foreach (var commit in commits.DefaultOrder())
66+
foreach (var commit in commits)
7067
{
71-
if (updateCommitHash && previousCommitHash is not null)
72-
{
73-
//we're rewriting history, so we need to update the previous commit hash
74-
commit.SetParentHash(previousCommitHash);
75-
}
76-
77-
previousCommitHash = commit.Hash;
7868
commitIndex++;
7969
foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index))
8070
{

0 commit comments

Comments
 (0)