-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathDatastore.swift
More file actions
1684 lines (1519 loc) · 83.1 KB
/
Datastore.swift
File metadata and controls
1684 lines (1519 loc) · 83.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// Datastore.swift
// CodableDatastore
//
// Created by Dimitri Bouniol on 2023-05-10.
// Copyright © 2023-24 Mochi Development, Inc. All rights reserved.
//
#if canImport(Darwin)
import Foundation
#else
@preconcurrency import Foundation
#endif
/// A store for a homogenous collection of instances.
public actor Datastore<Format: DatastoreFormat, AccessMode: _AccessMode>: Sendable {
/// A type representing the version of the datastore within the persistence.
///
/// - SeeAlso: ``DatastoreFormat/Version``
public typealias Version = Format.Version
/// The instance type to use when persisting and loading values from the datastore.
///
/// - SeeAlso: ``DatastoreFormat/Instance``
public typealias InstanceType = Format.Instance
/// The identifier to be used when de-duplicating instances saved in the persistence.
///
/// - SeeAlso: ``DatastoreFormat/Identifier``
public typealias IdentifierType = Format.Identifier
let persistence: any Persistence
let format: Format
let key: DatastoreKey
let version: Version
let encoder: @Sendable (_ instance: InstanceType) async throws -> Data
let decoders: [Version: @Sendable (_ data: Data) async throws -> (id: IdentifierType, instance: InstanceType)]
let indexRepresentations: [AnyIndexRepresentation<InstanceType> : GeneratedIndexRepresentation<InstanceType>]
var updatedDescriptor: DatastoreDescriptor?
fileprivate var warmupStatus: TaskStatus<Progress> = .waiting
fileprivate var warmupProgressHandlers: [ProgressHandler] = []
fileprivate var storeMigrationStatus: TaskStatus<Progress> = .waiting
fileprivate var storeMigrationProgressHandlers: [ProgressHandler] = []
fileprivate var indexMigrationStatus: [AnyIndexRepresentation<InstanceType> : TaskStatus<Progress>] = [:]
fileprivate var indexMigrationProgressHandlers: [AnyIndexRepresentation<InstanceType> : ProgressHandler] = [:]
public init(
persistence: some Persistence<AccessMode>,
format: Format.Type = Format.self,
key: DatastoreKey = Format.defaultKey,
version: Version = Format.currentVersion,
encoder: @Sendable @escaping (_ instance: InstanceType) async throws -> Data,
decoders: [Version: @Sendable (_ data: Data) async throws -> (id: IdentifierType, instance: InstanceType)],
configuration: Configuration = .init()
) where AccessMode == ReadWrite {
self.persistence = persistence
let format = Format()
self.format = format
self.indexRepresentations = format.generateIndexRepresentations(assertIdentifiable: true)
self.key = key
self.version = version
self.encoder = encoder
self.decoders = decoders
var usedIndexNames: Set<IndexName> = []
for (indexKey, indexRepresentation) in indexRepresentations {
assert(!usedIndexNames.contains(indexRepresentation.indexName), "Index \"\(indexRepresentation.indexName.rawValue)\" (\(indexRepresentation.index.indexType.rawValue)) was used more than once, which will lead to undefined behavior on every run. Please make sure \(String(describing: Format.self)) only declares a single index for \"\(indexRepresentation.indexName.rawValue)\".")
usedIndexNames.insert(indexRepresentation.indexName)
assert(indexKey == AnyIndexRepresentation(indexRepresentation: indexRepresentation.index), "The key returned for index \"\(indexRepresentation.indexName.rawValue)\" does not match the generated representation. Please double check to make sure that these values are aligned!")
}
for decoderVersion in Version.allCases {
guard decoders[decoderVersion] == nil else { continue }
assertionFailure("Decoders missing case for \(decoderVersion). Please make sure you have a decoder configured for this version or you may encounter errors at runtime.")
}
}
public init(
persistence: some Persistence,
format: Format.Type = Format.self,
key: DatastoreKey = Format.defaultKey,
version: Version = Format.currentVersion,
decoders: [Version: @Sendable (_ data: Data) async throws -> (id: IdentifierType, instance: InstanceType)],
configuration: Configuration = .init()
) where AccessMode == ReadOnly {
self.persistence = persistence
let format = Format()
self.format = format
self.indexRepresentations = format.generateIndexRepresentations(assertIdentifiable: true)
self.key = key
self.version = version
self.encoder = { _ in preconditionFailure("Encode called on read-only instance.") }
self.decoders = decoders
var usedIndexNames: Set<IndexName> = []
for (indexKey, indexRepresentation) in indexRepresentations {
assert(!usedIndexNames.contains(indexRepresentation.indexName), "Index \"\(indexRepresentation.indexName.rawValue)\" (\(indexRepresentation.index.indexType.rawValue)) was used more than once, which will lead to undefined behavior on every run. Please make sure \(String(describing: Format.self)) only declares a single index for \"\(indexRepresentation.indexName.rawValue)\".")
usedIndexNames.insert(indexRepresentation.indexName)
assert(indexKey == AnyIndexRepresentation(indexRepresentation: indexRepresentation.index), "The key returned for index \"\(indexRepresentation.indexName.rawValue)\" does not match the generated representation. Please double check to make sure that these values are aligned!")
}
for decoderVersion in Version.allCases {
guard decoders[decoderVersion] == nil else { continue }
assertionFailure("Decoders missing case for \(decoderVersion). Please make sure you have a decoder configured for this version or you may encounter errors at runtime.")
}
}
}
// MARK: - Helper Methods
extension Datastore {
func generateUpdatedDescriptor() throws -> DatastoreDescriptor {
if let updatedDescriptor {
return updatedDescriptor
}
let descriptor = try DatastoreDescriptor(
format: format,
version: version
)
updatedDescriptor = descriptor
return descriptor
}
func decoder(for version: Version) throws -> @Sendable (_ data: Data) async throws -> (id: IdentifierType, instance: InstanceType) {
guard let decoder = decoders[version] else {
throw DatastoreError.missingDecoder(version: String(describing: version))
}
return decoder
}
}
// MARK: - Warmup
extension Datastore {
/// Migrates and warms the data store ahead of time.
///
/// It is recommended you call this method before accessing any data, as it will offer you an opportunity to show a loading screen during potentially long migrations, rather than leaving it for the first read or write on the data store.
///
/// - Parameter progressHandler: A closure that will be regularly called with progress during the migration. If no migration needs to occur, it won't be called, so setup and tear down any UI within the handler.
public func warm(progressHandler: ProgressHandler? = nil) async throws {
try await warmupIfNeeded(progressHandler: progressHandler)
}
@discardableResult
func warmupIfNeeded(
@_inheritActorContext progressHandler: ProgressHandler? = nil
) async throws -> Progress {
switch warmupStatus {
case .complete(let value): return value
case .inProgress(let task):
if let progressHandler {
warmupProgressHandlers.append(progressHandler)
}
return try await task.value
case .waiting:
if let progressHandler {
warmupProgressHandlers.append(progressHandler)
}
let warmupTask = Task {
try await persistence._withTransaction(
actionName: "Migrate \(key) Instances",
options: []
) { transaction, _ in
try await self.registerAndMigrate(with: transaction)
}
}
warmupStatus = .inProgress(warmupTask)
return try await warmupTask.value
}
}
func registerAndMigrate(with transaction: DatastoreInterfaceProtocol) async throws -> Progress {
let persistedDescriptor = try await transaction.register(datastore: self)
/// Only operate on read-write datastores beyond this point.
guard let self = self as? Datastore<Format, ReadWrite>
else { return .complete(total: 0) }
/// Make sure we have a descriptor, and that there is at least one entry, otherwise stop here.
guard let persistedDescriptor, persistedDescriptor.size > 0
else { return .complete(total: 0) }
/// Check the version to see if the current one is greater or equal to the one in the existing descriptor. If we can't decode it, stop here and throw an error — the data store is unsupported.
let persistedVersion = try Version(persistedDescriptor.version)
guard persistedVersion.rawValue <= version.rawValue
else { throw DatastoreError.incompatibleVersion(version: String(describing: persistedVersion)) }
/// Notify progress handlers we are evaluating for possible migrations.
for handler in warmupProgressHandlers {
await handler(.evaluating)
}
/// Grab an up-to-date descriptor and check the indexes against it
let updatedDescriptor = try generateUpdatedDescriptor()
var rebuildPrimaryIndex = false
var directIndexesToBuild: Set<IndexName> = []
var secondaryIndexesToBuild: Set<IndexName> = []
var index = 0
/// Check the primary index for compatibility.
if persistedDescriptor.identifierType != updatedDescriptor.identifierType {
try await transaction.resetPrimaryIndex(datastoreKey: key)
rebuildPrimaryIndex = true
}
/// Check existing direct indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.directIndexes {
if let updatedIndex = updatedDescriptor.directIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible by checking their types, or checking if the primary index must be re-built.
if persistedIndex.type != updatedIndex.type || rebuildPrimaryIndex {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
directIndexesToBuild.insert(persistedIndex.name)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteDirectIndex(indexName: persistedIndex.name, datastoreKey: key)
}
}
/// Check for new direct indexes to build
for (_, updatedIndex) in updatedDescriptor.directIndexes {
guard persistedDescriptor.directIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
directIndexesToBuild.insert(updatedIndex.name)
}
/// Check existing reference indexes for compatibility
for (_, persistedIndex) in persistedDescriptor.referenceIndexes {
if let updatedIndex = updatedDescriptor.referenceIndexes[persistedIndex.name] {
/// If the index still exists, make sure it is compatible
if persistedIndex.type != updatedIndex.type {
/// They were not compatible, so delete the bad index, and queue it to be re-built.
try await transaction.deleteSecondaryIndex(indexName: persistedIndex.name, datastoreKey: key)
secondaryIndexesToBuild.insert(persistedIndex.name)
}
} else {
/// The index is no longer needed, delete it.
try await transaction.deleteSecondaryIndex(indexName: persistedIndex.name, datastoreKey: key)
}
}
/// Check for new reference indexes to build
for (_, updatedIndex) in updatedDescriptor.referenceIndexes {
guard persistedDescriptor.referenceIndexes[updatedIndex.name] == nil else { continue }
/// The index does not yet exist, so queue it to be built.
secondaryIndexesToBuild.insert(updatedIndex.name)
}
/// Remove any direct indexes from the secondary ones we may have requested.
secondaryIndexesToBuild.subtract(directIndexesToBuild)
/// Only perform work if we need to rebuild anything.
if rebuildPrimaryIndex || !directIndexesToBuild.isEmpty || !secondaryIndexesToBuild.isEmpty {
/// Create any missing indexes and prime the datastore for writing.
try await transaction.apply(descriptor: updatedDescriptor, for: key)
let primaryIndex = _load(range: IndexRange(), order: .ascending, awaitWarmup: false)
let versionData = try Data(self.version)
for try await (identifier, instance) in primaryIndex {
defer { index += 1 }
/// Notify progress handlers we are starting an entry.
for handler in warmupProgressHandlers {
await handler(.working(current: index, total: persistedDescriptor.size))
}
let instanceData = try await encoder(instance)
if rebuildPrimaryIndex {
let insertionCursor = try await transaction.primaryIndexCursor(inserting: identifier, datastoreKey: key)
try await transaction.persistPrimaryIndexEntry(
versionData: versionData,
identifierValue: identifier,
instanceData: instanceData,
cursor: insertionCursor,
datastoreKey: key
)
}
var queriedIndexes: Set<IndexName> = []
for (_, generatedRepresentation) in indexRepresentations {
let indexName = generatedRepresentation.indexName
switch generatedRepresentation.storage {
case .direct:
guard
directIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { continue }
queriedIndexes.insert(indexName)
for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.directIndexCursor(
inserting: updatedValue.indexed,
identifier: identifier,
indexName: indexName,
datastoreKey: key
)
/// Insert it.
try await transaction.persistDirectIndexEntry(
versionData: versionData,
indexValue: updatedValue.indexed,
identifierValue: identifier,
instanceData: instanceData,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: key
)
}
case .reference:
guard
secondaryIndexesToBuild.contains(indexName),
!queriedIndexes.contains(indexName)
else { continue }
queriedIndexes.insert(indexName)
for updatedValue in instance[index: generatedRepresentation.index] {
/// Grab a cursor to insert the new value in the index.
let updatedValueCursor = try await transaction.secondaryIndexCursor(
inserting: updatedValue.indexed,
identifier: identifier,
indexName: indexName,
datastoreKey: self.key
)
/// Insert it.
try await transaction.persistSecondaryIndexEntry(
indexValue: updatedValue.indexed,
identifierValue: identifier,
cursor: updatedValueCursor,
indexName: indexName,
datastoreKey: self.key
)
}
}
}
}
}
let completeProgress = Progress.complete(total: persistedDescriptor.size)
for handler in warmupProgressHandlers {
await handler(completeProgress)
}
warmupProgressHandlers.removeAll()
warmupStatus = .complete(completeProgress)
return completeProgress
}
}
// MARK: - Migrations
extension Datastore where AccessMode == ReadWrite {
/// Force a full migration of an index if the version persisted is less than the specified minimum version.
///
/// Only use this if you must force an index to be re-calculated, which is sometimes necessary when the implementation of the compare method changes between releases.
///
/// - Parameters:
/// - index: The index to migrate.
/// - minimumVersion: The minimum valid version for an index to not be migrated.
/// - progressHandler: A closure that will be regularly called with progress during the migration. If no migration needs to occur, it won't be called, so setup and tear down any UI within the handler.
public func migrate<Index: IndexRepresentation<InstanceType>>(
index: KeyPath<Format, Index>,
ifLessThan minimumVersion: Version,
progressHandler: ProgressHandler? = nil
) async throws {
let indexRepresentation = AnyIndexRepresentation(indexRepresentation: self.format[keyPath: index])
try await persistence._withTransaction(
actionName: "Migrate \(key) Instances",
options: []
) { transaction, _ in
guard
/// If we have no descriptor, then no data exists to be migrated.
let descriptor = try await transaction.datastoreDescriptor(for: self.key),
descriptor.size > 0,
/// If we didn't declare the index, we can't do anything. This is likely an error only encountered to self-implementers of ``DatastoreFormat``'s ``DatastoreFormat/generateIndexRepresentations``.
let declaredIndex = self.indexRepresentations[indexRepresentation],
/// If we don't have an index stored, there is nothing to do here. This means we can skip checking it on the type.
let matchingDescriptor =
descriptor.directIndexes[declaredIndex.indexName.rawValue] ?? descriptor.referenceIndexes[declaredIndex.indexName.rawValue],
/// We don't care in this method of the version is incompatible — the index will be discarded.
let version = try? Version(matchingDescriptor.version),
/// Make sure the stored version is smaller than the one we require, otherwise stop early.
version.rawValue < minimumVersion.rawValue
else { return }
let warmUpProgress = try await self.warmupIfNeeded { progress in
await progressHandler?(progress.adding(current: 0, total: descriptor.size))
}
/// Make sure we still need to do the work, as the warm up may have made changes anyways due to incompatible types.
guard
/// If we have no descriptor, then no data exists to be migrated.
let descriptor = try await transaction.datastoreDescriptor(for: self.key),
descriptor.size > 0,
/// If we didn't declare the index, we can't do anything. This is likely an error only encountered to self-implementers of ``DatastoreFormat``'s ``DatastoreFormat/generateIndexRepresentations``.
let declaredIndex = self.indexRepresentations[indexRepresentation],
/// If we don't have an index stored, there is nothing to do here. This means we can skip checking it on the type.
let matchingDescriptor =
descriptor.directIndexes[declaredIndex.indexName.rawValue] ?? descriptor.referenceIndexes[declaredIndex.indexName.rawValue],
/// We don't care in this method of the version is incompatible — the index will be discarded.
let version = try? Version(matchingDescriptor.version),
/// Make sure the stored version is smaller than the one we require, otherwise stop early.
version.rawValue < minimumVersion.rawValue
else {
await progressHandler?(warmUpProgress.adding(current: descriptor.size, total: descriptor.size))
return
}
try await self.migrate(index: index) { migrateProgress in
await progressHandler?(warmUpProgress.adding(migrateProgress))
}
}
}
func migrate<Index: IndexRepresentation<InstanceType>>(index: KeyPath<Format, Index>, progressHandler: ProgressHandler? = nil) async throws {
// TODO: Migrate just that index, use indexMigrationStatus and indexMigrationProgressHandlers to record progress.
}
/// Manually migrate the entire store if the primary index version persisted is less than a given minimum version.
///
/// Only use this if you must force the entire store to be re-calculated, which is sometimes necessary when the implementation of the `IdentifierType`'s compare method changes between releases.
///
/// - Parameters:
/// - minimumVersion: The minimum valid version for an index to not be migrated.
/// - progressHandler: A closure that will be regularly called with progress during the migration. If no migration needs to occur, it won't be called, so setup and tear down any UI within the handler.
public func migrateEntireStore(ifLessThan minimumVersion: Version, progressHandler: ProgressHandler? = nil) async throws {
// TODO: Like the method above, check the description to see if a migration is needed
}
func migrateEntireStore(progressHandler: ProgressHandler?) async throws {
// TODO: Migrate all indexes, use storeMigrationStatus and storeMigrationProgressHandlers to record progress.
}
}
// MARK: - Loading
extension Datastore {
/// The number of objects in the datastore.
///
/// - Note: This count may not reflect an up to date value while instances are being written concurrently, but will be acurate after such a transaction finishes.
public var count: Int {
get async throws {
try await warmupIfNeeded()
return try await persistence._withTransaction(
actionName: "Check \(key) Count",
options: [.idempotent, .readOnly]
) { transaction, _ in
let descriptor = try await transaction.datastoreDescriptor(for: self.key)
return descriptor?.size ?? 0
}
}
}
/// Load an instance with a given identifier, or return `nil` if one is not found.
///
/// - Parameter identifier: The identifier of the instance to load.
/// - Returns: The instance keyed to the identifier, or `nil` if none are found.
public func load(id identifier: IdentifierType) async throws -> InstanceType? {
try await warmupIfNeeded()
return try await persistence._withTransaction(
actionName: "Load \(key) ID",
options: [.idempotent, .readOnly]
) { transaction, _ in
do {
let persistedEntry = try await transaction.primaryIndexCursor(for: identifier, datastoreKey: self.key)
let entryVersion = try Version(persistedEntry.versionData)
let decoder = try self.decoder(for: entryVersion)
let instance = try await decoder(persistedEntry.instanceData).instance
return instance
} catch DatastoreInterfaceError.instanceNotFound {
return nil
} catch DatastoreInterfaceError.datastoreKeyNotFound {
/// There isn't a datastore yet, so no entries would exist either.
return nil
} catch {
throw error
}
}
}
/// **[Elided Form]** Load an instance with a given identifier, or return `nil` if one is not found.
///
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(id:)`` instead for better completion support, type inference, and indentation.
/// - Parameter identifier: The identifier of the instance to load.
/// - Returns: The instance keyed to the identifier, or `nil` if none are found.
@inlinable
public func load(_ identifier: IdentifierType) async throws -> InstanceType? {
try await load(id: identifier)
}
/// **Internal:** Load a range of instances from a datastore based on the identifier range passed in as an async sequence.
///
/// - Parameters:
/// - identifierRange: The range to load.
/// - order: The order to process instances in.
/// - awaitWarmup: Whether the sequence should await warmup or jump right into loading.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
nonisolated func _load(
range identifierRange: some IndexRangeExpression<IdentifierType>,
order: RangeOrder,
awaitWarmup: Bool
) -> some TypedAsyncSequence<(id: IdentifierType, instance: InstanceType)> {
AsyncThrowingBackpressureStream { provider in
if awaitWarmup {
try await self.warmupIfNeeded()
}
try await self.persistence._withTransaction(
actionName: "Load \(self.key) ID Range",
options: [.readOnly]
) { transaction, _ in
do {
try await transaction.primaryIndexScan(
range: identifierRange.applying(order),
datastoreKey: self.key
) { versionData, instanceData in
let entryVersion = try Version(versionData)
let decoder = try await self.decoder(for: entryVersion)
let decodedValue = try await decoder(instanceData)
try await provider.yield(decodedValue)
}
} catch DatastoreInterfaceError.datastoreKeyNotFound {
/// There isn't a datastore yet, so no entries would exist either. Do nothing and let the stream end.
}
}
}
}
/// Load a range of instances from a datastore based on the identifier range passed in as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - identifierRange: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of identifiers.
public nonisolated func load(
range identifierRange: some IndexRangeExpression<IdentifierType>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> where IdentifierType: RangedIndexable {
_load(range: identifierRange, order: order, awaitWarmup: true)
.map { $0.instance }
}
/// **[Elided Form]** Load a range of instances from a datastore based on the identifier range passed in as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(range:order:)-(IndexRangeExpression<IdentifierType>,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - identifierRange: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of identifiers.
@inlinable
public nonisolated func load(
_ identifierRange: some IndexRangeExpression<IdentifierType>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> where IdentifierType: RangedIndexable {
load(range: identifierRange, order: order)
}
/// Load a range of instances from a datastore based on the identifier range passed in as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - identifierRange: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of identifiers.
@_disfavoredOverload
@inlinable
public nonisolated func load(
range identifierRange: IndexRange<IdentifierType>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> where IdentifierType: RangedIndexable {
load(range: identifierRange, order: order)
}
/// **[Elided Form]** Load a range of instances from a datastore based on the identifier range passed in as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(range:order:)-(IndexRange<IdentifierType>,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - identifierRange: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of identifiers.
@_disfavoredOverload
@inlinable
public nonisolated func load(
_ identifierRange: IndexRange<IdentifierType>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> where IdentifierType: RangedIndexable {
load(range: identifierRange, order: order)
}
/// Load all instances in a datastore as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - unboundedRange: The range to load. Specify `...` to load every instance.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing all the instances.
public nonisolated func load(
range unboundedRange: Swift.UnboundedRange,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
_load(range: IndexRange.unbounded, order: order, awaitWarmup: true)
.map { $0.instance }
}
/// **[Elided Form]** Load all instances in a datastore as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(range:order:)-(UnboundedRange,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - unboundedRange: The range to load. Specify `...` to load every instance.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing all the instances.
@inlinable
public nonisolated func load(
_ unboundedRange: Swift.UnboundedRange,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
load(range: ..., order: order)
}
/// **Internal:** Load a range of instances from a given index as an async sequence.
/// - Parameters:
/// - index: The index to load from.
/// - range: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
@usableFromInline
nonisolated func _load<
Index: IndexRepresentation<InstanceType>,
Bound: Indexable
>(
index: KeyPath<Format, Index>,
range: some IndexRangeExpression<Bound>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
let declaredIndex = self.indexRepresentations[AnyIndexRepresentation(indexRepresentation: self.format[keyPath: index])]
return AsyncThrowingBackpressureStream { provider in
guard let declaredIndex
else { throw DatastoreError.missingIndex }
try await self.warmupIfNeeded()
try await self.persistence._withTransaction(
actionName: "Load \(self.key) \(declaredIndex.indexName)",
options: [.readOnly]
) { transaction, _ in
do {
switch declaredIndex.storage {
case .direct:
try await transaction.directIndexScan(
range: range.applying(order),
indexName: declaredIndex.indexName,
datastoreKey: self.key
) { versionData, instanceData in
let entryVersion = try Version(versionData)
let decoder = try await self.decoder(for: entryVersion)
let instance = try await decoder(instanceData).instance
try await provider.yield(instance)
}
case .reference:
try await transaction.secondaryIndexScan(
range: range.applying(order),
indexName: declaredIndex.indexName,
datastoreKey: self.key
) { (identifier: IdentifierType) in
let persistedEntry = try await transaction.primaryIndexCursor(for: identifier, datastoreKey: self.key)
let entryVersion = try Version(persistedEntry.versionData)
let decoder = try await self.decoder(for: entryVersion)
let instance = try await decoder(persistedEntry.instanceData).instance
try await provider.yield(instance)
}
}
} catch DatastoreInterfaceError.datastoreKeyNotFound {
/// There isn't a datastore yet, so no entries would exist either. Do nothing and let the stream end.
}
}
}
}
/// Load all instances with the matching indexed value as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified value, but is much more efficient as an index is already maintained for that value.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - index: The index to load from.
/// - value: The value to match against.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the specified indexed value.
public nonisolated func load<
Value: DiscreteIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
index: KeyPath<Format, Index>,
value: Value,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
_load(index: index, range: IndexRange(only: value), order: order)
}
/// **[Elided Form]** Load all instances with the matching indexed value as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified value, but is much more efficient as an index is already maintained for that value.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(index:value:order:)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - value: The value to match against.
/// - order: The order to process instances in.
/// - index: The index to load from.
/// - Returns: An asynchronous sequence containing the instances matching the specified indexed value.
public nonisolated func load<
Value: DiscreteIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
_ value: Value,
order: RangeOrder = .ascending,
from index: KeyPath<Format, Index>
) -> some TypedAsyncSequence<InstanceType> {
load(index: index, value: value, order: order)
}
/// Load an instance with the matching indexed value, or return `nil` if one is not found.
///
/// This requires either a ``DatastoreFormat/OneToOneIndex`` or ``DatastoreFormat/ManyToOneIndex`` to be declared as the index, and a guarantee on the caller's part that at most only a single instance will match the specified value. If multiple instancess match, the one with the identifier that sorts first will be returned.
/// - Parameters:
/// - index: The index to load from.
/// - value: The value to match against.
/// - Returns: The instance keyed to the specified indexed value, or `nil` if none are found.
public nonisolated func load<
Value: DiscreteIndexable,
Index: SingleInstanceIndexRepresentation<InstanceType, Value>
>(
index: KeyPath<Format, Index>,
value: Value
) async throws -> InstanceType? {
try await _load(index: index, range: IndexRange(only: value)).first(where: { _ in true })
}
/// **[Elided Form]** Load an instance with the matching indexed value, or return `nil` if one is not found.
///
/// This requires either a ``DatastoreFormat/OneToOneIndex`` or ``DatastoreFormat/ManyToOneIndex`` to be declared as the index, and a guarantee on the caller's part that at most only a single instance will match the specified value. If multiple instancess match, the one with the identifier that sorts first will be returned.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(index:value:)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - value: The value to match against.
/// - index: The index to load from.
/// - Returns: The instance keyed to the specified indexed value, or `nil` if none are found.
@inlinable
public nonisolated func load<
Value: DiscreteIndexable,
Index: SingleInstanceIndexRepresentation<InstanceType, Value>
>(
_ value: Value,
from index: KeyPath<Format, Index>
) async throws -> InstanceType? {
try await load(index: index, value: value)
}
/// Load a range of instances from a given index as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified range, but is much more efficient as an index is already maintained for that range of values.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - index: The index to load from.
/// - range: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
public nonisolated func load<
Value: RangedIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
index: KeyPath<Format, Index>,
range: some IndexRangeExpression<Value>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
_load(index: index, range: range, order: order)
}
/// **[Elided Form]** Load a range of instances from a given index as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified range, but is much more efficient as an index is already maintained for that range of values.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(index:range:order:)-(_,IndexRangeExpression<Value>,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - range: The range to load.
/// - order: The order to process instances in.
/// - index: The index to load from.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
@inlinable
public nonisolated func load<
Value: RangedIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
_ range: some IndexRangeExpression<Value>,
order: RangeOrder = .ascending,
from index: KeyPath<Format, Index>
) -> some TypedAsyncSequence<InstanceType> {
load(index: index, range: range, order: order)
}
/// Load a range of instances from a given index as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified range, but is much more efficient as an index is already maintained for that range of values.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Parameters:
/// - index: The index to load from.
/// - range: The range to load.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
@_disfavoredOverload
public nonisolated func load<
Value: RangedIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
index: KeyPath<Format, Index>,
range: IndexRange<Value>,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
_load(index: index, range: range, order: order)
}
/// **[Elided Form]** Load a range of instances from a given index as an async sequence.
///
/// This is conceptually similar to loading all instances and filtering only those who's indexed key path matches the specified range, but is much more efficient as an index is already maintained for that range of values.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(index:range:order:)-(_,IndexRange<Value>,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - range: The range to load.
/// - order: The order to process instances in.
/// - index: The index to load from.
/// - Returns: An asynchronous sequence containing the instances matching the range of values in that sequence.
@_disfavoredOverload
@inlinable
public nonisolated func load<
Value: RangedIndexable,
Index: RetrievableIndexRepresentation<InstanceType, Value>
>(
_ range: IndexRange<Value>,
order: RangeOrder = .ascending,
from index: KeyPath<Format, Index>
) -> some TypedAsyncSequence<InstanceType> {
load(index: index, range: range, order: order)
}
/// Load all instances in a datastore in index order as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
///
/// - Note: If the index is a Many-to-Any type of index, a smaller or larger number of results may be returned here, as some instances may not be respresented in the index, while others are over-represented and may show up multiple times.
/// - Parameters:
/// - index: The index to load from.
/// - unboundedRange: The range to load. Specify `...` to load every instance.
/// - order: The order to process instances in.
/// - Returns: An asynchronous sequence containing all the instances, ordered by the specified index.
public nonisolated func load<Index: IndexRepresentation<InstanceType>>(
index: KeyPath<Format, Index>,
range unboundedRange: Swift.UnboundedRange,
order: RangeOrder = .ascending
) -> some TypedAsyncSequence<InstanceType> {
_load(index: index, range: IndexRange.unbounded, order: order)
}
/// **[Elided Form]** Load all instances in a datastore in index order as an async sequence.
///
/// - Important: The sequence should be consumed at most a single time, ideally within the same transaction it was created in as it holds a reference to that transaction and thus snapshot of the datastore for data consistency.
/// - Note: If the index is a Many-to-Any type of index, a smaller or larger number of results may be returned here, as some instances may not be respresented in the index, while others are over-represented and may show up multiple times.
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``load(index:range:order:)-(_,UnboundedRange,_)`` instead for better completion support, type inference, and indentation.
/// - Parameters:
/// - unboundedRange: The range to load. Specify `...` to load every instance.
/// - order: The order to process instances in.
/// - index: The index to load from.
/// - Returns: An asynchronous sequence containing all the instances, ordered by the specified index.
@inlinable
public nonisolated func load<Index: IndexRepresentation<InstanceType>>(
_ unboundedRange: Swift.UnboundedRange,
order: RangeOrder = .ascending,
from index: KeyPath<Format, Index>
) -> some TypedAsyncSequence<InstanceType> {
load(index: index, range: ..., order: order)
}
}
// MARK: - Observation
extension Datastore {
/// Observe changes made to an instance with the given identifier.
///
/// - Parameter identifier: The identifier of the instance to observe.
/// - Returns: An unbounded asynchronous sequence reporting changes to the observed instance.
public func observe(
id identifier: IdentifierType
) async throws -> some TypedAsyncSequence<ObservedEvent<IdentifierType, InstanceType>> {
try await observe().filter { $0.id == identifier }
}
/// **[Elided Form]** Observe changes made to an instance with the given identifier.
///
/// - SeeAlso: This is form that elides the first argument name instead for better completion support, type inference, and indentation. You may however prefer to use ``observe(id:)`` instead for better completion support, type inference, and indentation.
/// - Parameter identifier: The identifier of the instance to observe.
/// - Returns: An unbounded asynchronous sequence reporting changes to the observed instance.
@inlinable
public func observe(
_ identifier: IdentifierType
) async throws -> some TypedAsyncSequence<ObservedEvent<IdentifierType, InstanceType>> {
try await observe(id: identifier)
}
/// Observe all changes made to a datastore.
///
/// - Returns: An unbounded asynchronous sequence reporting changes to the datastore.
public func observe() async throws -> some TypedAsyncSequence<ObservedEvent<IdentifierType, InstanceType>> {
try await warmupIfNeeded()
return try await persistence._withTransaction(
actionName: "Start \(key) Observations",
options: [.idempotent, .readOnly]
) { transaction, _ in
try await transaction.makeObserver(
identifierType: IdentifierType.self,
datastoreKey: self.key,
bufferingPolicy: .unbounded
)
}.compactMap { event in
try? await event.mapEntries { entry in
let version = try Version(entry.versionData)
let decoder = try await self.decoder(for: version)
let instance = try await decoder(entry.instanceData).instance
return instance
}
}
}
}
// MARK: - Writing
extension Datastore where AccessMode == ReadWrite {
/// Persist an instance for a given identifier.
///
/// If an instance does not already exist for the specified identifier, it will be created. If an instance already exists, it will be updated.
/// - Note: If you instance conforms to Identifiable, it it preferable to use ``persist(_:)`` instead.
/// - Parameters:
/// - instance: The instance to persist.
/// - identifier: The unique identifier to use to reference the item being persisted.
@discardableResult
public func persist(
_ instance: InstanceType,
to identifier: IdentifierType
) async throws -> InstanceType? {
try await warmupIfNeeded()
let updatedDescriptor = try self.generateUpdatedDescriptor()
let versionData = try Data(self.version)
let instanceData = try await self.encoder(instance)
return try await persistence._withTransaction(
actionName: "Persist \(key) Instance",
options: [.idempotent]
) { transaction, _ in
/// Create any missing indexes or prime the datastore for writing.
try await transaction.apply(descriptor: updatedDescriptor, for: self.key)
let existingEntry: (cursor: any InstanceCursorProtocol, instance: InstanceType, versionData: Data, instanceData: Data)? = try await {
do {
let existingEntry = try await transaction.primaryIndexCursor(for: identifier, datastoreKey: self.key)
let existingVersion = try Version(existingEntry.versionData)
let decoder = try self.decoder(for: existingVersion)