diff --git a/CHANGELOG.md b/CHANGELOG.md index c4f5aeaba8..349b877372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,8 @@ and this project adheres to auth method and the destination project credential on every proxied request. Feature-gated behind experimental features. [#4541](https://github.com/OpenFn/lightning/issues/4541) +- Support channels in the provisioner API + [#4522](https://github.com/OpenFn/lightning/issues/4522) - Do not persist channel request/response data when project has zero-persistence enabled [#4622](https://github.com/OpenFn/lightning/issues/4622) - Prometheus metrics for the channels HTTP reverse-proxy via a new PromEx diff --git a/lib/lightning/channels.ex b/lib/lightning/channels.ex index b6b00f3281..059b153d78 100644 --- a/lib/lightning/channels.ex +++ b/lib/lightning/channels.ex @@ -201,7 +201,9 @@ defmodule Lightning.Channels do %Ecto.Changeset{} = audit_cs -> Repo.insert(audit_cs) end end) - |> Audit.audit_auth_method_changes(changeset, actor) + |> Multi.merge(fn %{channel: channel} -> + Audit.audit_auth_method_changes(Multi.new(), channel, changeset, actor) + end) |> Repo.transaction() |> case do {:ok, %{channel: channel}} -> {:ok, channel} @@ -225,7 +227,9 @@ defmodule Lightning.Channels do %Ecto.Changeset{} = audit_cs -> Repo.insert(audit_cs) end end) - |> Audit.audit_auth_method_changes(changeset, actor) + |> Multi.merge(fn %{channel: channel} -> + Audit.audit_auth_method_changes(Multi.new(), channel, changeset, actor) + end) |> Repo.transaction() |> case do {:ok, %{channel: channel}} -> {:ok, channel} diff --git a/lib/lightning/channels/audit.ex b/lib/lightning/channels/audit.ex index 8f9bac8e53..d3471306f3 100644 --- a/lib/lightning/channels/audit.ex +++ b/lib/lightning/channels/audit.ex @@ -3,7 +3,7 @@ defmodule Lightning.Channels.Audit do Audit trail for channel CRUD and auth method changes. Provides `event/5` (via `Lightning.Auditing.Audit`) for basic CRUD events, - plus `audit_auth_method_changes/3` which derives fine-grained audit events + plus `audit_auth_method_changes/4` which derives fine-grained audit events for client and destination auth method additions, removals, and swaps. """ use Lightning.Auditing.Audit, @@ -19,6 +19,7 @@ defmodule Lightning.Channels.Audit do ] alias Ecto.Multi + alias Lightning.Channels.Channel @doc """ Appends audit events for auth method changes to the given Multi. @@ -26,16 +27,19 @@ defmodule Lightning.Channels.Audit do Inspects the changeset for changes to `:client_auth_methods` and `:destination_auth_method`, emitting the appropriate added/removed/changed events. No-op when no auth method changes are present. + + Step keys are unique per call, so the helper can be composed into a larger Multi any number of + times — e.g. once per channel when batching audits across channels. """ - def audit_auth_method_changes(multi, changeset, actor) do + def audit_auth_method_changes(multi, %Channel{} = channel, changeset, actor) do multi - |> audit_client_changes(changeset, actor) - |> audit_destination_changes(changeset, actor) + |> audit_client_changes(channel, changeset, actor) + |> audit_destination_changes(channel, changeset, actor) end # --- Client auth methods (has_many) --- - defp audit_client_changes(multi, changeset, actor) do + defp audit_client_changes(multi, channel, changeset, actor) do changes = Ecto.Changeset.get_change(changeset, :client_auth_methods, []) @@ -43,8 +47,8 @@ defmodule Lightning.Channels.Audit do deleted = Enum.filter(changes, &(&1.action == :delete)) multi - |> add_auth_method_audits(:added, inserted, :client, actor) - |> add_auth_method_audits(:removed, deleted, :client, actor) + |> add_auth_method_audits(:added, inserted, :client, channel, actor) + |> add_auth_method_audits(:removed, deleted, :client, channel, actor) end # --- Destination auth method (has_one, on_replace: :delete) --- @@ -55,7 +59,7 @@ defmodule Lightning.Channels.Audit do # 3. Swap (existing → different) → replace produces a delete of old + # insert of new. We emit a single "auth_method_changed" event instead. - defp audit_destination_changes(multi, changeset, actor) do + defp audit_destination_changes(multi, channel, changeset, actor) do old = changeset.data |> Map.get(:destination_auth_method) has_existing? = old != nil and not match?(%Ecto.Association.NotLoaded{}, old) @@ -68,7 +72,7 @@ defmodule Lightning.Channels.Audit do multi nil when has_existing? -> - audit_destination_removed(multi, old, actor) + audit_destination_removed(multi, channel, old, actor) nil -> multi @@ -77,47 +81,54 @@ defmodule Lightning.Channels.Audit do old_fields = fields_for_data(old, :destination) new_fields = fields_for_changeset(new_cs, :destination) - Multi.insert(multi, :audit_destination_changed, fn %{channel: channel} -> + Multi.insert( + multi, + unique_key(:audit_destination_changed), event("auth_method_changed", channel.id, actor, %{ before: old_fields, after: new_fields }) - end) + ) %Ecto.Changeset{action: :insert} = new_cs -> fields = fields_for_changeset(new_cs, :destination) - Multi.insert(multi, :audit_destination_added, fn %{channel: channel} -> + Multi.insert( + multi, + unique_key(:audit_destination_added), event("auth_method_added", channel.id, actor, %{ before: nil, after: fields }) - end) + ) %Ecto.Changeset{action: :delete} when has_existing? -> - audit_destination_removed(multi, old, actor) + audit_destination_removed(multi, channel, old, actor) %Ecto.Changeset{action: :delete} -> multi end end - defp audit_destination_removed(multi, nil, _actor), do: multi + defp audit_destination_removed(multi, _channel, nil, _actor), do: multi - defp audit_destination_removed(multi, old, actor) do + defp audit_destination_removed(multi, channel, old, actor) do old_fields = fields_for_data(old, :destination) - Multi.insert(multi, :audit_destination_removed, fn %{channel: channel} -> + Multi.insert( + multi, + unique_key("audit_destination_removed"), event("auth_method_removed", channel.id, actor, %{ before: old_fields, after: nil }) - end) + ) end - defp add_auth_method_audits(multi, _direction, [], _role, _actor), do: multi + defp add_auth_method_audits(multi, _direction, [], _role, _channel, _actor), + do: multi - defp add_auth_method_audits(multi, direction, changesets, role, actor) do + defp add_auth_method_audits(multi, direction, changesets, role, channel, actor) do {event_name, extract_fields, wrap} = case direction do :added -> @@ -127,24 +138,24 @@ defmodule Lightning.Channels.Audit do {"auth_method_removed", &fields_for_data(&1.data, role), &{&1, nil}} end - changesets - |> Enum.with_index() - |> Enum.reduce(multi, fn {cs, idx}, acc -> + Enum.reduce(changesets, multi, fn cs, acc -> {before, after_val} = wrap.(extract_fields.(cs)) Multi.insert( acc, - :"audit_#{role}_#{event_name}_#{idx}", - fn %{channel: channel} -> - event(event_name, channel.id, actor, %{ - before: before, - after: after_val - }) - end + unique_key("audit_#{role}_#{event_name}"), + event(event_name, channel.id, actor, %{ + before: before, + after: after_val + }) ) end) end + defp unique_key(base) do + "#{base}_#{System.unique_integer([:positive])}" + end + # Extract fields from a changeset (for inserts/new records) defp fields_for_changeset(cs, :client) do %{ diff --git a/lib/lightning/channels/channel.ex b/lib/lightning/channels/channel.ex index 1b67a53608..ee6928be06 100644 --- a/lib/lightning/channels/channel.ex +++ b/lib/lightning/channels/channel.ex @@ -25,6 +25,7 @@ defmodule Lightning.Channels.Channel do field :destination_url, :string field :enabled, :boolean, default: true field :lock_version, :integer, default: 0 + field :delete, :boolean, virtual: true belongs_to :project, Project @@ -57,13 +58,7 @@ defmodule Lightning.Channels.Channel do :enabled ]) |> validate_required([:name, :destination_url, :project_id]) - |> Validators.validate_url(:destination_url) - |> assoc_constraint(:project) - |> unique_constraint([:project_id, :name], - error_key: :name, - message: "A channel with this name already exists in this project" - ) - |> optimistic_lock(:lock_version) + |> validate() |> cast_assoc(:client_auth_methods, with: fn struct, attrs -> ChannelAuthMethod.changeset(struct, put_role(attrs, "client")) @@ -76,6 +71,24 @@ defmodule Lightning.Channels.Channel do ) end + @doc """ + Shared validation rules for a channel changeset. + + Used by `changeset/2` and by callers (such as the provisioner) that + build a channel changeset with a different shape of input fields but + need the same business-rule validations. + """ + def validate(changeset) do + changeset + |> Validators.validate_url(:destination_url) + |> assoc_constraint(:project) + |> unique_constraint([:project_id, :name], + error_key: :name, + message: "A channel with this name already exists in this project" + ) + |> optimistic_lock(:lock_version) + end + defp put_role(attrs, role) when is_map(attrs) do key = if has_string_keys?(attrs), do: "role", else: :role Map.put(attrs, key, role) diff --git a/lib/lightning/export_utils.ex b/lib/lightning/export_utils.ex index e8496ec479..4864723d82 100644 --- a/lib/lightning/export_utils.ex +++ b/lib/lightning/export_utils.ex @@ -22,11 +22,13 @@ defmodule Lightning.ExportUtils do :name, :description, :collections, + :channels, :credentials, :globals, :workflows ], collection: [:name], + channel: [:name, :destination_url, :enabled, :destination_credential], credential: [:name, :owner], workflow: [:name, :jobs, :triggers, :edges], job: [:name, :adaptor, :credential, :globals, :body], @@ -392,13 +394,23 @@ defmodule Lightning.ExportUtils do Map.put(acc, hyphenate(collection.name), ytree) end) + channels_map = + project.channels + |> Enum.sort_by(& &1.inserted_at, NaiveDateTime) + |> Enum.reduce(%{}, fn channel, acc -> + ytree = build_channel_yaml_tree(channel, project.project_credentials) + + Map.put(acc, hyphenate(channel.name), ytree) + end) + %{ name: project.name, description: project.description, node_type: :project, workflows: workflows_map, credentials: credentials_map, - collections: collections_map + collections: collections_map, + channels: channels_map } end @@ -427,6 +439,35 @@ defmodule Lightning.ExportUtils do } end + defp build_channel_yaml_tree(channel, project_credentials) do + project_credential_id = channel_destination_project_credential_id(channel) + + project_credential = + project_credential_id && + Enum.find(project_credentials, fn pc -> + pc.id == project_credential_id + end) + + %{ + name: channel.name, + destination_url: channel.destination_url, + enabled: channel.enabled, + node_type: :channel, + destination_credential: + project_credential && project_credential_key(project_credential) + } + end + + defp channel_destination_project_credential_id(%{destination_auth_method: nil}), + do: nil + + defp channel_destination_project_credential_id(%{ + destination_auth_method: %{project_credential_id: id} + }), + do: id + + defp channel_destination_project_credential_id(_), do: nil + defp build_workflow_yaml_tree(workflow, project_credentials) do jobs = workflow.jobs @@ -459,7 +500,8 @@ defmodule Lightning.ExportUtils do project = Lightning.Repo.preload(project, project_credentials: [credential: :user], - collections: [] + collections: [], + channels: [destination_auth_method: :project_credential] ) yaml = @@ -475,7 +517,8 @@ defmodule Lightning.ExportUtils do project = Lightning.Repo.preload(project, project_credentials: [credential: :user], - collections: [] + collections: [], + channels: [destination_auth_method: :project_credential] ) yaml = diff --git a/lib/lightning/projects/project.ex b/lib/lightning/projects/project.ex index 062585fc16..2d8e481d7e 100644 --- a/lib/lightning/projects/project.ex +++ b/lib/lightning/projects/project.ex @@ -48,11 +48,12 @@ defmodule Lightning.Projects.Project do has_many :workflows, Workflow, where: [deleted_at: nil] has_many :jobs, through: [:workflows, :jobs] + has_many :channels, Lightning.Channels.Channel + has_many :project_credentials, ProjectCredential has_many :credentials, through: [:project_credentials, :credential] has_many :collections, Lightning.Collections.Collection - has_many :channels, Lightning.Channels.Channel timestamps() end diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index 2b3fc85d97..0b6c846c2d 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -13,6 +13,9 @@ defmodule Lightning.Projects.Provisioner do alias Ecto.Multi alias Lightning.Accounts.User + alias Lightning.Channels.Audit, as: ChannelAudit + alias Lightning.Channels.Channel + alias Lightning.Channels.ChannelAuthMethod alias Lightning.Collections.Collection alias Lightning.Extensions.UsageLimiting.Action alias Lightning.Extensions.UsageLimiting.Context @@ -89,6 +92,8 @@ defmodule Lightning.Projects.Provisioner do updated_project <- preload_dependencies(project), {:ok, _changes} <- audit_workflows(project_changeset, user_or_repo_connection), + {:ok, _changes} <- + audit_channels(project_changeset, user_or_repo_connection), {:ok, _changes} <- update_workflows_version( project_changeset, @@ -118,9 +123,8 @@ defmodule Lightning.Projects.Provisioner do defp build_import_changeset(project, user_or_repo_connection, data) do project |> preload_dependencies() - |> parse_document(data) + |> parse_document(data, user_or_repo_connection) |> maybe_add_project_user(user_or_repo_connection) - |> maybe_add_project_credentials(user_or_repo_connection) end defp audit_workflows(project_changeset, user_or_repo_connection) do @@ -212,6 +216,55 @@ defmodule Lightning.Projects.Provisioner do ) end + defp audit_channels(project_changeset, actor) do + project_changeset + |> get_assoc(:channels) + |> Enum.reduce(Multi.new(), fn channel_cs, multi -> + append_channel_audits(multi, channel_cs, actor) + end) + |> Repo.transaction() + |> normalize_txn() + end + + defp append_channel_audits(multi, channel_cs, actor) do + case classify_channel_audit(channel_cs) do + :skip -> + multi + + {event, channel_id} -> + channel = %Channel{id: channel_id} + + multi + |> append_channel_event_audit(event, channel_id, channel_cs, actor) + |> ChannelAudit.audit_auth_method_changes(channel, channel_cs, actor) + end + end + + defp classify_channel_audit(%{action: :insert} = cs) do + {"created", get_field(cs, :id)} + end + + defp classify_channel_audit(%{ + action: :update, + data: %{id: id}, + changes: changes + }) + when changes != %{} do + {"updated", id} + end + + defp classify_channel_audit(_), do: :skip + + defp append_channel_event_audit(multi, event, channel_id, channel_cs, actor) do + case ChannelAudit.event(event, channel_id, actor, channel_cs) do + :no_changes -> + multi + + %Ecto.Changeset{} = audit_cs -> + Multi.insert(multi, "channel_audit_#{channel_id}", audit_cs) + end + end + defp create_snapshots( project_changeset, inserted_workflows, @@ -251,12 +304,27 @@ defmodule Lightning.Projects.Provisioner do end end - @spec parse_document(Project.t(), map()) :: Ecto.Changeset.t(Project.t()) - def parse_document(%Project{} = project, data) when is_map(data) do - project - |> project_changeset(data) + @spec parse_document( + Project.t(), + map(), + User.t() | ProjectRepoConnection.t() | nil + ) :: + Ecto.Changeset.t(Project.t()) + def parse_document(project, data, user_or_repo_connection \\ nil) + + def parse_document(%Project{} = project, data, user_or_repo_connection) + when is_map(data) do + changeset = + project + |> project_changeset(data) + |> maybe_add_project_credentials(user_or_repo_connection) + + valid_pc_ids = valid_project_credential_ids(changeset) + + changeset |> cast_assoc(:collections, with: &collection_changeset/2) - |> cast_assoc(:workflows, with: &workflow_changeset/2) + |> cast_assoc(:channels, with: &channel_changeset(&1, &2, valid_pc_ids)) + |> cast_assoc(:workflows, with: &workflow_changeset(&1, &2, valid_pc_ids)) |> then(fn changeset -> case WorkflowUsageLimiter.limit_workflows_activation( project, @@ -280,6 +348,13 @@ defmodule Lightning.Projects.Provisioner do end) end + defp valid_project_credential_ids(changeset) do + changeset + |> get_assoc(:project_credentials) + |> Enum.map(&get_field(&1, :id)) + |> Enum.reject(&is_nil/1) + end + defp limit_collection_creation(changeset) do new_collections_count = changeset @@ -405,6 +480,7 @@ defmodule Lightning.Projects.Provisioner do [ :project_users, :collections, + channels: [destination_auth_method: :project_credential], project_credentials: [credential: [:user]], workflows: {w, [:jobs, :triggers, :edges]} ], @@ -435,20 +511,120 @@ defmodule Lightning.Projects.Provisioner do |> Collection.validate() end - defp workflow_changeset(workflow, attrs) do + defp channel_changeset(channel, attrs, valid_project_credentials) do + attrs = maybe_add_destination_auth_param(attrs, channel) + + channel + |> cast(attrs, [ + :id, + :name, + :destination_url, + :enabled, + :delete + ]) + |> validate_required([:id, :name, :destination_url]) + |> block_channel_deletion() + |> cast_assoc(:destination_auth_method, + with: fn struct, attrs -> + struct + |> ChannelAuthMethod.changeset(attrs) + |> validate_project_credential_in_project( + :project_credential_id, + valid_project_credentials + ) + end + ) + |> validate_extraneous_params() + |> Channel.validate() + end + + defp validate_project_credential_in_project( + changeset, + field, + valid_project_credentials + ) do + changeset + |> validate_change(field, fn _ky, value -> + if value in valid_project_credentials do + [] + else + [{field, "credential doesnt exist or isn't available in this project"}] + end + end) + end + + # Channel deletion is intentionally rejected by the provisioner because + # `channel_requests` rows must be drained first (the `on_delete: :restrict` + # FK). Users should delete channels through the dashboard, which handles + # request cleanup transactionally. + defp block_channel_deletion(changeset) do + if get_change(changeset, :delete) == true do + add_error( + changeset, + :delete, + "channel deletion is not supported via the provisioning API; " <> + "delete from the dashboard instead" + ) + else + changeset + end + end + + defp maybe_add_destination_auth_param(attrs, %Channel{} = channel) do + case Map.fetch(attrs, "destination_credential_id") do + :error -> + attrs + + {:ok, project_credential_id} -> + attrs + |> Map.delete("destination_credential_id") + |> Map.put( + "destination_auth_method", + build_destination_auth_method_attrs( + existing_destination_auth_method(channel), + project_credential_id + ) + ) + end + end + + defp existing_destination_auth_method(%Channel{ + destination_auth_method: %ChannelAuthMethod{} = method + }), + do: method + + defp existing_destination_auth_method(_), do: nil + + defp build_destination_auth_method_attrs(current, project_credential_id) do + case {current, project_credential_id} do + {_, nil} -> + nil + + {%{id: id, project_credential_id: pc_id}, pc_id} -> + %{"id" => id} + + {_, _} -> + %{ + "role" => "destination", + "project_credential_id" => project_credential_id + } + end + end + + defp workflow_changeset(workflow, attrs, valid_project_credentials) do workflow |> cast(attrs, [:id, :name, :delete, :deleted_at]) |> optimistic_lock(:lock_version) |> validate_required([:id]) |> maybe_soft_delete_workflow() |> validate_extraneous_params(ignore: ["version_history"]) - |> cast_assoc(:jobs, with: &job_changeset/2) + |> cast_assoc(:jobs, with: &job_changeset(&1, &2, valid_project_credentials)) |> cast_assoc(:triggers, with: &trigger_changeset/2) |> cast_assoc(:edges, with: &edge_changeset/2) |> Workflow.validate() end - defp job_changeset(job, attrs) do + defp job_changeset(job, attrs, valid_project_credentials) do job |> Job.changeset(attrs) |> cast(attrs, [:delete]) @@ -456,6 +632,10 @@ defmodule Lightning.Projects.Provisioner do |> unique_constraint(:id, name: :jobs_pkey) |> validate_extraneous_params() |> maybe_mark_for_deletion() + |> validate_project_credential_in_project( + :project_credential_id, + valid_project_credentials + ) end defp trigger_changeset(trigger, attrs) do diff --git a/lib/lightning_web/controllers/api/provisioning_json.ex b/lib/lightning_web/controllers/api/provisioning_json.ex index ec80d6d585..fa23a8e8e5 100644 --- a/lib/lightning_web/controllers/api/provisioning_json.ex +++ b/lib/lightning_web/controllers/api/provisioning_json.ex @@ -4,6 +4,8 @@ defmodule LightningWeb.API.ProvisioningJSON do import LightningWeb.CoreComponents, only: [translate_error: 1] import Ecto.Changeset + alias Lightning.Channels.Channel + alias Lightning.Channels.ChannelAuthMethod alias Lightning.Collections.Collection alias Lightning.Projects.Project alias Lightning.Projects.ProjectCredential @@ -42,6 +44,12 @@ defmodule LightningWeb.API.ProvisioningJSON do |> Enum.sort_by(& &1.inserted_at, NaiveDateTime) |> Enum.map(&as_json/1) ) + |> Map.put( + :channels, + project.channels + |> Enum.sort_by(& &1.inserted_at, NaiveDateTime) + |> Enum.map(&as_json/1) + ) end def as_json(%module{} = workflow_or_snapshot) @@ -140,6 +148,23 @@ defmodule LightningWeb.API.ProvisioningJSON do %{id: collection.id, name: collection.name} end + def as_json(%Channel{} = channel) do + %{ + id: channel.id, + name: channel.name, + destination_url: channel.destination_url, + enabled: channel.enabled, + destination_credential_id: destination_credential_id(channel) + } + end + + defp destination_credential_id(%Channel{destination_auth_method: method}) do + case method do + %ChannelAuthMethod{project_credential_id: id} -> id + _ -> nil + end + end + defp drop_keys_with_nil_value(map) do Map.reject(map, fn {_, v} -> is_nil(v) end) end diff --git a/test/fixtures/canonical_project.yaml b/test/fixtures/canonical_project.yaml index 9c6efd6194..da678413ce 100644 --- a/test/fixtures/canonical_project.yaml +++ b/test/fixtures/canonical_project.yaml @@ -4,6 +4,7 @@ description: | collections: cannonical-collection: name: cannonical-collection +channels: null credentials: cannonical-user@lightning.com-new-credential: name: new credential diff --git a/test/fixtures/webhook_reply_and_cron_cursor_project.yaml b/test/fixtures/webhook_reply_and_cron_cursor_project.yaml index 90f2d6e027..2d393f8488 100644 --- a/test/fixtures/webhook_reply_and_cron_cursor_project.yaml +++ b/test/fixtures/webhook_reply_and_cron_cursor_project.yaml @@ -1,6 +1,7 @@ name: webhook-reply-and-cron-cursor-project description: null collections: null +channels: null credentials: null workflows: cron-cursor-workflow: diff --git a/test/integration/cli_deploy_test.exs b/test/integration/cli_deploy_test.exs index f15b340426..85f19aac8c 100644 --- a/test/integration/cli_deploy_test.exs +++ b/test/integration/cli_deploy_test.exs @@ -355,7 +355,8 @@ defmodule Lightning.CliDeployTest do Map.merge(state, %{ workflows: workflows, project_credentials: credentials, - collections: collections + collections: collections, + channels: [] }) end diff --git a/test/lightning/projects/provisioner_test.exs b/test/lightning/projects/provisioner_test.exs index 5ab456b91e..27018fee04 100644 --- a/test/lightning/projects/provisioner_test.exs +++ b/test/lightning/projects/provisioner_test.exs @@ -324,6 +324,102 @@ defmodule Lightning.Projects.ProvisionerTest do } = collection end + test "creates channels" do + Mox.verify_on_exit!() + user = insert(:user) + + %{body: body, project_id: project_id} = valid_document() + + channel_id = Ecto.UUID.generate() + + body_with_channels = + Map.put(body, "channels", [ + %{ + id: channel_id, + name: "my-channel", + destination_url: "https://example.com/destination", + enabled: true + } + ]) + + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + {:ok, project} = + Provisioner.import_document( + %Lightning.Projects.Project{}, + user, + body_with_channels + ) + + assert %{id: ^project_id, channels: [channel]} = project + + assert %{ + id: ^channel_id, + name: "my-channel", + destination_url: "https://example.com/destination", + enabled: true, + project_id: ^project_id + } = channel + end + + test "creates a channel with a destination_credential_id" do + Mox.verify_on_exit!() + user = insert(:user) + + credential = insert(:credential, name: "Dest Cred", user: user) + + %{body: body, project_id: project_id} = valid_document() + + project_credential_id = Ecto.UUID.generate() + channel_id = Ecto.UUID.generate() + + credentials_payload = [ + %{ + "id" => project_credential_id, + "name" => credential.name, + "owner" => user.email + } + ] + + body_with_channels = + body + |> Map.put("project_credentials", credentials_payload) + |> Map.put("channels", [ + %{ + "id" => channel_id, + "name" => "my-channel", + "destination_url" => "https://example.com/destination", + "enabled" => true, + "destination_credential_id" => project_credential_id + } + ]) + + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + {:ok, project} = + Provisioner.import_document( + %Lightning.Projects.Project{}, + user, + body_with_channels + ) + + assert %{id: ^project_id, channels: [channel]} = project + assert channel.id == channel_id + + assert %Lightning.Channels.ChannelAuthMethod{ + role: :destination, + project_credential_id: ^project_credential_id + } = channel.destination_auth_method + end + test "imports trigger with webhook_reply field" do Mox.verify_on_exit!() user = insert(:user) @@ -1145,6 +1241,409 @@ defmodule Lightning.Projects.ProvisionerTest do assert remaining_collection.id == collection.id end + test "updating a channel", %{ + project: %{id: project_id} = project, + user: user + } do + channel = insert(:channel, project: project, name: "old-name") + channel_id = channel.id + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel_id, + "name" => "new-name", + "destination_url" => "https://example.com/new", + "enabled" => false + } + ] + } + + assert {:ok, %{id: ^project_id, channels: [updated]}} = + Provisioner.import_document(project, user, body) + + assert %{ + id: ^channel_id, + name: "new-name", + destination_url: "https://example.com/new", + enabled: false + } = updated + end + + test "audits channel create, update, and destination auth changes", %{ + project: %{id: project_id} = project, + user: %{id: user_id} = user + } do + pc1 = insert(:project_credential, project: project) + pc2 = insert(:project_credential, project: project) + + # 1. Create a channel with a destination credential + new_channel_id = Ecto.UUID.generate() + + body_create = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => new_channel_id, + "name" => "audit-channel", + "destination_url" => "https://example.com/destination", + "enabled" => true, + "destination_credential_id" => pc1.id + } + ] + } + + assert {:ok, _project} = + Provisioner.import_document(project, user, body_create) + + assert created_audit = + Repo.one( + from a in Audit, + where: + a.item_id == ^new_channel_id and a.item_type == "channel" and + a.event == "created" + ) + + assert created_audit.actor_id == user_id + + assert auth_added_audit = + Repo.one( + from a in Audit, + where: + a.item_id == ^new_channel_id and a.item_type == "channel" and + a.event == "auth_method_added" + ) + + assert auth_added_audit.actor_id == user_id + + # 2. Update the channel and swap the credential + body_update = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => new_channel_id, + "name" => "audit-channel-renamed", + "destination_credential_id" => pc2.id + } + ] + } + + assert {:ok, _project} = + Provisioner.import_document(project, user, body_update) + + assert Repo.one( + from a in Audit, + where: + a.item_id == ^new_channel_id and a.item_type == "channel" and + a.event == "updated" + ) + + assert Repo.one( + from a in Audit, + where: + a.item_id == ^new_channel_id and a.item_type == "channel" and + a.event == "auth_method_changed" + ) + + # 3. Clear the credential + body_clear = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => new_channel_id, + "destination_credential_id" => nil + } + ] + } + + assert {:ok, _project} = + Provisioner.import_document(project, user, body_clear) + + assert Repo.one( + from a in Audit, + where: + a.item_id == ^new_channel_id and a.item_type == "channel" and + a.event == "auth_method_removed" + ) + end + + test "audits multiple channels in one import without step-name collisions", + %{project: %{id: project_id} = project, user: %{id: user_id} = user} do + pc_a = insert(:project_credential, project: project) + pc_b = insert(:project_credential, project: project) + + channel_a_id = Ecto.UUID.generate() + channel_b_id = Ecto.UUID.generate() + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel_a_id, + "name" => "channel-a", + "destination_url" => "https://example.com/a", + "enabled" => true, + "destination_credential_id" => pc_a.id + }, + %{ + "id" => channel_b_id, + "name" => "channel-b", + "destination_url" => "https://example.com/b", + "enabled" => true, + "destination_credential_id" => pc_b.id + } + ] + } + + assert {:ok, _project} = + Provisioner.import_document(project, user, body) + + # Both channels emit "created" and "auth_method_added" audits scoped + # to their own item_id — proves the batched single-Multi path doesn't + # collide on shared audit step keys. + for channel_id <- [channel_a_id, channel_b_id] do + assert created = + Repo.one( + from a in Audit, + where: + a.item_id == ^channel_id and + a.item_type == "channel" and + a.event == "created" + ) + + assert created.actor_id == user_id + + assert Repo.one( + from a in Audit, + where: + a.item_id == ^channel_id and a.item_type == "channel" and + a.event == "auth_method_added" + ) + end + end + + test "audits channel changes when the actor is a ProjectRepoConnection", + %{project: %{id: project_id} = project} do + repo_connection = insert(:project_repo_connection, project: project) + pc = insert(:project_credential, project: project) + + channel_id = Ecto.UUID.generate() + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel_id, + "name" => "repo-sync-channel", + "destination_url" => "https://example.com/destination", + "enabled" => true, + "destination_credential_id" => pc.id + } + ] + } + + assert {:ok, _project} = + Provisioner.import_document(project, repo_connection, body) + + assert created_audit = + Repo.one( + from a in Audit, + where: + a.item_id == ^channel_id and a.item_type == "channel" and + a.event == "created" + ) + + assert created_audit.actor_id == repo_connection.id + assert created_audit.actor_type == :project_repo_connection + + assert auth_audit = + Repo.one( + from a in Audit, + where: + a.item_id == ^channel_id and a.item_type == "channel" and + a.event == "auth_method_added" + ) + + assert auth_audit.actor_id == repo_connection.id + assert auth_audit.actor_type == :project_repo_connection + end + + test "rejects channel deletion with a helpful error", %{ + project: %{id: project_id} = project, + user: user + } do + channel = insert(:channel, project: project) + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{"id" => channel.id, "delete" => true} + ] + } + + assert {:error, changeset} = + Provisioner.import_document(project, user, body) + + assert %{channels: [%{delete: [msg]}]} = flatten_errors(changeset) + assert msg =~ "deletion is not supported" + + # Channel is unchanged in the database + assert Repo.reload(channel) + end + + test "rejects a destination_credential_id from another project", %{ + project: %{id: project_id} = project, + user: user + } do + other_project = insert(:project) + foreign_pc = insert(:project_credential, project: other_project) + + channel_id = Ecto.UUID.generate() + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel_id, + "name" => "leaky-channel", + "destination_url" => "https://example.com/destination", + "enabled" => true, + "destination_credential_id" => foreign_pc.id + } + ] + } + + assert {:error, changeset} = + Provisioner.import_document(project, user, body) + + assert %{ + channels: [ + %{destination_auth_method: %{project_credential_id: [msg]}} + ] + } = flatten_errors(changeset) + + assert msg =~ "isn't available in this project" + + # Channel was not persisted + refute Repo.get(Lightning.Channels.Channel, channel_id) + end + + test "rejects a job project_credential_id from another project", %{ + project: %{id: project_id} = project, + user: user + } do + other_project = insert(:project) + foreign_pc = insert(:project_credential, project: other_project) + + %{ + body: %{"workflows" => [workflow]} = body, + workflows: [%{first_job_id: first_job_id}] + } = + valid_document(project_id) + + tainted_jobs = + Enum.map(workflow["jobs"], fn job -> + if job["id"] == first_job_id do + Map.put(job, "project_credential_id", foreign_pc.id) + else + job + end + end) + + body = Map.put(body, "workflows", [%{workflow | "jobs" => tainted_jobs}]) + + assert {:error, changeset} = + Provisioner.import_document(project, user, body) + + assert %{ + workflows: [%{jobs: job_errors}] + } = flatten_errors(changeset) + + assert Enum.any?(job_errors, fn job_error -> + case job_error do + %{project_credential_id: [msg]} -> + msg =~ "isn't available in this project" + + _ -> + false + end + end) + end + + test "setting a channel's destination_credential_id replaces the existing auth method", + %{project: %{id: project_id} = project, user: user} do + pc_old = insert(:project_credential, project: project) + pc_new = insert(:project_credential, project: project) + + channel = insert(:channel, project: project) + + insert(:channel_auth_method, + channel: channel, + role: :destination, + webhook_auth_method: nil, + project_credential: pc_old + ) + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel.id, + "destination_credential_id" => pc_new.id + } + ] + } + + assert {:ok, %{channels: [updated]}} = + Provisioner.import_document(project, user, body) + + assert %Lightning.Channels.ChannelAuthMethod{ + role: :destination, + project_credential_id: new_pc_id + } = updated.destination_auth_method + + assert new_pc_id == pc_new.id + end + + test "setting destination_credential_id to nil clears the destination auth method", + %{project: %{id: project_id} = project, user: user} do + pc = insert(:project_credential, project: project) + channel = insert(:channel, project: project) + + insert(:channel_auth_method, + channel: channel, + role: :destination, + webhook_auth_method: nil, + project_credential: pc + ) + + body = %{ + "id" => project_id, + "name" => "test-project", + "channels" => [ + %{ + "id" => channel.id, + "destination_credential_id" => nil + } + ] + } + + assert {:ok, %{channels: [updated]}} = + Provisioner.import_document(project, user, body) + + assert updated.destination_auth_method == nil + end + test "usage limiter is called when creating collection", %{ project: %{id: project_id} = project, user: user diff --git a/test/lightning/projects_test.exs b/test/lightning/projects_test.exs index 1ce03ee040..8b23fbc60b 100644 --- a/test/lightning/projects_test.exs +++ b/test/lightning/projects_test.exs @@ -620,7 +620,7 @@ defmodule Lightning.ProjectsTest do project = project_fixture(name: "newly-created-project") expected_yaml = - "name: newly-created-project\ndescription: null\ncollections: null\ncredentials: null\nworkflows: null" + "name: newly-created-project\ndescription: null\ncollections: null\nchannels: null\ncredentials: null\nworkflows: null" {:ok, generated_yaml} = Projects.export_project(:yaml, project.id) @@ -780,6 +780,55 @@ defmodule Lightning.ProjectsTest do assert generated_yaml =~ expected_yaml_trigger end + test "channels are included in the export with their destination credential" do + user = insert(:user, email: "channel-user@lightning.com") + credential = insert(:credential, name: "channel-cred", user: user) + + project = insert(:project, name: "project-with-channel") + + project_credential = + insert(:project_credential, project: project, credential: credential) + + channel = + insert(:channel, + project: project, + name: "my-channel", + destination_url: "https://example.com/destination", + enabled: true + ) + + insert(:channel_auth_method, + channel: channel, + role: :destination, + webhook_auth_method: nil, + project_credential: project_credential + ) + + channel_only_name = + insert(:channel, + project: project, + name: "no-cred-channel", + destination_url: "https://example.com/other", + enabled: false + ) + + assert {:ok, generated_yaml} = Projects.export_project(:yaml, project.id) + + assert generated_yaml =~ """ + channels: + my-channel: + name: my-channel + destination_url: 'https://example.com/destination' + enabled: true + destination_credential: channel-user@lightning.com-channel-cred + no-cred-channel: + name: #{channel_only_name.name} + destination_url: 'https://example.com/other' + enabled: false + destination_credential: null + """ + end + test "webhook_response_config is included in the export" do project = insert(:project, name: "project 1") diff --git a/test/lightning_web/controllers/api/provisioning_controller_test.exs b/test/lightning_web/controllers/api/provisioning_controller_test.exs index 65aa6bedb4..0b6704d609 100644 --- a/test/lightning_web/controllers/api/provisioning_controller_test.exs +++ b/test/lightning_web/controllers/api/provisioning_controller_test.exs @@ -76,6 +76,71 @@ defmodule LightningWeb.API.ProvisioningControllerTest do ] end + test "returns a project with channels", %{ + conn: conn, + user: user + } do + %{id: project_id} = + project = + insert(:project, + project_users: [%{user_id: user.id}] + ) + + project_credential = + insert(:project_credential, + credential: %{name: "dest-cred", body: %{}, user_id: user.id}, + project: project + ) + + %{id: channel_with_cred_id} = + channel_with_cred = + insert(:channel, + project: project, + name: "with-cred", + destination_url: "https://example.com/a", + enabled: true + ) + + insert(:channel_auth_method, + channel: channel_with_cred, + role: :destination, + webhook_auth_method: nil, + project_credential: project_credential + ) + + %{id: channel_without_cred_id} = + insert(:channel, + project: project, + name: "without-cred", + destination_url: "https://example.com/b", + enabled: false + ) + + conn = get(conn, ~p"/api/provision/#{project_id}") + response = json_response(conn, 200) + + assert %{"channels" => channels_resp} = response["data"] + + expected_pc_id = project_credential.id + + assert [ + %{ + "id" => ^channel_with_cred_id, + "name" => "with-cred", + "destination_url" => "https://example.com/a", + "enabled" => true, + "destination_credential_id" => ^expected_pc_id + }, + %{ + "id" => ^channel_without_cred_id, + "name" => "without-cred", + "destination_url" => "https://example.com/b", + "enabled" => false, + "destination_credential_id" => nil + } + ] = channels_resp + end + test "returns a non empty project without credentials", %{ conn: conn, user: user