Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/configuration/sinks/zulip.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ Settings
* ``stream_name`` : Name of the channel to send the message to
* ``topic_name`` : Name of the topic of the stream to send messages to
* ``topic_override`` : Dynamic topic override, same as the channel_override in the slack sink
* ``topic_autoresolve`` : [Optional - default: ``false``] When enabled, Prometheus alerts use the alert
title as the Zulip topic. When a resolved alert is sent, Robusta looks for an existing topic whose name
contains that alert title and resolves it by adding Zulip's resolved-topic prefix. This requires the Zulip
bot to have permission to resolve or move topics.
* ``log_preview_char_limit`` : [Optional - default: ``500``] The amount of log characters to append to the alert message (zulip doesnt have a builtin text file preview). If set to ``0`` a text file will be sent

When ``topic_autoresolve`` is enabled, ``topic_name`` and ``topic_override`` are not used for Prometheus
findings. They are still used for non-Prometheus notifications.

Configuring the Zulip sink
---------------------------

Expand All @@ -56,6 +63,7 @@ Configuring the Zulip sink
bot_api_key: very_secret_key
stream_name: Monitoring
topic_name: Robusta
topic_autoresolve: false

Save the file and run

Expand Down
3 changes: 2 additions & 1 deletion src/robusta/core/sinks/zulip/zulip_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self, sink_config: ZulipSinkConfigWrapper, registry):
self.bot_email = sink_config.zulip_sink.bot_email
self.bot_api_key = sink_config.zulip_sink.bot_api_key
self.api_url = sink_config.zulip_sink.api_url
self.stream_name = sink_config.zulip_sink.stream_name

self.zclient = requests.Session()
self.zclient.auth = (self.bot_email, self.bot_api_key.get_secret_value())
Expand All @@ -25,7 +26,7 @@ def __init__(self, sink_config: ZulipSinkConfigWrapper, registry):
self.zclient.verify = False

self.zulip_sender = zulip_module.ZulipSender(
self.api_url, self.zclient, self.account_id, self.cluster_name, self.signing_key
self.api_url, self.stream_name, self.zclient, self.account_id, self.cluster_name, self.signing_key
)

def write_finding(self, finding: Finding, platform_enabled: bool):
Expand Down
1 change: 1 addition & 0 deletions src/robusta/core/sinks/zulip/zulip_sink_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class ZulipSinkParams(SinkBaseParams):
stream_name: str = "Monitoring"
topic_name: str = "Robusta"
topic_override: Optional[str] = None
topic_autoresolve: bool = False
log_preview_char_limit: int = 500

@classmethod
Expand Down
116 changes: 101 additions & 15 deletions src/robusta/integrations/zulip/sender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import List
from urllib.parse import urlencode

from requests.sessions import Session

Expand All @@ -24,22 +25,38 @@


class ZulipSender:
def __init__(self, api_url: str, zclient: Session, account_id: str, cluster_name: str, signing_key: str):
def __init__(
self, api_url: str, stream_name: str, zclient: Session, account_id: str, cluster_name: str, signing_key: str
):
self.signing_key = signing_key
self.account_id = account_id
self.cluster_name = cluster_name
self.api_url = api_url
self.zclient = zclient

self.max_topic_len: int | None = None
self.max_message_len = self.__get_max_msg_len()

self.stream_name = stream_name
self.stream_id = self.__get_stream_id(stream_name)
self.topic_cache = self.__load_topics(self.stream_id)

def __get_max_msg_len(self):
try:
r = self.zclient.post(f"{self.api_url}/api/v1/register")
r.raise_for_status()
self.max_message_len = int(r.json()["max_message_length"])
config = r.json()
max_topic_len = config.get("max_topic_length")
if max_topic_len is not None:
self.max_topic_len = int(max_topic_len)

return int(config["max_message_length"])

except Exception as e:
logging.exception(
f"Zulip Sink: failed to fetch max_message_length: {e}. Using default value of: {ZULIP_MESSAGE_DEFAULT_LEN}"
)
self.max_message_len = ZULIP_MESSAGE_DEFAULT_LEN
return ZULIP_MESSAGE_DEFAULT_LEN

def __to_zulip_bold(self, text: str):
return f"**{text}**"
Expand All @@ -56,6 +73,60 @@ def __to_zulip_link(self, name: str, url: str):
def __to_zulip_table(self, block: TableBlock):
return block.to_table_string(table_fmt="pipe")

def __build_msg_data(self, stream_name: str, topic: str, content: str):
return {"type": "stream", "to": stream_name, "topic": topic, "content": content}

def __to_topic_name(self, title: str, resolved: bool = False):
topic_name = f"✔ {title}" if resolved else title
return topic_name[: self.max_topic_len] if self.max_topic_len is not None else topic_name

def __get_stream_id(self, stream_name: str) -> int | None:
try:
params = {"stream": stream_name}
r = self.zclient.get(f"{self.api_url}/api/v1/get_stream_id?{urlencode(params)}")
r.raise_for_status()

return int(r.json()["stream_id"])
except Exception as e:
logging.exception(f"Zulip Sink: failed to fetch stream_id: {e}")

def __load_topics(self, stream_id: int | None):
if stream_id is None:
logging.warning("stream_id is None")
return []

try:
r = self.zclient.get(f"{self.api_url}/api/v1/users/me/{stream_id}/topics")
r.raise_for_status()

topics = r.json().get("topics", [])
logging.debug(f"topics: {topics}")

return topics
except Exception as e:
logging.exception(f"Zulip Sink: could not fetch topics for stream: {stream_id}: {e}")
return []

# because there's no direct topic access, they are only identifiable by a message they are part of
def __find_msg_id_for_topic_title(self, title_name: str) -> int | None:
def find_msg_id(topics, title):
topic_names = [self.__to_topic_name(title), self.__to_topic_name(title, resolved=True)]
return [topic["max_id"] for topic in topics if topic["name"] in topic_names]

msg_id = find_msg_id(self.topic_cache, title_name)

if not msg_id:
self.topic_cache = self.__load_topics(self.stream_id)
msg_id = find_msg_id(self.topic_cache, title_name)

if len(msg_id) > 1:
logging.warning(f"Zulip Sink: found multiple topics: {msg_id} that match the title: {title_name}")

if len(msg_id) == 0:
logging.warning(f"Zulip Sink: topic not found: {title_name}")

return msg_id[0] if msg_id else None
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def __upload_to_zulip(self, filename: str, content: bytes):
try:
r = self.zclient.post(f"{self.api_url}/api/v1/user_uploads", files={filename: content})
Expand Down Expand Up @@ -139,7 +210,7 @@ def send_finding_to_zulip(self, finding: Finding, sink_params: ZulipSinkParams,
if finding.add_silence_url:
silence_url = finding.get_prometheus_silence_url(self.account_id, self.cluster_name)
message_lines.append(self.__to_zulip_link("🔕 Silence", silence_url))

for link in finding.links:
message_lines.append(f"🎬 {self.__to_zulip_link(link.name, link.url)}")

Expand All @@ -156,18 +227,33 @@ def send_finding_to_zulip(self, finding: Finding, sink_params: ZulipSinkParams,
if self.__enough_msg_bytes_free(message, formatted_line):
message += formatted_line

templated_channel_topic = ChannelTransformer.template(
sink_params.topic_override,
sink_params.topic_name,
self.cluster_name,
finding.subject.labels,
finding.subject.annotations,
)
try:
if sink_params.topic_autoresolve and finding.source == FindingSource.PROMETHEUS:
title = finding.title.removeprefix("[RESOLVED] ")
msg_id = self.__find_msg_id_for_topic_title(title)

data = {"type": "stream", "to": sink_params.stream_name, "topic": templated_channel_topic, "content": message}
logging.warning(f"sending with msg_id: {msg_id}")
channel_topic = self.__to_topic_name(title, status == FindingStatus.RESOLVED and bool(msg_id))

try:
r = self.zclient.post(f"{self.api_url}/api/v1/messages", data=data)
r.raise_for_status()
data = self.__build_msg_data(self.stream_name, channel_topic, message)

r = self.zclient.post(f"{self.api_url}/api/v1/messages", data=data)
if msg_id:
patch_data = {"topic": channel_topic, "propagate_mode": "change_all"}
r = self.zclient.patch(f"{self.api_url}/api/v1/messages/{msg_id}", data=patch_data)

r.raise_for_status()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
else:
channel_topic = ChannelTransformer.template(
sink_params.topic_override,
sink_params.topic_name,
self.cluster_name,
finding.subject.labels,
finding.subject.annotations,
)
data = self.__build_msg_data(self.stream_name, channel_topic, message)

r = self.zclient.post(f"{self.api_url}/api/v1/messages", data=data)
r.raise_for_status()
except Exception as e:
logging.exception(f"Zulip Sink: failed to send data: {e}")
Loading