Skip to content

Commit f2a2cf7

Browse files
committed
feat(queue): simplify process for setting connect state
1 parent 68057cc commit f2a2cf7

5 files changed

Lines changed: 159 additions & 161 deletions

File tree

shard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: placeos-driver
2-
version: 7.12.0
2+
version: 7.13.0
33

44
dependencies:
55
action-controller:

src/placeos-driver/queue.cr

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class PlaceOS::Driver::Queue
2323
getter current : Task? = nil
2424
getter previous : Task? = nil
2525
getter terminated : Bool
26+
getter connected_state : Bool? = nil
2627
@online : Bool? = nil
2728

2829
# for modifying defaults
@@ -35,7 +36,7 @@ class PlaceOS::Driver::Queue
3536

3637
def online=(state : Bool)
3738
state_changed = state != @online
38-
@online = state
39+
@connected_state = @online = state
3940
@connected_callback.call(state) if state_changed
4041

4142
if @online && @waiting && @queue.size > 0
@@ -71,8 +72,17 @@ class PlaceOS::Driver::Queue
7172

7273
# A helper method for setting the connected state, without effecting queue
7374
# processing. UDP device not responding, incorrect login etc
74-
def set_connected(state)
75-
@connected_callback.call(state)
75+
def set_connected(state : Bool)
76+
current_state = @connected_state
77+
@connected_state = state
78+
79+
if state && !online
80+
self.online = true
81+
elsif state != current_state
82+
@connected_callback.call(state)
83+
end
84+
85+
self
7686
end
7787

7888
# adds a task callback to the queue

src/placeos-driver/transport.cr

Lines changed: 142 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -43,148 +43,148 @@ abstract class PlaceOS::Driver::Transport
4343
end
4444

4545
macro __build_http_helper__
46-
{% if @type.name.id.stringify != "PlaceOS::Driver::TransportHTTP" %}
47-
def http(method, path, body : ::HTTP::Client::BodyType = nil,
48-
params : Hash(String, String?) | URI::Params = URI::Params.new,
49-
headers : Hash(String, String) | HTTP::Headers = HTTP::Headers.new,
50-
secure = false, concurrent = true
51-
) : ::HTTP::Client::Response
52-
{% if @type.name.id.stringify == "PlaceOS::Driver::TransportLogic" %}
53-
raise "HTTP requests are not available in logic drivers"
54-
{% else %}
55-
56-
if uri_override = http_uri_override
57-
uri = uri_override
58-
elsif (uri_config = @uri.try(&.strip)) && !uri_config.empty?
59-
uri = URI.parse uri_config
60-
end
61-
62-
if uri
63-
context = case uri.scheme
64-
when "https", "wss"
65-
uri.scheme = "https"
66-
self.class.default_tls
67-
when "ws"
68-
uri.scheme = "http"
69-
nil
70-
else
71-
nil
72-
end
73-
else
74-
context = if secure
75-
uri = URI.parse "https://#{@ip}"
76-
77-
if secure.is_a?(OpenSSL::SSL::Context::Client)
78-
secure
79-
else
80-
self.class.default_tls
81-
end
82-
else
83-
uri = URI.parse "http://#{@ip}"
84-
nil
85-
end
86-
end
87-
88-
# Build the new URI
89-
uri.path = path
90-
params = if params.is_a?(Hash)
91-
URI::Params.new(params.transform_values { |v| v ? [v] : [] of String })
92-
else
93-
params
94-
end
95-
uri.query_params = params
96-
97-
# Apply headers
98-
headers = headers.is_a?(Hash) ? HTTP::Headers.new.tap { |head| headers.map { |key, value| head[key] = value } } : headers
99-
100-
# Make the request
101-
client = new_http_client(uri, context)
102-
cookies.add_request_headers(headers) unless @settings.get { setting?(Bool, :disable_cookies) } || false
103-
logger.debug { "http helper requesting: #{method.to_s.upcase} #{uri.request_target}" }
104-
check_http_response_encoding client.exec(method.to_s.upcase, uri.request_target, headers, body).tap { client.close }
105-
{% end %}
106-
end
107-
{% end %}
108-
109-
protected def check_http_response_encoding(response)
110-
headers = response.headers
111-
cookies.fill_from_server_headers(headers) unless @settings.get { setting?(Bool, :disable_cookies) } || false
112-
encoding = headers["Content-Encoding"]?
113-
if encoding.in?({"gzip", "deflate"})
114-
response.consume_body_io
115-
body = response.body
116-
117-
if !body.blank?
118-
body_io = IO::Memory.new(body)
119-
body = case encoding
120-
when "gzip"
121-
Compress::Gzip::Reader.open(body_io, &.gets_to_end)
122-
when "deflate"
123-
Compress::Deflate::Reader.open(body_io, &.gets_to_end)
124-
end
125-
126-
headers.delete("Content-Encoding")
127-
headers.delete("Content-Length")
128-
129-
response = HTTP::Client::Response.new(response.status, body, headers, response.status_message, response.version)
130-
end
131-
end
132-
response
133-
end
134-
135-
{% if @type.name.id.stringify != "PlaceOS::Driver::TransportLogic" %}
136-
protected def new_http_client(uri, context)
137-
client = ConnectProxy::HTTPClient.new(uri, context, ignore_env: true)
138-
connect_timeout = (@settings.get { setting?(Int32, :http_connect_timeout) } || 10).seconds
139-
comms_timeout = (@settings.get { setting?(Int32, :http_comms_timeout) } || 120).seconds
140-
client.dns_timeout = connect_timeout
141-
client.connect_timeout = connect_timeout
142-
client.read_timeout = comms_timeout
143-
client.write_timeout = comms_timeout
144-
145-
# Apply basic auth settings
146-
if auth = @settings.get { setting?(NamedTuple(username: String, password: String), :basic_auth) }
147-
client.basic_auth **auth
148-
end
149-
150-
# Apply proxy settings
151-
if proxy_config = @settings.get { setting?(NamedTuple(host: String, port: Int32, auth: NamedTuple(username: String, password: String)?), :proxy) }
152-
# this check is here so we can disable proxies as required
153-
if proxy_config[:host].presence
154-
proxy = ConnectProxy.new(**proxy_config)
155-
client.before_request { client.set_proxy(proxy.not_nil!) }
156-
end
157-
elsif ConnectProxy.behind_proxy?
158-
# Apply environment defined proxy
159-
begin
160-
proxy = ConnectProxy.new(*ConnectProxy.parse_proxy_url)
161-
client.before_request { client.set_proxy(proxy.not_nil!) }
162-
rescue error
163-
logger.warn(exception: error) { "failed to apply environment proxy URI" }
164-
end
165-
end
166-
167-
@proxy_in_use = proxy.try &.proxy_host
168-
169-
# Check if we need to override the Host header
170-
if host_header = @settings.get { setting?(String, :host_header) }
171-
client.before_request { |request| request.headers["Host"] = host_header }
172-
end
173-
174-
client.compress = true
175-
if before_req = @before_request
176-
client.before_request &before_req
177-
end
178-
client
179-
end
180-
{% end %}
181-
182-
def enable_multicast_loop(state = true)
183-
{% if @type.name.id.stringify == "PlaceOS::Driver::TransportUDP" %}
184-
@socket.try &.multicast_loopback = state
185-
{% end %}
186-
state
187-
end
46+
{% if @type.name.id.stringify != "PlaceOS::Driver::TransportHTTP" %}
47+
def http(method, path, body : ::HTTP::Client::BodyType = nil,
48+
params : Hash(String, String?) | URI::Params = URI::Params.new,
49+
headers : Hash(String, String) | HTTP::Headers = HTTP::Headers.new,
50+
secure = false, concurrent = true
51+
) : ::HTTP::Client::Response
52+
{% if @type.name.id.stringify == "PlaceOS::Driver::TransportLogic" %}
53+
raise "HTTP requests are not available in logic drivers"
54+
{% else %}
55+
56+
if uri_override = http_uri_override
57+
uri = uri_override
58+
elsif (uri_config = @uri.try(&.strip)) && !uri_config.empty?
59+
uri = URI.parse uri_config
60+
end
61+
62+
if uri
63+
context = case uri.scheme
64+
when "https", "wss"
65+
uri.scheme = "https"
66+
self.class.default_tls
67+
when "ws"
68+
uri.scheme = "http"
69+
nil
70+
else
71+
nil
72+
end
73+
else
74+
context = if secure
75+
uri = URI.parse "https://#{@ip}"
76+
77+
if secure.is_a?(OpenSSL::SSL::Context::Client)
78+
secure
79+
else
80+
self.class.default_tls
81+
end
82+
else
83+
uri = URI.parse "http://#{@ip}"
84+
nil
85+
end
86+
end
87+
88+
# Build the new URI
89+
uri.path = path
90+
params = if params.is_a?(Hash)
91+
URI::Params.new(params.transform_values { |v| v ? [v] : [] of String })
92+
else
93+
params
94+
end
95+
uri.query_params = params
96+
97+
# Apply headers
98+
headers = headers.is_a?(Hash) ? HTTP::Headers.new.tap { |head| headers.map { |key, value| head[key] = value } } : headers
99+
100+
# Make the request
101+
client = new_http_client(uri, context)
102+
cookies.add_request_headers(headers) unless @settings.get { setting?(Bool, :disable_cookies) } || false
103+
logger.debug { "http helper requesting: #{method.to_s.upcase} #{uri.request_target}" }
104+
check_http_response_encoding client.exec(method.to_s.upcase, uri.request_target, headers, body).tap { client.close }
105+
{% end %}
106+
end
107+
{% end %}
108+
109+
protected def check_http_response_encoding(response)
110+
headers = response.headers
111+
cookies.fill_from_server_headers(headers) unless @settings.get { setting?(Bool, :disable_cookies) } || false
112+
encoding = headers["Content-Encoding"]?
113+
if encoding.in?({"gzip", "deflate"})
114+
response.consume_body_io
115+
body = response.body
116+
117+
if !body.blank?
118+
body_io = IO::Memory.new(body)
119+
body = case encoding
120+
when "gzip"
121+
Compress::Gzip::Reader.open(body_io, &.gets_to_end)
122+
when "deflate"
123+
Compress::Deflate::Reader.open(body_io, &.gets_to_end)
124+
end
125+
126+
headers.delete("Content-Encoding")
127+
headers.delete("Content-Length")
128+
129+
response = HTTP::Client::Response.new(response.status, body, headers, response.status_message, response.version)
130+
end
131+
end
132+
response
133+
end
134+
135+
{% if @type.name.id.stringify != "PlaceOS::Driver::TransportLogic" %}
136+
protected def new_http_client(uri, context)
137+
client = ConnectProxy::HTTPClient.new(uri, context, ignore_env: true)
138+
connect_timeout = (@settings.get { setting?(Int32, :http_connect_timeout) } || 10).seconds
139+
comms_timeout = (@settings.get { setting?(Int32, :http_comms_timeout) } || 120).seconds
140+
client.dns_timeout = connect_timeout
141+
client.connect_timeout = connect_timeout
142+
client.read_timeout = comms_timeout
143+
client.write_timeout = comms_timeout
144+
145+
# Apply basic auth settings
146+
if auth = @settings.get { setting?(NamedTuple(username: String, password: String), :basic_auth) }
147+
client.basic_auth **auth
148+
end
149+
150+
# Apply proxy settings
151+
if proxy_config = @settings.get { setting?(NamedTuple(host: String, port: Int32, auth: NamedTuple(username: String, password: String)?), :proxy) }
152+
# this check is here so we can disable proxies as required
153+
if proxy_config[:host].presence
154+
proxy = ConnectProxy.new(**proxy_config)
155+
client.before_request { client.set_proxy(proxy.not_nil!) }
156+
end
157+
elsif ConnectProxy.behind_proxy?
158+
# Apply environment defined proxy
159+
begin
160+
proxy = ConnectProxy.new(*ConnectProxy.parse_proxy_url)
161+
client.before_request { client.set_proxy(proxy.not_nil!) }
162+
rescue error
163+
logger.warn(exception: error) { "failed to apply environment proxy URI" }
164+
end
165+
end
166+
167+
@proxy_in_use = proxy.try &.proxy_host
168+
169+
# Check if we need to override the Host header
170+
if host_header = @settings.get { setting?(String, :host_header) }
171+
client.before_request { |request| request.headers["Host"] = host_header }
172+
end
173+
174+
client.compress = true
175+
if before_req = @before_request
176+
client.before_request &before_req
177+
end
178+
client
179+
end
180+
{% end %}
181+
182+
def enable_multicast_loop(state = true)
183+
{% if @type.name.id.stringify == "PlaceOS::Driver::TransportUDP" %}
184+
@socket.try &.multicast_loopback = state
185+
{% end %}
186+
state
187+
end
188188
end
189189

190190
# Many devices have a HTTP service. Might as well make it easy to access.

src/placeos-driver/transport/http.cr

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ class PlaceOS::Driver
144144
@max_requests : Int32
145145
@ip : String
146146
@tls : OpenSSL::SSL::Context::Client?
147-
@connected_state : Bool = true
148147

149148
property :received
150149

@@ -162,14 +161,7 @@ class PlaceOS::Driver
162161
# the requests are required to re-enable the queue
163162
# and queue based HTTP drivers are less common
164163
protected def set_connected_state(state : Bool)
165-
current_state = @connected_state
166-
@connected_state = state
167-
168-
if state && !@queue.online
169-
@queue.online = true
170-
else
171-
@queue.set_connected(state) if state != current_state
172-
end
164+
@queue.set_connected(state) if state != @queue.connected_state
173165
end
174166

175167
protected def __is_https?
@@ -273,7 +265,7 @@ class PlaceOS::Driver
273265
end
274266

275267
# we don't want to be calling connected callback each time a request succeeds
276-
set_connected_state(true) unless @connected_state
268+
set_connected_state(true)
277269
@received.call
278270

279271
# fallback in case the HTTP client lib doesn't decompress the response

src/placeos-driver/transport/tcp.cr

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ class PlaceOS::Driver::TransportTCP < PlaceOS::Driver::Transport
1818
@uri : String?
1919
@socket : IO?
2020
@tls : OpenSSL::SSL::Context::Client?
21-
@connected_state : Bool? = nil
2221
property :received
2322

2423
def connect(connect_timeout : Int32 = 10) : Nil
@@ -46,13 +45,10 @@ class PlaceOS::Driver::TransportTCP < PlaceOS::Driver::Transport
4645

4746
# don't stop processing commands on makebreak devices
4847
protected def set_connected_state(state : Bool)
49-
current_state = @connected_state
50-
@connected_state = state
51-
5248
if state && !@queue.online
5349
@queue.online = true
5450
elsif @makebreak
55-
@queue.set_connected(state) if state != current_state
51+
@queue.set_connected(state) if state != @queue.connected_state
5652
else
5753
@queue.online = state
5854
end

0 commit comments

Comments
 (0)