Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ export function AIAssistantPanelWrapper({
}
} else {
// important: determines what ai to be used
options = { ...options, job_id: aiMode?.context.job_id };
options = { ...aiMode?.context, ...options };
Comment thread
doc-han marked this conversation as resolved.
}

// Update store state and send through registry
Expand Down
17 changes: 17 additions & 0 deletions lib/lightning/ai_assistant/ai_assistant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,23 @@ defmodule Lightning.AiAssistant do
|> Map.put("chat_session_id", session.id)
|> Map.put("code", code)
|> maybe_put_job_id_from_session(session)
|> maybe_put_unsaved_job_meta(session)
end

defp maybe_put_unsaved_job_meta(attrs, session) do
is_assistant = to_string(Map.get(attrs, "role")) == "assistant"
unsaved_job = get_in(session.meta || %{}, ["unsaved_job"])

if is_assistant && unsaved_job && unsaved_job["id"] do
existing_meta = Map.get(attrs, "meta", %{})

updated_meta =
Map.put(existing_meta, "from_unsaved_job", unsaved_job["id"])

Map.put(attrs, "meta", updated_meta)
else
attrs
end
end

defp update_session_meta(session, nil),
Expand Down
4 changes: 4 additions & 0 deletions lib/lightning/ai_assistant/chat_message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Lightning.AiAssistant.ChatMessage do
* `status` - Processing status: `:pending`, `:success`, `:error`, or `:cancelled`
* `is_deleted` - Soft deletion flag (defaults to false)
* `is_public` - Whether the message is publicly visible (defaults to true)
* `meta` - Additional metadata (e.g., `"unsaved_job"` for job data not yet saved)
* `chat_session_id` - Reference to the parent chat session
* `user_id` - Reference to the user who sent the message (required for user messages)
"""
Expand All @@ -36,6 +37,7 @@ defmodule Lightning.AiAssistant.ChatMessage do
job_id: Ecto.UUID.t() | nil,
is_deleted: boolean(),
is_public: boolean(),
meta: map() | nil,
processing_started_at: DateTime.t() | nil,
processing_completed_at: DateTime.t() | nil,
chat_session_id: Ecto.UUID.t(),
Expand All @@ -54,6 +56,7 @@ defmodule Lightning.AiAssistant.ChatMessage do

field :is_deleted, :boolean, default: false
field :is_public, :boolean, default: true
field :meta, :map, default: %{}
field :processing_started_at, :utc_datetime_usec
field :processing_completed_at, :utc_datetime_usec

Expand Down Expand Up @@ -89,6 +92,7 @@ defmodule Lightning.AiAssistant.ChatMessage do
:status,
:is_deleted,
:is_public,
:meta,
:chat_session_id,
:processing_started_at,
:processing_completed_at
Expand Down
72 changes: 57 additions & 15 deletions lib/lightning/ai_assistant/message_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,65 @@ defmodule Lightning.AiAssistant.MessageProcessor do
|> Repo.get!(message_id)
|> update_message_status(:processing)

# session.job_id for old job session. message.job_id for newer ones
is_job_chat = !is_nil(session.job_id) || !is_nil(message.job_id)

result =
if is_job_chat do
session =
if message.job_id do
%{session | job_id: message.job_id}
else
session
end
session = update_session_with_job_context(session, message)
result = dispatch_message_processing(session, message)
handle_processing_result(message, result)
end

process_job_message(session, message)
else
process_workflow_message(session, message)
end
@spec update_session_with_job_context(
AiAssistant.ChatSession.t(),
ChatMessage.t()
) :: AiAssistant.ChatSession.t()
defp update_session_with_job_context(session, message) do
message_meta = message.meta || %{}

cond do
message.job_id ->
%{session | job_id: message.job_id}

Map.has_key?(message_meta, "unsaved_job") ->
updated_meta =
Map.put(
session.meta || %{},
"unsaved_job",
message_meta["unsaved_job"]
)

%{session | meta: updated_meta}

true ->
session
end
end

@spec dispatch_message_processing(
AiAssistant.ChatSession.t(),
ChatMessage.t()
) :: {:ok, AiAssistant.ChatSession.t()} | {:error, String.t()}
defp dispatch_message_processing(session, message) do
if job_chat?(session, message) do
process_job_message(session, message)
else
process_workflow_message(session, message)
end
end

@spec job_chat?(AiAssistant.ChatSession.t(), ChatMessage.t()) :: boolean()
defp job_chat?(session, message) do
message_meta = message.meta || %{}

# session.job_id for old job session. message.job_id for newer ones
# Also check for unsaved job data in message.meta
!is_nil(session.job_id) ||
!is_nil(message.job_id) ||
Map.has_key?(message_meta, "unsaved_job")
end

@spec handle_processing_result(
ChatMessage.t(),
{:ok, AiAssistant.ChatSession.t()} | {:error, String.t()}
) :: {:ok, AiAssistant.ChatSession.t()} | {:error, String.t()}
defp handle_processing_result(message, result) do
case result do
{:ok, _} ->
{:ok, updated_session, _updated_message} =
Expand Down
73 changes: 67 additions & 6 deletions lib/lightning_web/channels/ai_assistant_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,16 @@ defmodule LightningWeb.AiAssistantChannel do
end

defp format_message(message) do
job_id =
case message.meta do
%{"from_unsaved_job" => unsaved_job_id}
when not is_nil(unsaved_job_id) ->
unsaved_job_id

_ ->
message.job_id
end

%{
id: message.id,
content: message.content,
Expand All @@ -654,7 +664,7 @@ defmodule LightningWeb.AiAssistantChannel do
inserted_at: message.inserted_at,
user_id: message.user_id,
user: format_user(message.user),
job_id: message.job_id
job_id: job_id
}
end

Expand Down Expand Up @@ -848,9 +858,61 @@ defmodule LightningWeb.AiAssistantChannel do
socket}
end

{:error, msg} ->
{:reply, {:error, %{type: "validation_error", errors: %{base: [msg]}}},
socket}
{:error, :job_not_found} ->
Comment thread
doc-han marked this conversation as resolved.
# Job not found in DB - save message with unsaved job data in meta
handle_unsaved_job_message(
session,
user,
content,
limit_result,
params,
socket
)
end
end

defp handle_unsaved_job_message(
session,
user,
content,
limit_result,
params,
socket
) do
job_id = params["job_id"]
job_name = params["job_name"]
job_body = params["job_body"]
job_adaptor = params["job_adaptor"]
workflow_id = params["workflow_id"]

unsaved_job_data = %{
"id" => job_id,
"name" => job_name,
"body" => job_body,
"adaptor" => job_adaptor || "@openfn/language-common@latest",
"workflow_id" => workflow_id
}

message_attrs =
build_message_attrs(user, nil, content, limit_result)
|> Map.put(:meta, %{"unsaved_job" => unsaved_job_data})

opts = extract_message_options(params)

case AiAssistant.save_message(session, message_attrs, opts) do
{:ok, updated_session} ->
message = find_user_message(updated_session.messages, content)

# Broadcast the user message to all subscribers so other users see it
broadcast(socket, "user_message", %{message: format_message(message)})

response = build_message_response(message, limit_result)
{:reply, {:ok, response}, socket}

{:error, changeset} ->
errors = format_changeset_errors(changeset)

{:reply, {:error, %{type: "validation_error", errors: errors}}, socket}
end
end

Expand All @@ -860,8 +922,7 @@ defmodule LightningWeb.AiAssistantChannel do
{:ok, job}

{:error, :not_found} ->
{:error,
"Job not saved or deleted. Please save if unsaved for AI to work."}
{:error, :job_not_found}
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Lightning.Repo.Migrations.AddMetaToAiChatMessages do
use Ecto.Migration

def up do
alter table(:ai_chat_messages) do
add :meta, :map, default: %{}
end
end

def down do
alter table(:ai_chat_messages) do
remove :meta
end
end
end