Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ and this project adheres to
auth method and the destination project credential on every proxied request.
Feature-gated behind experimental features.
[#4541](https://github.com/OpenFn/lightning/issues/4541)
- Support channels in the provisioner API
[#4522](https://github.com/OpenFn/lightning/issues/4522)
- Do not persist channel request/response data when project has zero-persistence
enabled [#4622](https://github.com/OpenFn/lightning/issues/4622)
- Prometheus metrics for the channels HTTP reverse-proxy via a new PromEx
Expand Down
8 changes: 6 additions & 2 deletions lib/lightning/channels.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ defmodule Lightning.Channels do
%Ecto.Changeset{} = audit_cs -> Repo.insert(audit_cs)
end
end)
|> Audit.audit_auth_method_changes(changeset, actor)
|> Multi.merge(fn %{channel: channel} ->
Audit.audit_auth_method_changes(Multi.new(), channel, changeset, actor)
end)
|> Repo.transaction()
|> case do
{:ok, %{channel: channel}} -> {:ok, channel}
Expand All @@ -225,7 +227,9 @@ defmodule Lightning.Channels do
%Ecto.Changeset{} = audit_cs -> Repo.insert(audit_cs)
end
end)
|> Audit.audit_auth_method_changes(changeset, actor)
|> Multi.merge(fn %{channel: channel} ->
Audit.audit_auth_method_changes(Multi.new(), channel, changeset, actor)
end)
|> Repo.transaction()
|> case do
{:ok, %{channel: channel}} -> {:ok, channel}
Expand Down
71 changes: 41 additions & 30 deletions lib/lightning/channels/audit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Lightning.Channels.Audit do
Audit trail for channel CRUD and auth method changes.

Provides `event/5` (via `Lightning.Auditing.Audit`) for basic CRUD events,
plus `audit_auth_method_changes/3` which derives fine-grained audit events
plus `audit_auth_method_changes/4` which derives fine-grained audit events
for client and destination auth method additions, removals, and swaps.
"""
use Lightning.Auditing.Audit,
Expand All @@ -19,32 +19,36 @@ defmodule Lightning.Channels.Audit do
]

alias Ecto.Multi
alias Lightning.Channels.Channel

@doc """
Appends audit events for auth method changes to the given Multi.

Inspects the changeset for changes to `:client_auth_methods` and
`:destination_auth_method`, emitting the appropriate added/removed/changed
events. No-op when no auth method changes are present.

Step keys are unique per call, so the helper can be composed into a larger Multi any number of
times — e.g. once per channel when batching audits across channels.
"""
def audit_auth_method_changes(multi, changeset, actor) do
def audit_auth_method_changes(multi, %Channel{} = channel, changeset, actor) do
multi
|> audit_client_changes(changeset, actor)
|> audit_destination_changes(changeset, actor)
|> audit_client_changes(channel, changeset, actor)
|> audit_destination_changes(channel, changeset, actor)
end

# --- Client auth methods (has_many) ---

defp audit_client_changes(multi, changeset, actor) do
defp audit_client_changes(multi, channel, changeset, actor) do
changes =
Ecto.Changeset.get_change(changeset, :client_auth_methods, [])

inserted = Enum.filter(changes, &(&1.action == :insert))
deleted = Enum.filter(changes, &(&1.action == :delete))

multi
|> add_auth_method_audits(:added, inserted, :client, actor)
|> add_auth_method_audits(:removed, deleted, :client, actor)
|> add_auth_method_audits(:added, inserted, :client, channel, actor)
|> add_auth_method_audits(:removed, deleted, :client, channel, actor)
end

# --- Destination auth method (has_one, on_replace: :delete) ---
Expand All @@ -55,7 +59,7 @@ defmodule Lightning.Channels.Audit do
# 3. Swap (existing → different) → replace produces a delete of old +
# insert of new. We emit a single "auth_method_changed" event instead.

defp audit_destination_changes(multi, changeset, actor) do
defp audit_destination_changes(multi, channel, changeset, actor) do
old = changeset.data |> Map.get(:destination_auth_method)
has_existing? = old != nil and not match?(%Ecto.Association.NotLoaded{}, old)

Expand All @@ -68,7 +72,7 @@ defmodule Lightning.Channels.Audit do
multi

nil when has_existing? ->
audit_destination_removed(multi, old, actor)
audit_destination_removed(multi, channel, old, actor)

nil ->
multi
Expand All @@ -77,47 +81,54 @@ defmodule Lightning.Channels.Audit do
old_fields = fields_for_data(old, :destination)
new_fields = fields_for_changeset(new_cs, :destination)

Multi.insert(multi, :audit_destination_changed, fn %{channel: channel} ->
Multi.insert(
multi,
unique_key(:audit_destination_changed),
event("auth_method_changed", channel.id, actor, %{
before: old_fields,
after: new_fields
})
end)
)

%Ecto.Changeset{action: :insert} = new_cs ->
fields = fields_for_changeset(new_cs, :destination)

Multi.insert(multi, :audit_destination_added, fn %{channel: channel} ->
Multi.insert(
multi,
unique_key(:audit_destination_added),
event("auth_method_added", channel.id, actor, %{
before: nil,
after: fields
})
end)
)

%Ecto.Changeset{action: :delete} when has_existing? ->
audit_destination_removed(multi, old, actor)
audit_destination_removed(multi, channel, old, actor)

%Ecto.Changeset{action: :delete} ->
multi
end
end

defp audit_destination_removed(multi, nil, _actor), do: multi
defp audit_destination_removed(multi, _channel, nil, _actor), do: multi

defp audit_destination_removed(multi, old, actor) do
defp audit_destination_removed(multi, channel, old, actor) do
old_fields = fields_for_data(old, :destination)

Multi.insert(multi, :audit_destination_removed, fn %{channel: channel} ->
Multi.insert(
multi,
unique_key("audit_destination_removed"),
event("auth_method_removed", channel.id, actor, %{
before: old_fields,
after: nil
})
end)
)
end

defp add_auth_method_audits(multi, _direction, [], _role, _actor), do: multi
defp add_auth_method_audits(multi, _direction, [], _role, _channel, _actor),
do: multi

defp add_auth_method_audits(multi, direction, changesets, role, actor) do
defp add_auth_method_audits(multi, direction, changesets, role, channel, actor) do
{event_name, extract_fields, wrap} =
case direction do
:added ->
Expand All @@ -127,24 +138,24 @@ defmodule Lightning.Channels.Audit do
{"auth_method_removed", &fields_for_data(&1.data, role), &{&1, nil}}
end

changesets
|> Enum.with_index()
|> Enum.reduce(multi, fn {cs, idx}, acc ->
Enum.reduce(changesets, multi, fn cs, acc ->
{before, after_val} = wrap.(extract_fields.(cs))

Multi.insert(
acc,
:"audit_#{role}_#{event_name}_#{idx}",
fn %{channel: channel} ->
event(event_name, channel.id, actor, %{
before: before,
after: after_val
})
end
unique_key("audit_#{role}_#{event_name}"),
event(event_name, channel.id, actor, %{
before: before,
after: after_val
})
)
end)
end

defp unique_key(base) do
"#{base}_#{System.unique_integer([:positive])}"
end

# Extract fields from a changeset (for inserts/new records)
defp fields_for_changeset(cs, :client) do
%{
Expand Down
27 changes: 20 additions & 7 deletions lib/lightning/channels/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Lightning.Channels.Channel do
field :destination_url, :string
field :enabled, :boolean, default: true
field :lock_version, :integer, default: 0
field :delete, :boolean, virtual: true

belongs_to :project, Project

Expand Down Expand Up @@ -57,13 +58,7 @@ defmodule Lightning.Channels.Channel do
:enabled
])
|> validate_required([:name, :destination_url, :project_id])
|> Validators.validate_url(:destination_url)
|> assoc_constraint(:project)
|> unique_constraint([:project_id, :name],
error_key: :name,
message: "A channel with this name already exists in this project"
)
|> optimistic_lock(:lock_version)
|> validate()
|> cast_assoc(:client_auth_methods,
with: fn struct, attrs ->
ChannelAuthMethod.changeset(struct, put_role(attrs, "client"))
Expand All @@ -76,6 +71,24 @@ defmodule Lightning.Channels.Channel do
)
end

@doc """
Shared validation rules for a channel changeset.

Used by `changeset/2` and by callers (such as the provisioner) that
build a channel changeset with a different shape of input fields but
need the same business-rule validations.
"""
def validate(changeset) do
changeset
|> Validators.validate_url(:destination_url)
|> assoc_constraint(:project)
|> unique_constraint([:project_id, :name],
error_key: :name,
message: "A channel with this name already exists in this project"
)
|> optimistic_lock(:lock_version)
end

defp put_role(attrs, role) when is_map(attrs) do
key = if has_string_keys?(attrs), do: "role", else: :role
Map.put(attrs, key, role)
Expand Down
49 changes: 46 additions & 3 deletions lib/lightning/export_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ defmodule Lightning.ExportUtils do
:name,
:description,
:collections,
:channels,
:credentials,
:globals,
:workflows
],
collection: [:name],
channel: [:name, :destination_url, :enabled, :destination_credential],
credential: [:name, :owner],
workflow: [:name, :jobs, :triggers, :edges],
job: [:name, :adaptor, :credential, :globals, :body],
Expand Down Expand Up @@ -392,13 +394,23 @@ defmodule Lightning.ExportUtils do
Map.put(acc, hyphenate(collection.name), ytree)
end)

channels_map =
project.channels
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.reduce(%{}, fn channel, acc ->
ytree = build_channel_yaml_tree(channel, project.project_credentials)

Map.put(acc, hyphenate(channel.name), ytree)
end)

%{
name: project.name,
description: project.description,
node_type: :project,
workflows: workflows_map,
credentials: credentials_map,
collections: collections_map
collections: collections_map,
channels: channels_map
}
end

Expand Down Expand Up @@ -427,6 +439,35 @@ defmodule Lightning.ExportUtils do
}
end

defp build_channel_yaml_tree(channel, project_credentials) do
project_credential_id = channel_destination_project_credential_id(channel)

project_credential =
project_credential_id &&
Enum.find(project_credentials, fn pc ->
pc.id == project_credential_id
end)

%{
name: channel.name,
destination_url: channel.destination_url,
enabled: channel.enabled,
node_type: :channel,
destination_credential:
project_credential && project_credential_key(project_credential)
}
end

defp channel_destination_project_credential_id(%{destination_auth_method: nil}),
do: nil

defp channel_destination_project_credential_id(%{
destination_auth_method: %{project_credential_id: id}
}),
do: id

defp channel_destination_project_credential_id(_), do: nil

defp build_workflow_yaml_tree(workflow, project_credentials) do
jobs =
workflow.jobs
Expand Down Expand Up @@ -459,7 +500,8 @@ defmodule Lightning.ExportUtils do
project =
Lightning.Repo.preload(project,
project_credentials: [credential: :user],
collections: []
collections: [],
channels: [destination_auth_method: :project_credential]
)

yaml =
Expand All @@ -475,7 +517,8 @@ defmodule Lightning.ExportUtils do
project =
Lightning.Repo.preload(project,
project_credentials: [credential: :user],
collections: []
collections: [],
channels: [destination_auth_method: :project_credential]
)

yaml =
Expand Down
3 changes: 2 additions & 1 deletion lib/lightning/projects/project.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ defmodule Lightning.Projects.Project do
has_many :workflows, Workflow, where: [deleted_at: nil]
has_many :jobs, through: [:workflows, :jobs]

has_many :channels, Lightning.Channels.Channel

has_many :project_credentials, ProjectCredential
has_many :credentials, through: [:project_credentials, :credential]

has_many :collections, Lightning.Collections.Collection
has_many :channels, Lightning.Channels.Channel

timestamps()
end
Expand Down
Loading