diff --git a/Dockerfile b/Dockerfile index 9c8cde0..c61e124 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ ARG CRYSTAL_VERSION=latest -FROM placeos/crystal:$CRYSTAL_VERSION as build +FROM placeos/crystal:$CRYSTAL_VERSION AS build WORKDIR /app # Set the commit via a build arg diff --git a/shard.lock b/shard.lock index 29a6e73..2279f77 100644 --- a/shard.lock +++ b/shard.lock @@ -159,7 +159,7 @@ shards: openssl_ext: git: https://github.com/spider-gazelle/openssl_ext.git - version: 2.8.3 + version: 2.8.4 pars: git: https://github.com/spider-gazelle/pars.git diff --git a/spec/publishing/publish_metadata_spec.cr b/spec/publishing/publish_metadata_spec.cr index bc9df82..972cb9c 100644 --- a/spec/publishing/publish_metadata_spec.cr +++ b/spec/publishing/publish_metadata_spec.cr @@ -17,6 +17,10 @@ module PlaceOS::Source def broadcast(message : Publisher::Message) messages << message end + + def stats : Hash(String, UInt64) + {"mock" => 0_u64} + end end class Dummy diff --git a/src/source/publishing/influx_manager.cr b/src/source/publishing/influx_manager.cr index b2d4800..3c26f8b 100644 --- a/src/source/publishing/influx_manager.cr +++ b/src/source/publishing/influx_manager.cr @@ -16,6 +16,10 @@ module PlaceOS::Source delegate start, stop, to: publisher + def stats : Hash(String, UInt64) + {"influx" => publisher.processed} + end + def initialize( @influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"), @influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"), diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index 6bdc8d2..0a051a6 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -47,19 +47,50 @@ module PlaceOS::Source def initialize(@client : Flux::Client, @bucket : String) end + @buffer : Array(Flux::Point) = Array(Flux::Point).new(StatusEvents::BATCH_SIZE) + # Write an MQTT event to InfluxDB # def publish(message : Publisher::Message) points = self.class.transform(message) - points.each do |point| - Log.trace { { - measurement: point.measurement, - timestamp: point.timestamp.to_s, - tags: point.tags.to_json, - fields: point.fields.to_json, - } } - client.write(bucket, point) + @buffer.concat points + commit if @buffer.size >= StatusEvents::BATCH_SIZE + end + + def commit : Nil + return if @buffer.empty? + points = @buffer.dup + @buffer.clear + + # points.each do |point| + # Log.trace { { + # measurement: point.measurement, + # timestamp: point.timestamp.to_s, + # tags: point.tags.to_json, + # fields: point.fields.to_json, + # } } + # client.write(bucket, point) + # end + + Log.debug { "writing #{points.size} points" } + + begin + client.write(bucket, points) + rescue error + Log.error(exception: error) { "error batch writing points" } + points.each do |point| + client.write(bucket, point) rescue nil + end + end + end + + def self.invalid_string?(str : String) : Bool + return true if str.size > 0xFF + str.each_byte do |byte| + # ASCII control chars + DEL + return true if byte < 0x20 || byte == 0x7F end + false end @@building_timezones = {} of String => Time::Location? @@ -141,6 +172,8 @@ module PlaceOS::Source case raw = Value.from_json(payload) in CustomMetrics then parse_custom(raw, fields, tags, data, timestamp) in FieldTypes + return [] of Flux::Point if raw.is_a?(String) && invalid_string?(raw) + fields[key] = raw point = Flux::Point.new!( measurement: data.module_name, @@ -183,8 +216,10 @@ module PlaceOS::Source sub_key = sub_key.gsub(/\W/, '_') if sub_key == "measurement" && value.is_a?(String) + return nil if invalid_string?(value) measurement = value else + next if value.is_a?(String) && invalid_string?(value) local[sub_key] = value end end @@ -213,6 +248,8 @@ module PlaceOS::Source points = Array(Flux::Point).new(initial_capacity: raw.value.size) default_measurement = raw.measurement + return [] of Flux::Point if default_measurement.is_a?(String) && invalid_string?(default_measurement) + raw.value.each_with_index do |val, index| # Skip if an empty point compacted = val.compact @@ -231,7 +268,9 @@ module PlaceOS::Source local_tags = tags.dup local_tags["pos_uniq"] = index.to_s - points << build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys) + if point = build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys) + points << point + end end points @@ -250,6 +289,8 @@ module PlaceOS::Source end end + return nil if measurement_value.is_a?(String) && invalid_string?(measurement_value) + # convert fields to tags as required if ts_tag_keys ts_tag_keys.each do |field| diff --git a/src/source/publishing/mqtt_broker_manager.cr b/src/source/publishing/mqtt_broker_manager.cr index d4f8521..d06bd3d 100644 --- a/src/source/publishing/mqtt_broker_manager.cr +++ b/src/source/publishing/mqtt_broker_manager.cr @@ -26,6 +26,16 @@ module PlaceOS::Source end end + def stats : Hash(String, UInt64) + hash = {} of String => UInt64 + read_publishers do |publishers| + publishers.values.each do |publisher| + hash["MQTT: #{publisher.broker.name}"] = publisher.processed + end + end + hash + end + def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result # Don't recreate the publisher if only "safe" attributes have changed case action @@ -131,7 +141,7 @@ module PlaceOS::Source end end - # Safe to update iff fields in SAFE_ATTRIBUTES changed + # Safe to update if fields in SAFE_ATTRIBUTES changed # def self.safe_update?(model : Model::Broker) # Take the union of the changed fields and the safe fields diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 1ac8696..ecd514e 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -30,7 +30,7 @@ module PlaceOS::Source Event.new(value, timestamp).to_json end - private getter broker : PlaceOS::Model::Broker + getter broker : PlaceOS::Model::Broker private getter broker_lock : RWLock = RWLock.new protected getter client : ::MQTT::V3::Client diff --git a/src/source/publishing/publisher.cr b/src/source/publishing/publisher.cr index b8b871f..5154735 100644 --- a/src/source/publishing/publisher.cr +++ b/src/source/publishing/publisher.cr @@ -11,12 +11,16 @@ module PlaceOS::Source timestamp : Time ) - getter message_queue : Channel(Message) = Channel(Message).new + getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE) + getter processed : UInt64 = 0_u64 abstract def publish(message : Message) + def commit : Nil + end + def start - consume_messages + spawn { consume_messages } end def stop @@ -24,13 +28,20 @@ module PlaceOS::Source end private def consume_messages - spawn do - while message = message_queue.receive? - begin - publish(message) - rescue error - Log.warn(exception: error) { "publishing message: #{message}" } + while !message_queue.closed? + select + when message = message_queue.receive? + if message + begin + publish(message) + @processed += 1_u64 + rescue error + Log.warn(exception: error) { "publishing message: #{message}" } + end end + when timeout(10.seconds) + # commit any buffered messages that have not been published yet + commit end end end diff --git a/src/source/publishing/publisher_manager.cr b/src/source/publishing/publisher_manager.cr index 9083dbc..76423cf 100644 --- a/src/source/publishing/publisher_manager.cr +++ b/src/source/publishing/publisher_manager.cr @@ -5,5 +5,7 @@ module PlaceOS::Source abstract def broadcast(message : Publisher::Message) abstract def start abstract def stop + + abstract def stats : Hash(String, UInt64) end end diff --git a/src/source/status_events.cr b/src/source/status_events.cr index 83fdaba..fe518cc 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -12,9 +12,9 @@ module PlaceOS::Source Log = ::Log.for(self) STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*" - MAX_CONTAINER_SIZE = 50_000 + MAX_CONTAINER_SIZE = 40_000 BATCH_SIZE = 100 - PROCESSING_INTERVAL = 100.milliseconds + PROCESSING_INTERVAL = 40.milliseconds CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8 private getter! redis : Redis @@ -66,30 +66,51 @@ module PlaceOS::Source redis.close end + def paginate_modules(&) + batch_size = 64 + last_created_at = Time.unix(0) + last_id = "" + + loop do + modules = PlaceOS::Model::Module + .where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id) + .order(created_at: :asc, id: :asc) + .limit(batch_size) + .to_a + + # process + break if modules.empty? + modules.each do |mod| + yield mod + end + break if modules.size < batch_size + + last_created_at = modules.last.created_at + last_id = modules.last.id + end + end + def update_values mods_mapped = 0_u64 status_updated = 0_u64 pattern = "initial_sync" - PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules| - modules.each do |mod| - next unless mod - mods_mapped += 1_u64 - module_id = mod.id.to_s - store = PlaceOS::Driver::RedisStorage.new(module_id) - store.each do |key, value| - status_updated += 1_u64 - add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) - end + paginate_modules do |mod| + mods_mapped += 1_u64 + module_id = mod.id.to_s + store = PlaceOS::Driver::RedisStorage.new(module_id) + store.each do |key, value| + status_updated += 1_u64 + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) + end - # Backpressure if event container is growing too fast - if event_container.size >= MAX_CONTAINER_SIZE / 2 - until event_container.size < MAX_CONTAINER_SIZE / 4 - sleep 10.milliseconds - end + # Backpressure if event container is growing too fast + if event_container.size > MAX_CONTAINER_SIZE // 2 + until event_container.size < MAX_CONTAINER_SIZE // 4 + sleep 10.milliseconds end - rescue error - Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" } end + rescue error + Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" } end Log.info { { message: "initial status sync complete", @@ -109,26 +130,23 @@ module PlaceOS::Source status_updated = 0_u64 pattern = "broker_resync" - PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules| - modules.each do |mod| - next unless mod - mods_mapped += 1_u64 - module_id = mod.id.to_s - store = PlaceOS::Driver::RedisStorage.new(module_id) - store.each do |key, value| - status_updated += 1_u64 - add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) - end + paginate_modules do |mod| + mods_mapped += 1_u64 + module_id = mod.id.to_s + store = PlaceOS::Driver::RedisStorage.new(module_id) + store.each do |key, value| + status_updated += 1_u64 + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) + end - # Backpressure if event container is growing too fast - if event_container.size >= MAX_CONTAINER_SIZE / 2 - until event_container.size < MAX_CONTAINER_SIZE / 4 - sleep 10.milliseconds - end + # Backpressure if event container is growing too fast + if event_container.size >= MAX_CONTAINER_SIZE // 2 + until event_container.size < MAX_CONTAINER_SIZE // 4 + sleep 10.milliseconds end - rescue error - Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" } end + rescue error + Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" } end Log.info { { @@ -169,6 +187,18 @@ module PlaceOS::Source else process_batch(batch) Fiber.yield + + # This outputs how many writes have occured for each publisher + # stats = String.build do |io| + # io << "\n\n\nNEXT BATCH:\n" + # publisher_managers.each do |manager| + # manager.stats.each do |name, count| + # io << " * #{name} => #{count}" + # end + # end + # io << "\n\n" + # end + # puts stats end rescue error Log.error(exception: error) { "error processing events" }