Skip to content

Commit 6450edd

Browse files
quinnjclaude
andcommitted
refactor: migrate from Channel/Slot to PipelineState API
Update HTTP server, client, and WebSocket code to use the new closure-based PipelineState from Reseau. Key changes: - Server: Connection.channel → Connection.pipeline, uses pipeline_shutdown!/pipeline_trigger_read/pipeline_schedule_task_now! - Client stream: h1conn.slot → h1conn.pipeline, uses pipeline_write! - WebSockets: WsChannelHandler → WsHandler (no longer extends AbstractChannelHandler), installs read handler via downstream_read_setter instead of slot manipulation - utils: _H2_CHANNEL_SUPPORTED = true (H2Connection no longer extends AbstractChannelHandler but is supported via pipeline API) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 65b739d commit 6450edd

4 files changed

Lines changed: 99 additions & 137 deletions

File tree

src/client/stream.jl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,23 +172,23 @@ end
172172
function _h1_flush_outgoing!(s::Stream)
173173
!isdefined(s, :aws_stream) && return
174174
h1conn = s.aws_stream.owning_connection
175-
slot = h1conn.slot
176-
slot === nothing && return
177-
channel = slot.channel
178-
channel === nothing && return
179-
if !Reseau.Sockets.channel_thread_is_callers_thread(channel)
175+
pipeline = h1conn.pipeline
176+
pipeline === nothing && return
177+
socket = pipeline.socket
178+
socket === nothing && return
179+
if !Reseau.Sockets.pipeline_thread_is_callers_thread(pipeline)
180180
fut = Future{Nothing}()
181-
task = Reseau.Sockets.ChannelTask((task, ctx, status) -> begin
182-
status == Reseau.TaskStatus.RUN_READY || return notify(fut, nothing)
181+
task = Reseau.Sockets.ChannelTask(Reseau.EventCallable(status -> begin
182+
Reseau.TaskStatus.T(status) == Reseau.TaskStatus.RUN_READY || return notify(fut, nothing)
183183
try
184184
_h1_flush_outgoing!(s)
185185
notify(fut, nothing)
186186
catch e
187187
notify(fut, CapturedException(e, catch_backtrace()))
188188
end
189189
return nothing
190-
end, nothing, "http_h1_flush_outgoing")
191-
Reseau.Sockets.channel_schedule_task_now!(channel, task)
190+
end), "http_h1_flush_outgoing")
191+
Reseau.Sockets.pipeline_schedule_task_now!(pipeline, task)
192192
wait(fut)
193193
return
194194
end
@@ -203,10 +203,10 @@ function _h1_flush_outgoing!(s::Stream)
203203
end
204204
buf.len = Csize_t(length(encoded))
205205
try
206-
Reseau.Sockets.channel_slot_send_message(slot, msg, Reseau.Sockets.ChannelDirection.WRITE)
206+
Reseau.Sockets.pipeline_write!(socket, msg)
207207
catch e
208208
e isa Reseau.ReseauError || rethrow()
209-
throw(AWSError("channel slot send failed"))
209+
throw(AWSError("pipeline write failed"))
210210
end
211211
end
212212
return
@@ -857,8 +857,8 @@ function with_stream(conn, req::Request, chunkedbody, on_stream_response_body, d
857857
(@atomic stream.fut.set) != 0 && return
858858
if !stream.http2 && isdefined(stream, :connection)
859859
conn = stream.connection
860-
if conn !== nothing && conn.slot !== nothing && conn.slot.channel !== nothing
861-
Reseau.Sockets.channel_shutdown!(conn.slot.channel, AwsHTTP.ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT; shutdown_immediately=true)
860+
if conn !== nothing && conn.pipeline !== nothing
861+
Reseau.Sockets.pipeline_shutdown!(conn.pipeline, AwsHTTP.ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT; shutdown_immediately=true)
862862
elseif conn !== nothing
863863
AwsHTTP.http_connection_close(conn)
864864
end

src/server.jl

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ const _BACKLOG_DEFAULT = 511
3030
mutable struct Connection{S}
3131
const server::S # Server{F, C}
3232
const h1conn::Any # AwsHTTP.H1Connection or AwsHTTP.H2Connection
33-
const channel::Any # Reseau.Channel
33+
const pipeline::Any # Reseau.PipelineState
3434
const streams_lock::ReentrantLock
3535
const streams::Set{Stream}
3636
const remote_addr::String
3737
const remote_port_num::Int
3838

39-
Connection(server::S, h1conn, channel, remote_addr::String, remote_port_num::Int) where {S} =
40-
new{S}(server, h1conn, channel, ReentrantLock(), Set{Stream}(), remote_addr, remote_port_num)
39+
Connection(server::S, h1conn, pipeline, remote_addr::String, remote_port_num::Int) where {S} =
40+
new{S}(server, h1conn, pipeline, ReentrantLock(), Set{Stream}(), remote_addr, remote_port_num)
4141
end
4242

4343
Base.hash(c::Connection, h::UInt) = hash(objectid(c), h)
@@ -267,7 +267,7 @@ function _create_request_handler!(conn::Connection, aws_conn; http2::Bool=false)
267267
end
268268
end
269269
if shutdown_channel
270-
Reseau.Sockets.channel_shutdown!(conn.channel; shutdown_immediately=true)
270+
Reseau.Sockets.pipeline_shutdown!(conn.pipeline; shutdown_immediately=true)
271271
@lock server.connections_lock begin
272272
delete!(server.connections, conn)
273273
end
@@ -395,28 +395,26 @@ function serve!(f, host="127.0.0.1", port=8080;
395395
)
396396
alpn_list = _tls_alpn_list(tls_conn_opts)
397397
initial_window = Csize_t(min(UInt64(initial_window_size), UInt64(typemax(Csize_t))))
398-
on_incoming_channel_setup = (bootstrap, error_code, channel, user_data) -> begin
398+
on_incoming_channel_setup = (bootstrap, error_code, pipeline, user_data) -> begin
399399
Base.CoreLogging.with_logstate(server.logstate) do
400400
if error_code != 0
401401
@error "incoming channel setup error" error_code
402402
return
403403
end
404404
st = @atomic(server.state)
405405
if st == :closing || st == :closed
406-
Reseau.Sockets.channel_shutdown!(channel; shutdown_immediately=true)
406+
Reseau.Sockets.pipeline_shutdown!(pipeline; shutdown_immediately=true)
407407
return
408408
end
409-
slot = Reseau.Sockets.channel_slot_new!(channel)
410-
Reseau.Sockets.channel_slot_insert_end!(channel, slot)
411409
version = AwsHTTP.HttpVersion.HTTP_1_1
412410
if tls_conn_opts !== nothing
413-
tls_slot = slot.adj_left
414-
if tls_slot === nothing || tls_slot.handler === nothing || !(tls_slot.handler isa Reseau.Sockets.TlsChannelHandler)
411+
tls_handler = pipeline.tls_handler
412+
if tls_handler === nothing
415413
@error "incoming channel setup error" error_code=Reseau.ERROR_INVALID_STATE
416-
Reseau.Sockets.channel_shutdown!(channel, Reseau.ERROR_INVALID_STATE)
414+
Reseau.Sockets.pipeline_shutdown!(pipeline, Reseau.ERROR_INVALID_STATE)
417415
return
418416
end
419-
protocol = Reseau.Sockets.tls_handler_protocol(tls_slot.handler)
417+
protocol = Reseau.Sockets.tls_handler_protocol(tls_handler)
420418
if protocol.len > 0
421419
protocol_str = Reseau.byte_buffer_as_string(protocol)
422420
if protocol_str == "h2"
@@ -426,26 +424,28 @@ function serve!(f, host="127.0.0.1", port=8080;
426424
end
427425
end
428426
end
429-
http_conn = AwsHTTP.http_connection_new_channel_handler(;
427+
http_conn = AwsHTTP.http_connection_new_handler(;
430428
is_server=true,
431429
version=version,
432430
initial_window_size=initial_window,
433431
)
434432
http_conn === nothing && return
435-
Reseau.Sockets.channel_slot_set_handler!(slot, http_conn)
436-
http_conn.slot = slot
437-
# Extract remote endpoint from the socket handler (first slot in pipeline)
433+
if version == AwsHTTP.HttpVersion.HTTP_2
434+
AwsHTTP.h2_connection_install!(http_conn, pipeline, pipeline.socket)
435+
else
436+
AwsHTTP.h1_connection_install!(http_conn, pipeline, pipeline.socket)
437+
end
438438
remote_addr = "0.0.0.0"
439439
remote_port_num = 0
440440
try
441-
socket_handler = channel.first.handler
442-
ep = socket_handler.socket.remote_endpoint
441+
socket = pipeline.socket
442+
ep = socket.remote_endpoint
443443
remote_addr = Reseau.Sockets.get_address(ep)
444444
remote_port_num = Int(ep.port)
445445
catch
446446
end
447447
http_conn.remote_endpoint = "$remote_addr:$remote_port_num"
448-
conn = Connection(server, http_conn, channel, remote_addr, remote_port_num)
448+
conn = Connection(server, http_conn, pipeline, remote_addr, remote_port_num)
449449
@lock server.connections_lock begin
450450
push!(server.connections, conn)
451451
end
@@ -470,26 +470,26 @@ function serve!(f, host="127.0.0.1", port=8080;
470470
else
471471
_create_request_handler!(conn, http_conn; http2=false)
472472
end
473-
if Reseau.Sockets.channel_thread_is_callers_thread(channel)
474-
Reseau.Sockets.channel_trigger_read(channel)
473+
if Reseau.Sockets.pipeline_thread_is_callers_thread(pipeline)
474+
Reseau.Sockets.pipeline_trigger_read(pipeline.socket)
475475
else
476476
task = Reseau.Sockets.ChannelTask(Reseau.EventCallable(status -> begin
477477
Reseau.TaskStatus.T(status) == Reseau.TaskStatus.RUN_READY || return nothing
478-
Reseau.Sockets.channel_trigger_read(channel)
478+
Reseau.Sockets.pipeline_trigger_read(pipeline.socket)
479479
return nothing
480480
end), "http_server_trigger_read")
481-
Reseau.Sockets.channel_schedule_task_now!(channel, task)
481+
Reseau.Sockets.pipeline_schedule_task_now!(pipeline, task)
482482
end
483483
end
484484
return
485485
end
486-
on_incoming_channel_shutdown = (bootstrap, error_code, channel, user_data) -> begin
486+
on_incoming_channel_shutdown = (bootstrap, error_code, pipeline, user_data) -> begin
487487
Base.CoreLogging.with_logstate(server.logstate) do
488488
if _should_log_channel_shutdown_error(error_code)
489489
@error "incoming channel shutdown error" error_code
490490
end
491491
@lock server.connections_lock begin
492-
filter!(c -> c.channel !== channel, server.connections)
492+
filter!(c -> c.pipeline !== pipeline, server.connections)
493493
end
494494
end
495495
return
@@ -633,7 +633,7 @@ function _forceclose!(server::Server; skip_shutdown::Bool=false)
633633
append!(conns, server.connections)
634634
end
635635
for conn in conns
636-
Reseau.Sockets.channel_shutdown!(conn.channel; shutdown_immediately=true)
636+
Reseau.Sockets.pipeline_shutdown!(conn.pipeline; shutdown_immediately=true)
637637
end
638638
@atomic server.state = :closed
639639
notify(server.closed)
@@ -658,7 +658,7 @@ function Base.close(server::Server)
658658
_stop_new_requests!(conn)
659659
@lock conn.streams_lock begin
660660
if isempty(conn.streams)
661-
Reseau.Sockets.channel_shutdown!(conn.channel; shutdown_immediately=true)
661+
Reseau.Sockets.pipeline_shutdown!(conn.pipeline; shutdown_immediately=true)
662662
@lock server.connections_lock begin
663663
delete!(server.connections, conn)
664664
end

src/utils.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const HTTP2_MAX_WINDOW_SIZE = 0x7fffffff
66
const AWS_HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = AwsHTTP.Http2SettingsId.MAX_CONCURRENT_STREAMS
77
const AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE = AwsHTTP.Http2SettingsId.INITIAL_WINDOW_SIZE
88
const AWS_HTTP2_SETTINGS_COUNT = Int(AwsHTTP.HTTP2_SETTINGS_END_RANGE - AwsHTTP.HTTP2_SETTINGS_BEGIN_RANGE)
9-
const _H2_CHANNEL_SUPPORTED = AwsHTTP.H2Connection <: Reseau.Sockets.AbstractChannelHandler
9+
const _H2_CHANNEL_SUPPORTED = true
1010

1111
function _normalize_alpn_list(alpn_list::Union{String, Nothing})
1212
alpn_list === nothing && return nothing

0 commit comments

Comments
 (0)