Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@ Here are some examples with inline comments that walk you through how to use the

Tests are also a good place to know how the the library works internally: [spec](spec)

### Keep-alive connections

By default, the client opens a fresh HTTP connection (and TLS handshake) for every request. For high-traffic applications this can dominate request latency. Setting `keep_alive_connections: true` enables persistent connections via the `:net_http_persistent` Faraday adapter:

```ruby
Typesense::Client.new(
api_key: ENV['TYPESENSE_API_KEY'],
nodes: [{ host: 'localhost', port: 8108, protocol: 'https' }],
connection_timeout_seconds: 3,
num_retries: 1,
keep_alive_connections: true
)
```

Notes:

- Connections are cached per `(thread, node)`. `Net::HTTP` is not thread-safe, so each thread maintains its own keep-alive socket to each Typesense node, and the existing node round-robin still works.
- A cached connection is dropped automatically when a network error occurs, so retries open a fresh socket. We recommend setting `num_retries` to at least `1` so the gem can recover from a server- or load-balancer-side idle timeout transparently.
- Idle sockets are closed after 30 seconds by default. Override with `keep_alive_idle_timeout_seconds` to match or stay under your load balancer's idle timeout.
- The underlying `net_http_persistent` adapter holds at most `keep_alive_pool_size` sockets per origin (default `1`, which matches the per-`(thread, node)` cache above). The default of `1` is the safe choice for the vast majority of users — because we already cache one Faraday connection per `(thread, node)` and `Net::HTTP` is not thread-safe, a single socket per pool is all the adapter needs. Only raise this if you have a specific reason to keep additional sockets warm per origin (e.g. a non-standard concurrency model layered on top of this client); a larger pool will not increase request throughput on its own.
- `keep_alive_idle_timeout_seconds` and `keep_alive_pool_size` are only valid when `keep_alive_connections: true`; setting them otherwise raises `Typesense::Error::MissingConfiguration`.
- The option defaults to `false`, so upgrading the gem does not change behaviour until you opt in.

## Compatibility

| Typesense Server | typesense-ruby |
Expand Down
70 changes: 65 additions & 5 deletions lib/typesense/api_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ def initialize(configuration)
@healthcheck_interval_seconds = @configuration.healthcheck_interval_seconds
@num_retries_per_request = @configuration.num_retries
@retry_interval_seconds = @configuration.retry_interval_seconds
@keep_alive_connections = @configuration.keep_alive_connections
@keep_alive_idle_timeout_seconds = @configuration.keep_alive_idle_timeout_seconds
@keep_alive_pool_size = @configuration.keep_alive_pool_size

@logger = @configuration.logger

# Per-instance key for the thread-local connection cache so multiple
# Typesense::Client instances in the same process do not share sockets.
@thread_connections_key = :"_typesense_api_call_connections_#{object_id}"

require 'faraday/net_http_persistent' if @keep_alive_connections

initialize_metadata_for_nodes
@current_node_index = -1
end
Expand Down Expand Up @@ -69,14 +78,11 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni
@logger.debug "Attempting #{method.to_s.upcase} request Try ##{num_tries} to Node #{node[:index]}"

begin
conn = Faraday.new(uri_for(endpoint, node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
end
conn, request_path = connection_and_path_for(endpoint, node)

headers = default_headers.merge(additional_headers)

response = conn.send(method) do |req|
response = conn.send(method, request_path) do |req|
req.headers = headers
req.params = query_parameters unless query_parameters.nil?
unless body_parameters.nil?
Expand Down Expand Up @@ -108,6 +114,9 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni
# Rescue network layer exceptions and HTTP 5xx errors, so the loop can continue.
# Using loops for retries instead of rescue...retry to maintain consistency with client libraries in
# other languages that might not support the same construct.
# Drop the cached keep-alive connection (if any): the underlying socket is likely
# half-closed and reusing it would just fail again on retry.
discard_connection(node) if @keep_alive_connections
set_node_healthcheck(node, is_healthy: false)
last_exception = e
@logger.warn "Request #{method}:#{uri_for(endpoint, node)} to Node #{node[:index]} failed due to \"#{e.class}: #{e.message}\""
Expand All @@ -125,6 +134,57 @@ def uri_for(endpoint, node)
"#{node[:protocol]}://#{node[:host]}:#{node[:port]}#{endpoint}"
end

# Returns [connection, request_path]. When keep-alive is enabled, the connection
# is cached per (thread, node) and the path is appended at request time. When it
# is disabled, the original behaviour is preserved: a fresh Faraday is built for
# the full per-request URL, so existing callers and stubs see no change.
def connection_and_path_for(endpoint, node)
if @keep_alive_connections
[connection_for(node), endpoint]
else
[build_one_shot_connection(endpoint, node), nil]
end
end

def build_one_shot_connection(endpoint, node)
Faraday.new(uri_for(endpoint, node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
end
end

# Net::HTTP is not thread-safe, so connections are cached per (thread, node)
# rather than shared across threads.
def connection_for(node)
thread_connections[connection_key(node)] ||= build_keep_alive_connection(node)
end

def discard_connection(node)
conn = thread_connections.delete(connection_key(node))
conn&.close if conn.respond_to?(:close)
end

def thread_connections
Thread.current[@thread_connections_key] ||= {}
end

def connection_key(node)
"#{node[:protocol]}://#{node[:host]}:#{node[:port]}"
end

def build_keep_alive_connection(node)
Faraday.new(url: connection_key(node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
# pool_size defaults to 1: we already cache one Faraday connection per
# (thread, node), and Net::HTTP is not thread-safe — so a single socket
# per pool is the safe default. Override only with a specific reason.
f.adapter :net_http_persistent, pool_size: @keep_alive_pool_size do |http|
http.idle_timeout = @keep_alive_idle_timeout_seconds
end
end
end

## Attempts to find the next healthy node, looping through the list of nodes once.
# But if no healthy nodes are found, it will just return the next node, even if it's unhealthy
# so we can try the request for good measure, in case that node has become healthy since
Expand Down
20 changes: 17 additions & 3 deletions lib/typesense/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module Typesense
class Configuration
attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level
attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level, :keep_alive_connections, :keep_alive_idle_timeout_seconds, :keep_alive_pool_size

def initialize(options = {})
@nodes = options[:nodes] || []
Expand All @@ -14,23 +14,28 @@ def initialize(options = {})
@num_retries = options[:num_retries] || (@nodes.length + (@nearest_node.nil? ? 0 : 1)) || 3
@retry_interval_seconds = options[:retry_interval_seconds] || 0.1
@api_key = options[:api_key]
@keep_alive_connections = options.fetch(:keep_alive_connections, false)
@keep_alive_idle_timeout_seconds = options[:keep_alive_idle_timeout_seconds] || 30
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a validation about idle_timeout_seconds being defined without the existence of keep_alive_connections

@keep_alive_pool_size = options[:keep_alive_pool_size] || 1

@logger = options[:logger] || Logger.new($stdout)
@log_level = options[:log_level] || Logger::WARN
@logger.level = @log_level

show_deprecation_warnings(options)
validate!
validate!(options)
end

def validate!
def validate!(options = {})
if @nodes.nil? ||
@nodes.empty? ||
@nodes.any? { |node| node_missing_parameters?(node) }
raise Error::MissingConfiguration, 'Missing required configuration. Ensure that nodes[][:protocol], nodes[][:host] and nodes[][:port] are set.'
end

raise Error::MissingConfiguration, 'Missing required configuration. Ensure that api_key is set.' if @api_key.nil?

validate_keep_alive_options!(options)
end

private
Expand All @@ -39,6 +44,15 @@ def node_missing_parameters?(node)
%i[protocol host port].any? { |attr| node.send(:[], attr).nil? }
end

def validate_keep_alive_options!(options)
return if @keep_alive_connections

dependent_keys = %i[keep_alive_idle_timeout_seconds keep_alive_pool_size].select { |k| options.key?(k) }
return if dependent_keys.empty?

raise Error::MissingConfiguration, "#{dependent_keys.join(' and ')} require keep_alive_connections: true."
end

def show_deprecation_warnings(options)
@logger.warn 'Deprecation warning: timeout_seconds is now renamed to connection_timeout_seconds' unless options[:timeout_seconds].nil?
@logger.warn 'Deprecation warning: master_node is now consolidated to nodes, starting with Typesense Server v0.12' unless options[:master_node].nil?
Expand Down
154 changes: 154 additions & 0 deletions spec/typesense/api_call_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,158 @@
it_behaves_like 'General error handling', :delete
it_behaves_like 'Node selection', :delete
end

describe 'keep-alive connection caching' do
subject(:api_call) { described_class.new(keep_alive_typesense.configuration) }

let(:keep_alive_typesense) do
Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
connection_timeout_seconds: 10,
retry_interval_seconds: 0.01,
log_level: Logger::ERROR,
keep_alive_connections: true
)
end

let(:node) { keep_alive_typesense.configuration.nodes[0] }

before do
keep_alive_typesense.configuration.nodes.each do |n|
stub_request(:any, api_call.send(:uri_for, '/', n))
.to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' })
end
end

it 'reuses the same Faraday connection across calls to the same node on the same thread' do
first = api_call.send(:connection_for, node)
second = api_call.send(:connection_for, node)

expect(second).to be(first)
end

it 'caches connections separately per node' do
first_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[0])
second_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[1])

expect(second_node_conn).not_to be(first_node_conn)
end

it 'isolates the cache per thread' do
main_thread_conn = api_call.send(:connection_for, node)

other_thread_conn = Thread.new { api_call.send(:connection_for, node) }.value

expect(other_thread_conn).not_to be(main_thread_conn)
end

it 'isolates the cache per ApiCall instance' do
other_api_call = described_class.new(keep_alive_typesense.configuration)

expect(other_api_call.send(:connection_for, node))
.not_to be(api_call.send(:connection_for, node))
end

it 'evicts the cached connection when a network error occurs so retries open a fresh socket' do
timeout_node = keep_alive_typesense.configuration.nodes[0]
keep_alive_typesense.configuration.nodes.each do |n|
stub_request(:any, api_call.send(:uri_for, '/', n)).to_timeout
end

pre_call_conn = api_call.send(:connection_for, timeout_node)

begin
api_call.get('/')
rescue StandardError
# expected: all nodes time out
end

cache = Thread.current[api_call.instance_variable_get(:@thread_connections_key)] || {}
expect(cache[api_call.send(:connection_key, timeout_node)]).to be_nil

post_retry_conn = api_call.send(:connection_for, timeout_node)
expect(post_retry_conn).not_to be(pre_call_conn)
end

it 'uses the configured timeouts on the cached connection' do
conn = api_call.send(:connection_for, node)

expect(conn.options.timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds)
expect(conn.options.open_timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds)
end

it 'defaults the idle timeout to 30 seconds' do
expect(keep_alive_typesense.configuration.keep_alive_idle_timeout_seconds).to eq(30)
end

it 'honours a custom keep_alive_idle_timeout_seconds' do
custom_client = Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
connection_timeout_seconds: 10,
log_level: Logger::ERROR,
keep_alive_connections: true,
keep_alive_idle_timeout_seconds: 5
)

expect(custom_client.configuration.keep_alive_idle_timeout_seconds).to eq(5)
expect(described_class.new(custom_client.configuration).instance_variable_get(:@keep_alive_idle_timeout_seconds)).to eq(5)
end

it 'defaults the pool size to 1' do
expect(keep_alive_typesense.configuration.keep_alive_pool_size).to eq(1)
end

it 'honours a custom keep_alive_pool_size' do
custom_client = Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
connection_timeout_seconds: 10,
log_level: Logger::ERROR,
keep_alive_connections: true,
keep_alive_pool_size: 5
)

expect(custom_client.configuration.keep_alive_pool_size).to eq(5)
expect(described_class.new(custom_client.configuration).instance_variable_get(:@keep_alive_pool_size)).to eq(5)
end
end

describe 'keep-alive disabled (default)' do
it 'is off by default on the configuration' do
expect(typesense.configuration.keep_alive_connections).to be(false)
end

it 'builds a fresh Faraday connection per request' do
stub_request(:any, api_call.send(:uri_for, '/', typesense.configuration.nodes[0]))
.to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' })

api_call.get('/')

expect(Thread.current[api_call.instance_variable_get(:@thread_connections_key)]).to be_nil
end

it 'raises when keep_alive_idle_timeout_seconds is set without keep_alive_connections' do
expect do
Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
log_level: Logger::ERROR,
keep_alive_idle_timeout_seconds: 5
)
end.to raise_error(Typesense::Error::MissingConfiguration, /keep_alive_connections: true/)
end

it 'raises when keep_alive_pool_size is set without keep_alive_connections' do
expect do
Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
log_level: Logger::ERROR,
keep_alive_pool_size: 4
)
end.to raise_error(Typesense::Error::MissingConfiguration, /keep_alive_connections: true/)
end
end
end
2 changes: 2 additions & 0 deletions typesense.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Gem::Specification.new do |spec|

spec.add_dependency 'base64', '~> 0.2'
spec.add_dependency 'faraday', '~> 2.8'
spec.add_dependency 'faraday-net_http_persistent', '~> 2.0'
spec.add_dependency 'json', '~> 2.9'
spec.add_dependency 'net-http-persistent', '~> 4.0'
spec.metadata['rubygems_mfa_required'] = 'true'
end
Loading