Skip to content

Commit ca96803

Browse files
authored
feature: add ability to print tap message streams (#130)
* feature: add ability to print tap message streams * bump api
1 parent 3ae1970 commit ca96803

3 files changed

Lines changed: 45 additions & 4 deletions

File tree

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
setup(
2828
name="science-synapse",
29-
version="2.2.7",
29+
version="2.2.8",
3030
description="Client library and CLI for the Synapse API",
3131
author="Science Team",
3232
author_email="team@science.xyz",

synapse/cli/taps.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from synapse.client.taps import Tap
2+
from synapse.api.logging_pb2 import LogEntry
3+
from synapse.utils.log import log_entry_to_str
24

35
from rich.console import Console
46
from rich.table import Table
@@ -16,19 +18,31 @@ def __init__(self, console: Console):
1618
self.message_count = 0
1719
self.total_bytes = 0
1820
self.start_time = None
21+
self.last_log_entry = None # Store the last decoded log entry
1922

2023
def start(self):
2124
"""Start the monitoring session."""
2225
self.start_time = time.time()
2326
self.message_count = 0
2427
self.total_bytes = 0
28+
self.last_log_entry = None
2529

26-
def update(self, message_size: int) -> Text:
30+
def update(self, message_size: int, message_data: bytes = None) -> Text:
2731
"""Update statistics with a new message and return formatted display text."""
2832
current_time = time.time()
2933
self.message_count += 1
3034
self.total_bytes += message_size
3135

36+
# Try to decode as LogEntry if message_data is provided
37+
if message_data:
38+
try:
39+
log_entry = LogEntry()
40+
log_entry.ParseFromString(message_data)
41+
self.last_log_entry = log_entry
42+
except Exception:
43+
# Not a LogEntry or failed to parse, ignore
44+
pass
45+
3246
# Calculate stats
3347
elapsed_time = current_time - self.start_time
3448
msgs_per_sec = self.message_count / elapsed_time if elapsed_time > 0 else 0
@@ -66,6 +80,14 @@ def _create_display_text(
6680
stats_text.append(f"{latest_size:,} bytes", style="magenta")
6781
stats_text.append(" | Runtime: ", style="bold")
6882
stats_text.append(f"{time.time() - self.start_time:.1f}s", style="blue")
83+
84+
# Add log entry information if available
85+
if self.last_log_entry:
86+
stats_text.append("\n")
87+
stats_text.append("Last Log: ", style="bold")
88+
log_str = log_entry_to_str(self.last_log_entry)
89+
stats_text.append(log_str, style="white")
90+
6991
return stats_text
7092

7193

@@ -108,10 +130,24 @@ def stream_taps(args):
108130
print(f"Tap {args.tap_name} not found")
109131
return
110132

133+
# Get the selected tap info to check message type
134+
selected_tap = None
135+
for t in taps:
136+
if t.name == args.tap_name:
137+
selected_tap = t
138+
break
139+
111140
tap.connect(args.tap_name)
112141

113142
console = Console()
114143
console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]")
144+
145+
# Show message type info if available
146+
if selected_tap and selected_tap.message_type:
147+
console.print(f"[dim]Message type: {selected_tap.message_type}[/]")
148+
if selected_tap.message_type == "synapse.LogEntry":
149+
console.print("[dim]Log messages will be decoded and displayed[/]")
150+
115151
console.print("[dim]Press Ctrl+C to stop[/]\n")
116152

117153
# Initialize health monitor
@@ -127,7 +163,12 @@ def stream_taps(args):
127163
message_size = len(message)
128164

129165
# Update statistics and get formatted display
130-
stats_text = monitor.update(message_size)
166+
# Pass message data if this might be a LogEntry tap
167+
message_data = None
168+
if selected_tap and selected_tap.message_type == "synapse.LogEntry":
169+
message_data = message
170+
171+
stats_text = monitor.update(message_size, message_data)
131172

132173
# Update the live display
133174
live.update(stats_text)

0 commit comments

Comments
 (0)