diff --git a/README.md b/README.md index a337a88..4b91403 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,29 @@ Here are some examples with inline comments that walk you through how to use the Tests are also a good place to know how the the library works internally: [spec](spec) +### Keep-alive connections + +By default, the client opens a fresh HTTP connection (and TLS handshake) for every request. For high-traffic applications this can dominate request latency. Setting `keep_alive_connections: true` enables persistent connections via the `:net_http_persistent` Faraday adapter: + +```ruby +Typesense::Client.new( + api_key: ENV['TYPESENSE_API_KEY'], + nodes: [{ host: 'localhost', port: 8108, protocol: 'https' }], + connection_timeout_seconds: 3, + num_retries: 1, + keep_alive_connections: true +) +``` + +Notes: + +- Connections are cached per `(thread, node)`. `Net::HTTP` is not thread-safe, so each thread maintains its own keep-alive socket to each Typesense node, and the existing node round-robin still works. +- A cached connection is dropped automatically when a network error occurs, so retries open a fresh socket. We recommend setting `num_retries` to at least `1` so the gem can recover from a server- or load-balancer-side idle timeout transparently. +- Idle sockets are closed after 30 seconds by default. Override with `keep_alive_idle_timeout_seconds` to match or stay under your load balancer's idle timeout. +- The underlying `net_http_persistent` adapter holds at most `keep_alive_pool_size` sockets per origin (default `1`, which matches the per-`(thread, node)` cache above). The default of `1` is the safe choice for the vast majority of users — because we already cache one Faraday connection per `(thread, node)` and `Net::HTTP` is not thread-safe, a single socket per pool is all the adapter needs. Only raise this if you have a specific reason to keep additional sockets warm per origin (e.g. a non-standard concurrency model layered on top of this client); a larger pool will not increase request throughput on its own. +- `keep_alive_idle_timeout_seconds` and `keep_alive_pool_size` are only valid when `keep_alive_connections: true`; setting them otherwise raises `Typesense::Error::MissingConfiguration`. +- The option defaults to `false`, so upgrading the gem does not change behaviour until you opt in. + ## Compatibility | Typesense Server | typesense-ruby | diff --git a/lib/typesense/api_call.rb b/lib/typesense/api_call.rb index 003b6a8..e41fff5 100644 --- a/lib/typesense/api_call.rb +++ b/lib/typesense/api_call.rb @@ -19,9 +19,18 @@ def initialize(configuration) @healthcheck_interval_seconds = @configuration.healthcheck_interval_seconds @num_retries_per_request = @configuration.num_retries @retry_interval_seconds = @configuration.retry_interval_seconds + @keep_alive_connections = @configuration.keep_alive_connections + @keep_alive_idle_timeout_seconds = @configuration.keep_alive_idle_timeout_seconds + @keep_alive_pool_size = @configuration.keep_alive_pool_size @logger = @configuration.logger + # Per-instance key for the thread-local connection cache so multiple + # Typesense::Client instances in the same process do not share sockets. + @thread_connections_key = :"_typesense_api_call_connections_#{object_id}" + + require 'faraday/net_http_persistent' if @keep_alive_connections + initialize_metadata_for_nodes @current_node_index = -1 end @@ -69,14 +78,11 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni @logger.debug "Attempting #{method.to_s.upcase} request Try ##{num_tries} to Node #{node[:index]}" begin - conn = Faraday.new(uri_for(endpoint, node)) do |f| - f.options.timeout = @connection_timeout_seconds - f.options.open_timeout = @connection_timeout_seconds - end + conn, request_path = connection_and_path_for(endpoint, node) headers = default_headers.merge(additional_headers) - response = conn.send(method) do |req| + response = conn.send(method, request_path) do |req| req.headers = headers req.params = query_parameters unless query_parameters.nil? unless body_parameters.nil? @@ -108,6 +114,9 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni # Rescue network layer exceptions and HTTP 5xx errors, so the loop can continue. # Using loops for retries instead of rescue...retry to maintain consistency with client libraries in # other languages that might not support the same construct. + # Drop the cached keep-alive connection (if any): the underlying socket is likely + # half-closed and reusing it would just fail again on retry. + discard_connection(node) if @keep_alive_connections set_node_healthcheck(node, is_healthy: false) last_exception = e @logger.warn "Request #{method}:#{uri_for(endpoint, node)} to Node #{node[:index]} failed due to \"#{e.class}: #{e.message}\"" @@ -125,6 +134,57 @@ def uri_for(endpoint, node) "#{node[:protocol]}://#{node[:host]}:#{node[:port]}#{endpoint}" end + # Returns [connection, request_path]. When keep-alive is enabled, the connection + # is cached per (thread, node) and the path is appended at request time. When it + # is disabled, the original behaviour is preserved: a fresh Faraday is built for + # the full per-request URL, so existing callers and stubs see no change. + def connection_and_path_for(endpoint, node) + if @keep_alive_connections + [connection_for(node), endpoint] + else + [build_one_shot_connection(endpoint, node), nil] + end + end + + def build_one_shot_connection(endpoint, node) + Faraday.new(uri_for(endpoint, node)) do |f| + f.options.timeout = @connection_timeout_seconds + f.options.open_timeout = @connection_timeout_seconds + end + end + + # Net::HTTP is not thread-safe, so connections are cached per (thread, node) + # rather than shared across threads. + def connection_for(node) + thread_connections[connection_key(node)] ||= build_keep_alive_connection(node) + end + + def discard_connection(node) + conn = thread_connections.delete(connection_key(node)) + conn&.close if conn.respond_to?(:close) + end + + def thread_connections + Thread.current[@thread_connections_key] ||= {} + end + + def connection_key(node) + "#{node[:protocol]}://#{node[:host]}:#{node[:port]}" + end + + def build_keep_alive_connection(node) + Faraday.new(url: connection_key(node)) do |f| + f.options.timeout = @connection_timeout_seconds + f.options.open_timeout = @connection_timeout_seconds + # pool_size defaults to 1: we already cache one Faraday connection per + # (thread, node), and Net::HTTP is not thread-safe — so a single socket + # per pool is the safe default. Override only with a specific reason. + f.adapter :net_http_persistent, pool_size: @keep_alive_pool_size do |http| + http.idle_timeout = @keep_alive_idle_timeout_seconds + end + end + end + ## Attempts to find the next healthy node, looping through the list of nodes once. # But if no healthy nodes are found, it will just return the next node, even if it's unhealthy # so we can try the request for good measure, in case that node has become healthy since diff --git a/lib/typesense/configuration.rb b/lib/typesense/configuration.rb index 4b57bcf..388caed 100644 --- a/lib/typesense/configuration.rb +++ b/lib/typesense/configuration.rb @@ -4,7 +4,7 @@ module Typesense class Configuration - attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level + attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level, :keep_alive_connections, :keep_alive_idle_timeout_seconds, :keep_alive_pool_size def initialize(options = {}) @nodes = options[:nodes] || [] @@ -14,16 +14,19 @@ def initialize(options = {}) @num_retries = options[:num_retries] || (@nodes.length + (@nearest_node.nil? ? 0 : 1)) || 3 @retry_interval_seconds = options[:retry_interval_seconds] || 0.1 @api_key = options[:api_key] + @keep_alive_connections = options.fetch(:keep_alive_connections, false) + @keep_alive_idle_timeout_seconds = options[:keep_alive_idle_timeout_seconds] || 30 + @keep_alive_pool_size = options[:keep_alive_pool_size] || 1 @logger = options[:logger] || Logger.new($stdout) @log_level = options[:log_level] || Logger::WARN @logger.level = @log_level show_deprecation_warnings(options) - validate! + validate!(options) end - def validate! + def validate!(options = {}) if @nodes.nil? || @nodes.empty? || @nodes.any? { |node| node_missing_parameters?(node) } @@ -31,6 +34,8 @@ def validate! end raise Error::MissingConfiguration, 'Missing required configuration. Ensure that api_key is set.' if @api_key.nil? + + validate_keep_alive_options!(options) end private @@ -39,6 +44,15 @@ def node_missing_parameters?(node) %i[protocol host port].any? { |attr| node.send(:[], attr).nil? } end + def validate_keep_alive_options!(options) + return if @keep_alive_connections + + dependent_keys = %i[keep_alive_idle_timeout_seconds keep_alive_pool_size].select { |k| options.key?(k) } + return if dependent_keys.empty? + + raise Error::MissingConfiguration, "#{dependent_keys.join(' and ')} require keep_alive_connections: true." + end + def show_deprecation_warnings(options) @logger.warn 'Deprecation warning: timeout_seconds is now renamed to connection_timeout_seconds' unless options[:timeout_seconds].nil? @logger.warn 'Deprecation warning: master_node is now consolidated to nodes, starting with Typesense Server v0.12' unless options[:master_node].nil? diff --git a/spec/typesense/api_call_spec.rb b/spec/typesense/api_call_spec.rb index 5e35313..959d901 100644 --- a/spec/typesense/api_call_spec.rb +++ b/spec/typesense/api_call_spec.rb @@ -258,4 +258,158 @@ it_behaves_like 'General error handling', :delete it_behaves_like 'Node selection', :delete end + + describe 'keep-alive connection caching' do + subject(:api_call) { described_class.new(keep_alive_typesense.configuration) } + + let(:keep_alive_typesense) do + Typesense::Client.new( + api_key: 'abcd', + nodes: typesense.configuration.nodes, + connection_timeout_seconds: 10, + retry_interval_seconds: 0.01, + log_level: Logger::ERROR, + keep_alive_connections: true + ) + end + + let(:node) { keep_alive_typesense.configuration.nodes[0] } + + before do + keep_alive_typesense.configuration.nodes.each do |n| + stub_request(:any, api_call.send(:uri_for, '/', n)) + .to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' }) + end + end + + it 'reuses the same Faraday connection across calls to the same node on the same thread' do + first = api_call.send(:connection_for, node) + second = api_call.send(:connection_for, node) + + expect(second).to be(first) + end + + it 'caches connections separately per node' do + first_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[0]) + second_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[1]) + + expect(second_node_conn).not_to be(first_node_conn) + end + + it 'isolates the cache per thread' do + main_thread_conn = api_call.send(:connection_for, node) + + other_thread_conn = Thread.new { api_call.send(:connection_for, node) }.value + + expect(other_thread_conn).not_to be(main_thread_conn) + end + + it 'isolates the cache per ApiCall instance' do + other_api_call = described_class.new(keep_alive_typesense.configuration) + + expect(other_api_call.send(:connection_for, node)) + .not_to be(api_call.send(:connection_for, node)) + end + + it 'evicts the cached connection when a network error occurs so retries open a fresh socket' do + timeout_node = keep_alive_typesense.configuration.nodes[0] + keep_alive_typesense.configuration.nodes.each do |n| + stub_request(:any, api_call.send(:uri_for, '/', n)).to_timeout + end + + pre_call_conn = api_call.send(:connection_for, timeout_node) + + begin + api_call.get('/') + rescue StandardError + # expected: all nodes time out + end + + cache = Thread.current[api_call.instance_variable_get(:@thread_connections_key)] || {} + expect(cache[api_call.send(:connection_key, timeout_node)]).to be_nil + + post_retry_conn = api_call.send(:connection_for, timeout_node) + expect(post_retry_conn).not_to be(pre_call_conn) + end + + it 'uses the configured timeouts on the cached connection' do + conn = api_call.send(:connection_for, node) + + expect(conn.options.timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds) + expect(conn.options.open_timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds) + end + + it 'defaults the idle timeout to 30 seconds' do + expect(keep_alive_typesense.configuration.keep_alive_idle_timeout_seconds).to eq(30) + end + + it 'honours a custom keep_alive_idle_timeout_seconds' do + custom_client = Typesense::Client.new( + api_key: 'abcd', + nodes: typesense.configuration.nodes, + connection_timeout_seconds: 10, + log_level: Logger::ERROR, + keep_alive_connections: true, + keep_alive_idle_timeout_seconds: 5 + ) + + expect(custom_client.configuration.keep_alive_idle_timeout_seconds).to eq(5) + expect(described_class.new(custom_client.configuration).instance_variable_get(:@keep_alive_idle_timeout_seconds)).to eq(5) + end + + it 'defaults the pool size to 1' do + expect(keep_alive_typesense.configuration.keep_alive_pool_size).to eq(1) + end + + it 'honours a custom keep_alive_pool_size' do + custom_client = Typesense::Client.new( + api_key: 'abcd', + nodes: typesense.configuration.nodes, + connection_timeout_seconds: 10, + log_level: Logger::ERROR, + keep_alive_connections: true, + keep_alive_pool_size: 5 + ) + + expect(custom_client.configuration.keep_alive_pool_size).to eq(5) + expect(described_class.new(custom_client.configuration).instance_variable_get(:@keep_alive_pool_size)).to eq(5) + end + end + + describe 'keep-alive disabled (default)' do + it 'is off by default on the configuration' do + expect(typesense.configuration.keep_alive_connections).to be(false) + end + + it 'builds a fresh Faraday connection per request' do + stub_request(:any, api_call.send(:uri_for, '/', typesense.configuration.nodes[0])) + .to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' }) + + api_call.get('/') + + expect(Thread.current[api_call.instance_variable_get(:@thread_connections_key)]).to be_nil + end + + it 'raises when keep_alive_idle_timeout_seconds is set without keep_alive_connections' do + expect do + Typesense::Client.new( + api_key: 'abcd', + nodes: typesense.configuration.nodes, + log_level: Logger::ERROR, + keep_alive_idle_timeout_seconds: 5 + ) + end.to raise_error(Typesense::Error::MissingConfiguration, /keep_alive_connections: true/) + end + + it 'raises when keep_alive_pool_size is set without keep_alive_connections' do + expect do + Typesense::Client.new( + api_key: 'abcd', + nodes: typesense.configuration.nodes, + log_level: Logger::ERROR, + keep_alive_pool_size: 4 + ) + end.to raise_error(Typesense::Error::MissingConfiguration, /keep_alive_connections: true/) + end + end end diff --git a/typesense.gemspec b/typesense.gemspec index cc8cb4d..9e79299 100644 --- a/typesense.gemspec +++ b/typesense.gemspec @@ -28,6 +28,8 @@ Gem::Specification.new do |spec| spec.add_dependency 'base64', '~> 0.2' spec.add_dependency 'faraday', '~> 2.8' + spec.add_dependency 'faraday-net_http_persistent', '~> 2.0' spec.add_dependency 'json', '~> 2.9' + spec.add_dependency 'net-http-persistent', '~> 4.0' spec.metadata['rubygems_mfa_required'] = 'true' end