Skip to content

Commit 5406ed4

Browse files
reorder decoded frames according to stream priorization rules
1 parent e197bbb commit 5406ed4

6 files changed

Lines changed: 112 additions & 5 deletions

File tree

lib/http/2/connection.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def initialize(settings = {})
9393
@streams_recently_closed = {}
9494
@pending_settings = []
9595

96-
@framer = Framer.new(@local_settings[:settings_max_frame_size])
96+
@framer = Framer.new(@streams, @local_settings[:settings_max_frame_size])
9797

9898
@local_window_limit = @local_settings[:settings_initial_window_size]
9999
@local_window = @local_window_limit

lib/http/2/framer.rb

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,12 @@ class Framer
115115

116116
# Initializes new framer object.
117117
#
118-
def initialize(local_max_frame_size = DEFAULT_MAX_FRAME_SIZE,
119-
remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE)
118+
def initialize(
119+
streams = {},
120+
local_max_frame_size = DEFAULT_MAX_FRAME_SIZE,
121+
remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE
122+
)
123+
@streams = streams
120124
@local_max_frame_size = local_max_frame_size
121125
@remote_max_frame_size = remote_max_frame_size
122126
@frames = []
@@ -372,13 +376,30 @@ def generate(frame)
372376
#
373377
# @param buf [Buffer]
374378
def parse(buf)
379+
decoded = false
380+
375381
while (frame = decode_frame(buf))
376382
if frame[:type] == :ping
377383
# PING responses SHOULD be given higher priority than any other frame.
378384
@frames.unshift(frame)
379385
else
380386
@frames << frame
381387
end
388+
decoded = true
389+
end
390+
391+
# TODO: support stream prioritization
392+
# WIP
393+
if decoded
394+
@frames.sort! do |f1, f2|
395+
next(0) unless f1.key?(:stream) && f2.key?(:stream)
396+
397+
s1 = @streams[f1[:stream]] or next(0)
398+
399+
s2 = @streams[f2[:stream]] or next(0)
400+
401+
s1 <=> s2
402+
end
382403
end
383404

384405
@frames.shift

lib/http/2/stream.rb

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,14 @@ class Stream
5353

5454
# Stream priority as set by initiator.
5555
attr_reader :weight
56-
attr_reader :dependency, :remote_window
56+
57+
# whether the stream is exclusive in the dependency tree
58+
attr_reader :exclusive
59+
60+
# the parent stream
61+
attr_reader :dependency
62+
63+
attr_reader :remote_window
5764

5865
# Size of current stream flow control window.
5966
attr_reader :local_window
@@ -82,6 +89,7 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa
8289
@id = id
8390
@weight = weight
8491
@dependency = dependency
92+
@exclusive = exclusive
8593

8694
# from mixins
8795
@listeners = Hash.new { |hash, key| hash[key] = [] }
@@ -104,6 +112,28 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa
104112
on(:local_window) { |v| @local_window_max_size = @local_window = v }
105113
end
106114

115+
def <=>(other)
116+
if !@dependency.zero?
117+
if @dependency == other.id
118+
# parent stream processed before
119+
return 1
120+
elsif @dependency == other.dependency
121+
if @exclusive
122+
# exclusive streams from the same dep come first
123+
return -1
124+
elsif other.exclusive
125+
return 1
126+
else
127+
return other.weight <=> @weight
128+
end
129+
end
130+
elsif !other.dependency.zero?
131+
return -1 if @id == other.dependency
132+
end
133+
134+
other.weight <=> @weight
135+
end
136+
107137
def closed?
108138
@state == :closed
109139
end
@@ -647,6 +677,7 @@ def complete_transition(frame)
647677
def process_priority(frame)
648678
@weight = frame[:weight]
649679
@dependency = frame[:dependency]
680+
@exclusive = frame[:exclusive]
650681
emit(
651682
:priority,
652683
weight: frame[:weight],

sig/framer.rbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ module HTTP2
3535
@local_max_frame_size: Integer
3636
@remote_max_frame_size: Integer
3737
@streams: Hash[Integer, Stream]
38+
@frames: Array[frame]
3839

3940
attr_accessor local_max_frame_size: Integer
4041

@@ -52,7 +53,7 @@ module HTTP2
5253

5354
private
5455

55-
def initialize: (?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped
56+
def initialize: (?Hash[Integer, Stream] streams, ?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped
5657

5758
def decode_frame: (String buf) -> frame?
5859

sig/stream.rbs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module HTTP2
1010
attr_reader parent: Stream?
1111
attr_reader weight: Integer
1212
attr_reader dependency: Integer
13+
attr_reader exclusive: bool
1314
attr_reader remote_window: Integer
1415
attr_reader local_window: Integer
1516
attr_reader closed: Symbol?
@@ -33,6 +34,8 @@ module HTTP2
3334

3435
alias << receive
3536

37+
def <=>: (Stream other) -> Integer
38+
3639
def verify_trailers: (headers_frame frame) -> void
3740

3841
def calculate_content_length: (Integer?) -> void

spec/stream_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,57 @@
662662
end
663663
end
664664

665+
context "prioritization" do
666+
it "should order based on dependency" do
667+
# 5.3.1
668+
# if streams B and C are dependent on stream A, and if stream
669+
# D is created with a dependency on stream A, this results in a
670+
# dependency order of A followed by B, C, and D in any order.
671+
a_stream = client.new_stream
672+
b_stream = client.new_stream(dependency: a_stream.id)
673+
c_stream = client.new_stream(dependency: a_stream.id)
674+
d_stream = client.new_stream(dependency: a_stream.id)
675+
676+
expect([b_stream, c_stream, d_stream, a_stream].sort).to eq(
677+
[a_stream, b_stream, c_stream, d_stream]
678+
)
679+
expect([b_stream, d_stream, c_stream, a_stream].sort).to eq(
680+
[a_stream, b_stream, d_stream, c_stream]
681+
)
682+
end
683+
684+
it "should push exclusive streams up the stack" do
685+
# he exclusive flag causes the stream to become the
686+
# sole dependency of its parent stream, causing other dependencies to
687+
# become dependent on the exclusive stream. In the previous example,
688+
# if stream D is created with an exclusive dependency on stream A, this
689+
# results in D becoming the dependency parent of B and C.
690+
a_stream = client.new_stream
691+
b_stream = client.new_stream(dependency: a_stream.id)
692+
c_stream = client.new_stream(dependency: a_stream.id)
693+
d_stream = client.new_stream(dependency: a_stream.id, exclusive: true)
694+
695+
expect([b_stream, c_stream, d_stream, a_stream].sort).to eq(
696+
[a_stream, d_stream, b_stream, c_stream]
697+
)
698+
end
699+
700+
it "should prioritze based on weight" do
701+
# Streams with the same parent SHOULD be allocated resources
702+
# proportionally based on their weight. Thus, if stream B depends on
703+
# stream A with weight 4, stream C depends on stream A with weight 12,
704+
# and no progress can be made on stream A, stream B ideally receives
705+
# one-third of the resources allocated to stream C.
706+
a_stream = client.new_stream
707+
b_stream = client.new_stream(dependency: a_stream.id, weight: 4)
708+
c_stream = client.new_stream(dependency: a_stream.id, weight: 12)
709+
710+
expect([b_stream, c_stream, a_stream].sort).to eq(
711+
[a_stream, c_stream, b_stream]
712+
)
713+
end
714+
end
715+
665716
context "client API" do
666717
it ".reprioritize should emit PRIORITY frame" do
667718
expect(stream).to receive(:send) do |frame|

0 commit comments

Comments
 (0)