Skip to content

Commit 3120c92

Browse files
committed
feat: add recon service api
1 parent 44fedfa commit 3120c92

11 files changed

Lines changed: 263 additions & 12 deletions

File tree

packages/cli/src/ceramic-daemon.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig {
118118
syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride],
119119
streamCacheLimit: opts.node.streamCacheLimit,
120120
indexing: opts.indexing,
121+
reconUrl: opts.node.reconUrl,
121122
}
122123
if (opts.stateStore?.mode == StateStoreMode.FS) {
123124
ceramicConfig.stateStoreDirectory = opts.stateStore.localDirectory

packages/cli/src/daemon-config.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,12 @@ export class DaemonCeramicNodeConfig {
342342
*/
343343
@jsonMember(Number, { name: 'stream-cache-limit' })
344344
streamCacheLimit?: number
345+
346+
/**
347+
* If set, experimental recon is enabled and uses another node to run recon.
348+
*/
349+
@jsonMember(String, { name: 'recon-url' })
350+
reconUrl?: string
345351
}
346352

347353
/**

packages/core/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@
8585
"pg-boss": "^8.2.0",
8686
"rxjs": "^7.5.2",
8787
"sqlite3": "^5.0.8",
88-
"uint8arrays": "^4.0.3"
88+
"uint8arrays": "^4.0.3",
89+
"zcgen-client": "^0.0.4"
8990
},
9091
"devDependencies": {
9192
"@ceramicnetwork/3id-did-resolver": "^2.23.0-rc.0",

packages/core/src/ceramic.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import { SyncApi } from './sync/sync-api.js'
6060
import { ProvidersCache } from './providers-cache.js'
6161
import crypto from 'crypto'
6262
import { SyncJobData } from './sync/interfaces.js'
63+
import { ReconApi, ReconApiHTTP } from './recon.js'
6364

6465
const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
6566
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
@@ -141,6 +142,8 @@ export interface CeramicConfig {
141142
useCentralizedPeerDiscovery?: boolean
142143
syncOverride?: SyncOptions
143144

145+
reconUrl?: string
146+
144147
[index: string]: any // allow arbitrary properties
145148
}
146149

@@ -160,6 +163,7 @@ export interface CeramicModules {
160163
repository: Repository
161164
shutdownSignal: ShutdownSignal
162165
providersCache: ProvidersCache
166+
recon: ReconApi | null
163167
}
164168

165169
/**
@@ -218,6 +222,7 @@ export class Ceramic implements CeramicApi {
218222
private readonly anchorResumingService: AnchorResumingService
219223
private readonly providersCache: ProvidersCache
220224
private readonly syncApi: SyncApi
225+
readonly recon: ReconApi
221226

222227
readonly _streamHandlers: HandlersMap
223228
private readonly _anchorValidator: AnchorValidator
@@ -292,6 +297,7 @@ export class Ceramic implements CeramicApi {
292297
anchorService: modules.anchorService,
293298
conflictResolution: conflictResolution,
294299
indexing: localIndex,
300+
recon: modules.recon,
295301
})
296302
this.syncApi = new SyncApi(
297303
{
@@ -305,7 +311,15 @@ export class Ceramic implements CeramicApi {
305311
)
306312
const pinApi = this._buildPinApi()
307313
this.repository.index.setSyncQueryApi(this.syncApi)
308-
this.admin = new LocalAdminApi(localIndex, this.syncApi, this.nodeStatus.bind(this), pinApi)
314+
this.recon = modules.recon
315+
316+
this.admin = new LocalAdminApi(
317+
localIndex,
318+
this.syncApi,
319+
this.nodeStatus.bind(this),
320+
pinApi,
321+
this.recon
322+
)
309323
}
310324

311325
get index(): LocalIndexApi {
@@ -537,6 +551,10 @@ export class Ceramic implements CeramicApi {
537551
maxQueriesPerSecond
538552
)
539553

554+
const recon = config.reconUrl
555+
? new ReconApiHTTP(config.reconUrl, config.networkName, repository, dispatcher, logger)
556+
: null
557+
540558
const params: CeramicParameters = {
541559
gateway: config.gateway,
542560
stateStoreDirectory: config.stateStoreDirectory,
@@ -557,6 +575,7 @@ export class Ceramic implements CeramicApi {
557575
repository,
558576
shutdownSignal,
559577
providersCache,
578+
recon,
560579
}
561580

562581
return [modules, params]

packages/core/src/dispatcher.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,14 @@ export class Dispatcher {
230230
* @param cid - Commit CID
231231
* @param streamId - StreamID of the stream the commit belongs to, used for logging.
232232
*/
233-
async retrieveCommit(cid: CID | string, streamId: StreamID): Promise<any> {
233+
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
234234
try {
235235
return await this._getFromIpfs(cid)
236236
} catch (e) {
237237
this._logger.err(
238-
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${streamId.toString()}: ${e}`
238+
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${
239+
streamId ? streamId.toString() : ''
240+
}: ${e}`
239241
)
240242
throw e
241243
}

packages/core/src/local-admin-api.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
import { StreamID } from '@ceramicnetwork/streamid'
99
import { LocalIndexApi } from './indexing/local-index-api.js'
1010
import { SyncApi } from './sync/sync-api.js'
11+
import { ReconApi } from './recon.js'
1112

1213
type NodeStatusFn = () => Promise<NodeStatusResponse>
1314

@@ -19,7 +20,8 @@ export class LocalAdminApi implements AdminApi {
1920
private readonly indexApi: LocalIndexApi,
2021
private readonly syncApi: SyncApi,
2122
private readonly nodeStatusFn: NodeStatusFn, // TODO(CDB-2293): circular dependency back into Ceramic
22-
private readonly pinApi: PinApi
23+
private readonly pinApi: PinApi,
24+
private readonly recon: ReconApi | undefined
2325
) {}
2426

2527
async nodeStatus(): Promise<NodeStatusResponse> {
@@ -32,7 +34,9 @@ export class LocalAdminApi implements AdminApi {
3234

3335
async startIndexingModelData(modelData: Array<ModelData>): Promise<void> {
3436
await this.indexApi.indexModels(modelData)
35-
await this.syncApi.startModelSync(modelData.map((idx) => idx.streamID.toString()))
37+
const ids = modelData.map((idx) => idx.streamID.toString())
38+
await this.syncApi.startModelSync(ids)
39+
if (this.recon) ids.forEach(this.recon.subscribe.bind(this.recon))
3640
}
3741

3842
async getIndexedModels(): Promise<Array<StreamID>> {
@@ -49,10 +53,12 @@ export class LocalAdminApi implements AdminApi {
4953
}
5054

5155
async stopIndexingModelData(modelData: Array<ModelData>): Promise<void> {
56+
const ids = modelData.map((idx) => idx.streamID.toString())
5257
await Promise.all([
5358
this.indexApi.stopIndexingModels(modelData),
54-
this.syncApi.stopModelSync(modelData.map((data) => data.streamID.toString())),
59+
this.syncApi.stopModelSync(ids),
5560
])
61+
if (this.recon) ids.forEach(this.recon.unsubscribe.bind(this.recon))
5662
}
5763

5864
get pin(): PinApi {

packages/core/src/recon.ts

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import { EventID, StreamID } from '@ceramicnetwork/streamid'
2+
import { DiagnosticsLogger } from '@ceramicnetwork/common'
3+
import { Repository } from './state-management/repository.js'
4+
import * as ReconClient from 'zcgen-client'
5+
import { Dispatcher } from './dispatcher.js'
6+
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
7+
import { from, repeat, timer, switchMap, tap, Subscription } from 'rxjs'
8+
import { retry } from 'rxjs/operators'
9+
10+
/**
11+
* Recon Event
12+
*/
13+
export interface Event {
14+
eventId: string
15+
}
16+
17+
/**
18+
* Describes Recon Service API
19+
*/
20+
export interface ReconApi {
21+
readonly networkName: string
22+
/**
23+
* Recon subscription, subscribe by model
24+
*/
25+
subscribe(model: string): Subscription
26+
/**
27+
* Add event to recon
28+
*/
29+
addEvent(event: Event): Promise<void>
30+
/**
31+
* Unsubscribe to subscription by model
32+
*/
33+
unsubscribe(model: string): void
34+
/**
35+
* Close and unsubscribe to all
36+
*/
37+
close(): void
38+
}
39+
40+
/**
41+
* Recon subscription manager, manages simple map of models to subscriptions
42+
*/
43+
export interface SubManager {
44+
/**
45+
* Add active subscription
46+
*/
47+
add(model: string, sub: Subscription): void
48+
/**
49+
* Get subscription by model
50+
*/
51+
get(model: string): Subscription | undefined
52+
/**
53+
* Unsubscribe
54+
*/
55+
unsubscribe(model: string): void
56+
/**
57+
* Unsubscribe to all known subscriptions
58+
*/
59+
close(): void
60+
}
61+
62+
export class ReconSubManager implements SubManager {
63+
private readonly subscriptions: Record<string, Subscription>
64+
65+
constructor(private readonly logger: DiagnosticsLogger) {
66+
this.subscriptions = {}
67+
}
68+
69+
add(model: string, sub: Subscription): void {
70+
this.subscriptions[model] = sub
71+
this.logger.verbose(`Recon: subscription for model ${model} added`)
72+
}
73+
74+
get(model: string): Subscription | undefined {
75+
return this.subscriptions[model]
76+
}
77+
78+
unsubscribe(model: string): void {
79+
const sub = this.get(model)
80+
if (!sub) return
81+
sub.unsubscribe()
82+
delete this.subscriptions[model]
83+
this.logger.verbose(`Recon: unsubscribed for model ${model}`)
84+
}
85+
86+
close(): void {
87+
Object.keys(this.subscriptions).forEach((model) => {
88+
this.unsubscribe(model)
89+
})
90+
this.logger.verbose(`Recon: closing, unsubscribed to all`)
91+
}
92+
}
93+
94+
/**
95+
* Recon API
96+
*/
97+
export class ReconApiHTTP implements ReconApi {
98+
private readonly api: ReconClient.DefaultApi
99+
private readonly subscriptions: ReconSubManager
100+
101+
constructor(
102+
url: string,
103+
readonly networkName: string,
104+
private readonly repository: Repository,
105+
private readonly dispatcher: Dispatcher,
106+
private readonly logger: DiagnosticsLogger
107+
) {
108+
const baseServer = new ReconClient.ServerConfiguration(url, {})
109+
const config = ReconClient.createConfiguration({ baseServer })
110+
this.api = new ReconClient.DefaultApi(config)
111+
this.subscriptions = new ReconSubManager(logger)
112+
}
113+
114+
subscribe(model: string): Subscription {
115+
if (this.subscriptions.get(model)) return this.subscriptions.get(model)
116+
117+
let offset = 0
118+
const increaseOffset = (val: number): void => {
119+
offset += val
120+
}
121+
122+
const obv$ = from(
123+
this.api.ceramicSubscribeSortKeySortValueGet(
124+
'model',
125+
model,
126+
undefined,
127+
undefined,
128+
offset,
129+
1000
130+
)
131+
).pipe(
132+
tap((arr) => increaseOffset(arr.length)),
133+
switchMap(from),
134+
repeat({ delay: 200 }),
135+
retry({
136+
delay: (error, count) => {
137+
this.logger.warn(`Recon: subscription failed for model ${model}, attempting to retry`)
138+
// exp backoff, max 3 minutes
139+
return timer(count > 11 ? 3 * 60 * 1000 : 2 ^ (count * 100))
140+
},
141+
resetOnSuccess: true,
142+
})
143+
)
144+
145+
// in future could return observable, handler added here to keep recon code together for now
146+
const sub = obv$.subscribe(this._eventHandler)
147+
this.subscriptions.add(model, sub)
148+
return sub
149+
}
150+
151+
unsubscribe(model: string): void {
152+
this.subscriptions.unsubscribe(model)
153+
}
154+
155+
close(): void {
156+
this.subscriptions.close()
157+
}
158+
159+
// messy here, so that recon changes are minimized for now and uses existing apis,
160+
// model/streamids used for lots of caching, but could later be implemented w/o or recon based
161+
async _eventHandler(event: string): Promise<void> {
162+
const eventId = EventID.fromString(event)
163+
const commit = await this.dispatcher.retrieveCommit(eventId.event)
164+
165+
let header, gcid
166+
if (commit.proof) {
167+
gcid = commit.id
168+
} else if (commit.id) {
169+
const genesis = await this.dispatcher.retrieveCommit(commit.id)
170+
header = genesis.header
171+
gcid = commit.id
172+
} else {
173+
header = commit.header
174+
gcid = eventId.event
175+
}
176+
177+
const model = header ? StreamID.fromBytes(header.model) : undefined
178+
// assumes model instance
179+
const streamid = new StreamID(ModelInstanceDocument.STREAM_TYPE_ID, gcid)
180+
181+
this.logger.verbose(`Recon: received eventID ${eventId.toString()} for streamId ${streamid}`)
182+
await this.repository.stateManager.handleUpdate(streamid, eventId.event, model)
183+
}
184+
185+
async addEvent(event: Event): Promise<void> {
186+
try {
187+
await this.api.ceramicEventsPost(event)
188+
this.logger.verbose(`Recon: added event ${event.eventId}`)
189+
} catch (err) {
190+
this.logger.err(`Recon: failed to add event ${event.eventId}`)
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)