-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathTransferRepo.kt
More file actions
227 lines (204 loc) · 9.37 KB
/
TransferRepo.kt
File metadata and controls
227 lines (204 loc) · 9.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package to.bitkit.repositories
import com.synonym.bitkitcore.Activity
import com.synonym.bitkitcore.ActivityFilter
import com.synonym.bitkitcore.SortDirection
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withContext
import org.lightningdevkit.ldknode.ChannelDetails
import org.lightningdevkit.ldknode.PendingSweepBalance
import to.bitkit.data.dao.TransferDao
import to.bitkit.data.entities.TransferEntity
import to.bitkit.di.BgDispatcher
import to.bitkit.ext.channelId
import to.bitkit.ext.latestSpendingTxid
import to.bitkit.models.TransferType
import to.bitkit.services.CoreService
import to.bitkit.utils.BlockTimeHelpers
import to.bitkit.utils.Logger
import java.util.UUID
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Clock
import kotlin.time.ExperimentalTime
@OptIn(ExperimentalTime::class)
@Singleton
class TransferRepo @Inject constructor(
@BgDispatcher private val bgDispatcher: CoroutineDispatcher,
private val lightningRepo: LightningRepo,
private val blocktankRepo: BlocktankRepo,
private val coreService: CoreService,
private val transferDao: TransferDao,
private val clock: Clock,
) {
val activeTransfers: Flow<List<TransferEntity>> = transferDao.getActiveTransfers()
val forceCloseRemainingDuration: Flow<String?> = combine(
activeTransfers,
lightningRepo.lightningState,
) { transfers, lightningState ->
val forceClose = transfers.firstOrNull { it.type == TransferType.FORCE_CLOSE }
?: return@combine null
val targetHeight = forceClose.claimableAtHeight?.toUInt() ?: return@combine null
val currentHeight = lightningState.block()?.height ?: return@combine null
val remaining = BlockTimeHelpers.blocksRemaining(targetHeight, currentHeight)
if (remaining <= 0) return@combine null
BlockTimeHelpers.getDurationForBlocks(remaining)
}
@Suppress("LongParameterList")
suspend fun createTransfer(
type: TransferType,
amountSats: Long,
channelId: String? = null,
fundingTxId: String? = null,
lspOrderId: String? = null,
claimableAtHeight: UInt? = null,
): Result<String> = withContext(bgDispatcher) {
runCatching {
val id = UUID.randomUUID().toString()
transferDao.insert(
TransferEntity(
id = id,
type = type,
amountSats = amountSats,
channelId = channelId,
fundingTxId = fundingTxId,
lspOrderId = lspOrderId,
isSettled = false,
createdAt = clock.now().epochSeconds,
claimableAtHeight = claimableAtHeight?.toInt(),
)
)
Logger.info("Created transfer: id=$id type=$type channelId=$channelId", context = TAG)
id
}.onFailure { e ->
Logger.error("Failed to create transfer", e, context = TAG)
}
}
// TODO maybe replace with delete, or call delete once activity item was augmented with the transfer's data.
// Likely no clear reason to keep persisting transfers afterwards.
suspend fun markSettled(id: String): Result<Unit> = withContext(bgDispatcher) {
runCatching {
val settledAt = clock.now().epochSeconds
transferDao.markSettled(id, settledAt)
Logger.info("Settled transfer: $id", context = TAG)
}.onFailure { e ->
Logger.error("Failed to settle transfer", e, context = TAG)
}
}
suspend fun syncTransferStates(): Result<Unit> = withContext(bgDispatcher) {
runCatching {
val activeTransfers = transferDao.getActiveTransfers().first()
if (activeTransfers.isEmpty()) return@runCatching
val channels = lightningRepo.getChannels() ?: emptyList()
val balances = lightningRepo.getBalancesAsync().getOrNull()
Logger.debug("Syncing ${activeTransfers.size} active transfers", context = TAG)
val toSpending = activeTransfers.filter { it.type.isToSpending() }
for (transfer in toSpending) {
val channelId = resolveChannelIdForTransfer(transfer, channels)
val channel = channelId?.let { channels.find { c -> c.channelId == it } }
if (channel != null && channel.isChannelReady) {
markSettled(transfer.id)
Logger.debug("Channel $channelId ready, settled transfer: ${transfer.id}", context = TAG)
}
}
val toSavings = activeTransfers.filter { it.type.isToSavings() }
for (transfer in toSavings) {
val channelId = resolveChannelIdForTransfer(transfer, channels)
val hasBalance = balances?.lightningBalances?.any {
it.channelId() == channelId
} ?: false
if (!hasBalance) {
if (transfer.type == TransferType.FORCE_CLOSE) {
settleForceClose(transfer, channelId, balances?.pendingBalancesFromChannelClosures)
} else {
markSettled(transfer.id)
Logger.debug(
"Channel $channelId balance swept, settled transfer: ${transfer.id}",
context = TAG
)
}
}
}
}.onSuccess {
Logger.verbose("syncTransferStates completed", context = TAG)
}.onFailure { e ->
Logger.error("syncTransferStates error", e, context = TAG)
}
}
private suspend fun settleForceClose(
transfer: TransferEntity,
channelId: String?,
pendingSweeps: List<PendingSweepBalance>?,
) {
if (channelId == null) return
if (coreService.activity.hasOnchainActivityForChannel(channelId)) {
markActivityAsTransferByChannel(channelId)
markSettled(transfer.id)
Logger.debug("Force close sweep detected, settled transfer: ${transfer.id}", context = TAG)
return
}
// When LDK batches sweeps from multiple channels into one transaction,
// the on-chain activity may only be linked to one channel. Fall back to
// checking if there are no remaining pending sweep balances for this channel.
val pendingSweep = pendingSweeps?.find { it.channelId() == channelId }
if (pendingSweep == null) {
markSettled(transfer.id)
Logger.debug(
"Force close sweep completed (no pending sweeps), settled transfer: ${transfer.id}",
context = TAG,
)
return
}
val sweepTxid = pendingSweep.latestSpendingTxid()
if (sweepTxid != null && coreService.activity.hasOnchainActivityForTxid(sweepTxid)) {
// The sweep tx was already synced as an on-chain activity (linked to another
// channel in the same batched sweep). Safe to settle this transfer.
markActivityAsTransfer(sweepTxid, channelId)
markSettled(transfer.id)
Logger.debug(
"Force close batched sweep detected via txid $sweepTxid, settled transfer: ${transfer.id}",
context = TAG,
)
return
}
Logger.debug("Force close awaiting sweep detection for transfer: ${transfer.id}", context = TAG)
}
private suspend fun markActivityAsTransfer(txid: String, channelId: String) {
val activity = coreService.activity.getOnchainActivityByTxId(txid) ?: return
if (activity.isTransfer) return
val updated = activity.copy(isTransfer = true, channelId = channelId)
coreService.activity.update(activity.id, Activity.Onchain(updated))
Logger.debug("Marked activity ${activity.id} as transfer for channel $channelId", context = TAG)
}
private suspend fun markActivityAsTransferByChannel(channelId: String) {
val activities = coreService.activity.get(
filter = ActivityFilter.ONCHAIN,
limit = 50u,
sortDirection = SortDirection.DESC,
)
val activity = activities.firstOrNull { it is Activity.Onchain && it.v1.channelId == channelId }
as? Activity.Onchain ?: return
if (activity.v1.isTransfer) return
val updated = activity.v1.copy(isTransfer = true, channelId = channelId)
coreService.activity.update(activity.v1.id, Activity.Onchain(updated))
Logger.debug("Marked activity ${activity.v1.id} as transfer for channel $channelId", context = TAG)
}
/** Resolve channelId: for LSP orders: via order->fundingTx match, for manual: directly. */
suspend fun resolveChannelIdForTransfer(
transfer: TransferEntity,
channels: List<ChannelDetails>,
): String? {
return transfer.lspOrderId
?.let { orderId ->
val order = blocktankRepo.getOrder(orderId, refresh = false).getOrNull()
val fundingTxId = order?.channel?.fundingTx?.id ?: return null
return@let channels.find { it.fundingTxo?.txid == fundingTxId }?.channelId
}
?: transfer.channelId
}
companion object {
private const val TAG = "TransferRepo"
}
}