Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/SIL.Harmony.Core/CommitBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace SIL.Harmony.Core;
/// most basic commit, does not contain any change data, that's stored in <see cref="CommitBase{TChange}"/>
/// this class is not meant to be inherited from directly, use <see cref="ServerCommit"/> or <see cref="SIL.Harmony.Commit"/> instead
/// </summary>
public abstract class CommitBase
public abstract class CommitBase : IComparable<CommitBase>
{
public const string NullParentHash = "0000";
[JsonConstructor]
Expand Down Expand Up @@ -45,6 +45,12 @@ public override string ToString()
{
return $"{Id} [{DateTime}]";
}

public int CompareTo(CommitBase? other)
{
if (other is null) return 1;
return CompareKey.CompareTo(other.CompareKey);
}
}

/// <inheritdoc cref="CommitBase"/>
Expand Down
15 changes: 15 additions & 0 deletions src/SIL.Harmony.Core/QueryHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ public static async IAsyncEnumerable<TCommit> GetMissingCommits<TCommit, TChange
}
}

public static SortedSet<T> ToSortedSet<T>(this IEnumerable<T> queryable) where T : CommitBase
{
return [.. queryable];
}

Comment thread
myieye marked this conversation as resolved.
public static async Task<SortedSet<T>> ToSortedSetAsync<T>(this IQueryable<T> queryable) where T : CommitBase
{
var set = new SortedSet<T>();
await foreach (var item in queryable.AsAsyncEnumerable())
{
set.Add(item);
}
return set;
}

public static IQueryable<T> DefaultOrder<T>(this IQueryable<T> queryable) where T: CommitBase
{
return queryable
Expand Down
34 changes: 19 additions & 15 deletions src/SIL.Harmony/DataModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public async Task AddManyChanges(Guid clientId,
repo.ClearChangeTracker();

await using var transaction = await repo.BeginTransactionAsync();
await repo.AddCommits(commits);
await UpdateSnapshots(repo, commits.First(), commits);
var updatedCommits = await repo.AddCommits(commits);
await UpdateSnapshots(repo, updatedCommits);
await ValidateCommits(repo);
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -119,8 +119,8 @@ private async Task Add(Commit commit)
repo.ClearChangeTracker();

await using var transaction = repo.IsInTransaction ? null : await repo.BeginTransactionAsync();
await repo.AddCommit(commit);
await UpdateSnapshots(repo, commit, [commit]);
var updatedCommits = await repo.AddCommit(commit);
await UpdateSnapshots(repo, updatedCommits);

if (AlwaysValidate) await ValidateCommits(repo);

Expand Down Expand Up @@ -156,8 +156,8 @@ async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)

await using var transaction = await repo.BeginTransactionAsync();
//don't save since UpdateSnapshots will also modify newCommits with hashes, so changes will be saved once that's done
await repo.AddCommits(newCommits, false);
await UpdateSnapshots(repo, oldestChange, newCommits);
var updatedCommits = await repo.AddCommits(newCommits, false);
await UpdateSnapshots(repo, updatedCommits);
await ValidateCommits(repo);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -194,13 +194,15 @@ ValueTask<bool> ISyncable.ShouldSync()
return ValueTask.FromResult(true);
}

private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit, Commit[] newCommits)
private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply)
{
if (commitsToApply.Count == 0) return;
var oldestAddedCommit = commitsToApply.First();
await repo.DeleteStaleSnapshots(oldestAddedCommit);
Dictionary<Guid, Guid?> snapshotLookup;
if (newCommits.Length > 10)
if (commitsToApply.Count > 10)
Comment thread
myieye marked this conversation as resolved.
{
var entityIds = newCommits.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
var entityIds = commitsToApply.SelectMany(c => c.ChangeEntities.Select(ce => ce.EntityId));
snapshotLookup = await repo.CurrentSnapshots()
.Where(s => entityIds.Contains(s.EntityId))
.Select(s => new KeyValuePair<Guid, Guid?>(s.EntityId, s.Id))
Expand All @@ -212,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, Commit oldestAddedCommit
}

var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value);
await snapshotWorker.UpdateSnapshots(oldestAddedCommit, newCommits);
await snapshotWorker.UpdateSnapshots(commitsToApply);
}

private async Task ValidateCommits(CrdtRepository repo)
Expand Down Expand Up @@ -240,8 +242,10 @@ public async Task RegenerateSnapshots()
await using var repo = await _crdtRepositoryFactory.CreateRepository();
await repo.DeleteSnapshotsAndProjectedTables();
repo.ClearChangeTracker();
var allCommits = await repo.CurrentCommits().AsNoTracking().ToArrayAsync();
await UpdateSnapshots(repo, allCommits.First(), allCommits);
var allCommits = await repo.CurrentCommits()
.Include(c => c.ChangeEntities)
.ToSortedSetAsync();
await UpdateSnapshots(repo, allCommits);
}

public async Task<ObjectSnapshot> GetLatestSnapshotByObjectId(Guid entityId)
Expand Down Expand Up @@ -296,7 +300,7 @@ public async Task<Dictionary<Guid, ObjectSnapshot>> GetSnapshotsAtCommit(Commit
var repository = repo.GetScopedRepository(commit);
var (snapshots, pendingCommits) = await repository.GetCurrentSnapshotsAndPendingCommits();

if (pendingCommits.Length != 0)
if (pendingCommits.Count != 0)
{
snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(snapshots,
repository,
Expand Down Expand Up @@ -331,8 +335,8 @@ public async Task<T> GetAtCommit<T>(Commit commit, Guid entityId)
var newCommits = await repository.CurrentCommits()
.Include(c => c.ChangeEntities)
.WhereAfter(snapshot.Commit)
.ToArrayAsync();
if (newCommits.Length > 0)
.ToSortedSetAsync();
if (newCommits.Count > 0)
{
var snapshots = await SnapshotWorker.ApplyCommitsToSnapshots(
new Dictionary<Guid, ObjectSnapshot>([
Expand Down
57 changes: 36 additions & 21 deletions src/SIL.Harmony/Db/CrdtRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
return snapshots;
}

public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, Commit[] pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
public async Task<(Dictionary<Guid, ObjectSnapshot> currentSnapshots, SortedSet<Commit> pendingCommits)> GetCurrentSnapshotsAndPendingCommits()
{
var snapshots = await CurrentSnapshots().Include(s => s.Commit).ToDictionaryAsync(s => s.EntityId);

Expand All @@ -217,7 +217,7 @@ public IAsyncEnumerable<SimpleSnapshot> CurrenSimpleSnapshots(bool includeDelete
var newCommits = await CurrentCommits()
.Include(c => c.ChangeEntities)
.WhereAfter(lastCommit)
.ToArrayAsync();
.ToSortedSetAsync();
return (snapshots, newCommits);
}

Expand Down Expand Up @@ -326,25 +326,11 @@ public async Task AddSnapshots(IEnumerable<ObjectSnapshot> snapshots)
catch (DbUpdateException e)
{
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
var message = $"Error saving snapshots (deleted: {grouping.Key}): {e.Message}{Environment.NewLine}{entries}";
_logger.LogError(e, message);
throw new DbUpdateException(message, e);
}
}

// this extra try/catch was added as a quick way to get the NewEntityOnExistingEntityIsNoOp test to pass
// it will be removed again in a larger refactor in https://github.com/sillsdev/harmony/pull/56
try
{
await _dbContext.SaveChangesAsync();
}
catch (DbUpdateException e)
{
var entries = string.Join(Environment.NewLine, e.Entries.Select(entry => entry.ToString()));
var message = $"Error saving snapshots: {e.Message}{Environment.NewLine}{entries}";
_logger.LogError(e, message);
throw new DbUpdateException(message, e);
}
}

private async ValueTask ProjectSnapshot(ObjectSnapshot objectSnapshot)
Expand Down Expand Up @@ -389,16 +375,45 @@ public CrdtRepository GetScopedRepository(Commit excludeChangesAfterCommit)
return new CrdtRepository(_dbContext, _crdtConfig, _logger, excludeChangesAfterCommit);
}

public async Task AddCommit(Commit commit)
public async Task<SortedSet<Commit>> AddCommit(Commit commit)
Comment thread
myieye marked this conversation as resolved.
{
_dbContext.Add(commit);
var updatedCommits = await AddNewCommits([commit]);
await _dbContext.SaveChangesAsync();
return updatedCommits;
}

public async Task AddCommits(IEnumerable<Commit> commits, bool save = true)
public async Task<SortedSet<Commit>> AddCommits(IEnumerable<Commit> commits, bool save = true)
{
_dbContext.AddRange(commits);
var updatedCommits = await AddNewCommits(commits);
if (save) await _dbContext.SaveChangesAsync();
return updatedCommits;
}
Comment thread
myieye marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

private async Task<SortedSet<Commit>> AddNewCommits(IEnumerable<Commit> newCommits)
{
if (newCommits is null || !newCommits.Any()) return [];
var oldestAddedCommit = newCommits.MinBy(c => c.CompareKey)
?? throw new ArgumentException("Couldn't find oldest commit", nameof(newCommits));
var parentCommit = await FindPreviousCommit(oldestAddedCommit);
var existingCommitsToUpdate = await GetCommitsAfter(parentCommit);
var commitsToApply = existingCommitsToUpdate
.UnionBy(newCommits, c => c.Id)
.ToSortedSet();
//we're inserting commits in the past/rewriting history, so we need to update the previous commit hashes
await UpdateCommitHashes(commitsToApply, parentCommit);
_dbContext.AddRange(newCommits);
return commitsToApply;
}
Comment thread
myieye marked this conversation as resolved.

private async Task UpdateCommitHashes(SortedSet<Commit> commits, Commit? parentCommit = null)
{
var previousCommitHash = parentCommit?.Hash ?? CommitBase.NullParentHash;
foreach (var commit in commits)
{
commit.SetParentHash(previousCommitHash);
previousCommitHash = commit.Hash;
}
await _dbContext.SaveChangesAsync();
}

public HybridDateTime? GetLatestDateTime()
Expand Down
22 changes: 6 additions & 16 deletions src/SIL.Harmony/SnapshotWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnapshots(
Dictionary<Guid, ObjectSnapshot> snapshots,
CrdtRepository crdtRepository,
ICollection<Commit> commits,
SortedSet<Commit> commits,
CrdtConfig crdtConfig)
{
//we need to pass in the snapshots because we expect it to be modified, this is intended.
//if the constructor makes a copy in the future this will need to be updated
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits, false, null);
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits);
return snapshots;
}

Expand All @@ -49,32 +49,22 @@ internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
{
}

public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits)
public async Task UpdateSnapshots(SortedSet<Commit> commits)
{
var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit);
var commits = await _crdtRepository.GetCommitsAfter(previousCommit);
await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash);

await ApplyCommitChanges(commits);
await _crdtRepository.AddSnapshots([
.._rootSnapshots.Values,
.._newIntermediateSnapshots,
.._pendingSnapshots.Values
]);
}

private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool updateCommitHash, string? previousCommitHash)
private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
{
var intermediateSnapshots = new Dictionary<Guid, ObjectSnapshot>();
var commitIndex = 0;
foreach (var commit in commits.DefaultOrder())
foreach (var commit in commits)
{
if (updateCommitHash && previousCommitHash is not null)
{
//we're rewriting history, so we need to update the previous commit hash
commit.SetParentHash(previousCommitHash);
Comment thread
myieye marked this conversation as resolved.
}

previousCommitHash = commit.Hash;
commitIndex++;
foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index))
{
Expand Down