diff --git a/lib/sequin/consumers/sink_consumer.ex b/lib/sequin/consumers/sink_consumer.ex index 13bbb61a6..8e0e0aea0 100644 --- a/lib/sequin/consumers/sink_consumer.ex +++ b/lib/sequin/consumers/sink_consumer.ex @@ -207,6 +207,7 @@ defmodule Sequin.Consumers.SinkConsumer do |> cast_embed(:source, required: true) |> cast_embed(:source_tables) |> put_defaults() + |> validate_type() |> validate_message_grouping() |> validate_enrichment() |> validate_required([:name, :status, :replication_slot_id, :batch_size]) @@ -226,6 +227,16 @@ defmodule Sequin.Consumers.SinkConsumer do |> Sequin.Changeset.annotations_check_constraint() end + defp validate_type(changeset) do + sink = get_field(changeset, :sink) + + if sink && sink.type == :benchmark && Application.get_env(:sequin, :env) == :prod do + add_error(changeset, :type, "invalid type: #{inspect(sink.type)}") + else + changeset + end + end + defp validate_message_grouping(changeset) do message_grouping = get_field(changeset, :message_grouping) source_tables = get_field(changeset, :source_tables) || [] diff --git a/lib/sequin/runtime/sqs_pipeline.ex b/lib/sequin/runtime/sqs_pipeline.ex index 17287793c..64945df33 100644 --- a/lib/sequin/runtime/sqs_pipeline.ex +++ b/lib/sequin/runtime/sqs_pipeline.ex @@ -54,18 +54,20 @@ defmodule Sequin.Runtime.SqsPipeline do def handle_batch(:default, messages, %{batch_key: queue_url}, context) do %{ consumer: %SinkConsumer{} = consumer, - sqs_client: sqs_client, test_pid: test_pid } = context setup_allowances(test_pid) + # Credentials may have expired if we are using task role + context = maybe_refresh_client(context) + sqs_messages = Enum.map(messages, fn %{data: data} -> build_sqs_message(consumer, data) end) - case SQS.send_messages(sqs_client, queue_url, sqs_messages) do + case SQS.send_messages(context.sqs_client, queue_url, sqs_messages) do :ok -> {:ok, messages, context} @@ -94,6 +96,25 @@ defmodule Sequin.Runtime.SqsPipeline do end end + defp maybe_refresh_client(%{consumer: %SinkConsumer{} = consumer} = context) do + # Only refresh for task role credentials, as they expire + # Explicit credentials keep using the same client + if consumer.sink.use_task_role do + case SqsSink.aws_client(consumer.sink) do + {:ok, fresh_client} -> + Map.put(context, :sqs_client, fresh_client) + + {:error, reason} -> + # Log but continue (may be transient) + Logger.warning("Failed to refresh AWS client for task role: #{inspect(reason)}") + context + end + else + # Not using task roles, no refresh needed + context + end + end + defp setup_allowances(nil), do: :ok defp setup_allowances(test_pid) do diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 1a5c7a96a..baaa918d7 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -1411,6 +1411,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do defp consumer_title(%{sink: %{type: :sqs}}), do: "SQS Sink" defp consumer_title(%{sink: %{type: :typesense}}), do: "Typesense Sink" defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink" + defp consumer_title(%{sink: %{type: :benchmark}}), do: "Benchmark Sink" defp put_health(%SinkConsumer{} = consumer) do with {:ok, health} <- Health.health(consumer),