Skip to content

Commit 95e70fc

Browse files
committed
ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications requiring reading the oplog
1 parent c95f84e commit 95e70fc

13 files changed

Lines changed: 1687 additions & 1 deletion
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Main interface for bucketd oplog management
3+
*/
4+
const async = require('async');
5+
const BucketClient = require('bucketclient').RESTClient;
6+
const { jsutil } = require('arsenal');
7+
const LogConsumer = require('arsenal').storage.metadata.bucketclient.LogConsumer;
8+
const { isMasterKey } = require('arsenal/lib/versioning/Version');
9+
const OplogInterface = require('./OplogInterface');
10+
11+
class BucketdOplogInterface extends OplogInterface {
12+
13+
constructor(params) {
14+
super(params);
15+
this.stopAt = -1;
16+
this.backendRetryTimes = 3;
17+
this.backendRetryInterval = 300;
18+
this.bucketdOplogQuerySize = 20;
19+
let bkBootstrap = ['localhost:9000'];
20+
if (params && params.bootstrap !== undefined) {
21+
bkBootstrap = params.bootstrap;
22+
}
23+
if (params && params.stopAt !== undefined) {
24+
this.stopAt = params.stopAt;
25+
}
26+
this.bkClient = new BucketClient(bkBootstrap);
27+
}
28+
29+
start(bucketName, cb) {
30+
async.waterfall([
31+
/*
32+
* In this step we get the raftId for bucketName
33+
*/
34+
next => {
35+
this.logger.info('obtaining raftId',
36+
{ bucketName });
37+
async.retry(
38+
{
39+
times: this.backendRetryTimes,
40+
interval: this.backendRetryInterval,
41+
},
42+
done => {
43+
this.bkClient.getBucketInformation(
44+
bucketName,
45+
null,
46+
(err, info) => {
47+
if (err) {
48+
this.logger.info('retrying getBucketInformation', { err, bucketName });
49+
return done(err);
50+
}
51+
return done(null, JSON.parse(info));
52+
});
53+
},
54+
(err, res) => {
55+
if (err) {
56+
this.logger.error('getBucketInformation too many failures', { err, bucketName });
57+
return next(err);
58+
}
59+
return next(null, res.raftSessionId);
60+
});
61+
return undefined;
62+
},
63+
/*
64+
* In this step we get the stored offset if we have it
65+
*/
66+
(raftId, next) => {
67+
let cseq = undefined;
68+
this.persist.load(bucketName, this.persistData, (err, offset) => {
69+
if (err) {
70+
return next(err);
71+
}
72+
cseq = offset;
73+
return next(null, raftId, cseq);
74+
});
75+
},
76+
/*
77+
* In this step we acquire the offset if we don't already have it
78+
*/
79+
(raftId, cseq, next) => {
80+
if (cseq !== undefined) {
81+
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
82+
{ bucketName });
83+
return next(null, raftId, cseq, true);
84+
}
85+
this.logger.info('cseq acquisition',
86+
{ bucketName });
87+
async.retry(
88+
{
89+
times: this.backendRetryTimes,
90+
interval: this.backendRetryInterval,
91+
},
92+
done => {
93+
this.bkClient.getRaftLog(
94+
raftId,
95+
1,
96+
1,
97+
true,
98+
null,
99+
(err, stream) => {
100+
if (err) {
101+
this.logger.info('retrying getRaftLog', { err, bucketName });
102+
return done(err);
103+
}
104+
const chunks = [];
105+
stream.on('data', chunk => {
106+
chunks.push(chunk);
107+
});
108+
stream.on('end', () => {
109+
const info = JSON.parse(Buffer.concat(chunks));
110+
return done(null, info);
111+
});
112+
return undefined;
113+
});
114+
},
115+
(err, res) => {
116+
if (err) {
117+
this.logger.error('getRaftLog too many failures', { err, bucketName });
118+
return next(err);
119+
}
120+
return next(null, raftId, res.info.cseq, false);
121+
});
122+
return undefined;
123+
},
124+
/*
125+
* In this step we scan the bucket
126+
*/
127+
(raftId, cseq, skipListing, next) => {
128+
if (skipListing) {
129+
this.logger.info(`skipping listing cseq=${cseq}`,
130+
{ bucketName });
131+
return next(null, raftId, cseq);
132+
}
133+
this.logger.info(`listing cseq=${cseq}`,
134+
{ bucketName });
135+
this.persistData.initState(err => {
136+
if (err) {
137+
return next(err);
138+
}
139+
this.persist.save(
140+
bucketName, this.persistData, cseq, err => {
141+
if (err) {
142+
return next(err);
143+
}
144+
return next(null, raftId, cseq);
145+
});
146+
return undefined;
147+
});
148+
return undefined;
149+
},
150+
/*
151+
* In this step we loop over the oplog
152+
*/
153+
(raftId, cseq, next) => {
154+
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
155+
{ bucketName });
156+
// only way to get out of the loop in all cases
157+
const nextOnce = jsutil.once(next);
158+
let doStop = false;
159+
// resume reading the oplog from cseq. changes are idempotent
160+
const logConsumer = new LogConsumer({
161+
bucketClient: this.bkClient,
162+
raftSession: raftId,
163+
});
164+
let _cseq = cseq;
165+
async.until(
166+
() => doStop,
167+
_next => {
168+
// console.error(
169+
// 'readRecords', _cseq, this.bucketdOplogQuerySize);
170+
logConsumer.readRecords({
171+
startSeq: _cseq,
172+
limit: this.bucketdOplogQuerySize,
173+
}, (err, record) => {
174+
if (err) {
175+
this.logger.error('readRecords error', { err, bucketName });
176+
// return _next(err);
177+
return setTimeout(() => _next(), 5000);
178+
}
179+
// console.error('record info', record.info);
180+
if (!record.log) {
181+
// nothing to read
182+
return setTimeout(() => _next(), 5000);
183+
}
184+
const seqs = [];
185+
record.log.on('data', chunk => {
186+
seqs.push(chunk);
187+
});
188+
record.log.on('end', () => {
189+
const addQueue = [];
190+
const delQueue = [];
191+
for (let i = 0; i < seqs.length; i++) {
192+
if (seqs[i].db === bucketName) {
193+
for (let j = 0; j < seqs[i].entries.length; j++) {
194+
// console.info(i, j, seqs[i].db, seqs[i].entries[j]);
195+
const _item = {};
196+
_item.bucketName = bucketName;
197+
_item.key = seqs[i].entries[j].key;
198+
if (seqs[i].entries[j].type !== undefined &&
199+
seqs[i].entries[j].type === 'del') {
200+
if (!isMasterKey(_item.key)) {
201+
// ignore for now
202+
return;
203+
}
204+
delQueue.push(_item);
205+
} else {
206+
_item.value = Object.assign({}, seqs[i].entries[j].value);
207+
addQueue.push(_item);
208+
}
209+
}
210+
}
211+
}
212+
this.persistData.updateState(
213+
addQueue, delQueue, err => {
214+
if (err) {
215+
return _next(err);
216+
}
217+
_cseq += seqs.length;
218+
this.persist.save(
219+
bucketName, this.persistData, _cseq, err => {
220+
if (err) {
221+
return _next(err);
222+
}
223+
if (_cseq > this.stopAt) {
224+
doStop = true;
225+
}
226+
return _next();
227+
});
228+
return undefined;
229+
});
230+
});
231+
return undefined;
232+
});
233+
}, err => {
234+
if (err) {
235+
return nextOnce(err);
236+
}
237+
return nextOnce();
238+
});
239+
},
240+
], err => {
241+
if (err) {
242+
return cb(err);
243+
}
244+
this.logger.info('returning',
245+
{ bucketName });
246+
return cb();
247+
});
248+
}
249+
}
250+
251+
module.exports = BucketdOplogInterface;

0 commit comments

Comments
 (0)