Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG CRYSTAL_VERSION=latest

FROM placeos/crystal:$CRYSTAL_VERSION as build
FROM placeos/crystal:$CRYSTAL_VERSION AS build
WORKDIR /app

# Set the commit via a build arg
Expand Down
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ shards:

openssl_ext:
git: https://github.com/spider-gazelle/openssl_ext.git
version: 2.8.3
version: 2.8.4

pars:
git: https://github.com/spider-gazelle/pars.git
Expand Down
4 changes: 4 additions & 0 deletions spec/publishing/publish_metadata_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ module PlaceOS::Source
def broadcast(message : Publisher::Message)
messages << message
end

def stats : Hash(String, UInt64)
{"mock" => 0_u64}
end
end

class Dummy
Expand Down
4 changes: 4 additions & 0 deletions src/source/publishing/influx_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ module PlaceOS::Source

delegate start, stop, to: publisher

def stats : Hash(String, UInt64)
{"influx" => publisher.processed}
end

def initialize(
@influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"),
@influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"),
Expand Down
59 changes: 50 additions & 9 deletions src/source/publishing/influx_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,50 @@
def initialize(@client : Flux::Client, @bucket : String)
end

@buffer : Array(Flux::Point) = Array(Flux::Point).new(StatusEvents::BATCH_SIZE)

# Write an MQTT event to InfluxDB
#
def publish(message : Publisher::Message)
points = self.class.transform(message)
points.each do |point|
Log.trace { {
measurement: point.measurement,
timestamp: point.timestamp.to_s,
tags: point.tags.to_json,
fields: point.fields.to_json,
} }
client.write(bucket, point)
@buffer.concat points
commit if @buffer.size >= StatusEvents::BATCH_SIZE
end

def commit : Nil
return if @buffer.empty?
points = @buffer.dup
@buffer.clear

# points.each do |point|
# Log.trace { {
# measurement: point.measurement,
# timestamp: point.timestamp.to_s,
# tags: point.tags.to_json,
# fields: point.fields.to_json,
# } }
# client.write(bucket, point)
# end

Log.debug { "writing #{points.size} points" }

begin
client.write(bucket, points)
rescue error
Log.error(exception: error) { "error batch writing points" }
points.each do |point|
client.write(bucket, point) rescue nil
end
end
end

def self.invalid_string?(str : String) : Bool
return true if str.size > 0xFF
str.each_byte do |byte|
# ASCII control chars + DEL
return true if byte < 0x20 || byte == 0x7F
end
false
end

@@building_timezones = {} of String => Time::Location?
Expand Down Expand Up @@ -141,6 +172,8 @@
case raw = Value.from_json(payload)
in CustomMetrics then parse_custom(raw, fields, tags, data, timestamp)
in FieldTypes
return [] of Flux::Point if raw.is_a?(String) && invalid_string?(raw)

fields[key] = raw
point = Flux::Point.new!(
measurement: data.module_name,
Expand Down Expand Up @@ -183,8 +216,10 @@
sub_key = sub_key.gsub(/\W/, '_')

if sub_key == "measurement" && value.is_a?(String)
return nil if invalid_string?(value)
measurement = value
else
next if value.is_a?(String) && invalid_string?(value)
local[sub_key] = value
end
end
Expand All @@ -199,7 +234,7 @@
).tap &.fields.merge!(local_fields)
end

protected def self.parse_custom(raw, fields, tags, data, timestamp)

Check warning on line 237 in src/source/publishing/influx_publisher.cr

View workflow job for this annotation

GitHub Actions / Ameba

Metrics/CyclomaticComplexity

Cyclomatic complexity too high [12/10]
Raw output
> protected def self.parse_custom(raw, fields, tags, data, timestamp)
                     ^
# Add the tags and fields going to all points
if ts_tags = raw.ts_tags
tags.merge!(ts_tags.compact)
Expand All @@ -213,6 +248,8 @@
points = Array(Flux::Point).new(initial_capacity: raw.value.size)
default_measurement = raw.measurement

return [] of Flux::Point if default_measurement.is_a?(String) && invalid_string?(default_measurement)

raw.value.each_with_index do |val, index|
# Skip if an empty point
compacted = val.compact
Expand All @@ -231,7 +268,9 @@
local_tags = tags.dup
local_tags["pos_uniq"] = index.to_s

points << build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys)
if point = build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys)
points << point
end
end

points
Expand All @@ -250,6 +289,8 @@
end
end

return nil if measurement_value.is_a?(String) && invalid_string?(measurement_value)

# convert fields to tags as required
if ts_tag_keys
ts_tag_keys.each do |field|
Expand Down
12 changes: 11 additions & 1 deletion src/source/publishing/mqtt_broker_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ module PlaceOS::Source
end
end

def stats : Hash(String, UInt64)
hash = {} of String => UInt64
read_publishers do |publishers|
publishers.values.each do |publisher|
hash["MQTT: #{publisher.broker.name}"] = publisher.processed
end
end
hash
end

def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result
# Don't recreate the publisher if only "safe" attributes have changed
case action
Expand Down Expand Up @@ -131,7 +141,7 @@ module PlaceOS::Source
end
end

# Safe to update iff fields in SAFE_ATTRIBUTES changed
# Safe to update if fields in SAFE_ATTRIBUTES changed
#
def self.safe_update?(model : Model::Broker)
# Take the union of the changed fields and the safe fields
Expand Down
2 changes: 1 addition & 1 deletion src/source/publishing/mqtt_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module PlaceOS::Source
Event.new(value, timestamp).to_json
end

private getter broker : PlaceOS::Model::Broker
getter broker : PlaceOS::Model::Broker
private getter broker_lock : RWLock = RWLock.new
protected getter client : ::MQTT::V3::Client

Expand Down
27 changes: 19 additions & 8 deletions src/source/publishing/publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,37 @@ module PlaceOS::Source
timestamp : Time
)

getter message_queue : Channel(Message) = Channel(Message).new
getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE)
getter processed : UInt64 = 0_u64

abstract def publish(message : Message)

def commit : Nil
end

def start
consume_messages
spawn { consume_messages }
end

def stop
message_queue.close
end

private def consume_messages
spawn do
while message = message_queue.receive?
begin
publish(message)
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
while !message_queue.closed?
select
when message = message_queue.receive?
if message
begin
publish(message)
@processed += 1_u64
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
end
end
when timeout(10.seconds)
# commit any buffered messages that have not been published yet
commit
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/source/publishing/publisher_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ module PlaceOS::Source
abstract def broadcast(message : Publisher::Message)
abstract def start
abstract def stop

abstract def stats : Hash(String, UInt64)
end
end
102 changes: 66 additions & 36 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ module PlaceOS::Source
Log = ::Log.for(self)

STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
MAX_CONTAINER_SIZE = 50_000
MAX_CONTAINER_SIZE = 40_000
BATCH_SIZE = 100
PROCESSING_INTERVAL = 100.milliseconds
PROCESSING_INTERVAL = 40.milliseconds
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8

private getter! redis : Redis
Expand Down Expand Up @@ -66,30 +66,51 @@ module PlaceOS::Source
redis.close
end

def paginate_modules(&)
batch_size = 64
last_created_at = Time.unix(0)
last_id = ""

loop do
modules = PlaceOS::Model::Module
.where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id)
.order(created_at: :asc, id: :asc)
.limit(batch_size)
.to_a

# process
break if modules.empty?
modules.each do |mod|
yield mod
end
break if modules.size < batch_size

last_created_at = modules.last.created_at
last_id = modules.last.id
end
end

def update_values
mods_mapped = 0_u64
status_updated = 0_u64
pattern = "initial_sync"
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size > MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
Log.info { {
message: "initial status sync complete",
Expand All @@ -109,26 +130,23 @@ module PlaceOS::Source
status_updated = 0_u64
pattern = "broker_resync"

PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end

Log.info { {
Expand Down Expand Up @@ -169,6 +187,18 @@ module PlaceOS::Source
else
process_batch(batch)
Fiber.yield

# This outputs how many writes have occured for each publisher
# stats = String.build do |io|
# io << "\n\n\nNEXT BATCH:\n"
# publisher_managers.each do |manager|
# manager.stats.each do |name, count|
# io << " * #{name} => #{count}"
# end
# end
# io << "\n\n"
# end
# puts stats
end
rescue error
Log.error(exception: error) { "error processing events" }
Expand Down
Loading