Skip to content

Commit 832a6b7

Browse files
committed
howl MORE
1 parent 581bfa5 commit 832a6b7

7 files changed

Lines changed: 106 additions & 32 deletions

File tree

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36
5959

6060
`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.
6161

62-
## `howl` - Produce fake `ev44` messages to a topic
62+
## `howl` - Produce fake run-like messages
63+
64+
`saluki-howl` emits `ev44` events, `pl72` run starts, and `6s4t` run stops to Kafka, in a format which
65+
look somewhat like a real run.
6366

6467
```
6568
saluki howl mybroker:9092/dest_topic --events-per-frame 200 --frames-per-second 50 --tof-peak 10000000 --tof-sigma 5000000 --det-min 0 --det-max 500

src/saluki/howl.py

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
1+
import json
12
import logging
23
import time
4+
import uuid
35

46
import numpy as np
57
from confluent_kafka import Producer
6-
from streaming_data_types import serialise_ev44
8+
from streaming_data_types import serialise_6s4t, serialise_ev44, serialise_pl72
9+
from streaming_data_types.run_start_pl72 import DetectorSpectrumMap
710

811
logger = logging.getLogger("saluki")
912

1013
RNG = np.random.default_rng()
1114

1215

13-
def generate_fake_ev44(
16+
def generate_fake_events(
1417
msg_id: int,
1518
events_per_frame: int,
1619
tof_peak: float,
1720
tof_sigma: float,
1821
det_min: int,
1922
det_max: int,
2023
) -> bytes:
21-
detector_ids = np.random.randint(low=det_min, high=det_max, size=events_per_frame)
24+
detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame)
2225
tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame))
2326

2427
return serialise_ev44(
@@ -31,50 +34,98 @@ def generate_fake_ev44(
3134
)
3235

3336

37+
def generate_run_start(det_max: int) -> bytes:
38+
det_spec_map = DetectorSpectrumMap(
39+
detector_ids=np.arange(0, det_max, dtype=np.int32),
40+
spectrum_numbers=np.arange(0, det_max, dtype=np.int32),
41+
n_spectra=det_max,
42+
)
43+
return serialise_pl72(
44+
start_time=int(time.time() * 1000),
45+
stop_time=None,
46+
run_name=f"saluki-howl-{uuid.uuid4()}",
47+
instrument_name="saluki-howl",
48+
nexus_structure=json.dumps({}),
49+
job_id=str(uuid.uuid4()),
50+
filename=str(uuid.uuid4()),
51+
detector_spectrum_map=det_spec_map,
52+
)
53+
54+
55+
def generate_run_stop() -> bytes:
56+
return serialise_6s4t(
57+
stop_time=int(time.time() * 1000),
58+
job_id=str(uuid.uuid4()),
59+
)
60+
61+
3462
def howl(
3563
broker: str,
36-
topic: str,
64+
topic_prefix: str,
3765
events_per_frame: int,
3866
frames_per_second: int,
67+
frames_per_run: int,
3968
tof_peak: float,
4069
tof_sigma: float,
4170
det_min: int,
4271
det_max: int,
4372
) -> None:
4473
"""
45-
Prints the broker and topic metadata for a given broker.
46-
If a topic is given, only this topic's partitions and watermarks will be printed.
47-
:param broker: The broker address including port number.
48-
:param topic: Optional topic to filter information to.
74+
Send messages vaguely resembling a run to Kafka.
4975
"""
5076

5177
producer = Producer(
5278
{
5379
"bootstrap.servers": broker,
80+
"queue.buffering.max.messages": 100000,
81+
"queue.buffering.max.ms": 20,
5482
}
5583
)
5684

5785
target_frame_time = 1 / frames_per_second
5886

59-
msg_id = 0
87+
frames = 0
6088

61-
ev44_size = len(generate_fake_ev44(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max))
89+
ev44_size = len(
90+
generate_fake_events(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)
91+
)
6292
rate_bytes_per_sec = ev44_size * frames_per_second
6393
rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8
64-
logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} MBit/s")
94+
logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s")
95+
logger.info(f"Each ev44 is {ev44_size} bytes")
96+
97+
producer.produce(
98+
topic=f"{topic_prefix}_runInfo",
99+
key=None,
100+
value=generate_run_start(det_max),
101+
)
65102

66103
while True:
67104
start_time = time.time()
68105
target_end_time = start_time + target_frame_time
69106

70107
producer.produce(
71-
topic=topic,
108+
topic=f"{topic_prefix}_events",
72109
key=None,
73-
value=generate_fake_ev44(
74-
msg_id, events_per_frame, tof_peak, tof_sigma, det_min, det_max
110+
value=generate_fake_events(
111+
frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max
75112
),
76113
)
77-
msg_id += 1
114+
producer.poll(0)
115+
frames += 1
116+
117+
if frames_per_run != 0 and frames % frames_per_run == 0:
118+
logger.info(f"Starting new run after {frames_per_run} simulated frames")
119+
producer.produce(
120+
topic=f"{topic_prefix}_runInfo",
121+
key=None,
122+
value=generate_run_stop(),
123+
)
124+
producer.produce(
125+
topic=f"{topic_prefix}_runInfo",
126+
key=None,
127+
value=generate_run_start(det_max),
128+
)
78129

79130
sleep_time = max(target_end_time - time.time(), 0)
80131
if sleep_time == 0:

src/saluki/main.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import sys
44

55
from saluki.consume import consume
6+
from saluki.howl import howl
67
from saluki.listen import listen
78
from saluki.play import play
89
from saluki.sniff import sniff
9-
from saluki.howl import howl
1010
from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri
1111

1212
logger = logging.getLogger("saluki")
@@ -131,13 +131,28 @@ def main() -> None:
131131
help="replay mode - replay data into another topic",
132132
parents=[common_options],
133133
)
134-
howl_parser.add_argument("topic", type=str, help="Destination topic")
135-
howl_parser.add_argument("--events-per-frame", type=int, help="Events per frame to simulate")
136-
howl_parser.add_argument("--frames-per-second", type=int, help="Frames per second to simulate")
137-
howl_parser.add_argument("--tof-peak", type=float, help="Time-of-flight peak (ns)")
138-
howl_parser.add_argument("--tof-sigma", type=float, help="Time-of-flight sigma (ns)")
139-
howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID")
140-
howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID")
134+
howl_parser.add_argument("broker", type=str, help="Kafka broker URL")
135+
howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME")
136+
howl_parser.add_argument(
137+
"--events-per-frame", type=int, help="Events per frame to simulate", default=100
138+
)
139+
howl_parser.add_argument(
140+
"--frames-per-second", type=int, help="Frames per second to simulate", default=1
141+
)
142+
howl_parser.add_argument(
143+
"--frames-per-run",
144+
type=int,
145+
help="Frames to take before beginning new run (0 to run forever)",
146+
default=0,
147+
)
148+
howl_parser.add_argument(
149+
"--tof-peak", type=float, help="Time-of-flight peak (ns)", default=10_000_000
150+
)
151+
howl_parser.add_argument(
152+
"--tof-sigma", type=float, help="Time-of-flight sigma (ns)", default=2_000_000
153+
)
154+
howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID", default=0)
155+
howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID", default=1000)
141156

142157
if len(sys.argv) == 1:
143158
parser.print_help()
@@ -189,13 +204,12 @@ def main() -> None:
189204
logger.debug(f"Sniffing whole broker {args.broker}")
190205
sniff(args.broker)
191206
elif args.command == _HOWL:
192-
broker, topic = parse_kafka_uri(args.topic)
193-
logger.debug(f"Howling to topic {topic} on broker {broker}")
194207
howl(
195-
broker,
196-
topic,
208+
args.broker,
209+
args.topic_prefix,
197210
events_per_frame=args.events_per_frame,
198211
frames_per_second=args.frames_per_second,
212+
frames_per_run=args.frames_per_run,
199213
tof_peak=args.tof_peak,
200214
tof_sigma=args.tof_sigma,
201215
det_min=args.det_min,

src/saluki/play.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ def play(
7070
logger.debug(f"finished consuming {num_messages} messages")
7171
consumer.close()
7272
producer.produce_batch(
73-
dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs]
73+
dest_topic,
74+
[{"key": message.key(), "value": message.value()} for message in msgs],
7475
)
7576
logger.debug(f"flushing producer. len(p): {len(producer)}")
7677
producer.flush(timeout=10)

src/saluki/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ def fallback_deserialiser(payload: bytes) -> str:
3434

3535

3636
def deserialise_and_print_messages(
37-
msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None
37+
msgs: List[Message],
38+
partition: int | None,
39+
schemas_to_filter_to: list[str] | None = None,
3840
) -> None:
3941
for msg in msgs:
4042
try:

tests/test_howl.py

Whitespace-only changes.

tests/test_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message):
7878
assert logger.info.call_count == 1
7979

8080

81-
def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message):
81+
def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(
82+
mock_message,
83+
):
8284
with patch("saluki.utils.logger") as logger:
8385
ok_message = Mock(spec=Message)
8486
ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore
@@ -179,7 +181,8 @@ def test_uri_with_no_topic():
179181

180182

181183
@pytest.mark.parametrize(
182-
"timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"]
184+
"timestamp",
185+
["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"],
183186
)
184187
def test_parses_datetime_properly_with_string(timestamp):
185188
assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000

0 commit comments

Comments
 (0)