-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathpolling.rb
More file actions
599 lines (518 loc) · 21.1 KB
/
polling.rb
File metadata and controls
599 lines (518 loc) · 21.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
# frozen_string_literal: true
require "ldclient-rb/interfaces"
require "ldclient-rb/interfaces/data_system"
require "ldclient-rb/impl/data_system"
require "ldclient-rb/impl/data_system/protocolv2"
require "ldclient-rb/impl/data_source/requestor"
require "ldclient-rb/impl/util"
require "concurrent"
require "json"
require "uri"
require "http"
module LaunchDarkly
module Impl
module DataSystem
FDV2_POLLING_ENDPOINT = "/sdk/poll"
FDV1_POLLING_ENDPOINT = "/sdk/latest-all"
LD_ENVID_HEADER = "x-launchdarkly-env-id"
LD_FD_FALLBACK_HEADER = "x-launchdarkly-fd-fallback"
#
# Requester protocol for polling data source
#
module Requester
#
# Fetches the data for the given selector.
# Returns a Result containing a tuple of [ChangeSet, headers],
# or an error if the data could not be retrieved.
#
# @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil]
# @return [Result]
#
def fetch(selector)
raise NotImplementedError
end
#
# Closes any persistent connections and releases resources.
# This method should be called when the requester is no longer needed.
# Implementations should handle being called multiple times gracefully.
#
def stop
# Optional - implementations may override if they need cleanup
end
end
#
# PollingDataSource is a data source that can retrieve information from
# LaunchDarkly either as an Initializer or as a Synchronizer.
#
class PollingDataSource
include LaunchDarkly::Interfaces::DataSystem::Initializer
include LaunchDarkly::Interfaces::DataSystem::Synchronizer
attr_reader :name
#
# @param poll_interval [Float] Polling interval in seconds
# @param requester [Requester] The requester to use for fetching data
# @param logger [Logger] The logger
#
def initialize(poll_interval, requester, logger)
@requester = requester
@poll_interval = poll_interval
@logger = logger
@interrupt_event = Concurrent::Event.new
@stop = Concurrent::Event.new
@name = "PollingDataSourceV2"
end
#
# Fetch returns a Basis, or an error if the Basis could not be retrieved.
#
# @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore]
# @return [LaunchDarkly::Interfaces::DataSystem::Basis, nil]
#
def fetch(ss)
poll(ss)
ensure
# Ensure the requester is stopped to avoid leaving open connections.
@requester.stop if @requester.respond_to?(:stop)
end
#
# sync begins the synchronization process for the data source, yielding
# Update objects until the connection is closed or an unrecoverable error
# occurs.
#
# @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore]
# @yieldparam update [LaunchDarkly::Interfaces::DataSystem::Update]
#
def sync(ss)
@logger.info { "[LDClient] Starting PollingDataSourceV2 synchronizer" }
until @stop.set?
result = @requester.fetch(ss.selector)
if !result.success?
fallback = false
envid = nil
if result.headers
fallback = result.headers[LD_FD_FALLBACK_HEADER] == 'true'
envid = result.headers[LD_ENVID_HEADER]
end
if result.exception.is_a?(LaunchDarkly::Impl::DataSource::UnexpectedResponseError)
error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE,
result.exception.status,
Impl::Util.http_error_message(
result.exception.status, "polling request", "will retry"
),
Time.now
)
status_code = result.exception.status
if Impl::Util.http_error_recoverable?(status_code)
# If fallback is requested, send OFF status to signal shutdown
if fallback
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: error_info,
environment_id: envid,
revert_to_fdv1: true
)
break
end
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
error: error_info,
environment_id: envid,
revert_to_fdv1: false
)
@interrupt_event.wait(@poll_interval)
next
end
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: error_info,
environment_id: envid,
revert_to_fdv1: fallback
)
break
end
error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR,
0,
result.error,
Time.now
)
# If fallback is requested, send OFF status to signal shutdown
if fallback
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: error_info,
environment_id: envid,
revert_to_fdv1: true
)
break
end
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
error: error_info,
environment_id: envid,
revert_to_fdv1: false
)
else
change_set, headers = result.value
fallback = headers[LD_FD_FALLBACK_HEADER] == 'true'
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: change_set,
environment_id: headers[LD_ENVID_HEADER],
revert_to_fdv1: fallback
)
end
break if fallback
break if @interrupt_event.wait(@poll_interval)
end
ensure
# Ensure the requester is stopped to avoid leaving open connections.
@requester.stop if @requester.respond_to?(:stop)
end
#
# Stops the synchronizer.
#
def stop
@logger.info { "[LDClient] Stopping PollingDataSourceV2 synchronizer" }
@interrupt_event.set
@stop.set
end
#
# @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore]
# @return [LaunchDarkly::Result<LaunchDarkly::Interfaces::DataSystem::Basis, String>]
#
private def poll(ss)
result = @requester.fetch(ss.selector)
unless result.success?
if result.exception.is_a?(LaunchDarkly::Impl::DataSource::UnexpectedResponseError)
status_code = result.exception.status
http_error_message_result = Impl::Util.http_error_message(
status_code, "polling request", "will retry"
)
@logger.warn { "[LDClient] #{http_error_message_result}" } if Impl::Util.http_error_recoverable?(status_code)
return LaunchDarkly::Result.fail(http_error_message_result, result.exception)
end
return LaunchDarkly::Result.fail(result.error || 'Failed to request payload', result.exception)
end
change_set, headers = result.value
env_id = headers[LD_ENVID_HEADER]
env_id = nil unless env_id.is_a?(String)
basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(
change_set: change_set,
persist: change_set.selector.defined?,
environment_id: env_id
)
LaunchDarkly::Result.success(basis)
rescue => e
msg = "Error: Exception encountered when updating flags. #{e}"
@logger.error { "[LDClient] #{msg}" }
@logger.debug { "[LDClient] Exception trace: #{e.backtrace}" }
LaunchDarkly::Result.fail(msg, e)
end
end
#
# HTTPPollingRequester is a Requester that uses HTTP to make
# requests to the FDv2 polling endpoint.
#
class HTTPPollingRequester
include Requester
#
# @param sdk_key [String]
# @param config [LaunchDarkly::Config]
#
def initialize(sdk_key, config)
@etag = nil
@config = config
@sdk_key = sdk_key
@poll_uri = config.base_uri + FDV2_POLLING_ENDPOINT
@http_client = Impl::Util.new_http_client(config.base_uri, config)
.use(:auto_inflate)
.headers("Accept-Encoding" => "gzip")
end
#
# @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil]
# @return [Result]
#
def fetch(selector)
query_params = []
query_params << ["filter", @config.payload_filter_key] unless @config.payload_filter_key.nil?
if selector && selector.defined?
query_params << ["selector", selector.state]
end
uri = @poll_uri
if query_params.any?
filter_query = URI.encode_www_form(query_params)
uri = "#{uri}?#{filter_query}"
end
headers = {}
Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| headers[k] = v }
headers["If-None-Match"] = @etag unless @etag.nil?
begin
response = @http_client.request("GET", uri, headers: headers)
status = response.status.code
response_headers = response.headers.to_h.transform_keys(&:downcase)
if status >= 400
return LaunchDarkly::Result.fail(
"HTTP error #{status}",
LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(status),
response_headers
)
end
if status == 304
return LaunchDarkly::Result.success([LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes, response_headers])
end
body = response.to_s
data = JSON.parse(body, symbolize_names: true)
etag = response_headers["etag"]
@etag = etag unless etag.nil?
@config.logger.debug { "[LDClient] #{uri} response status:[#{status}] ETag:[#{etag}]" }
changeset_result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data)
if changeset_result.success?
LaunchDarkly::Result.success([changeset_result.value, response_headers])
else
LaunchDarkly::Result.fail(changeset_result.error, changeset_result.exception, response_headers)
end
rescue JSON::ParserError => e
LaunchDarkly::Result.fail("Failed to parse JSON: #{e.message}", e, response_headers)
rescue => e
LaunchDarkly::Result.fail("Network error: #{e.message}", e)
end
end
#
# Closes the HTTP client and releases any persistent connections.
#
def stop
begin
@http_client.close if @http_client
rescue
end
end
end
#
# HTTPFDv1PollingRequester is a Requester that uses HTTP to make
# requests to the FDv1 polling endpoint.
#
class HTTPFDv1PollingRequester
include Requester
#
# @param sdk_key [String]
# @param config [LaunchDarkly::Config]
#
def initialize(sdk_key, config)
@etag = nil
@config = config
@sdk_key = sdk_key
@poll_uri = config.base_uri + FDV1_POLLING_ENDPOINT
@http_client = Impl::Util.new_http_client(config.base_uri, config)
.use(:auto_inflate)
.headers("Accept-Encoding" => "gzip")
end
#
# @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil]
# @return [Result]
#
def fetch(selector)
query_params = []
query_params << ["filter", @config.payload_filter_key] unless @config.payload_filter_key.nil?
uri = @poll_uri
if query_params.any?
filter_query = URI.encode_www_form(query_params)
uri = "#{uri}?#{filter_query}"
end
headers = {}
Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| headers[k] = v }
headers["If-None-Match"] = @etag unless @etag.nil?
begin
response = @http_client.request("GET", uri, headers: headers)
status = response.status.code
response_headers = response.headers.to_h.transform_keys(&:downcase)
if status >= 400
return LaunchDarkly::Result.fail(
"HTTP error #{status}",
LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(status),
response_headers
)
end
if status == 304
return LaunchDarkly::Result.success([LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes, response_headers])
end
body = response.to_s
data = JSON.parse(body, symbolize_names: true)
etag = response_headers["etag"]
@etag = etag unless etag.nil?
@config.logger.debug { "[LDClient] #{uri} response status:[#{status}] ETag:[#{etag}]" }
changeset_result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data)
if changeset_result.success?
LaunchDarkly::Result.success([changeset_result.value, response_headers])
else
LaunchDarkly::Result.fail(changeset_result.error, changeset_result.exception, response_headers)
end
rescue JSON::ParserError => e
LaunchDarkly::Result.fail("Failed to parse JSON: #{e.message}", e, response_headers)
rescue => e
LaunchDarkly::Result.fail("Network error: #{e.message}", e)
end
end
#
# Closes the HTTP client and releases any persistent connections.
#
def stop
begin
@http_client.close if @http_client
rescue
end
end
end
#
# Converts a polling payload into a ChangeSet.
#
# @param data [Hash] The polling payload
# @return [LaunchDarkly::Result<LaunchDarkly::Interfaces::DataSystem::ChangeSet, String>] Result containing ChangeSet on success, or error message on failure
#
def self.polling_payload_to_changeset(data)
unless data[:events].is_a?(Array)
return LaunchDarkly::Result.fail("Invalid payload: 'events' key is missing or not a list")
end
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
data[:events].each do |event|
unless event.is_a?(Hash)
return LaunchDarkly::Result.fail("Invalid payload: 'events' must be a list of objects")
end
next unless event[:event]
case event[:event].to_sym
when LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT
begin
server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.from_h(event[:data])
rescue ArgumentError => e
return LaunchDarkly::Result.fail("Invalid JSON in server intent", e)
end
if server_intent.payload.code == LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE
return LaunchDarkly::Result.success(LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes)
end
builder.start(server_intent.payload.code)
when LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT
begin
put = LaunchDarkly::Impl::DataSystem::ProtocolV2::PutObject.from_h(event[:data])
rescue ArgumentError => e
return LaunchDarkly::Result.fail("Invalid JSON in put object", e)
end
builder.add_put(put.kind, put.key, put.version, put.object)
when LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT
begin
delete_object = LaunchDarkly::Impl::DataSystem::ProtocolV2::DeleteObject.from_h(event[:data])
rescue ArgumentError => e
return LaunchDarkly::Result.fail("Invalid JSON in delete object", e)
end
builder.add_delete(delete_object.kind, delete_object.key, delete_object.version)
when LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED
begin
selector = LaunchDarkly::Interfaces::DataSystem::Selector.from_h(event[:data])
changeset = builder.finish(selector)
return LaunchDarkly::Result.success(changeset)
rescue ArgumentError, RuntimeError => e
return LaunchDarkly::Result.fail("Invalid JSON in payload transferred object", e)
end
end
end
LaunchDarkly::Result.fail("didn't receive any known protocol events in polling payload")
end
#
# Converts an FDv1 polling payload into a ChangeSet.
#
# @param data [Hash] The FDv1 polling payload
# @return [LaunchDarkly::Result<LaunchDarkly::Interfaces::DataSystem::ChangeSet, String>] Result containing ChangeSet on success, or error message on failure
#
def self.fdv1_polling_payload_to_changeset(data)
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL)
selector = LaunchDarkly::Interfaces::DataSystem::Selector.no_selector
kind_mappings = [
[LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, :flags],
[LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, :segments],
]
kind_mappings.each do |kind, fdv1_key|
kind_data = data[fdv1_key]
next if kind_data.nil?
unless kind_data.is_a?(Hash)
return LaunchDarkly::Result.fail("Invalid format: #{fdv1_key} is not an object")
end
kind_data.each do |key, flag_or_segment|
unless flag_or_segment.is_a?(Hash)
return LaunchDarkly::Result.fail("Invalid format: #{key} is not an object")
end
version = flag_or_segment[:version]
return LaunchDarkly::Result.fail("Invalid format: #{key} does not have a version set") if version.nil?
builder.add_put(kind, key, version, flag_or_segment)
end
end
LaunchDarkly::Result.success(builder.finish(selector))
end
#
# Builder for a PollingDataSource.
#
class PollingDataSourceBuilder
#
# @param sdk_key [String]
# @param config [LaunchDarkly::Config]
#
def initialize(sdk_key, config)
@sdk_key = sdk_key
@config = config
@requester = nil
end
#
# Sets a custom Requester for the PollingDataSource.
#
# @param requester [Requester]
# @return [PollingDataSourceBuilder]
#
def requester(requester)
@requester = requester
self
end
#
# Builds the PollingDataSource with the configured parameters.
#
# @return [PollingDataSource]
#
def build
requester = @requester || HTTPPollingRequester.new(@sdk_key, @config)
PollingDataSource.new(@config.poll_interval, requester, @config.logger)
end
end
#
# Builder for an FDv1 PollingDataSource.
#
class FDv1PollingDataSourceBuilder
#
# @param sdk_key [String]
# @param config [LaunchDarkly::Config]
#
def initialize(sdk_key, config)
@sdk_key = sdk_key
@config = config
@requester = nil
end
#
# Sets a custom Requester for the PollingDataSource.
#
# @param requester [Requester]
# @return [FDv1PollingDataSourceBuilder]
#
def requester(requester)
@requester = requester
self
end
#
# Builds the PollingDataSource with the configured parameters.
#
# @return [PollingDataSource]
#
def build
requester = @requester || HTTPFDv1PollingRequester.new(@sdk_key, @config)
PollingDataSource.new(@config.poll_interval, requester, @config.logger)
end
end
end
end
end