diff --git a/CHANGES.txt b/CHANGES.txt index ecfe21a6c..e12623f5e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,10 @@ CHANGES +8.11.0 (Mar, 12, 2026) +- Added the ability to listen to different events triggered by the SDK. Read more in our docs. + - SDK_UPDATE notify when a flag or user segment has changed + - SDK_READY notify when the SDK is ready to evaluate + 8.10.1 (Jan 28, 2025) - Fixed rule-based segment matcher to exit when a conition is met. - Fixed impressions properties format in redis mode. diff --git a/LICENSE b/LICENSE index 0f9e8a596..0f4dec61f 100644 --- a/LICENSE +++ b/LICENSE @@ -157,7 +157,7 @@ Apache License file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2025 Harness Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/lib/splitclient-rb.rb b/lib/splitclient-rb.rb index 5f4ebcc91..ee3d7b234 100644 --- a/lib/splitclient-rb.rb +++ b/lib/splitclient-rb.rb @@ -66,6 +66,11 @@ require 'splitclient-rb/engine/common/impressions_counter' require 'splitclient-rb/engine/common/impressions_manager' require 'splitclient-rb/engine/common/noop_impressions_counter' +require 'splitclient-rb/engine/events/events_manager_config.rb' +require 'splitclient-rb/engine/events/events_manager.rb' +require 'splitclient-rb/engine/events/events_task.rb' +require 'splitclient-rb/engine/events/events_delivery.rb' +require 'splitclient-rb/engine/events/noop_events_queue.rb' require 'splitclient-rb/engine/parser/condition' require 'splitclient-rb/engine/parser/partition' require 'splitclient-rb/engine/parser/evaluator' @@ -112,6 +117,13 @@ require 'splitclient-rb/engine/models/evaluation_options' require 'splitclient-rb/engine/models/fallback_treatment.rb' require 'splitclient-rb/engine/models/fallback_treatments_configuration.rb' +require 'splitclient-rb/engine/models/events_metadata.rb' +require 'splitclient-rb/engine/models/sdk_event_type.rb' +require 'splitclient-rb/engine/models/sdk_event.rb' +require 'splitclient-rb/engine/models/sdk_internal_event.rb' +require 'splitclient-rb/engine/models/sdk_internal_event_notification.rb' +require 'splitclient-rb/engine/models/valid_sdk_event.rb' +require 'splitclient-rb/engine/models/event_active_subscriptions.rb' require 'splitclient-rb/engine/auth_api_client' require 'splitclient-rb/engine/back_off' require 'splitclient-rb/engine/fallback_treatment_calculator.rb' diff --git a/lib/splitclient-rb/cache/repositories/rule_based_segments_repository.rb b/lib/splitclient-rb/cache/repositories/rule_based_segments_repository.rb index 7f45e3437..67f339feb 100644 --- a/lib/splitclient-rb/cache/repositories/rule_based_segments_repository.rb +++ b/lib/splitclient-rb/cache/repositories/rule_based_segments_repository.rb @@ -28,7 +28,7 @@ class RuleBasedSegmentsRepository < Repository RB_SEGMENTS_PREFIX = '.rbsegment.' REGISTERED_PREFIX = '.segments.registered' - def initialize(config) + def initialize(config, internal_events_queue) super(config) @adapter = case @config.cache_adapter.class.to_s when 'SplitIoClient::Cache::Adapters::RedisAdapter' @@ -40,12 +40,25 @@ def initialize(config) @adapter.set_string(namespace_key(TILL_PREFIX), '-1') @adapter.initialize_map(namespace_key(REGISTERED_PREFIX)) end + @internal_events_queue = internal_events_queue end def update(to_add, to_delete, new_change_number) to_add.each{ |rule_based_segment| add_rule_based_segment(rule_based_segment) } to_delete.each{ |rule_based_segment| remove_rule_based_segment(rule_based_segment) } set_change_number(new_change_number) + + if to_add.length > 0 || to_delete.length > 0 + @internal_events_queue.push( + SplitIoClient::Engine::Models::SdkInternalEventNotification.new( + SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED, + SplitIoClient::Engine::Models::EventsMetadata.new( + SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE, + [] + ) + ) + ) + end end def get_rule_based_segment(name) diff --git a/lib/splitclient-rb/cache/repositories/segments_repository.rb b/lib/splitclient-rb/cache/repositories/segments_repository.rb index a45ee167a..7115d1d35 100644 --- a/lib/splitclient-rb/cache/repositories/segments_repository.rb +++ b/lib/splitclient-rb/cache/repositories/segments_repository.rb @@ -6,7 +6,7 @@ class SegmentsRepository < Repository attr_reader :adapter - def initialize(config) + def initialize(config, internal_events_queue) super(config) @adapter = case @config.cache_adapter.class.to_s when 'SplitIoClient::Cache::Adapters::RedisAdapter' @@ -15,6 +15,7 @@ def initialize(config) @config.cache_adapter end @adapter.set_bool(namespace_key('.ready'), false) unless @config.mode.equal?(:consumer) + @internal_events_queue = internal_events_queue end # Receives segment data, adds and removes segements from the store @@ -22,9 +23,19 @@ def add_to_segment(segment) name = segment[:name] @adapter.initialize_set(segment_data(name)) unless @adapter.exists?(segment_data(name)) - add_keys(name, segment[:added]) remove_keys(name, segment[:removed]) + if segment[:added].length > 0 || segment[:removed].length > 0 + @internal_events_queue.push( + SplitIoClient::Engine::Models::SdkInternalEventNotification.new( + SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED, + SplitIoClient::Engine::Models::EventsMetadata.new( + SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE, + [] + ) + ) + ) + end end def get_segment_keys(name) diff --git a/lib/splitclient-rb/cache/repositories/splits_repository.rb b/lib/splitclient-rb/cache/repositories/splits_repository.rb index 9fcae0e04..f9a8acbbb 100644 --- a/lib/splitclient-rb/cache/repositories/splits_repository.rb +++ b/lib/splitclient-rb/cache/repositories/splits_repository.rb @@ -35,7 +35,7 @@ class SplitsRepository < Repository SPLIT_PREFIX = '.split.' READY_PREFIX = '.splits.ready' - def initialize(config, flag_sets_repository, flag_set_filter) + def initialize(config, flag_sets_repository, flag_set_filter, internal_events_queue) super(config) @tt_cache = {} @adapter = case @config.cache_adapter.class.to_s @@ -46,6 +46,7 @@ def initialize(config, flag_sets_repository, flag_set_filter) end @flag_sets = flag_sets_repository @flag_set_filter = flag_set_filter + @internal_events_queue = internal_events_queue initialize_keys end @@ -53,6 +54,18 @@ def update(to_add, to_delete, new_change_number) to_add.each{ |feature_flag| add_feature_flag(feature_flag) } to_delete.each{ |feature_flag| remove_feature_flag(feature_flag) } set_change_number(new_change_number) + + if to_add.length > 0 || to_delete.length > 0 + @internal_events_queue.push( + SplitIoClient::Engine::Models::SdkInternalEventNotification.new( + SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, + SplitIoClient::Engine::Models::EventsMetadata.new( + SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, + to_add.map {|flag| flag[:name]} | to_delete.map {|flag| flag[:name]} + ) + ) + ) + end end def get_split(name) @@ -140,6 +153,15 @@ def kill(change_number, split_name, default_treatment) split[:changeNumber] = change_number @adapter.set_string(namespace_key(".split.#{split_name}"), split.to_json) + @internal_events_queue.push( + SplitIoClient::Engine::Models::SdkInternalEventNotification.new( + SplitIoClient::Engine::Models::SdkInternalEvent::FLAG_KILLED_NOTIFICATION, + SplitIoClient::Engine::Models::EventsMetadata.new( + SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, + [split_name] + ) + ) + ) end def splits_count diff --git a/lib/splitclient-rb/clients/split_client.rb b/lib/splitclient-rb/clients/split_client.rb index 2caccf848..c350b719e 100644 --- a/lib/splitclient-rb/clients/split_client.rb +++ b/lib/splitclient-rb/clients/split_client.rb @@ -18,7 +18,7 @@ class SplitClient # @param sdk_key [String] the SDK key for your split account # # @return [SplitIoClient] split.io client instance - def initialize(sdk_key, repositories, status_manager, config, impressions_manager, telemetry_evaluation_producer, evaluator, split_validator, fallback_treatment_calculator) + def initialize(sdk_key, repositories, status_manager, config, impressions_manager, telemetry_evaluation_producer, evaluator, split_validator, fallback_treatment_calculator, events_manager) @api_key = sdk_key @splits_repository = repositories[:splits] @segments_repository = repositories[:segments] @@ -33,6 +33,7 @@ def initialize(sdk_key, repositories, status_manager, config, impressions_manage @split_validator = split_validator @evaluator = evaluator @fallback_treatment_calculator = fallback_treatment_calculator + @events_manager = events_manager end def get_treatment( @@ -117,11 +118,11 @@ def destroy @config.logger.info('Split client shutdown started...') if @config.debug_enabled if !@config.cache_adapter.is_a?(SplitIoClient::Cache::Adapters::RedisAdapter) && @config.impressions_mode != :none && (!@impressions_repository.empty? || !@events_repository.empty?) - @config.logger.debug("Impressions and/or Events cache is not empty") + @config.logger.debug("Impressions and/or Events cache is not empty") if @config.debug_enabled # Adding small delay to ensure sender threads are fully running sleep(0.1) if !@config.threads.key?(:impressions_sender) || !@config.threads.key?(:events_sender) - @config.logger.debug("Periodic data recording thread has not started yet, waiting for service startup.") + @config.logger.debug("Periodic data recording thread has not started yet, waiting for service startup.") if @config.debug_enabled @config.threads[:start_sdk].join(5) if @config.threads.key?(:start_sdk) end end @@ -176,6 +177,14 @@ def block_until_ready(time = nil) @status_manager.wait_until_ready(time) if @status_manager end + def register(sdk_event, handler) + @events_manager.register(sdk_event, handler) + end + + def unregister(sdk_event, handler) + @events_manager.unregister(sdk_event) + end + private def check_properties_size(properties_size, msg = "Event not queued") diff --git a/lib/splitclient-rb/engine/api/faraday_middleware/gzip.rb b/lib/splitclient-rb/engine/api/faraday_middleware/gzip.rb index e1e6baf22..fad8471b8 100644 --- a/lib/splitclient-rb/engine/api/faraday_middleware/gzip.rb +++ b/lib/splitclient-rb/engine/api/faraday_middleware/gzip.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'faraday' +require 'stringio' module SplitIoClient module FaradayMiddleware diff --git a/lib/splitclient-rb/engine/api/splits.rb b/lib/splitclient-rb/engine/api/splits.rb index 887adb26d..6ecd1473d 100644 --- a/lib/splitclient-rb/engine/api/splits.rb +++ b/lib/splitclient-rb/engine/api/splits.rb @@ -24,7 +24,7 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till if check_last_proxy_check_timestamp @spec_version = SplitIoClient::Spec::FeatureFlags::SPEC_VERSION - @config.logger.debug("Switching to new Feature flag spec #{@spec_version} and fetching.") + @config.logger.debug("Switching to new Feature flag spec #{@spec_version} and fetching.") if @config.debug_enabled @old_spec_since = since since = -1 since_rbs = -1 @@ -41,7 +41,7 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till params[:sets] = @flag_sets_filter.join(",") unless @flag_sets_filter.empty? params[:till] = fetch_options[:till] unless fetch_options[:till].nil? - @config.logger.debug("Fetching from splitChanges with #{params}: ") + @config.logger.debug("Fetching from splitChanges with #{params}: ") if @config.debug_enabled response = get_api("#{@config.base_uri}/splitChanges", @api_key, params, fetch_options[:cache_control_headers]) if response.status == 414 @config.logger.error("Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.") diff --git a/lib/splitclient-rb/engine/auth_api_client.rb b/lib/splitclient-rb/engine/auth_api_client.rb index 475842a22..f3b7c5c6d 100644 --- a/lib/splitclient-rb/engine/auth_api_client.rb +++ b/lib/splitclient-rb/engine/auth_api_client.rb @@ -21,10 +21,12 @@ def authenticate(api_key) return process_error(response) if response.status >= 400 && response.status < 500 @telemetry_runtime_producer.record_sync_error(Telemetry::Domain::Constants::TOKEN_SYNC, response.status.to_i) - @config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}") + if @config.debug_enabled + @config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}") + end { push_enabled: false, retry: true } rescue StandardError => e - @config.logger.debug("AuthApiClient error: #{e.inspect}.") + @config.logger.debug("AuthApiClient error: #{e.inspect}.") if @config.debug_enabled { push_enabled: false, retry: false } end @@ -51,7 +53,9 @@ def decode_token(token) end def process_error(response) - @config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}") + if @config.debug_enabled + @config.logger.debug("Error connecting to: #{@config.auth_service_url}. Response status: #{response.status}") + end @telemetry_runtime_producer.record_auth_rejections if response.status == 401 { push_enabled: false, retry: false } diff --git a/lib/splitclient-rb/engine/events/events_delivery.rb b/lib/splitclient-rb/engine/events/events_delivery.rb new file mode 100644 index 000000000..dd2a093bf --- /dev/null +++ b/lib/splitclient-rb/engine/events/events_delivery.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Events + class EventsDelivery + def initialize(config) + @config = config + end + + def deliver(sdk_event, event_metadata, event_handler) + event_handler.call(event_metadata) + rescue StandardError => e + @config.logger.error("Exception when calling handler for Sdk Event #{sdk_event}") + @config.log_found_exception(__method__.to_s, e) + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/events/events_manager.rb b/lib/splitclient-rb/engine/events/events_manager.rb new file mode 100644 index 000000000..3cefd4f68 --- /dev/null +++ b/lib/splitclient-rb/engine/events/events_manager.rb @@ -0,0 +1,194 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Events + class EventsManager + def initialize(events_manager_config, events_delivery, config) + @manager_config = events_manager_config + @events_delivery = events_delivery + @active_subscriptions = {} + @internal_events_status = {} + @mutex = Mutex.new + @config = config + end + + def register(sdk_event, event_handler) + return if @active_subscriptions.key?(sdk_event) && !get_event_handler(sdk_event).nil? + + @mutex.synchronize do + # SDK ready already fired + if sdk_event == Engine::Models::SdkEvent::SDK_READY && event_already_triggered(sdk_event) + @active_subscriptions[sdk_event] = Engine::Models::EventActiveSubscriptions.new(true, event_handler) + @config.logger.debug('EventsManager: Firing SDK_READY event for new subscription') if @config.debug_enabled + fire_sdk_event(sdk_event, nil) + return + end + + @config.logger.debug("EventsManager: Register event: #{sdk_event}") if @config.debug_enabled + @active_subscriptions[sdk_event] = Engine::Models::EventActiveSubscriptions.new(false, event_handler) + end + end + + def unregister(sdk_event) + return unless @active_subscriptions.key?(sdk_event) + + @mutex.synchronize do + @active_subscriptions.delete(sdk_event) + end + end + + def notify_internal_event(sdk_internal_event, event_metadata) + @mutex.synchronize do + update_internal_event_status(sdk_internal_event, true) + @manager_config.evaluation_order.each do |sorted_event| + if get_sdk_event_if_applicable(sdk_internal_event).include?(sorted_event) && + !get_event_handler(sorted_event).nil? + fire_sdk_event(sorted_event, event_metadata) + end + + # if client is not subscribed to SDK_READY + if check_if_register_needed(sorted_event) + @config.logger.debug('EventsManager: Registering SDK_READY event as fired') if @config.debug_enabled + @active_subscriptions[Engine::Models::SdkEvent::SDK_READY] = Engine::Models::EventActiveSubscriptions.new(true, nil) + end + end + end + end + + def destroy + @mutex.synchronize do + @active_subscriptions = {} + @internal_events_status = {} + end + end + + private + + def check_if_register_needed(sorted_event) + sorted_event == Engine::Models::SdkEvent::SDK_READY && + get_event_handler(sorted_event).nil? && + !@active_subscriptions.include?(sorted_event) + end + + def fire_sdk_event(sdk_event, event_metadata) + @config.logger.debug("EventsManager: Firing Sdk event: #{sdk_event}") if @config.debug_enabled + @config.threads[:sdk_event_notify] = Thread.new do + @events_delivery.deliver(sdk_event, event_metadata, get_event_handler(sdk_event)) + end + sdk_event_triggered(sdk_event) + end + + def event_already_triggered(sdk_event) + return @active_subscriptions[sdk_event].triggered if @active_subscriptions.key?(sdk_event) + + false + end + + def get_internal_event_status(sdk_internal_event) + return @internal_events_status[sdk_internal_event] if @internal_events_status.key?(sdk_internal_event) + + false + end + + def update_internal_event_status(sdk_internal_event, status) + @internal_events_status[sdk_internal_event] = status + end + + def sdk_event_triggered(sdk_event) + return unless @active_subscriptions.key?(sdk_event) + + return if @active_subscriptions[sdk_event].triggered + + @active_subscriptions[sdk_event].triggered = true + end + + def get_event_handler(sdk_event) + return nil unless @active_subscriptions.key?(sdk_event) + + @active_subscriptions[sdk_event].handler + end + + def get_sdk_event_if_applicable(sdk_internal_event) + final_sdk_event = Engine::Models::ValidSdkEvent.new(nil, false) + + events_to_fire = [] + require_any_sdk_event = check_require_any(sdk_internal_event) + if require_any_sdk_event.valid + if (!event_already_triggered(require_any_sdk_event.sdk_event) && + execution_limit(require_any_sdk_event.sdk_event) == 1) || + execution_limit(require_any_sdk_event.sdk_event) == -1 + final_sdk_event = Engine::Models::ValidSdkEvent.new( + require_any_sdk_event.sdk_event, + check_prerequisites(require_any_sdk_event.sdk_event) && + check_suppressed_by(require_any_sdk_event.sdk_event) + ) + end + events_to_fire.push(final_sdk_event.sdk_event) if final_sdk_event.valid + end + check_require_all.each { |sdk_event| events_to_fire.push(sdk_event) } + + events_to_fire + end + + def check_require_all + events = [] + @manager_config.require_all.each do |require_name, require_value| + final_status = true + require_value.each { |val| final_status &= get_internal_event_status(val) } + events.push(require_name) if check_event_eligible_conditions(final_status, require_name, require_value) + end + + events + end + + def check_event_eligible_conditions(final_status, require_name, require_value) + final_status && + check_prerequisites(require_name) && + ((!event_already_triggered(require_name) && + execution_limit(require_name) == 1) || + execution_limit(require_name) == -1) && + require_value.length.positive? + end + + def check_prerequisites(sdk_event) + @manager_config.prerequisites.each do |name, value| + value.each do |val| + return false if name == sdk_event && !event_already_triggered(val) + end + end + + true + end + + def check_suppressed_by(sdk_event) + @manager_config.suppressed_by.each do |name, value| + value.each do |val| + return false if name == sdk_event && event_already_triggered(val) + end + end + + true + end + + def execution_limit(sdk_event) + return -1 unless @manager_config.execution_limits.key?(sdk_event) + + @manager_config.execution_limits[sdk_event] + end + + def check_require_any(sdk_internal_event) + valid_sdk_event = Engine::Models::ValidSdkEvent.new(nil, false) + @manager_config.require_any.each do |name, val| + if val.include?(sdk_internal_event) + valid_sdk_event = Engine::Models::ValidSdkEvent.new(name, true) + return valid_sdk_event + end + end + + valid_sdk_event + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/events/events_manager_config.rb b/lib/splitclient-rb/engine/events/events_manager_config.rb new file mode 100644 index 000000000..4fe4b2268 --- /dev/null +++ b/lib/splitclient-rb/engine/events/events_manager_config.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Events + class EventsManagerConfig + attr_accessor :require_all, :prerequisites, :require_any, :suppressed_by, :execution_limits, :evaluation_order + + def initialize + @require_all = construct_require_all + @prerequisites = construct_prerequisites + @require_any = construct_require_any + @suppressed_by = construct_suppressed_by + @execution_limits = construct_execution_limits + @evaluation_order = construct_sorted_events + end + + private + + def construct_require_all + { + SplitIoClient::Engine::Models::SdkEvent::SDK_READY => Set.new([SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY]) + } + end + + def construct_prerequisites + { + SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE => Set.new([SplitIoClient::Engine::Models::SdkEvent::SDK_READY]) + } + end + + def construct_require_any + { + SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE => Set.new( + [ + SplitIoClient::Engine::Models::SdkInternalEvent::FLAG_KILLED_NOTIFICATION, + SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, + SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED, + SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED + ] + ) + } + end + + def construct_suppressed_by + {} + end + + def construct_execution_limits + { + SplitIoClient::Engine::Models::SdkEvent::SDK_READY => 1, + SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE => -1 + } + end + + def construct_sorted_events + sorted_events = [] + [SplitIoClient::Engine::Models::SdkEvent::SDK_READY, SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].each do |sdk_event| + sorted_events = dfs_recursive(sdk_event, sorted_events) + end + + sorted_events + end + + def dfs_recursive(sdk_event, added) + return added if added.include?(sdk_event) + + get_dependencies(sdk_event).each do |dependent_event| + added = dfs_recursive(dependent_event, added) + end + + added.push(sdk_event) + + added + end + + def get_dependencies(sdk_event) + dependencies = Set.new + @prerequisites.each do |prerequisites_event_name, prerequisites_event_value| + next unless prerequisites_event_name == sdk_event + + prerequisites_event_value.each do |prereq_event| + dependencies.add(prereq_event) + end + end + + @suppressed_by.each do |suppressed_event_name, suppressed_event_value| + dependencies.add(suppressed_event_name) if suppressed_event_value.include?(sdk_event) + end + + dependencies + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/events/events_task.rb b/lib/splitclient-rb/engine/events/events_task.rb new file mode 100644 index 000000000..f1cdb2452 --- /dev/null +++ b/lib/splitclient-rb/engine/events/events_task.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Events + class EventsTask + attr_accessor :running + + def initialize(notify_internal_events, internal_events_queue, config) + @notify_internal_events = notify_internal_events + @internal_events_queue = internal_events_queue + @config = config + @running = false + end + + def start + return if @running + + @config.logger.info('Starting Internal Events Task.') + @running = true + @config.threads[:internal_events_task] = Thread.new do + worker_thread + end + end + + def stop + return unless @running + + @config.logger.info('Stopping Internal Events Task.') + @running = false + end + + private + + def worker_thread + while (event = @internal_events_queue.pop) + break unless @running + + @config.logger.debug("Processing sdk internal event: #{event.internal_event}") if @config.debug_enabled + begin + @notify_internal_events.call(event.internal_event, event.metadata) + rescue StandardError => e + @config.log_found_exception(__method__.to_s, e) + end + end + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/events/noop_events_queue.rb b/lib/splitclient-rb/engine/events/noop_events_queue.rb new file mode 100644 index 000000000..48989ac4f --- /dev/null +++ b/lib/splitclient-rb/engine/events/noop_events_queue.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Events + class NoOpEventsQueue + def push(sdk_event) + # do nothing + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/models/event_active_subscriptions.rb b/lib/splitclient-rb/engine/models/event_active_subscriptions.rb new file mode 100644 index 000000000..3a0d4909c --- /dev/null +++ b/lib/splitclient-rb/engine/models/event_active_subscriptions.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: false + +module SplitIoClient + module Engine::Models + class EventActiveSubscriptions + attr_accessor :triggered, :handler + + def initialize(triggered, handler) + @triggered = triggered + @handler = handler + end + end + end +end diff --git a/lib/splitclient-rb/engine/models/events_metadata.rb b/lib/splitclient-rb/engine/models/events_metadata.rb new file mode 100644 index 000000000..208c24b55 --- /dev/null +++ b/lib/splitclient-rb/engine/models/events_metadata.rb @@ -0,0 +1,10 @@ +module SplitIoClient::Engine::Models + class EventsMetadata + attr_accessor :type, :names + + def initialize(type, names=nil) + @type = type + @names = names + end + end +end diff --git a/lib/splitclient-rb/engine/models/sdk_event.rb b/lib/splitclient-rb/engine/models/sdk_event.rb new file mode 100644 index 000000000..4d9fa1890 --- /dev/null +++ b/lib/splitclient-rb/engine/models/sdk_event.rb @@ -0,0 +1,4 @@ +class SplitIoClient::Engine::Models::SdkEvent + SDK_READY = 'SDK_READY' + SDK_UPDATE = 'SDK_UPDATE' +end diff --git a/lib/splitclient-rb/engine/models/sdk_event_type.rb b/lib/splitclient-rb/engine/models/sdk_event_type.rb new file mode 100644 index 000000000..a8a8e0fcb --- /dev/null +++ b/lib/splitclient-rb/engine/models/sdk_event_type.rb @@ -0,0 +1,4 @@ +class SplitIoClient::Engine::Models::SdkEventType + FLAG_UPDATE = 'FLAG_UPDATE' + SEGMENTS_UPDATE = 'SEGMENTS_UPDATE' +end diff --git a/lib/splitclient-rb/engine/models/sdk_internal_event.rb b/lib/splitclient-rb/engine/models/sdk_internal_event.rb new file mode 100644 index 000000000..8372a132f --- /dev/null +++ b/lib/splitclient-rb/engine/models/sdk_internal_event.rb @@ -0,0 +1,8 @@ +class SplitIoClient::Engine::Models::SdkInternalEvent + SDK_READY = 'SDK_READY' + FLAGS_UPDATED = 'FLAGS_UPDATED' + FLAG_KILLED_NOTIFICATION = 'FLAG_KILLED_NOTIFICATION' + SEGMENTS_UPDATED = 'SEGMENTS_UPDATED' + RB_SEGMENTS_UPDATED = 'RB_SEGMENTS_UPDATED' + LARGE_SEGMENTS_UPDATED = 'LARGE_SEGMENTS_UPDATED' +end diff --git a/lib/splitclient-rb/engine/models/sdk_internal_event_notification.rb b/lib/splitclient-rb/engine/models/sdk_internal_event_notification.rb new file mode 100644 index 000000000..b88f60d4d --- /dev/null +++ b/lib/splitclient-rb/engine/models/sdk_internal_event_notification.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: false + +module SplitIoClient + module Engine + module Models + class SdkInternalEventNotification + attr_reader :internal_event, :metadata + + def initialize(internal_event, metadata) + @internal_event = internal_event + @metadata = metadata + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/models/valid_sdk_event.rb b/lib/splitclient-rb/engine/models/valid_sdk_event.rb new file mode 100644 index 000000000..0a0cbcd0b --- /dev/null +++ b/lib/splitclient-rb/engine/models/valid_sdk_event.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: false + +module SplitIoClient + module Engine::Models + class ValidSdkEvent + attr_reader :sdk_event, :valid + + def initialize(sdk_event, valid) + @sdk_event = sdk_event + @valid = valid + end + end + end +end diff --git a/lib/splitclient-rb/engine/status_manager.rb b/lib/splitclient-rb/engine/status_manager.rb index c0115ffe0..e5ed19d1f 100644 --- a/lib/splitclient-rb/engine/status_manager.rb +++ b/lib/splitclient-rb/engine/status_manager.rb @@ -3,9 +3,10 @@ module SplitIoClient module Engine class StatusManager - def initialize(config) + def initialize(config, internal_events_queue) @config = config @sdk_ready = Concurrent::CountDownLatch.new(1) + @internal_events_queue = internal_events_queue end def ready? @@ -19,6 +20,11 @@ def ready! @sdk_ready.count_down @config.logger.info('SplitIO SDK is ready') + @internal_events_queue.push( + SplitIoClient::Engine::Models::SdkInternalEventNotification.new( + SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil + ) + ) end def wait_until_ready(seconds = nil) diff --git a/lib/splitclient-rb/engine/sync_manager.rb b/lib/splitclient-rb/engine/sync_manager.rb index d1f35c662..8bb78b4bd 100644 --- a/lib/splitclient-rb/engine/sync_manager.rb +++ b/lib/splitclient-rb/engine/sync_manager.rb @@ -47,13 +47,13 @@ def start_thread connected = false if @config.streaming_enabled - @config.logger.debug('Starting Streaming mode ...') + @config.logger.debug('Starting Streaming mode ...') if @config.debug_enabled start_push_status_monitor connected = @push_manager.start_sse end unless connected - @config.logger.debug('Starting Polling mode ...') + @config.logger.debug('Starting Polling mode ...') if @config.debug_enabled @synchronizer.start_periodic_fetch record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING) end @@ -92,7 +92,7 @@ def process_push_shutdown def process_connected if @sse_connected.value - @config.logger.debug('Streaming already connected.') + @config.logger.debug('Streaming already connected.') if @config.debug_enabled return end @@ -107,7 +107,7 @@ def process_connected def process_forced_stop unless @sse_connected.value - @config.logger.debug('Streaming already disconnected.') + @config.logger.debug('Streaming already disconnected.') if @config.debug_enabled return end @@ -120,7 +120,7 @@ def process_forced_stop def process_disconnect(reconnect) unless @sse_connected.value - @config.logger.debug('Streaming already disconnected.') + @config.logger.debug('Streaming already disconnected.') if @config.debug_enabled return end @@ -169,12 +169,16 @@ def incoming_push_status_handler when Constants::PUSH_SUBSYSTEM_OFF process_push_shutdown else - @config.logger.debug('Incorrect push status type.') + log_if_debug('Incorrect push status type.') end end rescue StandardError => e @config.logger.error("Push status handler error: #{e.inspect}") end end + + def log_if_debug(msg) + @config.logger.debug(msg) if @config.debug_enabled + end end end diff --git a/lib/splitclient-rb/split_factory.rb b/lib/splitclient-rb/split_factory.rb index ca8fb1bb4..5500d0433 100644 --- a/lib/splitclient-rb/split_factory.rb +++ b/lib/splitclient-rb/split_factory.rb @@ -45,6 +45,7 @@ def initialize(api_key, config_hash = {}) register_factory + build_events_manager build_telemetry_components build_flag_sets_filter build_repositories @@ -53,13 +54,13 @@ def initialize(api_key, config_hash = {}) build_unique_keys_tracker build_impressions_components - @status_manager = Engine::StatusManager.new(@config) + @status_manager = Engine::StatusManager.new(@config, @internal_events_queue) @split_validator = SplitIoClient::Validators.new(@config) @evaluator = Engine::Parser::Evaluator.new(@segments_repository, @splits_repository, @rule_based_segment_repository, @config) start! fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(@config.fallback_treatments_configuration) - @client = SplitClient.new(@api_key, repositories, @status_manager, @config, @impressions_manager, @evaluation_producer, @evaluator, @split_validator, fallback_treatment_calculator) + @client = SplitClient.new(@api_key, repositories, @status_manager, @config, @impressions_manager, @evaluation_producer, @evaluator, @split_validator, fallback_treatment_calculator, @events_manager) @manager = SplitManager.new(@splits_repository, @status_manager, @config) end @@ -219,9 +220,9 @@ def build_repositories else @flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new(@config.flag_sets_filter) end - @splits_repository = SplitsRepository.new(@config, @flag_sets_repository, @flag_sets_filter) - @segments_repository = SegmentsRepository.new(@config) - @rule_based_segment_repository = RuleBasedSegmentsRepository.new(@config) + @splits_repository = SplitsRepository.new(@config, @flag_sets_repository, @flag_sets_filter, @internal_events_queue) + @segments_repository = SegmentsRepository.new(@config, @internal_events_queue) + @rule_based_segment_repository = RuleBasedSegmentsRepository.new(@config, @internal_events_queue) @impressions_repository = ImpressionsRepository.new(@config) @events_repository = EventsRepository.new(@config, @api_key, @runtime_producer) end @@ -265,5 +266,19 @@ def build_impressions_components def build_flag_sets_filter @flag_sets_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new(@config.flag_sets_filter) end + + def build_events_manager + @events_manager = Engine::Events::EventsManager.new(Engine::Events::EventsManagerConfig.new, + Engine::Events::EventsDelivery.new(@config), + @config) + if @config.consumer? + @internal_events_queue = Engine::Events::NoOpEventsQueue.new + return + end + + @internal_events_queue = Queue.new + @events_task = Engine::Events::EventsTask.new(@events_manager.method(:notify_internal_event), @internal_events_queue, @config) + @events_task.start + end end end diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index bb07d76a4..5190ad6e4 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -38,23 +38,23 @@ def initialize(config, def close(status = nil) unless connected? - @config.logger.debug('SSEClient already disconected.') + @config.logger.debug('SSEClient already disconected.') if @config.debug_enabled return end - @config.logger.debug("Closing SSEClient socket") + @config.logger.debug("Closing SSEClient socket") if @config.debug_enabled push_status(status) @connected.make_false @socket.sync_close = true if @socket.is_a? OpenSSL::SSL::SSLSocket @socket.close - @config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket + @config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket && @config.debug_enabled rescue StandardError => e @config.logger.error("SSEClient close Error: #{e.inspect}") end def start(url) if connected? - @config.logger.debug('SSEClient already running.') + @config.logger.debug('SSEClient already running.') if @config.debug_enabled return true end @@ -96,18 +96,17 @@ def connect_stream(latch) raise 'eof exception' if partial_data == :eof rescue IO::WaitReadable => e - @config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}") + @config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}") if @config.debug_enabled IO.select([@socket], nil, nil, @read_timeout) retry rescue Errno::EAGAIN => e - @config.logger.debug("SSE client transient error: #{e.inspect}") + @config.logger.debug("SSE client transient error: #{e.inspect}") if @config.debug_enabled IO.select([@socket], nil, nil, @read_timeout) retry rescue Errno::ETIMEDOUT => e @config.logger.error("SSE read operation timed out!: #{e.inspect}") return Constants::PUSH_RETRYABLE_ERROR rescue EOFError => e - puts "SSE read operation EOF Exception!: #{e.inspect}" @config.logger.error("SSE read operation EOF Exception!: #{e.inspect}") raise 'eof exception' rescue Errno::EBADF, IOError => e @@ -125,12 +124,12 @@ def connect_stream(latch) return Constants::PUSH_RETRYABLE_ERROR end rescue Errno::EBADF - @config.logger.debug("SSE socket is not connected (Errno::EBADF)") + @config.logger.debug("SSE socket is not connected (Errno::EBADF)") if @config.debug_enabled break rescue RuntimeError raise 'eof exception' rescue Exception => e - @config.logger.debug("SSE socket is not connected: #{e.inspect}") + @config.logger.debug("SSE socket is not connected: #{e.inspect}") if @config.debug_enabled break end @@ -156,7 +155,7 @@ def read_first_event(data, latch) return unless @first_event.value response_code = @event_parser.first_event(data) - @config.logger.debug("SSE client first event code: #{response_code}") + @config.logger.debug("SSE client first event code: #{response_code}") if @config.debug_enabled error_event = false events = @event_parser.parse(data) @@ -165,7 +164,7 @@ def read_first_event(data, latch) if response_code == OK_CODE && !error_event @connected.make_true - @config.logger.debug("SSE client first event Connected is true") + @config.logger.debug("SSE client first event Connected is true") if @config.debug_enabled @telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil) push_status(Constants::PUSH_CONNECTED) end @@ -202,7 +201,7 @@ def socket_connect end def process_data(partial_data) - @config.logger.debug("Event partial data: #{partial_data}") + @config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE events = @event_parser.parse(partial_data) @@ -220,7 +219,7 @@ def build_request(uri) req << "SplitSDKMachineName: #{@config.machine_name}\r\n" req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil? req << "Cache-Control: no-cache\r\n\r\n" - @config.logger.debug("Request info: #{req}") + @config.logger.debug("Request info: #{req}") if @config.debug_enabled req end @@ -255,7 +254,7 @@ def dispatch_event(event) def push_status(status) return if status.nil? - @config.logger.debug("Pushing new sse status: #{status}") + @config.logger.debug("Pushing new sse status: #{status}") if @config.debug_enabled @status_queue.push(status) end end diff --git a/lib/splitclient-rb/sse/event_source/event_parser.rb b/lib/splitclient-rb/sse/event_source/event_parser.rb index 566cb5c24..8983c9d7d 100644 --- a/lib/splitclient-rb/sse/event_source/event_parser.rb +++ b/lib/splitclient-rb/sse/event_source/event_parser.rb @@ -29,14 +29,14 @@ def parse(raw_event) events rescue StandardError => e - @config.logger.debug("Error during parsing a event: #{e.inspect}") + @config.logger.debug("Error during parsing a event: #{e.inspect}") if @config.debug_enabled [] end def first_event(raw_data) raw_data.split("\n")[0].split(' ')[1].to_i rescue StandardError => e - @config.logger.debug("Error parsing first event: #{e.inspect}") + @config.logger.error("Error parsing first event: #{e.inspect}") BAD_REQUEST_CODE end diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index 33ff2b961..34b245818 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -42,12 +42,12 @@ def process_event_control(type) @telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::STREAMING_STATUS, DISABLED) push_status(Constants::PUSH_SUBSYSTEM_OFF) else - @config.logger.error("Incorrect event type: #{incoming_notification}") + @config.logger.error("Incorrect event type: #{incoming_notification}") if @config.debug_enabled end end def process_event_occupancy(channel, publishers) - @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") + @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") if @config.debug_enabled update_publishers(channel, publishers) @@ -76,7 +76,7 @@ def are_publishers_available? end def push_status(status) - @config.logger.debug("Pushing occupancy status: #{status}") + @config.logger.debug("Pushing occupancy status: #{status}") if @config.debug_enabled @status_queue.push(status) end end diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index a241c72d0..828e7c2d2 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -14,13 +14,13 @@ def initialize(synchronizer, config, segments_repository) def add_to_queue(change_number, segment_name) item = { change_number: change_number, segment_name: segment_name } - @config.logger.debug("SegmentsWorker add to queue #{item}") + @config.logger.debug("SegmentsWorker add to queue #{item}") if @config.debug_enabled @queue.push(item) end def start if @running.value - @config.logger.debug('segments worker already running.') + @config.logger.debug('segments worker already running.') if @config.debug_enabled return end @@ -30,7 +30,7 @@ def start def stop unless @running.value - @config.logger.debug('segments worker not running.') + @config.logger.debug('segments worker not running.') if @config.debug_enabled return end @@ -44,7 +44,7 @@ def perform while (item = @queue.pop) segment_name = item[:segment_name] cn = item[:change_number] - @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") + @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") if @config.debug_enabled @synchronizer.fetch_segment(segment_name, cn) end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index dc15eb2c6..7fffc1f06 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -18,7 +18,7 @@ def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime def start if @running.value - @config.logger.debug('feature_flags_worker already running.') + @config.logger.debug('feature_flags_worker already running.') if @config.debug_enabled return end @@ -28,7 +28,7 @@ def start def stop unless @running.value - @config.logger.debug('feature_flags_worker not running.') + @config.logger.debug('feature_flags_worker not running.') if @config.debug_enabled return end @@ -37,7 +37,7 @@ def stop end def add_to_queue(notification) - @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}") + @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}") if @config.debug_enabled @queue.push(notification) end @@ -52,7 +52,9 @@ def perform_thread def perform while (notification = @queue.pop) - @config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}") + if @config.debug_enabled + @config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}") + end case notification.data['type'] when SSE::EventSource::EventTypes::SPLIT_UPDATE success = update_feature_flag(notification) @@ -117,7 +119,9 @@ def update_rule_based_segment(notification) def kill_feature_flag(notification) return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber'] - @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}") + if @config.debug_enabled + @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}") + end @feature_flags_repository.kill(notification.data['changeNumber'], notification.data['splitName'], notification.data['defaultTreatment']) diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index 82ab65832..5b66f95fc 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '8.10.1' + VERSION = '8.11.0' end diff --git a/spec/allocations/splitclient-rb/clients/split_client_spec.rb b/spec/allocations/splitclient-rb/clients/split_client_spec.rb index 40ab2543d..5ceeeacfc 100644 --- a/spec/allocations/splitclient-rb/clients/split_client_spec.rb +++ b/spec/allocations/splitclient-rb/clients/split_client_spec.rb @@ -4,13 +4,16 @@ describe SplitIoClient::SplitClient do let(:config) { SplitIoClient::SplitConfig.new(impressions_queue_size: 10) } - + let(:events_queue) { Queue.new } + let(:events_manager) { SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } let(:evaluation_producer) { SplitIoClient::Telemetry::EvaluationProducer.new(config) } let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new } @@ -42,7 +45,7 @@ unique_keys_tracker) end let(:client) do - SplitIoClient::SplitClient.new('', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => nil}, nil, config, impressions_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + SplitIoClient::SplitClient.new('', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => nil}, nil, config, impressions_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) end context 'control' do diff --git a/spec/cache/fetchers/segment_fetch_spec.rb b/spec/cache/fetchers/segment_fetch_spec.rb index 6280866ba..8ee1bcddc 100644 --- a/spec/cache/fetchers/segment_fetch_spec.rb +++ b/spec/cache/fetchers/segment_fetch_spec.rb @@ -31,12 +31,13 @@ end context 'memory adapter' do + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:segment_fetcher) { described_class.new(segments_repository, '', config, telemetry_runtime_producer) } let(:split_fetcher) do @@ -67,13 +68,14 @@ before do Redis.new.flushall end + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(cache_adapter: :redis) } let(:adapter) { SplitIoClient::Cache::Adapters::RedisAdapter.new(config.redis_url) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::RedisFlagSetsRepository.new(config) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:segment_fetcher) { described_class.new(segments_repository, '', config, telemetry_runtime_producer) } let(:split_fetcher) do diff --git a/spec/cache/fetchers/split_fetch_spec.rb b/spec/cache/fetchers/split_fetch_spec.rb index abe65d010..39e086c5d 100644 --- a/spec/cache/fetchers/split_fetch_spec.rb +++ b/spec/cache/fetchers/split_fetch_spec.rb @@ -23,10 +23,11 @@ cache_adapter: :memory ) end + let(:events_queue) { Queue.new } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:store) { described_class.new(splits_repository, rule_based_segments_repository, '', config, telemetry_runtime_producer) } @@ -77,10 +78,11 @@ flag_sets_filter: ['set_2'] ) end + let(:events_queue) { Queue.new } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new(['set_2']) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new(['set_2']) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:store) { described_class.new(splits_repository, rule_based_segments_repository, '', config, telemetry_runtime_producer) } @@ -131,10 +133,11 @@ cache_adapter: :redis ) end + let(:events_queue) { Queue.new } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::RedisFlagSetsRepository.new(config) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:store) { described_class.new(splits_repository, rule_based_segments_repository, '', config, telemetry_runtime_producer) } diff --git a/spec/cache/repositories/rule_based_segments_repository_spec.rb b/spec/cache/repositories/rule_based_segments_repository_spec.rb index 3fdd89ed6..f583105e2 100644 --- a/spec/cache/repositories/rule_based_segments_repository_spec.rb +++ b/spec/cache/repositories/rule_based_segments_repository_spec.rb @@ -6,7 +6,8 @@ describe SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository do RSpec.shared_examples 'RuleBasedSegments Repository' do |cache_adapter| let(:config) { SplitIoClient::SplitConfig.new(cache_adapter: cache_adapter) } - let(:repository) { described_class.new(config) } + let(:queue) {Queue.new} + let(:repository) { described_class.new(config, queue) } before :all do redis = Redis.new @@ -102,6 +103,33 @@ repository.update([rule_based_segment2], [], -1) expect(repository.get_rule_based_segment('corge2')[:conditions]).to eq SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository::DEFAULT_CONDITIONS_TEMPLATE end + + it 'push to internal event queue' do + queue.clear() + + repository.update([{name: 'flag1', trafficTypeName: 'tt_name_1', conditions: []}, + {name: 'flag2', trafficTypeName: 'tt_name_1', conditions: []}], [], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE) + expect(event.metadata.names).to eq([]) + + repository.update([{name: 'flag1', trafficTypeName: 'tt_name_1', killed: false, default_treatment: 'on', changeNumber: 1, conditions: []}], + [{name: 'flag2', trafficTypeName: 'tt_name_1', conditions: []}], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE) + expect(event.metadata.names).to eq([]) + + repository.update([], [], 123) + expect(queue.empty?).to eq(true) + + repository.update([], [{name: 'flag1', trafficTypeName: 'tt_name_1', killed: false, default_treatment: 'on', changeNumber: 1, conditions: []}], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE) + expect(event.metadata.names).to eq([]) + end end describe 'with Memory Adapter' do diff --git a/spec/cache/repositories/segments_repository_spec.rb b/spec/cache/repositories/segments_repository_spec.rb index cd2dd021e..3cfac1aec 100644 --- a/spec/cache/repositories/segments_repository_spec.rb +++ b/spec/cache/repositories/segments_repository_spec.rb @@ -4,10 +4,11 @@ describe SplitIoClient::Cache::Repositories::SegmentsRepository do context 'memory adapter' do - let(:repository) { described_class.new(@default_config) } + let(:queue) {Queue.new} + let(:repository) { described_class.new(@default_config, queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:split_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(@default_config, flag_sets_repository, flag_set_filter) } + let(:split_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(@default_config, flag_sets_repository, flag_set_filter, queue) } it 'removes keys' do repository.add_to_segment(name: 'foo', added: [1, 2, 3], removed: []) @@ -27,10 +28,21 @@ expect(repository.segments_count).to be(3) expect(repository.segment_keys_count).to be(7) end + + it 'push to internal event queue' do + queue.clear() + + repository.add_to_segment(name: 'foo', added: [1, 2, 3], removed: []) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE) + expect(event.metadata.names).to eq([]) + end end context 'redis adapter' do - let(:repository) { described_class.new(SplitIoClient::SplitConfig.new(cache_adapter: :redis)) } + let(:queue) {Queue.new} + let(:repository) { described_class.new(SplitIoClient::SplitConfig.new(cache_adapter: :redis), queue) } it 'removes keys' do repository.add_to_segment(name: 'foo', added: [1, 2, 3], removed: []) diff --git a/spec/cache/repositories/splits_repository_spec.rb b/spec/cache/repositories/splits_repository_spec.rb index b55bfd115..ab18b84d8 100644 --- a/spec/cache/repositories/splits_repository_spec.rb +++ b/spec/cache/repositories/splits_repository_spec.rb @@ -8,7 +8,8 @@ let(:config) { SplitIoClient::SplitConfig.new(cache_adapter: cache_adapter) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::RedisFlagSetsRepository.new(config)} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:repository) { described_class.new(config, flag_sets_repository, flag_set_filter) } + let(:queue) {Queue.new} + let(:repository) { described_class.new(config, flag_sets_repository, flag_set_filter, queue) } before :all do redis = Redis.new @@ -172,6 +173,39 @@ repository.update([split2], [], -1) expect(repository.get_split('corge2')[:conditions]).to eq SplitIoClient::Cache::Repositories::SplitsRepository::DEFAULT_CONDITIONS_TEMPLATE end + + it 'push to internal event queue' do + queue.clear() + + repository.update([{name: 'flag1', trafficTypeName: 'tt_name_1', conditions: []}, + {name: 'flag2', trafficTypeName: 'tt_name_1', conditions: []}], [], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + expect(event.metadata.names).to eq(['flag1', 'flag2']) + + repository.update([{name: 'flag1', trafficTypeName: 'tt_name_1', killed: false, default_treatment: 'on', changeNumber: 1, conditions: []}], + [{name: 'flag2', trafficTypeName: 'tt_name_1', conditions: []}], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + expect(event.metadata.names).to eq(['flag1', 'flag2']) + + repository.update([], [], 123) + expect(queue.empty?).to eq(true) + + repository.kill(123, 'flag1', 'off') + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::FLAG_KILLED_NOTIFICATION) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + expect(event.metadata.names).to eq(['flag1']) + + repository.update([], [{name: 'flag1', trafficTypeName: 'tt_name_1', killed: false, default_treatment: 'on', changeNumber: 1, conditions: []}], -1) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED) + expect(event.metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + expect(event.metadata.names).to eq(['flag1']) + end end describe 'with Memory Adapter' do diff --git a/spec/cache/stores/localhost_split_store_spec.rb b/spec/cache/stores/localhost_split_store_spec.rb index 73dbb90cf..2458c4cf5 100644 --- a/spec/cache/stores/localhost_split_store_spec.rb +++ b/spec/cache/stores/localhost_split_store_spec.rb @@ -3,11 +3,13 @@ require 'spec_helper' describe SplitIoClient::Cache::Stores::LocalhostSplitStore do + let(:log) { StringIO.new } + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:split_file) do ['local_feature local_treatment'] diff --git a/spec/engine/api/segments_spec.rb b/spec/engine/api/segments_spec.rb index 755a2890f..88891cd78 100644 --- a/spec/engine/api/segments_spec.rb +++ b/spec/engine/api/segments_spec.rb @@ -11,12 +11,13 @@ ) end let(:log) { StringIO.new } + let(:events_queue) { Queue.new } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:segments_api) { described_class.new('', segments_repository, config, telemetry_runtime_producer) } let(:adapter) do SplitIoClient::Cache::Adapters::MemoryAdapter.new(SplitIoClient::Cache::Adapters::MemoryAdapters::MapAdapter.new) end - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:segments) do File.read(File.expand_path(File.join(File.dirname(__FILE__), '../../test_data/segments/segments.json'))) end diff --git a/spec/engine/events/events_delivery_spec.rb b/spec/engine/events/events_delivery_spec.rb new file mode 100644 index 000000000..e851d20d2 --- /dev/null +++ b/spec/engine/events/events_delivery_spec.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe SplitIoClient::Engine::Events::EventsDelivery do + subject { SplitIoClient::Engine::Events::EventsDelivery } + + it 'calls handler successfully' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + delivery = subject.new(config) + + delivery.deliver( + SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, + SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE), + method(:call_back) + ) + sleep 0.5 + expect(@metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + end + + it 'handles exception when calling handler' do + log = StringIO.new + config = SplitIoClient::SplitConfig.new(logger: Logger.new(log)) + delivery = subject.new(config) + + delivery.deliver( + SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, + SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE), + method(:call_with_exception) + ) + sleep 0.5 + expect(log.string).to include('Exception when calling handler for Sdk Event') + end + + it 'logs the sdk event name when handler raises exception' do + log = StringIO.new + config = SplitIoClient::SplitConfig.new(logger: Logger.new(log)) + delivery = subject.new(config) + + delivery.deliver( + SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, + nil, + method(:call_with_exception) + ) + sleep 0.5 + expect(log.string).to include('Exception when calling handler for Sdk Event') + expect(log.string).to include(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY.to_s) + end + + it 'calls handler with correct metadata' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + delivery = subject.new(config) + metadata = SplitIoClient::Engine::Models::EventsMetadata.new( + SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, + ['feature1', 'feature2'] + ) + + delivery.deliver( + SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, + metadata, + method(:call_back) + ) + sleep 0.5 + expect(@metadata).to eq(metadata) + end + + def call_back(metadata) + @metadata = metadata + end + + def call_with_exception(_metadata) + raise StandardError, 'call exception' + end +end diff --git a/spec/engine/events/events_manager_config_spec.rb b/spec/engine/events/events_manager_config_spec.rb new file mode 100644 index 000000000..46c0a5c31 --- /dev/null +++ b/spec/engine/events/events_manager_config_spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe SplitIoClient::Engine::Events::EventsManagerConfig do + subject { SplitIoClient::Engine::Events::EventsManagerConfig } + + it 'test_build_instance' do + config = subject.new + + expect(config.require_all[SplitIoClient::Engine::Models::SdkEvent::SDK_READY].length).to eq(1) + expect(config.require_all[SplitIoClient::Engine::Models::SdkEvent::SDK_READY].include?(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY)).to eq(true) + + expect(config.prerequisites[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].include?(SplitIoClient::Engine::Models::SdkEvent::SDK_READY)).to eq(true) + + expect(config.execution_limits[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE]).to eq(-1) + expect(config.execution_limits[SplitIoClient::Engine::Models::SdkEvent::SDK_READY]).to eq(1) + + expect(config.require_any[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].length).to eq(4) + expect(config.require_any[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].include?(SplitIoClient::Engine::Models::SdkInternalEvent::FLAG_KILLED_NOTIFICATION)).to be(true) + expect(config.require_any[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].include?(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED)).to be(true) + expect(config.require_any[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].include?(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED)).to be(true) + expect(config.require_any[SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE].include?(SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED)).to be(true) + + order = 0 + expect(config.evaluation_order.length).to eq(2) + config.evaluation_order.each do |sdk_event| + order += 1 + if order == 1 + expect(sdk_event).to eq(SplitIoClient::Engine::Models::SdkEvent::SDK_READY) + end + if order == 2 + expect(sdk_event).to eq(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE) + end + end + end +end diff --git a/spec/engine/events/events_manager_spec.rb b/spec/engine/events/events_manager_spec.rb new file mode 100644 index 000000000..0ba9c43d0 --- /dev/null +++ b/spec/engine/events/events_manager_spec.rb @@ -0,0 +1,159 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe SplitIoClient::Engine::Events::EventsManager do + subject { SplitIoClient::Engine::Events::EventsManager } + let(:metadata) { nil } + let(:sdk_ready) { false } + let(:sdk_update) { false } + let(:first_event) { nil } + + + it 'test_firing_events' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + manager = subject.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_READY, method(:ready_call_back)) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) + meta = SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, ["feature1"]) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil) + sleep 0.5 + expect(@metadata).to be(nil) + expect(@sdk_ready).to be(true) + expect(@sdk_update).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAG_KILLED_NOTIFICATION, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SEGMENTS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + end + + it 'events fire only after register' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + manager = subject.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + meta = SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, ["feature1"]) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil) + sleep 0.5 + expect(@metadata).to be(nil) + expect(@sdk_ready).to be(false) + expect(@sdk_update).to be(false) + + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_READY, method(:ready_call_back)) + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil) + sleep 0.5 + expect(@metadata).to be(nil) + expect(@sdk_ready).to be(true) + expect(@sdk_update).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(nil) + expect(@sdk_update).to be(false) + expect(@sdk_ready).to be(false) + + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + end + + it 'update fires only after ready events' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + manager = subject.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_READY, method(:ready_call_back)) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) + meta = SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, ["feature1"]) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(nil) + expect(@sdk_update).to be(false) + expect(@sdk_ready).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil) + sleep 0.5 + expect(@metadata).to be(nil) + expect(@sdk_ready).to be(true) + expect(@sdk_update).to be(false) + + reset_flags + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@metadata).to eq(meta) + expect(@sdk_update).to be(true) + expect(@sdk_ready).to be(false) + end + + it 'event ordered correctly' do + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + manager = subject.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_READY, method(:ready_call_back)) + manager.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) + meta = SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE, ["feature1"]) + + reset_flags + @first_event = nil + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY, nil) + manager.notify_internal_event(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, meta) + sleep 0.5 + expect(@first_event).to be("ready") + end + + def ready_call_back(metadata) + @sdk_ready = true + @metadata = metadata + @first_event = "ready" if @first_event.nil? + end + + def update_call_back(metadata) + @sdk_update = true + @metadata = metadata + @first_event = "update" if @first_event.nil? + end + + def reset_flags + @sdk_ready = false + @sdk_update = false + @metadata = nil + end +end diff --git a/spec/engine/events/events_task_spec.rb b/spec/engine/events/events_task_spec.rb new file mode 100644 index 000000000..b9a20efe2 --- /dev/null +++ b/spec/engine/events/events_task_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe SplitIoClient::Engine::Events::EventsTask do + subject { SplitIoClient::Engine::Events::EventsTask } + let(:internal_event) { nil } + let(:metadata) { nil } + + it 'test_task_running' do + queue = Queue.new + config = SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) + task = subject.new(method(:call_back), queue, config) + task.start + expect(task.running).to be(true) + + queue.push(SplitIoClient::Engine::Models::SdkInternalEventNotification.new(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED, SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE))) + sleep 0.5 + expect(@internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::FLAGS_UPDATED) + expect(@metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE) + + @internal_event = nil + @metadata = nil + queue.push(SplitIoClient::Engine::Models::SdkInternalEventNotification.new(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED, SplitIoClient::Engine::Models::EventsMetadata.new(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE))) + sleep 0.5 + expect(@internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::RB_SEGMENTS_UPDATED) + expect(@metadata.type).to be(SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE) + + task.stop + sleep 0.2 + expect(task.running).to be(false) + + end + + def call_back(internal_event, metadata) + @internal_event = internal_event + @metadata = metadata + end + +end diff --git a/spec/engine/matchers/rule_based_segment_matcher_spec.rb b/spec/engine/matchers/rule_based_segment_matcher_spec.rb index 6034dfc32..4ff2a255f 100644 --- a/spec/engine/matchers/rule_based_segment_matcher_spec.rb +++ b/spec/engine/matchers/rule_based_segment_matcher_spec.rb @@ -3,11 +3,12 @@ require 'spec_helper' describe SplitIoClient::RuleBasedSegmentMatcher do + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(debug_enabled: true) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } context '#string_type' do it 'is not string type matcher' do @@ -17,14 +18,14 @@ context 'test_matcher' do it 'return false if excluded key is passed' do - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) rbs_repositoy.update([{name: 'foo', trafficTypeName: 'tt_name_1', conditions: [], excluded: {keys: ['key1'], segments: []}}], [], -1) matcher = described_class.new(segments_repository, rbs_repositoy, 'foo', config) expect(matcher.match?(value: 'key1')).to be false end it 'return false if excluded segment is passed' do - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rbs_repositoy, true) segments_repository.add_to_segment({:name => 'segment1', :added => [], :removed => []}) rbs_repositoy.update([{:name => 'foo', :trafficTypeName => 'tt_name_1', :conditions => [], :excluded => {:keys => ['key1'], :segments => [{:name => 'segment1', :type => 'standard'}]}}], [], -1) @@ -33,7 +34,7 @@ end it 'return false if excluded rb segment is matched' do - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) rbs = {:name => 'sample_rule_based_segment', :trafficTypeName => 'tt_name_1', :conditions => [{ :matcherGroup => { :combiner => "AND", @@ -106,7 +107,7 @@ }] } - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) rbs_repositoy.update([rule_based_segment], [], -1) matcher = described_class.new(segments_repository, rbs_repositoy, 'corge', config) expect(matcher.match?({:matching_key => 'user', :attributes => {}})).to be false @@ -114,7 +115,7 @@ end it 'return true if dependent rb segment matches' do - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) rbs = { :changeNumber => 5, :name => "dependent_rbs", @@ -180,7 +181,7 @@ end it 'return true if has multiple conditions' do - rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rbs_repositoy = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) rbs = { :name => 'sample_rule_based_segment', :trafficTypeName => 'tt_name_1', diff --git a/spec/engine/parser/evaluator_spec.rb b/spec/engine/parser/evaluator_spec.rb index a6beabe98..fbf57acf0 100644 --- a/spec/engine/parser/evaluator_spec.rb +++ b/spec/engine/parser/evaluator_spec.rb @@ -3,11 +3,12 @@ require 'spec_helper' describe SplitIoClient::Engine::Parser::Evaluator do - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(@default_config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(@default_config) } + let(:events_queue) { Queue.new } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(@default_config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(@default_config, events_queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(@default_config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(@default_config, flag_sets_repository, flag_set_filter, events_queue) } let(:evaluator) { described_class.new(segments_repository, splits_repository, rule_based_segments_repository, true) } let(:killed_split) { { killed: true, defaultTreatment: 'default' } } diff --git a/spec/engine/push_manager_spec.rb b/spec/engine/push_manager_spec.rb index 4cc3f9dbe..bd3a3540f 100644 --- a/spec/engine/push_manager_spec.rb +++ b/spec/engine/push_manager_spec.rb @@ -6,15 +6,16 @@ describe SplitIoClient::Engine::PushManager do subject { SplitIoClient::Engine::PushManager } + let(:events_queue) { Queue.new } let(:body_response) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/auth_body_response.json')) } let(:api_key) { 'PushManager-key' } let(:log) { StringIO.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, rule_based_segments_repository, api_key, config, runtime_producer) } let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, runtime_producer) } diff --git a/spec/engine/status_manager_spec.rb b/spec/engine/status_manager_spec.rb index 3805a21a7..c7a6a62d0 100644 --- a/spec/engine/status_manager_spec.rb +++ b/spec/engine/status_manager_spec.rb @@ -6,15 +6,16 @@ subject { SplitIoClient::Engine::StatusManager } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new)) } + let(:queue) {Queue.new} it 'check if sdk is ready - should return false' do - status_manager = subject.new(config) + status_manager = subject.new(config, queue) expect(status_manager.ready?).to eq(false) end it 'check if sdk is ready - should return true' do - status_manager = subject.new(config) + status_manager = subject.new(config, queue) expect(status_manager.ready?).to eq(false) @@ -23,11 +24,23 @@ end it 'wait until ready - should return false' do - status_manager = subject.new(config) + status_manager = subject.new(config, queue) expect { status_manager.wait_until_ready(0.5) }.to raise_error(SplitIoClient::SplitIoError, 'SDK start up timeout expired') status_manager.ready! expect { status_manager.wait_until_ready(0) }.not_to raise_error end + + it 'check if sdk is ready - should fire ready event' do + status_manager = subject.new(config, queue) + + expect(status_manager.ready?).to eq(false) + + status_manager.ready! + expect(status_manager.ready?).to eq(true) + event = queue.pop + expect(event.internal_event).to be(SplitIoClient::Engine::Models::SdkInternalEvent::SDK_READY) + expect(event.metadata).to be(nil) + end end diff --git a/spec/engine/sync_manager_spec.rb b/spec/engine/sync_manager_spec.rb index 68169a7cc..9bbcc13ca 100644 --- a/spec/engine/sync_manager_spec.rb +++ b/spec/engine/sync_manager_spec.rb @@ -7,7 +7,7 @@ subject { SplitIoClient::Engine::SyncManager } let(:event_control) { "d4\r\nid: 123\nevent: message\ndata: {\"id\":\"1\",\"clientId\":\"emptyClientId\",\"timestamp\": 1582056812285,\"encoding\": \"json\",\"channel\": \"control_pri\",\"data\":\"{\\\"type\\\" : \\\"CONTROL\\\",\\\"controlType\\\":\\\"STREAMING_DISABLED\\\"}\"}\n\n\r\n" } - + let(:events_queue) { Queue.new } let(:splits) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/splits.json')) } let(:segment1) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment1.json')) } let(:segment2) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment2.json')) } @@ -18,9 +18,9 @@ let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log), streaming_enabled: true) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:events_repository) { SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, telemetry_runtime_producer) } @@ -51,7 +51,7 @@ let(:telemetry_consumers) { { init: init_consumer, runtime: runtime_consumer, evaluation: evaluation_consumer } } let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) } let(:telemetry_synchronizer) { SplitIoClient::Telemetry::Synchronizer.new(config, telemetry_consumers, init_producer, repositories, telemetry_api, 0, 0) } - let(:status_manager) { SplitIoClient::Engine::StatusManager.new(config) } + let(:status_manager) { SplitIoClient::Engine::StatusManager.new(config, events_queue) } let(:splits_worker) { SplitIoClient::SSE::Workers::SplitsWorker.new(synchronizer, config, splits_repository, telemetry_runtime_producer, sync_params[:segment_fetcher], rule_based_segments_repository) } let(:segments_worker) { SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, config, segments_repository) } let(:notification_processor) { SplitIoClient::SSE::NotificationProcessor.new(config, splits_worker, segments_worker) } diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index cef2020d7..c00a3e904 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -4,7 +4,7 @@ describe SplitIoClient::Engine::Synchronizer do subject { SplitIoClient::Engine::Synchronizer } - + let(:events_queue) { Queue.new } let(:splits) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/splits.json')) } let(:segment1) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment1.json')) } let(:segment2) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment2.json')) } @@ -16,9 +16,9 @@ runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) repositories = { splits: splits_repository, diff --git a/spec/integrations/push_client_spec.rb b/spec/integrations/push_client_spec.rb index 689793a5a..9420861fc 100644 --- a/spec/integrations/push_client_spec.rb +++ b/spec/integrations/push_client_spec.rb @@ -39,6 +39,9 @@ let(:auth_body_response) do File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/auth_body_response.json')) end + let(:metadata) { nil } + let(:event_ready) { false } + let(:sdk_update) { false } before do stub_request(:any, /https:\/\/events.*/).to_return(status: 200, body: '') @@ -155,21 +158,25 @@ stub_request(:get, auth_service_url + "?s=1.3").to_return(status: 200, body: auth_body_response) mock_server do |server| - + log = StringIO.new streaming_service_url = server.base_uri factory = SplitIoClient::SplitFactory.new( 'test_api_key', streaming_service_url: streaming_service_url, - auth_service_url: auth_service_url + auth_service_url: auth_service_url, + logger: Logger.new(log), + debug_enabled: true ) + reset_flags client = factory.client + client.register(SplitIoClient::Engine::Models::SdkEvent::SDK_READY, method(:ready_call_back)) + client.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) client.block_until_ready server.setup_response('/') do |_, res| send_content(res, event_split_iff_update_no_compression) end - treatment = 'control' for i in 1..10 do sleep(1) @@ -177,7 +184,25 @@ break if treatment != 'control' end + expect(@metadata.length).to eq(2) + check1 = false + check2 = false + @metadata.each do |meta| + if meta.type == SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE + expect(meta.names).to eq(['bilal_split']) + check1 = true + end + if meta.type == SplitIoClient::Engine::Models::SdkEventType::SEGMENTS_UPDATE + expect(meta.names).to eq([]) + check2 = true + end + end + expect(check1).to be(true) + expect(check2).to be(true) + expect(@event_ready).to be(true) + expect(@sdk_update).to eq(true) expect(treatment).to eq('off') + client.destroy end end @@ -319,20 +344,35 @@ stub_request(:get, auth_service_url + "?s=1.3").to_return(status: 200, body: auth_body_response) streaming_service_url = server.base_uri + log = StringIO.new factory = SplitIoClient::SplitFactory.new( 'test_api_key', streaming_enabled: true, streaming_service_url: streaming_service_url, - auth_service_url: auth_service_url + auth_service_url: auth_service_url, + logger: Logger.new(log), + debug_enabled: true ) + reset_flags client = factory.client + client.register(SplitIoClient::Engine::Models::SdkEvent::SDK_UPDATE, method(:update_call_back)) client.block_until_ready(1) sleep(2) expect(client.get_treatment('admin', 'push_test')).to eq('on') expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=-1&rbSince=-1')).to have_been_made.times(1) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=1585948850109&rbSince=-1')).to have_been_made.times(1) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=1585948850110&rbSince=-1')).to have_been_made.times(0) + + check1 = false + @metadata.each do |meta| + if meta.type == SplitIoClient::Engine::Models::SdkEventType::FLAG_UPDATE + expect(meta.names.include?('push_test')).to eq(true) + check1 = true + end + end + expect(check1).to eq(true) + client.destroy end end @@ -572,4 +612,19 @@ def mock_segment_changes(segment_name, segment_json, since) stub_request(:get, "https://sdk.split.io/api/segmentChanges/#{segment_name}?since=#{since}") .to_return(status: 200, body: segment_json) end + + def ready_call_back(metadata) + @event_ready = true + end + + def update_call_back(metadata) + @sdk_update = true + @metadata.push(metadata) + end + + def reset_flags + @event_ready = false + @sdk_update = false + @metadata = [] + end end diff --git a/spec/repository_helper.rb b/spec/repository_helper.rb index 37770a768..dc3157f86 100644 --- a/spec/repository_helper.rb +++ b/spec/repository_helper.rb @@ -3,6 +3,8 @@ require 'set' describe SplitIoClient::Helpers::RepositoryHelper do + let(:events_queue) { Queue.new } + context 'test repository helper' do it 'with flag set filter' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory) @@ -11,7 +13,8 @@ feature_flag_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new( config, flag_sets_repository, - flag_set_filter) + flag_set_filter, + events_queue) SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(feature_flag_repository, [{:name => 'split1', :status => 'ACTIVE', conditions: [], :sets => []}], -1, config, false) expect(feature_flag_repository.get_split('split1').nil?).to eq(true) @@ -33,7 +36,8 @@ feature_flag_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new( config, flag_sets_repository, - flag_set_filter) + flag_set_filter, + events_queue) SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(feature_flag_repository, [{:name => 'split1', :status => 'ACTIVE', conditions: [], :sets => []}], -1, config, false) expect(feature_flag_repository.get_split('split1').nil?).to eq(false) @@ -55,7 +59,8 @@ feature_flag_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new( config, flag_sets_repository, - flag_set_filter) + flag_set_filter, + events_queue) SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(feature_flag_repository, [{:name => 'split1', :status => 'ACTIVE', conditions: [], :sets => []}], -1, config, false) expect(feature_flag_repository.get_split('split1')[:impressionsDisabled]).to eq(false) @@ -74,7 +79,8 @@ feature_flag_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new( config, flag_sets_repository, - flag_set_filter) + flag_set_filter, + events_queue) SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(feature_flag_repository, [{:name => 'split1', :status => 'ACTIVE', conditions: [], :sets => []}], -1, config, false) expect(feature_flag_repository.get_split('split1').nil?).to eq(false) @@ -91,7 +97,8 @@ feature_flag_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new( config, flag_sets_repository, - flag_set_filter) + flag_set_filter, + events_queue) SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(feature_flag_repository, [{:name => 'split1', :status => 'ACTIVE', conditions: [], :sets => []}], -1, config, false) expect(feature_flag_repository.get_split('split1')[:prerequisites]).to eq([]) diff --git a/spec/splitclient/split_client_spec.rb b/spec/splitclient/split_client_spec.rb index c0596f239..be9805a96 100644 --- a/spec/splitclient/split_client_spec.rb +++ b/spec/splitclient/split_client_spec.rb @@ -4,20 +4,24 @@ describe SplitIoClient::SplitClient do context 'split client methods' do + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :debug) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:events_manager) { SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) } let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:impressions_repository) {SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:events_repository) { SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) } let(:impression_manager) { SplitIoClient::Engine::Common::ImpressionManager.new(config, impressions_repository, SplitIoClient::Engine::Common::NoopImpressionCounter.new, runtime_producer, SplitIoClient::Observers::NoopImpressionObserver.new, SplitIoClient::Engine::Impressions::NoopUniqueKeysTracker.new) } let(:evaluation_producer) { SplitIoClient::Telemetry::EvaluationProducer.new(config) } let(:evaluator) { SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) } let(:fallback_treatment_calculator) { SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new) } - let(:split_client) { SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository, :rule_based_segments => rule_based_segments_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) } + let(:split_client) { SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository, :rule_based_segments => rule_based_segments_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) } let(:splits) do File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/splits.json')) @@ -115,12 +119,16 @@ context 'impressions toggle' do it 'optimized mode' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :optimized) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -130,7 +138,7 @@ evaluation_producer = SplitIoClient::Telemetry::EvaluationProducer.new(config) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new) - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) splits = File.read(File.join(SplitIoClient.root, 'spec/test_data/splits/imp-toggle.json')) splits_repository.update([JSON.parse(splits,:symbolize_names => true)[:ff][:d][0]], [], -1) @@ -156,12 +164,16 @@ it 'debug mode' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :debug) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -171,7 +183,7 @@ evaluation_producer = SplitIoClient::Telemetry::EvaluationProducer.new(config) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new) - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) splits = File.read(File.join(SplitIoClient.root, 'spec/test_data/splits/imp-toggle.json')) splits_repository.update([JSON.parse(splits,:symbolize_names => true)[:ff][:d][0]], [], -1) @@ -197,12 +209,16 @@ it 'none mode' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :none) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -212,7 +228,7 @@ evaluation_producer = SplitIoClient::Telemetry::EvaluationProducer.new(config) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new) - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) splits = File.read(File.join(SplitIoClient.root, 'spec/test_data/splits/imp-toggle.json')) splits_repository.update([JSON.parse(splits,:symbolize_names => true)[:ff][:d][0]], [], -1) @@ -242,12 +258,16 @@ context 'fallback treatments' do it 'feature not found ' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :debug) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -257,7 +277,7 @@ evaluation_producer = SplitIoClient::Telemetry::EvaluationProducer.new(config) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new(SplitIoClient::Engine::Models::FallbackTreatment.new("on-global", '{"prop": "global"}'), {:feature => SplitIoClient::Engine::Models::FallbackTreatment.new("on-local", '{"prop": "local"}')})) - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) treatment = split_client.get_treatment_with_config('key2', 'feature') expect(treatment[:treatment]).to eq('on-local') @@ -286,12 +306,16 @@ it 'exception' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :debug) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -301,7 +325,7 @@ evaluation_producer = SplitIoClient::Telemetry::EvaluationProducer.new(config) evaluator = SplitIoClient::Engine::Parser::Evaluator.new(segments_repository, splits_repository, rule_based_segments_repository, config) fallback_treatment_calculator = SplitIoClient::Engine::FallbackTreatmentCalculator.new(SplitIoClient::Engine::Models::FallbackTreatmentsConfiguration.new(SplitIoClient::Engine::Models::FallbackTreatment.new("on-global", '{"prop": "global"}'), {:feature => SplitIoClient::Engine::Models::FallbackTreatment.new("on-local", '{"prop": "local"}')})) - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, nil, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) splits = File.read(File.join(SplitIoClient.root, 'spec/test_data/splits/imp-toggle.json')) split = JSON.parse(splits,:symbolize_names => true)[:ff][:d][0] @@ -343,12 +367,16 @@ it 'client not ready' do config = SplitIoClient::SplitConfig.new(cache_adapter: :memory, impressions_mode: :debug) - segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + events_queue = Queue.new + events_manager = SplitIoClient::Engine::Events::EventsManager.new(SplitIoClient::Engine::Events::EventsManagerConfig.new, + SplitIoClient::Engine::Events::EventsDelivery.new(config), + config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) flag_sets_repository = SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([]) flag_set_filter = SplitIoClient::Cache::Filter::FlagSetsFilter.new([]) - splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) impressions_repository = SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) - rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments_repository = SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) events_repository = SplitIoClient::Cache::Repositories::EventsRepository.new(config, 'sdk_key', runtime_producer) impressions_counter = SplitIoClient::Engine::Common::ImpressionCounter.new @@ -367,7 +395,7 @@ def wait_until_ready(time) true end end - split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, MyStatusManager.new, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator) + split_client = SplitIoClient::SplitClient.new('sdk_key', {:splits => splits_repository, :segments => segments_repository, :impressions => impressions_repository, :events => events_repository}, MyStatusManager.new, config, impression_manager, evaluation_producer, evaluator, SplitIoClient::Validators.new(config), fallback_treatment_calculator, events_manager) treatment = split_client.get_treatment_with_config('key2', 'feature') expect(treatment[:treatment]).to eq('on-local') diff --git a/spec/splitclient/split_factory_spec.rb b/spec/splitclient/split_factory_spec.rb index dd0ef73ae..42af69048 100644 --- a/spec/splitclient/split_factory_spec.rb +++ b/spec/splitclient/split_factory_spec.rb @@ -129,9 +129,6 @@ expect(factory.instance_variable_get(:@config).valid_mode).to be false expect(factory.manager.split('test_split')) .to be nil - - puts '###### log' - puts log.string end end diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 95e0a1f29..ce4cccdd9 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -8,7 +8,8 @@ subject { SplitIoClient::SSE::EventSource::Client } let(:log) { StringIO.new } - let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } + let(:events_queue) { Queue.new } + let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log), debug_enabled: true) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:telemetry_runtime_consumer) { SplitIoClient::Telemetry::RuntimeConsumer.new(config) } let(:api_token) { 'api-token-test' } @@ -18,11 +19,11 @@ let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} let(:repositories) do { - splits: SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter), - segments: SplitIoClient::Cache::Repositories::SegmentsRepository.new(config), + splits: SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue), + segments: SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue), impressions: SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config), events: SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, telemetry_runtime_producer), - rule_based_segments: SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments: SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } end let(:parameters) do @@ -331,7 +332,7 @@ it 'test retry with IO::WaitReadable exceptions' do log2 = StringIO.new - config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2)) + config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2), debug_enabled: true) mock_server do |server| server.setup_response('/') do |_, res| diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 96fb61414..1c81abcf0 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -8,11 +8,12 @@ let(:api_key) { 'SSEHandler-key' } let(:log) { StringIO.new } + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:push_status_queue) { Queue.new } let(:notification_manager_keeper) { SplitIoClient::SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer, push_status_queue) } @@ -22,7 +23,7 @@ segments: segments_repository, impressions: SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config), events: SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, telemetry_runtime_producer), - rule_based_segments: SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) + rule_based_segments: SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } end let(:parameters) do diff --git a/spec/sse/workers/segments_worker_spec.rb b/spec/sse/workers/segments_worker_spec.rb index 8e3405bc6..918e43291 100644 --- a/spec/sse/workers/segments_worker_spec.rb +++ b/spec/sse/workers/segments_worker_spec.rb @@ -6,6 +6,7 @@ describe SplitIoClient::SSE::Workers::SegmentsWorker do subject { SplitIoClient::SSE::Workers::SegmentsWorker } + let(:events_queue) { Queue.new } let(:splits) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/splits.json')) } let(:segment1) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment1.json')) } let(:segment2) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment2.json')) } @@ -15,9 +16,9 @@ let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:events_repository) { SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, telemetry_runtime_producer) } diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 74e00c519..b3318365d 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -23,16 +23,17 @@ let(:event_split_update_segments) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c":2,"d":"eJzcVFtr20wQ/SvhPK9AsnzTvpnPJp+po0DlppRgzFga2dusJLNapaRG/73Id7sOoU+FvmluZ3TOGXYDayhNVTx9W3NIGUOiKtlAQCWQSNq+FyeJ6yzcBTuex+T0qe86XrfrUkJBzH4AgXw3mVFlivl3eiWIA/BA6yImq4oc0nPdG/mIOYF0gpYfeO3AEyh3Ca/XDfxer+u2BUpLtiohMfhvOn4aQeBFad20paRLFkg4pUrbqWGyGecWEvbwPQ9cCMQrypccVtmCDaTX7feCnu+7nY7nCZBeFpAtgbjIU7WszPbPSshNvc0lah8/b05hoxkkvv4/no4m42gKgYxsvGJzb4pqDdn0ZguVNwsxCIenhh3SPriBk/OSLB/Z/Vgpy1qV9mE3MSRLDfwxD/kMSjKVb1dUpmgwVFxgVtezWmBNxp5RsDdlavkdCJTqJ2+tqmcCmhasIU+LOEEtftfg8+Nk8vjlzxV44beINce2ME3z2TEeDrEWVzKNw3k0un8YhTd0aiaGnKqck4iXDakrwcpdNjzdq9PChxIV+VEXt2F/UUvTC9Guyk/t90dfO+/Xro73w65z7y6cU/ndnvTdge7f9W8wmcw/jb5F1+79yybsX6c7U2lGPat/BQAA//9ygdKB"}'), 'test') } let(:event_split_update_rb_segments) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c":0,"d":"eyJjaGFuZ2VOdW1iZXIiOiAxMCwgInRyYWZmaWNUeXBlTmFtZSI6ICJ1c2VyIiwgIm5hbWUiOiAicmJzX2ZsYWciLCAidHJhZmZpY0FsbG9jYXRpb24iOiAxMDAsICJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOiAxODI4Mzc3MzgwLCAic2VlZCI6IC0yODY2MTc5MjEsICJzdGF0dXMiOiAiQUNUSVZFIiwgImtpbGxlZCI6IGZhbHNlLCAiZGVmYXVsdFRyZWF0bWVudCI6ICJvZmYiLCAiYWxnbyI6IDIsICJjb25kaXRpb25zIjogW3siY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIifSwgIm1hdGNoZXJUeXBlIjogIklOX1JVTEVfQkFTRURfU0VHTUVOVCIsICJuZWdhdGUiOiBmYWxzZSwgInVzZXJEZWZpbmVkU2VnbWVudE1hdGNoZXJEYXRhIjogeyJzZWdtZW50TmFtZSI6ICJzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50In19XX0sICJwYXJ0aXRpb25zIjogW3sidHJlYXRtZW50IjogIm9uIiwgInNpemUiOiAxMDB9LCB7InRyZWF0bWVudCI6ICJvZmYiLCAic2l6ZSI6IDB9XSwgImxhYmVsIjogImluIHJ1bGUgYmFzZWQgc2VnbWVudCBzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50In0sIHsiY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIifSwgIm1hdGNoZXJUeXBlIjogIkFMTF9LRVlTIiwgIm5lZ2F0ZSI6IGZhbHNlfV19LCAicGFydGl0aW9ucyI6IFt7InRyZWF0bWVudCI6ICJvbiIsICJzaXplIjogMH0sIHsidHJlYXRtZW50IjogIm9mZiIsICJzaXplIjogMTAwfV0sICJsYWJlbCI6ICJkZWZhdWx0IHJ1bGUifV0sICJjb25maWd1cmF0aW9ucyI6IHt9LCAic2V0cyI6IFtdLCAiaW1wcmVzc2lvbnNEaXNhYmxlZCI6IGZhbHNlfQ=="}'), 'test') } let(:event_rb_segment_update) { SplitIoClient::SSE::EventSource::StreamData.new("data", 12345, JSON.parse('{"type":"RB_SEGMENT_UPDATE","changeNumber":5564531221,"pcn":1234,"c":0,"d":"eyJjaGFuZ2VOdW1iZXIiOiA1LCAibmFtZSI6ICJzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50IiwgInN0YXR1cyI6ICJBQ1RJVkUiLCAidHJhZmZpY1R5cGVOYW1lIjogInVzZXIiLCAiZXhjbHVkZWQiOiB7ImtleXMiOiBbIm1hdXJvQHNwbGl0LmlvIl0sICJzZWdtZW50cyI6IFt7InR5cGUiOiAic3RhbmRhcmQiLCAibmFtZSI6ICJzZWdtZW50MSJ9XX0sICJjb25kaXRpb25zIjogW3sibWF0Y2hlckdyb3VwIjogeyJjb21iaW5lciI6ICJBTkQiLCAibWF0Y2hlcnMiOiBbeyJrZXlTZWxlY3RvciI6IHsidHJhZmZpY1R5cGUiOiAidXNlciIsICJhdHRyaWJ1dGUiOiAiZW1haWwifSwgIm1hdGNoZXJUeXBlIjogIklOX1NFR01FTlQiLCAibmVnYXRlIjogZmFsc2UsICJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6IHsic2VnbWVudE5hbWUiOiAiZGVtbyJ9fV19fV19"}'), 'test') } + let(:events_queue) { Queue.new } context 'add change number to queue' do let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, rule_based_segments_repository, api_key, config, telemetry_runtime_producer) } let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:synchronizer) do telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) @@ -104,12 +105,12 @@ context 'kill split notification' do let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, rule_based_segments_repository, api_key, config, telemetry_runtime_producer) } let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:synchronizer) do telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) @@ -176,12 +177,12 @@ context 'update with flagset filter' do let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new(["set_1"])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new(["set_1"])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, rule_based_segments_repository, api_key, config, telemetry_runtime_producer) } let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:synchronizer) do telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) @@ -230,12 +231,12 @@ context 'instant ff update split notification' do let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, rule_based_segments_repository, api_key, config, telemetry_runtime_producer) } let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } + let(:rule_based_segments_repository) { SplitIoClient::Cache::Repositories::RuleBasedSegmentsRepository.new(config, events_queue) } let(:synchronizer) do telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) diff --git a/spec/telemetry/synchronizer_spec.rb b/spec/telemetry/synchronizer_spec.rb index 5448027cb..3f005bb1b 100644 --- a/spec/telemetry/synchronizer_spec.rb +++ b/spec/telemetry/synchronizer_spec.rb @@ -30,14 +30,15 @@ end context 'Memory' do + let(:events_queue) { Queue.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:evaluation_consumer) { SplitIoClient::Telemetry::EvaluationConsumer.new(config) } let(:init_consumer) { SplitIoClient::Telemetry::InitConsumer.new(config) } let(:runtime_consumer) { SplitIoClient::Telemetry::RuntimeConsumer.new(config) } let(:flag_sets_repository) {SplitIoClient::Cache::Repositories::MemoryFlagSetsRepository.new([])} let(:flag_set_filter) {SplitIoClient::Cache::Filter::FlagSetsFilter.new([])} - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } + let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config, flag_sets_repository, flag_set_filter, events_queue) } + let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config, events_queue) } let(:api_key) { 'Synchronizer-key' } let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:evaluation_producer) { SplitIoClient::Telemetry::EvaluationProducer.new(config) }