Skip to content

Commit e99867c

Browse files
committed
ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications requiring reading the oplog
1 parent c95f84e commit e99867c

13 files changed

Lines changed: 1802 additions & 1 deletion
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Main interface for bucketd oplog management
3+
*/
4+
const async = require('async');
5+
const BucketClient = require('bucketclient').RESTClient;
6+
const { jsutil, errors } = require('arsenal');
7+
const LogConsumer = require('arsenal').storage.metadata.bucketclient.LogConsumer;
8+
const { isMasterKey } = require('arsenal/lib/versioning/Version');
9+
const OplogInterface = require('./OplogInterface');
10+
11+
class BucketdOplogInterface extends OplogInterface {
12+
13+
constructor(params) {
14+
super(params);
15+
this.stopAt = -1;
16+
this.backendRetryTimes = 3;
17+
this.backendRetryInterval = 300;
18+
this.bucketdOplogQuerySize = 20;
19+
let bkBootstrap = ['localhost:9000'];
20+
if (params && params.bootstrap !== undefined) {
21+
bkBootstrap = params.bootstrap;
22+
}
23+
if (params && params.stopAt !== undefined) {
24+
this.stopAt = params.stopAt;
25+
}
26+
this.bkClient = new BucketClient(bkBootstrap);
27+
}
28+
29+
start(filter, cb) {
30+
if (!(filter.filterType === 'bucket' ||
31+
filter.filterType === 'raftSession')) {
32+
return cb(errors.NotImplemented);
33+
}
34+
const filterName = filter.filterName;
35+
async.waterfall([
36+
/*
37+
* In this step we get the raftId for filterName
38+
*/
39+
next => {
40+
if (filter.filterType === 'raftSession') {
41+
return next(null, filter.raftSession.raftId);
42+
}
43+
this.logger.info('obtaining raftId',
44+
{ filterName });
45+
async.retry(
46+
{
47+
times: this.backendRetryTimes,
48+
interval: this.backendRetryInterval,
49+
},
50+
done => {
51+
this.bkClient.getBucketInformation(
52+
filter.bucket.bucketName,
53+
null,
54+
(err, info) => {
55+
if (err) {
56+
this.logger.info('retrying getBucketInformation', { err, filterName });
57+
return done(err);
58+
}
59+
return done(null, JSON.parse(info));
60+
});
61+
},
62+
(err, res) => {
63+
if (err) {
64+
this.logger.error('getBucketInformation too many failures', { err, filterName });
65+
return next(err);
66+
}
67+
return next(null, res.raftSessionId);
68+
});
69+
return undefined;
70+
},
71+
/*
72+
* In this step we get the stored offset if we have it
73+
*/
74+
(raftId, next) => {
75+
let cseq = undefined;
76+
this.persist.load(filterName, this.persistData, (err, offset) => {
77+
if (err) {
78+
return next(err);
79+
}
80+
cseq = offset;
81+
return next(null, raftId, cseq);
82+
});
83+
},
84+
/*
85+
* In this step we acquire the offset if we don't already have it
86+
*/
87+
(raftId, cseq, next) => {
88+
if (cseq !== undefined) {
89+
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
90+
{ filterName });
91+
return next(null, raftId, cseq, true);
92+
}
93+
this.logger.info('cseq acquisition',
94+
{ filterName });
95+
async.retry(
96+
{
97+
times: this.backendRetryTimes,
98+
interval: this.backendRetryInterval,
99+
},
100+
done => {
101+
this.bkClient.getRaftLog(
102+
raftId,
103+
1,
104+
1,
105+
true,
106+
null,
107+
(err, stream) => {
108+
if (err) {
109+
this.logger.info('retrying getRaftLog', { err, filterName });
110+
return done(err);
111+
}
112+
const chunks = [];
113+
stream.on('data', chunk => {
114+
chunks.push(chunk);
115+
});
116+
stream.on('end', () => {
117+
const info = JSON.parse(Buffer.concat(chunks));
118+
return done(null, info);
119+
});
120+
return undefined;
121+
});
122+
},
123+
(err, res) => {
124+
if (err) {
125+
this.logger.error('getRaftLog too many failures', { err, filterName });
126+
return next(err);
127+
}
128+
return next(null, raftId, res.info.cseq, false);
129+
});
130+
return undefined;
131+
},
132+
/*
133+
* In this step we init the state (e.g. scan)
134+
*/
135+
(raftId, cseq, skipInit, next) => {
136+
if (skipInit) {
137+
this.logger.info(`skipping state initialization cseq=${cseq}`,
138+
{ filterName });
139+
return next(null, raftId, cseq);
140+
}
141+
this.logger.info(`initializing state cseq=${cseq}`,
142+
{ filterName });
143+
this.persistData.initState(err => {
144+
if (err) {
145+
return next(err);
146+
}
147+
this.persist.save(
148+
filterName, this.persistData, cseq, err => {
149+
if (err) {
150+
return next(err);
151+
}
152+
return next(null, raftId, cseq);
153+
});
154+
return undefined;
155+
});
156+
return undefined;
157+
},
158+
/*
159+
* In this step we loop over the oplog
160+
*/
161+
(raftId, cseq, next) => {
162+
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
163+
{ filterName });
164+
// only way to get out of the loop in all cases
165+
const nextOnce = jsutil.once(next);
166+
let doStop = false;
167+
// resume reading the oplog from cseq. changes are idempotent
168+
const logConsumer = new LogConsumer({
169+
bucketClient: this.bkClient,
170+
raftSession: raftId,
171+
});
172+
let _cseq = cseq;
173+
async.until(
174+
() => doStop,
175+
_next => {
176+
// console.error(
177+
// 'readRecords', _cseq, this.bucketdOplogQuerySize);
178+
logConsumer.readRecords({
179+
startSeq: _cseq,
180+
limit: this.bucketdOplogQuerySize,
181+
}, (err, record) => {
182+
if (err) {
183+
this.logger.error('readRecords error', { err, filterName });
184+
// return _next(err);
185+
return setTimeout(() => _next(), 5000);
186+
}
187+
// console.error('record info', record.info);
188+
if (!record.log) {
189+
// nothing to read
190+
return setTimeout(() => _next(), 5000);
191+
}
192+
const seqs = [];
193+
record.log.on('data', chunk => {
194+
seqs.push(chunk);
195+
});
196+
record.log.on('end', () => {
197+
const addQueue = [];
198+
const delQueue = [];
199+
for (let i = 0; i < seqs.length; i++) {
200+
if (filter.filterType === 'raftSession' ||
201+
(filter.filterType === 'bucket' &&
202+
seqs[i].db === filter.bucket.bucketName)) {
203+
for (let j = 0; j < seqs[i].entries.length; j++) {
204+
// console.info(i, j, seqs[i].db, seqs[i].entries[j]);
205+
const _item = {};
206+
_item.bucketName = seqs[i].db;
207+
_item.key = seqs[i].entries[j].key;
208+
if (seqs[i].entries[j].type !== undefined &&
209+
seqs[i].entries[j].type === 'del') {
210+
if (!isMasterKey(_item.key)) {
211+
// ignore for now
212+
return;
213+
}
214+
delQueue.push(_item);
215+
} else {
216+
_item.value = Object.assign({}, seqs[i].entries[j].value);
217+
addQueue.push(_item);
218+
}
219+
}
220+
}
221+
}
222+
this.persistData.updateState(
223+
addQueue, delQueue, err => {
224+
if (err) {
225+
return _next(err);
226+
}
227+
_cseq += seqs.length;
228+
this.persist.save(
229+
filterName, this.persistData, _cseq, err => {
230+
if (err) {
231+
return _next(err);
232+
}
233+
if (_cseq > this.stopAt) {
234+
doStop = true;
235+
}
236+
return _next();
237+
});
238+
return undefined;
239+
});
240+
});
241+
return undefined;
242+
});
243+
}, err => {
244+
if (err) {
245+
return nextOnce(err);
246+
}
247+
return nextOnce();
248+
});
249+
},
250+
], err => {
251+
if (err) {
252+
return cb(err);
253+
}
254+
this.logger.info('returning',
255+
{ filterName });
256+
return cb();
257+
});
258+
return undefined;
259+
}
260+
}
261+
262+
module.exports = BucketdOplogInterface;

0 commit comments

Comments
 (0)