-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathCoreService.swift
More file actions
1886 lines (1617 loc) · 78.8 KB
/
CoreService.swift
File metadata and controls
1886 lines (1617 loc) · 78.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import BitkitCore
import Combine
import Foundation
import LDKNode
// MARK: - Activity Service
class ActivityService {
private let coreService: CoreService
private let activitiesChangedSubject = PassthroughSubject<Void, Never>()
var activitiesChangedPublisher: AnyPublisher<Void, Never> {
activitiesChangedSubject.eraseToAnyPublisher()
}
private let metadataChangedSubject = PassthroughSubject<Void, Never>()
var metadataChangedPublisher: AnyPublisher<Void, Never> {
metadataChangedSubject.eraseToAnyPublisher()
}
// MARK: - Constants
/// Maximum address index to search when current address exists
private static let maxAddressSearchIndex: UInt32 = 100_000
// MARK: - BoostTxIds Cache
// Cached set of transaction IDs that appear in boostTxIds (for filtering replaced transactions)
private var cachedTxIdsInBoostTxIds: Set<String> = []
/// Get the set of transaction IDs that appear in boostTxIds (cached for performance)
func getTxIdsInBoostTxIds() async -> Set<String> {
if cachedTxIdsInBoostTxIds.isEmpty {
await refreshBoostTxIdsCache()
}
return cachedTxIdsInBoostTxIds
}
private func updateBoostTxIdsCache(for activity: Activity) {
if case let .onchain(onchain) = activity {
cachedTxIdsInBoostTxIds.formUnion(onchain.boostTxIds)
}
}
private func refreshBoostTxIdsCache() async {
do {
let allOnchainActivities = try await get(filter: .onchain)
var txIds: Set<String> = []
for activity in allOnchainActivities {
if case let .onchain(onchain) = activity {
txIds.formUnion(onchain.boostTxIds)
}
}
await MainActor.run {
self.cachedTxIdsInBoostTxIds = txIds
}
} catch {
Logger.error("Failed to refresh boostTxIds cache: \(error)", context: "ActivityService")
}
}
private func mapToCoreTransactionDetails(txid: String, _ details: LDKNode.TransactionDetails) -> BitkitCore.TransactionDetails {
let inputs = details.inputs.map { input in
BitkitCore.TxInput(
txid: input.txid,
vout: input.vout,
scriptsig: input.scriptsig,
witness: input.witness,
sequence: input.sequence
)
}
let outputs = details.outputs.map { output in
BitkitCore.TxOutput(
scriptpubkey: output.scriptpubkey,
scriptpubkeyType: output.scriptpubkeyType,
scriptpubkeyAddress: output.scriptpubkeyAddress,
value: output.value,
n: output.n
)
}
return BitkitCore.TransactionDetails(
txId: txid,
amountSats: details.amountSats,
inputs: inputs,
outputs: outputs
)
}
private func fetchTransactionDetails(txid: String) async -> BitkitCore.TransactionDetails? {
do {
return try await getTransactionDetails(txid: txid)
} catch {
Logger.warn("Failed to fetch stored transaction details for \(txid): \(error)", context: "ActivityService")
return nil
}
}
func getTransactionDetails(txid: String) async throws -> BitkitCore.TransactionDetails? {
try await ServiceQueue.background(.core) {
try BitkitCore.getTransactionDetails(txId: txid)
}
}
// MARK: - Seen Tracking
func isActivitySeen(id: String) async -> Bool {
do {
if let activity = try await getActivityById(activityId: id) {
switch activity {
case let .onchain(onchain):
return onchain.seenAt != nil
case let .lightning(lightning):
return lightning.seenAt != nil
}
}
} catch {
Logger.error("Failed to check seen status for activity \(id): \(error)", context: "ActivityService")
}
return false
}
func isOnchainActivitySeen(txid: String) async -> Bool {
if let activity = try? await getOnchainActivityByTxId(txid: txid) {
return activity.seenAt != nil
}
return false
}
func markActivityAsSeen(id: String, seenAt: UInt64? = nil) async {
let timestamp = seenAt ?? UInt64(Date().timeIntervalSince1970)
do {
try await ServiceQueue.background(.core) {
try BitkitCore.markActivityAsSeen(activityId: id, seenAt: timestamp)
self.activitiesChangedSubject.send()
}
} catch {
Logger.error("Failed to mark activity \(id) as seen: \(error)", context: "ActivityService")
}
}
func markOnchainActivityAsSeen(txid: String, seenAt: UInt64? = nil) async {
do {
guard let activity = try await getOnchainActivityByTxId(txid: txid) else {
return
}
await markActivityAsSeen(id: activity.id, seenAt: seenAt)
} catch {
Logger.error("Failed to mark onchain activity for \(txid) as seen: \(error)", context: "ActivityService")
}
}
func markAllUnseenActivitiesAsSeen() async {
let timestamp = UInt64(Date().timeIntervalSince1970)
do {
let activities = try await get()
var didMarkAny = false
for activity in activities {
let id: String
let isSeen: Bool
switch activity {
case let .onchain(onchain):
id = onchain.id
isSeen = onchain.seenAt != nil
case let .lightning(lightning):
id = lightning.id
isSeen = lightning.seenAt != nil
}
if !isSeen {
try await ServiceQueue.background(.core) {
try BitkitCore.markActivityAsSeen(activityId: id, seenAt: timestamp)
}
didMarkAny = true
}
}
if didMarkAny {
activitiesChangedSubject.send()
}
} catch {
Logger.error("Failed to mark all activities as seen: \(error)", context: "ActivityService")
}
}
// MARK: - Transaction Status Checks
func wasTransactionReplaced(txid: String) async -> Bool {
// Check if the activity exists and is marked as replaced
if let onchain = try? await getOnchainActivityByTxId(txid: txid),
!onchain.doesExist
{
return true
}
return false
}
func shouldShowReceivedSheet(txid: String, value: UInt64) async -> Bool {
if value == 0 {
return false
}
// Don't show sheet for channel closure transactions (commitment tx)
if await findClosedChannelForTransaction(txid: txid, transactionDetails: nil) != nil {
Logger.info("Skipping received sheet for channel close transaction \(txid)", context: "CoreService.shouldShowReceivedSheet")
return false
}
let onchainActivity = try? await getOnchainActivityByTxId(txid: txid)
// Don't show sheet for transfer transactions (channel open/close)
if let onchainActivity, onchainActivity.isTransfer {
Logger.info("Skipping received sheet for transfer transaction \(txid)", context: "CoreService.shouldShowReceivedSheet")
return false
}
// Don't show sheet for transactions with a channel ID (part of a channel lifecycle)
if let onchainActivity, onchainActivity.channelId != nil {
Logger.info("Skipping received sheet for channel-related transaction \(txid)", context: "CoreService.shouldShowReceivedSheet")
return false
}
if let onchainActivity, onchainActivity.seenAt != nil {
return false
}
// If this is a replacement transaction with same value as original, skip the sheet
if let boostTxIds = onchainActivity?.boostTxIds, !boostTxIds.isEmpty {
for replacedTxid in boostTxIds {
if let replaced = try? await getOnchainActivityByTxId(txid: replacedTxid),
replaced.value == value
{
Logger.info(
"Skipping received sheet for replacement transaction \(txid) with same value as replaced transaction \(replacedTxid)",
context: "CoreService.shouldShowReceivedSheet"
)
return false
}
}
}
return true
}
func isReceivedTransaction(txid: String) async -> Bool {
guard let payments = LightningService.shared.payments,
let payment = payments.first(where: { payment in
if case let .onchain(paymentTxid, _) = payment.kind {
return paymentTxid == txid
}
return false
})
else { return false }
return payment.direction == .inbound
}
/// Get doesExist status for boostTxIds to determine RBF vs CPFP. RBF transactions have doesExist = false (replaced), CPFP transactions have
/// doesExist = true (child transactions).
func getBoostTxDoesExist(boostTxIds: [String]) async -> [String: Bool] {
var doesExistMap: [String: Bool] = [:]
for boostTxId in boostTxIds {
if let boostActivity = try? await getOnchainActivityByTxId(txid: boostTxId) {
doesExistMap[boostTxId] = boostActivity.doesExist
}
}
return doesExistMap
}
func isCpfpChildTransaction(txId: String) async -> Bool {
guard await getTxIdsInBoostTxIds().contains(txId),
let activity = try? await getOnchainActivityByTxId(txid: txId)
else {
return false
}
return activity.doesExist && !activity.isBoosted
}
init(coreService: CoreService) {
self.coreService = coreService
}
func removeAll() async throws {
try await ServiceQueue.background(.core) {
// Get all activities and delete them one by one
let activities = try getActivities(
filter: .all, txType: nil, tags: nil, search: nil, minDate: nil, maxDate: nil, limit: nil, sortDirection: nil
)
for activity in activities {
let id: String = switch activity {
case let .lightning(ln): ln.id
case let .onchain(on): on.id
}
_ = try deleteActivityById(activityId: id)
}
// Clear cache since all activities are deleted
self.cachedTxIdsInBoostTxIds.removeAll()
self.activitiesChangedSubject.send()
}
}
func insert(_ activity: Activity) async throws {
try await ServiceQueue.background(.core) {
try insertActivity(activity: activity)
self.updateBoostTxIdsCache(for: activity)
self.activitiesChangedSubject.send()
}
}
func upsertList(_ activities: [Activity]) async throws {
try await ServiceQueue.background(.core) {
try upsertActivities(activities: activities)
await self.refreshBoostTxIdsCache()
}
}
func closedChannels(sortDirection: SortDirection = .asc) async throws -> [ClosedChannelDetails] {
try await ServiceQueue.background(.core) {
try getAllClosedChannels(sortDirection: sortDirection)
}
}
func upsertClosedChannelList(_ closedChannels: [ClosedChannelDetails]) async throws {
try await ServiceQueue.background(.core) {
try upsertClosedChannels(channels: closedChannels)
}
}
func upsertClosedChannel(_ closedChannel: ClosedChannelDetails) async throws {
try await ServiceQueue.background(.core) {
try BitkitCore.upsertClosedChannel(channel: closedChannel)
}
}
// MARK: - Payment Processing
private func processOnchainPayment(
_ payment: PaymentDetails,
transactionDetails: BitkitCore.TransactionDetails? = nil
) async throws {
guard case let .onchain(txid, _) = payment.kind else { return }
let paymentTimestamp = payment.latestUpdateTimestamp
// Look for existing activity by id first, then by txid (for migrated activities)
var existingActivity = try getActivityById(activityId: payment.id)
if existingActivity == nil {
existingActivity = try BitkitCore.getActivityByTxId(txId: txid).map { .onchain($0) }
}
// Skip if existing activity has newer timestamp to avoid overwriting local data
if let existingActivity, case let .onchain(existing) = existingActivity {
let existingUpdatedAt = existing.updatedAt ?? 0
if existingUpdatedAt > paymentTimestamp {
return
}
}
// Determine confirmation status from payment's txStatus
var blockTimestamp: UInt64?
let isConfirmed: Bool
if case let .onchain(_, txStatus) = payment.kind,
case let .confirmed(_, _, bts) = txStatus
{
isConfirmed = true
blockTimestamp = bts
} else {
isConfirmed = false
}
// Extract existing activity data
let existingOnchain: OnchainActivity? = {
if let existingActivity, case let .onchain(existing) = existingActivity {
return existing
}
return nil
}()
let isBoosted = existingOnchain?.isBoosted ?? false
let boostTxIds = existingOnchain?.boostTxIds ?? []
var isTransfer = existingOnchain?.isTransfer ?? false
var channelId = existingOnchain?.channelId
let transferTxId = existingOnchain?.transferTxId
let feeRate = existingOnchain?.feeRate ?? 1
let preservedAddress = existingOnchain?.address ?? "Loading..."
let doesExist = existingOnchain?.doesExist ?? true
let seenAt = existingOnchain?.seenAt
let ldkValue = payment.amountSats ?? 0
let value: UInt64 = if let existingValue = existingOnchain?.value, existingValue > ldkValue {
existingValue
} else {
ldkValue
}
// Check if this transaction is a channel transfer
if channelId == nil || !isTransfer {
let foundChannelId = await findChannelForTransaction(
txid: txid,
direction: payment.direction,
transactionDetails: transactionDetails
)
if let foundChannelId {
channelId = foundChannelId
isTransfer = true
}
}
// Find receiving address for inbound transactions
var address = preservedAddress
if payment.direction == .inbound {
do {
if let foundAddress = try await findReceivingAddress(
for: txid,
value: value,
transactionDetails: transactionDetails
) {
address = foundAddress
}
} catch {
Logger.error("Failed to find address for txid \(txid): \(error)", context: "CoreService.processOnchainPayment")
}
}
// Build and save the activity
let finalDoesExist = isConfirmed ? true : doesExist
let activityTimestamp: UInt64 = {
let baseTimestamp = existingOnchain?.timestamp ?? paymentTimestamp
if let bts = blockTimestamp, bts < baseTimestamp {
return bts
}
return baseTimestamp
}()
let onchain = OnchainActivity(
id: payment.id,
txType: payment.direction == .outbound ? .sent : .received,
txId: txid,
value: value,
fee: (payment.feePaidMsat ?? 0) / 1000,
feeRate: feeRate,
address: address,
confirmed: isConfirmed,
timestamp: activityTimestamp,
isBoosted: isBoosted,
boostTxIds: boostTxIds,
isTransfer: isTransfer,
doesExist: finalDoesExist,
confirmTimestamp: blockTimestamp,
channelId: channelId,
transferTxId: transferTxId,
createdAt: UInt64(payment.creationTime.timeIntervalSince1970),
updatedAt: paymentTimestamp,
seenAt: seenAt
)
if let existingActivity, case let .onchain(existing) = existingActivity {
try await update(id: existing.id, activity: .onchain(onchain))
} else {
try await upsert(.onchain(onchain))
}
}
// MARK: - Onchain Event Handlers
private func processOnchainTransaction(txid: String, details: BitkitCore.TransactionDetails, context: String) async throws {
guard let payments = LightningService.shared.payments else {
Logger.warn("No payments available for transaction \(txid)", context: context)
return
}
guard let payment = payments.first(where: { payment in
if case let .onchain(paymentTxid, _) = payment.kind {
return paymentTxid == txid
}
return false
}) else {
Logger.warn("Payment not found for transaction \(txid) - LDK should have updated payment store before emitting event", context: context)
return
}
try await processOnchainPayment(payment, transactionDetails: details)
}
func handleOnchainTransactionReceived(txid: String, details: LDKNode.TransactionDetails) async throws {
let coreDetails = mapToCoreTransactionDetails(txid: txid, details)
try await ServiceQueue.background(.core) {
try BitkitCore.upsertTransactionDetails(detailsList: [coreDetails])
try await self.processOnchainTransaction(txid: txid, details: coreDetails, context: "CoreService.handleOnchainTransactionReceived")
}
}
func handleOnchainTransactionConfirmed(txid: String, details: LDKNode.TransactionDetails) async throws {
let coreDetails = mapToCoreTransactionDetails(txid: txid, details)
try await ServiceQueue.background(.core) {
try BitkitCore.upsertTransactionDetails(detailsList: [coreDetails])
try await self.processOnchainTransaction(txid: txid, details: coreDetails, context: "CoreService.handleOnchainTransactionConfirmed")
}
}
func handleOnchainTransactionReplaced(txid: String, conflicts: [String]) async throws {
try await ServiceQueue.background(.core) {
// Find the activity for the replaced transaction
let replacedActivity = try await self.getOnchainActivityByTxId(txid: txid)
if var existing = replacedActivity {
Logger.info(
"Transaction \(txid) replaced by \(conflicts.count) conflict(s): \(conflicts.joined(separator: ", "))",
context: "CoreService.handleOnchainTransactionReplaced"
)
// Mark the replaced transaction as not existing
existing.doesExist = false
existing.isBoosted = false
existing.updatedAt = UInt64(Date().timeIntervalSince1970)
try await self.update(id: existing.id, activity: .onchain(existing))
Logger.info("Marked transaction \(txid) as replaced", context: "CoreService.handleOnchainTransactionReplaced")
} else {
Logger.info(
"Activity not found for replaced transaction \(txid) - will be created when transaction is processed",
context: "CoreService.handleOnchainTransactionReplaced"
)
}
// For each replacement transaction, update its boostTxIds to include the replaced txid
for conflictTxid in conflicts {
// Try to get the replacement activity, or process it if it doesn't exist
var replacementActivity = try? await self.getOnchainActivityByTxId(txid: conflictTxid)
if replacementActivity == nil,
let payments = LightningService.shared.payments,
let replacementPayment = payments.first(where: { payment in
if case let .onchain(paymentTxid, _) = payment.kind {
return paymentTxid == conflictTxid
}
return false
})
{
Logger.info(
"Processing replacement transaction \(conflictTxid) that was already in payments list",
context: "CoreService.handleOnchainTransactionReplaced"
)
do {
try await self.processOnchainPayment(replacementPayment, transactionDetails: nil)
replacementActivity = try? await self.getOnchainActivityByTxId(txid: conflictTxid)
} catch {
Logger.error(
"Failed to process replacement transaction \(conflictTxid): \(error)",
context: "CoreService.handleOnchainTransactionReplaced"
)
continue
}
}
// Update the replacement transaction's boostTxIds to include the replaced txid
if var activity = replacementActivity,
!activity.boostTxIds.contains(txid)
{
activity.boostTxIds.append(txid)
activity.isBoosted = true
activity.updatedAt = UInt64(Date().timeIntervalSince1970)
try await self.update(id: activity.id, activity: .onchain(activity))
// Move tags from the replaced transaction
if let replacedActivity {
do {
let replacedTags = try await self.tags(forActivity: replacedActivity.id)
if !replacedTags.isEmpty {
try await self.appendTags(toActivity: activity.id, replacedTags)
}
} catch {
Logger.error(
"Failed to copy tags from replaced transaction \(txid) to replacement transaction \(conflictTxid): \(error)",
context: "CoreService.handleOnchainTransactionReplaced"
)
}
}
Logger.info(
"Updated replacement transaction \(conflictTxid) with boostTxId \(txid)",
context: "CoreService.handleOnchainTransactionReplaced"
)
}
}
self.activitiesChangedSubject.send()
}
}
func handleOnchainTransactionReorged(txid: String) async throws {
try await ServiceQueue.background(.core) {
guard var onchain = try await self.getOnchainActivityByTxId(txid: txid) else {
Logger.warn("Activity not found for reorged transaction \(txid)", context: "CoreService.handleOnchainTransactionReorged")
return
}
onchain.confirmed = false
onchain.confirmTimestamp = nil
onchain.updatedAt = UInt64(Date().timeIntervalSince1970)
try await self.update(id: onchain.id, activity: .onchain(onchain))
}
}
func handleOnchainTransactionEvicted(txid: String) async throws {
try await ServiceQueue.background(.core) {
guard var onchain = try await self.getOnchainActivityByTxId(txid: txid) else {
Logger.warn("Activity not found for evicted transaction \(txid)", context: "CoreService.handleOnchainTransactionEvicted")
return
}
onchain.doesExist = false
onchain.updatedAt = UInt64(Date().timeIntervalSince1970)
try await self.update(id: onchain.id, activity: .onchain(onchain))
}
}
// MARK: - Lightning Event Handlers
/// Handle a single payment event by processing the specific payment
func handlePaymentEvent(paymentHash: String) async throws {
try await ServiceQueue.background(.core) {
guard let payments = LightningService.shared.payments else {
Logger.warn("No payments available for hash \(paymentHash)", context: "CoreService.handlePaymentEvent")
return
}
if let payment = payments.first(where: { $0.id == paymentHash }) {
try await self.processLightningPayment(payment)
} else {
Logger.info("Payment not found for hash \(paymentHash) - syncing all payments", context: "CoreService.handlePaymentEvent")
try await self.syncLdkNodePayments(payments)
}
}
}
private func processLightningPayment(_ payment: PaymentDetails) async throws {
guard case let .bolt11(hash, preimage, secret, description, bolt11) = payment.kind else { return }
// Skip pending inbound payments - just means they created an invoice
guard !(payment.status == .pending && payment.direction == .inbound) else { return }
let paymentTimestamp = UInt64(payment.latestUpdateTimestamp)
let existingActivity = try getActivityById(activityId: payment.id)
let existingLightning: LightningActivity? = if let existingActivity, case let .lightning(ln) = existingActivity { ln } else { nil }
// Skip if existing activity has newer timestamp to avoid overwriting local data
if let existingUpdatedAt = existingLightning?.updatedAt, existingUpdatedAt > paymentTimestamp {
return
}
let state: BitkitCore.PaymentState = switch payment.status {
case .failed: .failed
case .pending: .pending
case .succeeded: .succeeded
}
let ln = LightningActivity(
id: payment.id,
txType: payment.direction == .outbound ? .sent : .received,
status: state,
value: UInt64(payment.amountSats ?? 0),
fee: (payment.feePaidMsat ?? 0) / 1000,
invoice: bolt11 ?? "No invoice",
message: description ?? "",
timestamp: paymentTimestamp,
preimage: preimage,
createdAt: paymentTimestamp,
updatedAt: paymentTimestamp,
seenAt: existingLightning?.seenAt
)
if existingActivity != nil {
try await update(id: payment.id, activity: .lightning(ln))
} else {
try await upsert(.lightning(ln))
}
}
/// Sync all LDK node payments to activities
/// Use for initial wallet load, manual refresh, or after operations that create new payments.
/// Events handle individual payment updates, so this should not be called on every event.
func syncLdkNodePayments(_ payments: [PaymentDetails]) async throws {
try await ServiceQueue.background(.core) {
var addedCount = 0
var updatedCount = 0
var latestCaughtError: Error?
for payment in payments {
do {
let state: BitkitCore.PaymentState = switch payment.status {
case .failed:
.failed
case .pending:
.pending
case .succeeded:
.succeeded
}
if case let .onchain(txid, _) = payment.kind {
do {
let hadExistingActivity = try getActivityById(activityId: payment.id) != nil
try await self.processOnchainPayment(payment, transactionDetails: nil)
if hadExistingActivity {
updatedCount += 1
} else {
addedCount += 1
}
} catch {
Logger.error("Error processing onchain payment \(txid): \(error)", context: "CoreService.syncLdkNodePayments")
latestCaughtError = error
}
} else if case .bolt11 = payment.kind {
do {
let hadExistingActivity = try getActivityById(activityId: payment.id) != nil
try await self.processLightningPayment(payment)
if hadExistingActivity {
updatedCount += 1
} else {
addedCount += 1
}
} catch {
Logger.error("Error processing lightning payment \(payment.id): \(error)", context: "CoreService.syncLdkNodePayments")
latestCaughtError = error
}
}
} catch {
Logger.error("Error syncing LDK payment: \(error)", context: "CoreService")
latestCaughtError = error
}
}
// If any of the inserts failed, we want to throw the error up
if let error = latestCaughtError {
throw error
}
Logger.info("Synced LDK payments - Added: \(addedCount) - Updated: \(updatedCount)", context: "CoreService")
self.activitiesChangedSubject.send()
}
}
/// Marks replacement transactions (with originalTxId in boostTxIds) as doesExist = false when original confirms
/// Finds the channel ID associated with a transaction based on its direction
private func findChannelForTransaction(txid: String, direction: PaymentDirection,
transactionDetails: BitkitCore.TransactionDetails? = nil) async -> String?
{
switch direction {
case .inbound:
// Check if this transaction is a channel close by checking if it spends a closed channel's funding UTXO
return await findClosedChannelForTransaction(txid: txid, transactionDetails: transactionDetails)
case .outbound:
// Check if this transaction is a channel open by checking if it's the funding transaction for an open channel
return await findOpenChannelForTransaction(txid: txid)
}
}
/// Check if a transaction spends a closed channel's funding UTXO or is a force close sweep
private func findClosedChannelForTransaction(txid: String, transactionDetails: BitkitCore.TransactionDetails? = nil) async -> String? {
do {
// First, check if this txid is a known sweep transaction using LDK's pending sweep balances
let pendingSweeps = await MainActor.run { LightningService.shared.balances?.pendingBalancesFromChannelClosures }
if let pendingSweeps {
for sweepBalance in pendingSweeps {
switch sweepBalance {
case let .broadcastAwaitingConfirmation(channelId, _, latestSpendingTxid, _):
if latestSpendingTxid.description == txid, let channelId {
Logger.info(
"Matched sweep tx \(txid) to channel \(channelId) via pendingSweepBalance (awaiting confirmation)",
context: "findClosedChannelForTransaction"
)
return channelId.description
}
case let .awaitingThresholdConfirmations(channelId, latestSpendingTxid, _, _, _):
if latestSpendingTxid.description == txid, let channelId {
Logger.info(
"Matched sweep tx \(txid) to channel \(channelId) via pendingSweepBalance (threshold confirmations)",
context: "findClosedChannelForTransaction"
)
return channelId.description
}
case .pendingBroadcast:
// No txid yet, skip
break
}
}
}
let closedChannels = try await getAllClosedChannels(sortDirection: .desc)
guard !closedChannels.isEmpty else { return nil }
let details = if let provided = transactionDetails { provided } else { await fetchTransactionDetails(txid: txid) }
guard let details else {
Logger.warn("Transaction details not available for \(txid)", context: "CoreService.findClosedChannelForTransaction")
return nil
}
// Check if any input spends a closed channel's funding UTXO (commitment transaction)
for input in details.inputs {
let inputTxid = input.txid
let inputVout = Int(input.vout)
if let matchingChannel = closedChannels.first(where: { channel in
channel.fundingTxoTxid == inputTxid && channel.fundingTxoIndex == UInt32(inputVout)
}) {
return matchingChannel.channelId
}
}
} catch {
Logger.warn(
"Failed to check if transaction \(txid) spends closed channel funding UTXO: \(error)",
context: "CoreService.findClosedChannelForTransaction"
)
}
return nil
}
/// Check if a transaction is the funding transaction for an open channel
private func findOpenChannelForTransaction(txid: String) async -> String? {
let channels = await MainActor.run { LightningService.shared.channels }
guard let channels, !channels.isEmpty else {
return nil
}
// First, check if the transaction matches any channel's funding transaction directly
if let channel = channels.first(where: { $0.fundingTxo?.txid.description == txid }) {
return channel.channelId.description
}
// If no direct match, check Blocktank orders for payment transactions
do {
let orders = try await coreService.blocktank.orders(orderIds: nil, filter: nil, refresh: false)
// Find order with matching payment transaction
guard let order = orders.first(where: { order in
order.payment?.onchain?.transactions.contains { $0.txId == txid } ?? false
}) else {
return nil
}
// Find channel that matches this order's channel funding transaction
guard let orderChannel = order.channel else {
return nil
}
if let channel = channels.first(where: { channel in
channel.fundingTxo?.txid.description == orderChannel.fundingTx.id
}) {
return channel.channelId.description
}
} catch {
Logger.warn(
"Failed to fetch Blocktank orders: \(error)",
context: "CoreService.findOpenChannelForTransaction"
)
}
return nil
}
/// Check pre-activity metadata for addresses in the transaction
private func findAddressInPreActivityMetadata(details: BitkitCore.TransactionDetails, value: UInt64) async -> String? {
for output in details.outputs {
guard let address = output.scriptpubkeyAddress else { continue }
if let metadata = try? await getPreActivityMetadata(searchKey: address, searchByAddress: true),
metadata.isReceive
{
return address
}
}
return nil
}
/// Find the receiving address for an onchain transaction
private func findReceivingAddress(for txid: String, value: UInt64,
transactionDetails: BitkitCore.TransactionDetails? = nil) async throws -> String?
{
let details = if let provided = transactionDetails { provided } else { await fetchTransactionDetails(txid: txid) }
guard let details else {
Logger.warn("Transaction details not available for \(txid)", context: "CoreService.findReceivingAddress")
return nil
}
let batchSize: UInt32 = 20
let currentWalletAddress = UserDefaults.standard.string(forKey: "onchainAddress") ?? ""
// Check if an address matches any transaction output
func matchesTransaction(_ address: String) -> Bool {
details.outputs.contains { output in
output.scriptpubkeyAddress == address
}
}
// Find matching address from a list, preferring exact value match
func findMatch(in addresses: [String]) -> String? {
// Try exact value match first
for address in addresses {
for output in details.outputs {
if output.scriptpubkeyAddress == address,
output.value == value
{
return address
}
}
}
// Fallback to any address match
for address in addresses {
if matchesTransaction(address) {
return address
}
}
return nil
}
// First, check pre-activity metadata for addresses in the transaction
if let address = await findAddressInPreActivityMetadata(details: details, value: value) {
return address
}
// Check current address if it exists
if !currentWalletAddress.isEmpty && matchesTransaction(currentWalletAddress) {
return currentWalletAddress
}
// Search addresses forward in batches
func searchAddresses(isChange: Bool) async throws -> String? {
var index: UInt32 = 0
var currentAddressIndex: UInt32? = nil
let hasCurrentAddress = !currentWalletAddress.isEmpty
let maxIndex: UInt32 = hasCurrentAddress ? Self.maxAddressSearchIndex : batchSize
while index < maxIndex {
let accountAddresses = try await coreService.utility.getAccountAddresses(
walletIndex: 0,
isChange: isChange,
startIndex: index,
count: batchSize
)
let addresses = accountAddresses.unused.map(\.address) + accountAddresses.used.map(\.address)
// Track when we find the current address
if hasCurrentAddress, currentAddressIndex == nil, addresses.contains(currentWalletAddress) {
currentAddressIndex = index
}
// Check for matches
if let match = findMatch(in: addresses) {
return match
}
// Stop if we've checked one batch after finding current address
if let foundIndex = currentAddressIndex, index >= foundIndex + batchSize {
break
}
// Stop if we've reached the end
if addresses.count < Int(batchSize) {
break
}
index += batchSize
}
return nil
}
// Try receiving addresses first, then change addresses
if let address = try await searchAddresses(isChange: false) {
return address
}
if let address = try await searchAddresses(isChange: true) {
return address
}
// Fallback: return first output address
return details.outputs.first?.scriptpubkeyAddress
}
func getActivity(id: String) async throws -> Activity? {
try await ServiceQueue.background(.core) {
try getActivityById(activityId: id)
}
}