-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbroker_sync_spec.cr
More file actions
173 lines (129 loc) · 5.04 KB
/
broker_sync_spec.cr
File metadata and controls
173 lines (129 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
require "./spec_helper"
module PlaceOS::Source
describe "Broker State Sync" do
it "callback is invoked when new broker is added after startup" do
# Create initial broker
test_broker
# Setup MQTT broker manager
mqtt_manager = MqttBrokerManager.new
# Track if callback was invoked
callback_invoked = false
callback_broker_id = ""
mqtt_manager.on_broker_ready = ->(broker_id : String) {
callback_invoked = true
callback_broker_id = broker_id
}
# Start the manager to mark startup as finished
mqtt_manager.start
# Wait for startup to complete
sleep 200.milliseconds
# Create a new broker after startup
new_broker = PlaceOS::Model::Broker.new(
name: "new-broker-#{Time.utc.to_unix}",
host: ENV["MQTT_HOST"]?.presence || "mqtt",
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
auth_type: :no_auth,
).save!
# Trigger the broker creation event
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
mqtt_manager.@event_channel.send(event)
# Wait for broker to be processed
sleep 300.milliseconds
# Verify the broker was created successfully
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil
# Verify the callback was invoked
callback_invoked.should be_true
callback_broker_id.should eq new_broker.id.as(String)
# Cleanup
mqtt_manager.stop
new_broker.destroy
end
it "callback is not invoked for brokers created during startup" do
# Setup MQTT broker manager
mqtt_manager = MqttBrokerManager.new
# Track if callback was invoked
callback_invoked = false
mqtt_manager.on_broker_ready = ->(_broker_id : String) {
callback_invoked = true
}
# Create broker before starting (simulating existing broker)
startup_broker = test_broker
# Start the manager (this will load existing brokers)
mqtt_manager.start
# Wait for startup to complete
sleep 200.milliseconds
# Verify the broker was loaded
mqtt_manager.@publishers[startup_broker.id.as(String)]?.should_not be_nil
# Verify the callback was NOT invoked during startup
callback_invoked.should be_false
# Cleanup
mqtt_manager.stop
end
it "resync_state only runs after initial sync completes" do
mock_mappings_state = mock_state(module_id: "mod-test")
mock_mappings = Mappings.new(mock_mappings_state)
mock_publisher = MockManager.new
status_events = StatusEvents.new(mock_mappings, [mock_publisher] of PublisherManager)
# Before initial sync, resync should not run
status_events.resync_state
mock_publisher.messages.size.should eq 0
# Start to trigger initial sync
spawn { status_events.start }
# Wait for initial sync
sleep 300.milliseconds
# Clear messages from initial sync
mock_publisher.messages.clear
# Now resync should work
status_events.resync_state
# Wait for resync to process
sleep 200.milliseconds
# Cleanup
status_events.stop
end
it "full integration: new broker receives state via resync" do
# Create initial broker
test_broker
# Setup mock publisher to track messages
mock_publisher = MockManager.new
publisher_managers = [mock_publisher] of PublisherManager
# Add MQTT broker manager
mqtt_manager = MqttBrokerManager.new
publisher_managers << mqtt_manager
# Mock data with a module that has proper mappings
module_id = "mod-integration-test"
status_key = "power"
mock_mappings_state = mock_state(module_id: module_id)
mock_mappings = Mappings.new(mock_mappings_state)
# Start application manager
manager = Manager.new(publisher_managers, mock_mappings)
manager.start
# Wait for initial sync to complete
sleep 300.milliseconds
# Store module state in Redis
Redis.open(url: REDIS_URL) do |client|
client.set("status/#{module_id}/#{status_key}", "on".to_json)
end
# Clear any messages from initial sync
mock_publisher.messages.clear
# Create a new broker after startup
new_broker = PlaceOS::Model::Broker.new(
name: "integration-broker-#{Time.utc.to_unix}",
host: ENV["MQTT_HOST"]?.presence || "mqtt",
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
auth_type: :no_auth,
).save!
# Trigger the broker creation event
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
mqtt_manager.@event_channel.send(event)
# Wait for broker to be processed and state resync to occur
sleep 500.milliseconds
# Verify the broker was created
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil
# Verify the callback was wired up by the manager
mqtt_manager.on_broker_ready.should_not be_nil
# Cleanup
manager.stop
new_broker.destroy
end
end
end