Skip to content

Commit 12ef4c4

Browse files
authored
fix(subscriptions): redis reconnect logic could drop subscriptions (#170)
ensures connections are re-established properly
1 parent e06fcca commit 12ef4c4

4 files changed

Lines changed: 51 additions & 44 deletions

File tree

shard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: placeos-driver
2-
version: 7.17.4
2+
version: 7.17.5
33

44
dependencies:
55
action-controller:

src/placeos-driver/protocol.cr

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,9 @@ require "tokenizer"
66

77
require "./protocol/request"
88

9-
STDIN.blocking = false
109
STDIN.sync = false
11-
1210
STDERR.flush_on_newline = false
13-
STDERR.blocking = false
14-
STDERR.sync = true # we mark this as false if in use for protocol comms
15-
16-
STDOUT.blocking = false
11+
STDERR.sync = true
1712
STDOUT.sync = true
1813

1914
# :nodoc:

src/placeos-driver/subscriptions.cr

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,16 @@ class PlaceOS::Driver
119119
end
120120

121121
private def monitor_changes
122+
monitor_count = 0
123+
122124
SimpleRetry.try_to(
123125
base_interval: 1.second,
124126
max_interval: 5.seconds,
125127
randomise: 500.milliseconds
126128
) do
129+
return if terminated?
130+
monitor_count += 1
131+
subscribe_count = monitor_count
127132
wait = Channel(Nil).new
128133
begin
129134
# This will run on redis reconnect
@@ -134,39 +139,31 @@ class PlaceOS::Driver
134139
# re-subscribe to existing subscriptions here
135140
# NOTE:: sending an empty array errors
136141
keys = mutex.synchronize { subscriptions.keys }
137-
redis.subscribe(keys) if keys.size > 0
142+
if keys.size > 0
143+
redis.subscribe(keys)
144+
end
138145

139146
spawn(same_thread: true) {
140-
# re-check indirect subscriptions
147+
# re-check indirect subscriptions, these are registered by the subscriptions above
141148
redirections.each_key do |system_id|
142149
remap_indirect(system_id)
143150
end
144151
}
145152

146153
@running = true
147154

148-
# TODO:: check for any value changes
149-
# disconnect might have been a network partition and an update may
150-
# have occurred during this time gap
151-
152155
while details = subscription_channel.receive?
153156
sub, chan = details
154157

155158
begin
156-
SimpleRetry.try_to(
157-
max_attempts: 4,
158-
base_interval: 20.milliseconds,
159-
max_interval: 1.seconds,
160-
randomise: 80.milliseconds
161-
) do
162-
if sub
163-
redis.subscribe [chan]
164-
else
165-
redis.unsubscribe [chan]
166-
end
159+
if sub
160+
redis.subscribe [chan]
161+
else
162+
redis.unsubscribe [chan]
167163
end
168164
rescue error
169165
Log.fatal(exception: error) { "redis subscription failed... some components may not function correctly" }
166+
redis.close
170167
end
171168
end
172169
}
@@ -175,8 +172,19 @@ class PlaceOS::Driver
175172
# requires a block and subsequent ones throw an error with a block.
176173
# NOTE:: this version of subscribe only supports splat arguments
177174
redis.subscribe(SYSTEM_ORDER_UPDATE) do |on|
175+
raise "redis reconnect detected" if subscribe_count != monitor_count
176+
subscribe_count += 1
177+
178178
on.message { |c, m| on_message(c, m) }
179-
spawn(same_thread: true) { wait.close }
179+
spawn(same_thread: true) do
180+
instance = monitor_count
181+
wait.close
182+
loop do
183+
sleep 1.second
184+
break if instance != monitor_count
185+
subscription_channel.send({true, SYSTEM_ORDER_UPDATE})
186+
end
187+
end
180188
end
181189

182190
raise "no subscriptions, restarting loop" unless terminated?
@@ -193,8 +201,15 @@ class PlaceOS::Driver
193201

194202
@running = false
195203

196-
# We need to re-create the subscribe object for our sanity
197-
handle_disconnect unless terminated?
204+
case client = @redis
205+
in ::Redis::Cluster::Client
206+
client.close!
207+
in Redis
208+
client.close rescue nil
209+
in Nil
210+
end
211+
@redis_cluster = nil
212+
@redis = nil
198213
end
199214
end
200215
end
@@ -211,14 +226,16 @@ class PlaceOS::Driver
211226
new_channel = sub.subscribe_to
212227

213228
# Unsubscribe if channel changed
214-
if current_channel && current_channel != new_channel
215-
perform_unsubscribe(sub, current_channel)
216-
else
217-
subscribed = true
229+
if current_channel
230+
if current_channel != new_channel
231+
perform_unsubscribe(sub, current_channel)
232+
else
233+
subscribed = true
234+
end
218235
end
219236
end
220237

221-
perform_subscribe(sub) if !subscribed
238+
perform_subscribe(sub) unless subscribed
222239
end
223240
end
224241
end
@@ -271,15 +288,15 @@ class PlaceOS::Driver
271288
@redis : Redis? = nil
272289

273290
protected def self.new_clustered_redis
274-
Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379")
291+
Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379", reconnect: false)
275292
end
276293

277294
private def redis_cluster
278-
@redis_cluster_client ||= Subscriptions.new_clustered_redis
295+
@redis_cluster ||= Subscriptions.new_clustered_redis
279296
end
280297

281298
protected def self.new_redis(cluster : Redis::Client = new_clustered_redis) : Redis
282-
client = new_clustered_redis.connect!
299+
client = cluster.connect!
283300

284301
if client.is_a?(::Redis::Cluster::Client)
285302
client.cluster_info.each_nodes do |node_info|
@@ -293,14 +310,9 @@ class PlaceOS::Driver
293310
# Could not connect to any nodes in cluster
294311
raise Redis::ConnectionError.new
295312
else
296-
cluster.close!
297-
cluster.connect!.as(Redis)
313+
client.as(Redis)
298314
end
299315
end
300-
301-
private def handle_disconnect
302-
@redis = Subscriptions.new_redis(redis_cluster)
303-
end
304316
end
305317
end
306318

src/placeos-driver/transport/http.cr

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class PlaceOS::Driver
130130

131131
max_requests = @settings.get { setting?(Int32, :http_max_requests) } || 1000
132132
@max_requests = max_requests
133-
@client_idle = Time.monotonic
133+
@client_idle = Time.instant
134134
@client_requests = 0
135135

136136
@tls = __is_https? ? new_tls_context : nil
@@ -139,7 +139,7 @@ class PlaceOS::Driver
139139

140140
@params_base : URI::Params
141141
@client : ConnectProxy::HTTPClient
142-
@client_idle : Time::Span
142+
@client_idle : Time::Instant
143143
@keep_alive : Time::Span
144144
@max_requests : Int32
145145
@ip : String
@@ -179,7 +179,7 @@ class PlaceOS::Driver
179179

180180
protected def with_shared_client(&)
181181
@http_client_mutex.synchronize do
182-
now = Time.monotonic
182+
now = Time.instant
183183
idle_for = now - @client_idle
184184
__new_http_client if @client.__place_socket_invalid? || idle_for >= @keep_alive || @client_requests >= @max_requests
185185
@client_idle = now

0 commit comments

Comments
 (0)