Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lib/falcon/environment/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require_relative "server"
require_relative "../service/cluster"

module Falcon
module Environment
# Provides an environment for hosting a cluster of Falcon server workers, where each worker binds its own endpoint.
module Cluster
include Server

# The service class to use for the cluster.
# @returns [Class]
def service_class
Service::Cluster
end

# The host that this server will receive connections for.
def url
"http://[::]:0"
end

# The endpoint bound by the current worker.
# @returns [IO::Endpoint::BoundEndpoint | Nil]
def bound_endpoint
@bound_endpoint
end

# Set the endpoint bound by the current worker.
# @parameter bound_endpoint [IO::Endpoint::BoundEndpoint]
def bound_endpoint=(bound_endpoint)
@bound_endpoint = bound_endpoint
end

# The first socket address bound by the current worker.
# @returns [Addrinfo | Nil]
def bound_address
@bound_endpoint&.sockets&.first&.to_io&.local_address
end

# The port bound by the current worker.
# @returns [Integer | Nil]
def bound_port
bound_address&.ip_port
end
end
end
end
65 changes: 65 additions & 0 deletions lib/falcon/service/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require_relative "server"

module Falcon
# @namespace
module Service
# A managed service for running Falcon workers with independently bound endpoints.
class Cluster < Server
# Cluster workers bind independently in their own process.
def bind_endpoint
end

# Setup the service into the specified container.
# @parameter container [Async::Container] The container to configure.
def setup(container)
container_options = @evaluator.container_options
health_check_timeout = container_options[:health_check_timeout]

container.run(**container_options) do |instance|
clock = Async::Clock.start
bound_endpoint = nil

begin
Async do |task|
evaluator = self.environment.evaluator
server = nil

health_checker(instance, health_check_timeout) do
if server
instance.name = format_title(evaluator, server)
end
end

instance.status!("Preparing...")

bound_endpoint = evaluator.endpoint.bound
evaluator.bound_endpoint = bound_endpoint

evaluator.prepare!(instance)
emit_prepared(instance, clock)

instance.status!("Running...")
server = run(instance, evaluator, bound_endpoint)
instance.name = format_title(evaluator, server)
emit_running(instance, clock)

instance.ready!

sleep
ensure
bound_endpoint&.close
task&.children&.each(&:stop)
end
ensure
bound_endpoint&.close
end
end
end
end
end
end
20 changes: 15 additions & 5 deletions lib/falcon/service/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ def initialize(...)
@bound_endpoint = nil
end

# Prepare the bound endpoint for the server.
def start
# Bind the endpoint used by each server worker.
def bind_endpoint
@endpoint = @evaluator.endpoint

Sync do
@bound_endpoint = @endpoint.bound
end

Console.info(self){"Starting #{self.name} on #{@endpoint}"}
end

# Prepare the bound endpoint for the server.
def start
bind_endpoint

super
end
Expand All @@ -38,20 +43,25 @@ def start
#
# @parameter instance [Object] The container instance.
# @parameter evaluator [Environment::Evaluator] The environment evaluator.
# @parameter bound_endpoint [IO::Endpoint] The endpoint bound by this worker.
# @returns [Falcon::Server] The server instance.
def run(instance, evaluator)
def run(instance, evaluator, bound_endpoint = @bound_endpoint)
if evaluator.respond_to?(:make_supervised_worker)
Console.warn(self, "Async::Container::Supervisor is replaced by Async::Service::Supervisor, please update your service definition.")

evaluator.make_supervised_worker(instance).run
end

server = evaluator.make_server(@bound_endpoint)
server = evaluator.make_server(bound_endpoint)

Async do |task|
server.run

task.children&.each(&:wait)
begin
task.children&.each(&:wait)
rescue IOError
raise unless bound_endpoint.respond_to?(:sockets) && bound_endpoint.sockets.empty?
end
end

server
Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- Add `Falcon::Environment::Cluster` and `Falcon::Service::Cluster` for running workers with independently bound endpoints.
- Move Falcon middleware trace providers to `traces/provider/falcon/middleware`.

## v0.55.5
Expand Down
24 changes: 24 additions & 0 deletions test/falcon/environment/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "falcon/environment/cluster"
require "async/service/environment"

describe Falcon::Environment::Cluster do
let(:evaluator) do
Async::Service::Environment.build(subject, name: "localhost").evaluator
end

it "provides default cluster settings" do
expect(evaluator).to have_attributes(
service_class: be == Falcon::Service::Cluster,
url: be == "http://[::]:0",
authority: be == "localhost",
bound_endpoint: be == nil,
bound_address: be == nil,
bound_port: be == nil,
)
end
end
84 changes: 84 additions & 0 deletions test/falcon/service/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "falcon/service/cluster"
require "falcon/environment/cluster"
require "async/container"
require "async/service/environment"
require "fileutils"
require "net/http"
require "protocol/http/middleware"

describe Falcon::Service::Cluster do
let(:ports_path) {File.expand_path(".cluster/ports.txt", __dir__)}

let(:recorder) do
path = ports_path

Module.new do
def middleware
Protocol::HTTP::Middleware::HelloWorld
end

def container_options
super.merge(restart: false)
end

define_method(:prepare!) do |instance|
super(instance)

File.open(path, "a") do |file|
file.puts(bound_port)
end
end
end
end

let(:environment) do
Async::Service::Environment.new(Falcon::Environment::Cluster).with(
recorder,
name: "hello",
root: File.expand_path(".cluster/hello", __dir__),
url: "http://127.0.0.1:0",
count: 2,
)
end

let(:server) do
subject.new(environment)
end

before do
FileUtils.rm_rf(File.dirname(ports_path))
FileUtils.mkdir_p(File.dirname(ports_path))
end

after do
FileUtils.rm_rf(File.dirname(ports_path))
end

it "binds a unique port for each worker before preparing the instance" do
container = Async::Container.new

server.start
server.setup(container)
container.wait_until_ready

ports = File.readlines(ports_path, chomp: true).map(&:to_i)

expect(ports).to have_attributes(size: be == 2)
expect(ports).to have_value(be > 0)
expect(ports.uniq).to be == ports

ports.each do |port|
response = Net::HTTP.get_response(URI("http://127.0.0.1:#{port}/"))

expect(response).to have_attributes(code: be == "200")
end
ensure
server.stop
container&.stop
end
end
Loading