diff --git a/dashboard/src/components/debug/MessageTraceDialog.tsx b/dashboard/src/components/debug/MessageTraceDialog.tsx new file mode 100644 index 0000000..7707e6c --- /dev/null +++ b/dashboard/src/components/debug/MessageTraceDialog.tsx @@ -0,0 +1,315 @@ +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Card } from "@/components/ui/card"; +import { BACKEND_URL } from "@/consts/config"; +import axios from "axios"; +import { useEffect, useMemo, useState } from "react"; +import { Loader2, AlertCircle } from "lucide-react"; +import { formatTimeWithMillis } from "@/lib/utils"; + +interface CANField { + name: string; + offset: number; + size: number; + sign: "signed" | "unsigned"; + endian: "big" | "little"; + bytes: string; + raw_value: number; + signal_names: string[]; +} + +interface CANSignal { + id: string; + timestamp: number; + vehicle_id: string; + name: string; + value: number; + raw_value: number; + produced_at: string; + created_at: string; +} + +interface CANMessage { + id: string; + vehicle_id: string; + node_id: string; + timestamp: number; + can_id: number; + bytes: string; + upload_key: number; + metadata?: { status: string; note?: string }; + produced_at: string; + created_at: string; + fields: CANField[] | null; + signals: CANSignal[]; +} + +interface Props { + // signalId is the streamed mapache.Signal id; the endpoint joins to + // gr26_can_signal to find the originating frame and returns the same + // shape as /gr26/messages/:id. + signalId: string | null; + vehicleType: string; + highlightSignal?: string; + onOpenChange: (open: boolean) => void; +} + +// Tailwind palette cycled per field so adjacent fields are visually +// distinct. Numerals are arbitrary; just need enough variety. +const FIELD_COLORS = [ + "bg-pink-500/30 text-pink-200", + "bg-purple-500/30 text-purple-200", + "bg-cyan-500/30 text-cyan-200", + "bg-amber-500/30 text-amber-200", + "bg-emerald-500/30 text-emerald-200", + "bg-blue-500/30 text-blue-200", + "bg-rose-500/30 text-rose-200", + "bg-lime-500/30 text-lime-200", +]; + +export default function MessageTraceDialog({ + signalId, + vehicleType, + highlightSignal, + onOpenChange, +}: Props) { + const [data, setData] = useState(null); + const [error, setError] = useState(null); + const [loading, setLoading] = useState(false); + + useEffect(() => { + if (!signalId) { + setData(null); + setError(null); + return; + } + let cancelled = false; + setLoading(true); + setError(null); + axios + .get(`${BACKEND_URL}/${vehicleType}/signals/${signalId}`, { + headers: { + Authorization: `Bearer ${localStorage.getItem("sentinel_access_token")}`, + }, + }) + .then((res) => { + if (cancelled) return; + setData(res.data as CANMessage); + }) + .catch((err) => { + if (cancelled) return; + setError( + err.response?.data?.error ?? err.message ?? "Failed to load trace", + ); + }) + .finally(() => { + if (!cancelled) setLoading(false); + }); + return () => { + cancelled = true; + }; + }, [signalId, vehicleType]); + + // Map every byte offset to the field that owns it (if any) so the hex + // grid can color bytes by field with a single lookup. + const byteToField = useMemo(() => { + const map = new Map(); + if (!data?.fields) return map; + data.fields.forEach((f, idx) => { + for (let i = 0; i < f.size; i++) { + map.set(f.offset + i, idx); + } + }); + return map; + }, [data]); + + const totalBytes = data ? Math.floor(data.bytes.length / 2) : 0; + + return ( + + + + CAN frame trace + + + {loading && ( +
+ + Loading trace... +
+ )} + + {error && ( +
+ +
{error}
+
+ )} + + {data && ( +
+ +
+ + + + + + + + +
+ {data.metadata?.note && ( +
+ {data.metadata.note} +
+ )} +
+ +
+
+ Bytes +
+ +
+ {Array.from({ length: totalBytes }).map((_, i) => { + const hex = data.bytes + .slice(i * 2, i * 2 + 2) + .toUpperCase(); + const fieldIdx = byteToField.get(i); + const cls = + fieldIdx != null + ? FIELD_COLORS[fieldIdx % FIELD_COLORS.length] + : "bg-muted text-muted-foreground"; + return ( +
+ {i} + {hex} +
+ ); + })} +
+
+
+ + {data.fields && data.fields.length > 0 && ( +
+
+ Fields +
+ +
+ {data.fields.map((f, idx) => { + const cls = + FIELD_COLORS[idx % FIELD_COLORS.length].split(" ")[0]; + return ( +
+
+
+ + + {f.name} + +
+ + raw {f.raw_value} · 0x + {f.bytes.toUpperCase() || "00"} + +
+
+ + offset {f.offset} · {f.size} byte + {f.size === 1 ? "" : "s"} + + + {f.sign} · {f.endian} + + {f.signal_names.length > 0 && ( + + → {f.signal_names.join(", ")} + + )} +
+
+ ); + })} +
+
+
+ )} + +
+
+ Signals from this frame +
+ + {data.signals.length === 0 ? ( +
+ None linked. +
+ ) : ( +
+ {data.signals.map((s) => { + const isHighlight = s.name === highlightSignal; + return ( +
+ {s.name} + + {s.value} · raw {s.raw_value} + +
+ ); + })} +
+ )} +
+
+
+ )} +
+
+ ); +} + +function Row({ + label, + value, + mono, +}: { + label: string; + value: string; + mono?: boolean; +}) { + return ( + <> + {label} + {value} + + ); +} diff --git a/dashboard/src/models/signal.tsx b/dashboard/src/models/signal.tsx index 51156b1..94b0de7 100644 --- a/dashboard/src/models/signal.tsx +++ b/dashboard/src/models/signal.tsx @@ -1,4 +1,5 @@ export interface Signal { + id?: string; timestamp: number; vehicle_id: string; name: string; diff --git a/dashboard/src/pages/debug/DebugPage.tsx b/dashboard/src/pages/debug/DebugPage.tsx index b691d27..87b4083 100644 --- a/dashboard/src/pages/debug/DebugPage.tsx +++ b/dashboard/src/pages/debug/DebugPage.tsx @@ -9,6 +9,7 @@ import { TableRow, } from "@/components/ui/table"; import { Input } from "@/components/ui/input"; +import MessageTraceDialog from "@/components/debug/MessageTraceDialog"; import { BACKEND_WS_URL } from "@/consts/config"; import { useVehicle, useVehicleList } from "@/lib/store"; import { Signal } from "@/models/signal"; @@ -27,6 +28,7 @@ import useWebSocket, { ReadyState } from "react-use-websocket"; import { ArrowDown, ArrowUp, ArrowUpDown } from "lucide-react"; interface SignalState { + id?: string; name: string; value: number; rawValue: number; @@ -88,6 +90,7 @@ const WsBridge = memo(function WsBridge({ totalRef.current += 1; const existing = signalsRef.current.get(parsed.name); signalsRef.current.set(parsed.name, { + id: parsed.id, name: parsed.name, value: parsed.value, rawValue: parsed.raw_value, @@ -101,7 +104,15 @@ const WsBridge = memo(function WsBridge({ }); const SignalRowView = memo( - function SignalRowView({ s, now }: { s: SignalState; now: number }) { + function SignalRowView({ + s, + now, + onSelect, + }: { + s: SignalState; + now: number; + onSelect: (s: SignalState) => void; + }) { const ageMs = Math.max(0, now - s.lastSeen); const ageColor = ageMs < 500 @@ -109,8 +120,12 @@ const SignalRowView = memo( : ageMs < 5000 ? "text-yellow-500" : "text-red-500"; + const clickable = s.id != null; return ( - + onSelect(s) : undefined} + > {s.name} {s.value} @@ -136,7 +151,9 @@ const SignalRowView = memo( // Skip the row when neither the signal nor the wall clock changed enough // to nudge the age column. 100ms is below human perception. (prev, next) => - prev.s === next.s && Math.abs(prev.now - next.now) < 100, + prev.s === next.s && + Math.abs(prev.now - next.now) < 100 && + prev.onSelect === next.onSelect, ); interface DebugTableProps { @@ -145,6 +162,7 @@ interface DebugTableProps { sortKey: SortKey; sortDir: SortDir; onSort: (key: SortKey) => void; + onSelect: (s: SignalState) => void; emptyMessage: string; } @@ -154,6 +172,7 @@ const DebugTable = memo(function DebugTable({ sortKey, sortDir, onSort, + onSelect, emptyMessage, }: DebugTableProps) { const SortIcon = ({ k }: { k: SortKey }) => { @@ -201,7 +220,14 @@ const DebugTable = memo(function DebugTable({ ) : ( - rows.map((s) => ) + rows.map((s) => ( + + )) )} @@ -239,6 +265,15 @@ export default function DebugPage() { const [readyState, setReadyState] = useState( ReadyState.UNINSTANTIATED, ); + const [selected, setSelected] = useState<{ + signalId: string; + signalName: string; + } | null>(null); + + const onSelect = useCallback((s: SignalState) => { + if (!s.id) return; + setSelected({ signalId: s.id, signalName: s.name }); + }, []); useEffect(() => { signalsRef.current = new Map(); @@ -381,9 +416,18 @@ export default function DebugPage() { sortKey={sortKey} sortDir={sortDir} onSort={onSort} + onSelect={onSelect} emptyMessage={emptyMessage} /> + { + if (!open) setSelected(null); + }} + /> ); } diff --git a/gr26/api/api.go b/gr26/api/api.go index bca039b..410d781 100644 --- a/gr26/api/api.go +++ b/gr26/api/api.go @@ -36,4 +36,6 @@ func InitializeRouter() *gin.Engine { func InitializeRoutes(router *gin.Engine) { router.GET("/gr26/ping", Ping) router.GET("/gr26/live", GetLatestSignalWebSocket) + router.GET("/gr26/messages/:id", GetCANMessage) + router.GET("/gr26/signals/:id", GetCANBySignalID) } diff --git a/gr26/api/can.go b/gr26/api/can.go new file mode 100644 index 0000000..eee7d59 --- /dev/null +++ b/gr26/api/can.go @@ -0,0 +1,168 @@ +package api + +import ( + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/gaucho-racing/mapache/gr26/model" + "github.com/gaucho-racing/mapache/gr26/service" + + mapache "github.com/gaucho-racing/mapache/mapache-go/v3" + + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +// canMessageResponse is the GET /gr26/messages/:id wire shape. We +// hex-encode the bytes (rather than the default base64 for []byte) so the +// dashboard can render the hex grid without re-encoding. +type canMessageResponse struct { + ID string `json:"id"` + VehicleID string `json:"vehicle_id"` + NodeID string `json:"node_id"` + Timestamp int `json:"timestamp"` + CANID int `json:"can_id"` + Bytes string `json:"bytes"` + UploadKey int `json:"upload_key"` + Metadata map[string]any `json:"metadata,omitempty"` + ProducedAt string `json:"produced_at"` + CreatedAt string `json:"created_at"` + Fields []canFieldTrace `json:"fields"` + Signals []mapache.Signal `json:"signals"` +} + +// canFieldTrace describes one field of a decoded CAN frame: its byte +// range within the frame, decoded raw value, and which signal names it +// produces. +type canFieldTrace struct { + Name string `json:"name"` + Offset int `json:"offset"` + Size int `json:"size"` + Sign string `json:"sign"` + Endian string `json:"endian"` + Bytes string `json:"bytes"` + RawValue int `json:"raw_value"` + SignalNames []string `json:"signal_names"` +} + +func GetCANMessage(c *gin.Context) { + id := c.Param("id") + if id == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "id is required"}) + return + } + respondWithCAN(c, service.GetCAN, id, "can message not found") +} + +// GetCANBySignalID returns the same trace shape as GetCANMessage but +// looks up the CAN frame by the signal id that came from it. Lets the +// dashboard go straight from a streamed signal.id (which is just +// mapache.Signal — no extra wire fields) to its source frame in one +// call. +func GetCANBySignalID(c *gin.Context) { + id := c.Param("id") + if id == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "id is required"}) + return + } + respondWithCAN(c, service.GetCANForSignal, id, "no can frame linked to this signal") +} + +func respondWithCAN( + c *gin.Context, + lookup func(string) (model.CAN, error), + id string, + notFoundMsg string, +) { + can, err := lookup(id) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": notFoundMsg}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + signals, err := service.GetSignalsForCAN(can.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + fields := decodeFieldTrace(can) + + var meta map[string]any + if len(can.Metadata) > 0 { + _ = json.Unmarshal(can.Metadata, &meta) + } + + c.JSON(http.StatusOK, canMessageResponse{ + ID: can.ID, + VehicleID: can.VehicleID, + NodeID: can.NodeID, + Timestamp: can.Timestamp, + CANID: can.CANID, + Bytes: hex.EncodeToString(can.Bytes), + UploadKey: can.UploadKey, + Metadata: meta, + ProducedAt: can.ProducedAt.UTC().Format("2006-01-02T15:04:05.000000Z"), + CreatedAt: can.CreatedAt.UTC().Format("2006-01-02T15:04:05.000000Z"), + Fields: fields, + Signals: signals, + }) +} + +// decodeFieldTrace re-runs the gr26 decoder over the stored bytes so the +// response carries per-field metadata (offset, size, sign, endian, the +// bytes that contributed, the raw value, and the signal names produced). +// Returns nil if the can id has no registered decoder or if the bytes +// don't fit the field layout — both of those are already captured in +// can.Metadata. +func decodeFieldTrace(can model.CAN) []canFieldTrace { + messageStruct := model.GetMessage(can.CANID) + if messageStruct == nil { + return nil + } + if err := messageStruct.FillFromBytes(can.Bytes); err != nil { + return nil + } + + out := make([]canFieldTrace, 0, len(messageStruct)) + offset := 0 + for _, f := range messageStruct { + signalNames := make([]string, 0) + for _, s := range f.ExportSignals() { + signalNames = append(signalNames, fmt.Sprintf("%s_%s", can.NodeID, s.Name)) + } + out = append(out, canFieldTrace{ + Name: f.Name, + Offset: offset, + Size: f.Size, + Sign: signMode(f.Sign), + Endian: endian(f.Endian), + Bytes: hex.EncodeToString(f.Bytes), + RawValue: f.Value, + SignalNames: signalNames, + }) + offset += f.Size + } + return out +} + +func signMode(s mapache.SignMode) string { + if s == mapache.Signed { + return "signed" + } + return "unsigned" +} + +func endian(e mapache.Endian) string { + if e == mapache.BigEndian { + return "big" + } + return "little" +} diff --git a/gr26/database/db.go b/gr26/database/db.go index d6ad7f4..48be14a 100644 --- a/gr26/database/db.go +++ b/gr26/database/db.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gaucho-racing/mapache/gr26/config" + "github.com/gaucho-racing/mapache/gr26/model" "github.com/gaucho-racing/mapache/gr26/pkg/logger" "github.com/gaucho-racing/mapache/mapache-go/v3" @@ -30,7 +31,7 @@ func Init() { } } else { logger.SugarLogger.Infoln("Connected to database") - db.AutoMigrate(&mapache.Signal{}, &mapache.Ping{}) + db.AutoMigrate(&mapache.Signal{}, &mapache.Ping{}, &model.CAN{}, &model.CANSignal{}) logger.SugarLogger.Infoln("AutoMigration complete") DB = db } diff --git a/gr26/model/can.go b/gr26/model/can.go new file mode 100644 index 0000000..84f9d69 --- /dev/null +++ b/gr26/model/can.go @@ -0,0 +1,41 @@ +package model + +import "time" + +// CAN is a stored record of a single decoded CAN frame received from a +// gr26 vehicle. The composite (vehicle_id, node_id, timestamp) tuple +// uniquely identifies a frame; ulid is the surrogate key used by the +// gr26_can_signal join table. +type CAN struct { + ID string `json:"id" gorm:"primaryKey"` + VehicleID string `json:"vehicle_id" gorm:"uniqueIndex:idx_gr26_can_unique"` + NodeID string `json:"node_id" gorm:"uniqueIndex:idx_gr26_can_unique"` + Timestamp int `json:"timestamp" gorm:"uniqueIndex:idx_gr26_can_unique"` + CANID int `json:"can_id"` + Bytes []byte `json:"bytes" gorm:"type:bytea"` + UploadKey int `json:"upload_key"` + // Metadata always carries a {"status": ...} field describing the + // decode outcome (ok, unknown_can_id, decode_error) and an optional + // "note" with the human-readable detail. + Metadata []byte `json:"metadata,omitempty" gorm:"type:jsonb"` + ProducedAt time.Time `json:"produced_at" gorm:"precision:6"` + CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;precision:6"` +} + +func (CAN) TableName() string { + return "gr26_can" +} + +// CANSignal joins each persisted signal back to the CAN frame it was +// decoded from. SignalID is the primary key because the relationship is +// one-to-many (one frame, many signals; each signal traces to exactly +// one frame). +type CANSignal struct { + SignalID string `json:"signal_id" gorm:"primaryKey"` + CANMessageID string `json:"can_message_id" gorm:"index"` + CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;precision:6"` +} + +func (CANSignal) TableName() string { + return "gr26_can_signal" +} diff --git a/gr26/service/can.go b/gr26/service/can.go new file mode 100644 index 0000000..4fc1e38 --- /dev/null +++ b/gr26/service/can.go @@ -0,0 +1,93 @@ +package service + +import ( + "github.com/gaucho-racing/mapache/gr26/database" + "github.com/gaucho-racing/mapache/gr26/model" + + mapache "github.com/gaucho-racing/mapache/mapache-go/v3" + + ulid "github.com/gaucho-racing/ulid-go" + "gorm.io/gorm/clause" +) + +// GetCAN looks up a stored CAN frame by its ulid. Returns the zero value +// if no row matches. +func GetCAN(id string) (model.CAN, error) { + var can model.CAN + if err := database.DB.Where("id = ?", id).First(&can).Error; err != nil { + return model.CAN{}, err + } + return can, nil +} + +// GetCANForSignal looks up the CAN frame that produced a given signal, +// joining via gr26_can_signal. Returns gorm.ErrRecordNotFound if the +// signal isn't linked to any frame (e.g., signal predates the trace +// feature or belongs to a different service). +func GetCANForSignal(signalID string) (model.CAN, error) { + var can model.CAN + err := database.DB. + Joins("JOIN gr26_can_signal ON gr26_can_signal.can_message_id = gr26_can.id"). + Where("gr26_can_signal.signal_id = ?", signalID). + First(&can).Error + if err != nil { + return model.CAN{}, err + } + return can, nil +} + +// GetSignalsForCAN returns every signal currently linked to the given +// CAN message via the gr26_can_signal join table. The query orders by +// signal name so the response is stable and easy to scan. +func GetSignalsForCAN(canMessageID string) ([]mapache.Signal, error) { + var signals []mapache.Signal + err := database.DB. + Joins("JOIN gr26_can_signal ON gr26_can_signal.signal_id = signal.id"). + Where("gr26_can_signal.can_message_id = ?", canMessageID). + Order("signal.name ASC"). + Find(&signals).Error + if err != nil { + return nil, err + } + return signals, nil +} + +// CreateCAN inserts (or upserts) a CAN frame record and returns it with +// the stored id populated. Conflicts on (vehicle_id, node_id, timestamp) +// update the payload columns and leave the id stable. +func CreateCAN(can model.CAN) (model.CAN, error) { + can.ID = ulid.Make().Prefixed("can") + result := database.DB.Clauses(clause.OnConflict{ + Columns: []clause.Column{ + {Name: "vehicle_id"}, + {Name: "node_id"}, + {Name: "timestamp"}, + }, + DoUpdates: clause.AssignmentColumns([]string{"can_id", "bytes", "upload_key", "metadata", "produced_at"}), + }, clause.Returning{Columns: []clause.Column{{Name: "id"}}}).Create(&can) + if result.Error != nil { + return model.CAN{}, result.Error + } + return can, nil +} + +// CreateCANSignals links each signal id to the CAN frame it was decoded +// from. Conflicts on signal_id update the can_message_id so the link +// always points at the most recent frame that produced the signal. +func CreateCANSignals(canMessageID string, signalIDs []string) error { + if len(signalIDs) == 0 { + return nil + } + rows := make([]model.CANSignal, len(signalIDs)) + for i, sid := range signalIDs { + rows[i] = model.CANSignal{ + SignalID: sid, + CANMessageID: canMessageID, + } + } + result := database.DB.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "signal_id"}}, + DoUpdates: clause.AssignmentColumns([]string{"can_message_id"}), + }).Create(&rows) + return result.Error +} diff --git a/gr26/service/message.go b/gr26/service/message.go index f0a124e..14ea75f 100644 --- a/gr26/service/message.go +++ b/gr26/service/message.go @@ -2,15 +2,19 @@ package service import ( "encoding/binary" + "encoding/json" "fmt" "strconv" "strings" "time" + "github.com/gaucho-racing/mapache/gr26/config" "github.com/gaucho-racing/mapache/gr26/model" "github.com/gaucho-racing/mapache/gr26/mqtt" "github.com/gaucho-racing/mapache/gr26/pkg/logger" + mapache "github.com/gaucho-racing/mapache/mapache-go/v3" + mq "github.com/eclipse/paho.mqtt.golang" ) @@ -62,34 +66,95 @@ func HandleMessage(vehicleID string, nodeID string, canID int, message []byte) { uploadKey := message[8:10] data := message[10:] - if !ValidateUploadKey(vehicleID, int(binary.BigEndian.Uint16(uploadKey))) { + uploadKeyInt := int(binary.BigEndian.Uint16(uploadKey)) + if !ValidateUploadKey(vehicleID, uploadKeyInt) { logger.SugarLogger.Infof("Upload key validation failed for vehicle %s, ignoring", vehicleID) return } + ts := int(binary.BigEndian.Uint64(timestamp)) + producedAt := time.UnixMicro(int64(ts)) + + // Attempt to decode first. If anything goes wrong, capture why in + // metadata so a "what bytes did we get that we couldn't parse" view + // has the answer alongside the raw frame. + var ( + signals []mapache.Signal + meta []byte + ) messageStruct := model.GetMessage(canID) - if messageStruct == nil { - logger.SugarLogger.Infof("Received unknown message id: %d, ignoring", canID) - return + switch { + case messageStruct == nil: + logger.SugarLogger.Infof("Received unknown message id: %d, frame stored without signals", canID) + meta = mustJSON(map[string]any{ + "status": "unknown_can_id", + "note": fmt.Sprintf("no decoder registered for can id 0x%X", canID), + }) + default: + if err := messageStruct.FillFromBytes(data); err != nil { + logger.SugarLogger.Infof("Error deserializing message id %d, frame stored without signals: %s", canID, err) + meta = mustJSON(map[string]any{ + "status": "decode_error", + "note": err.Error(), + }) + } else { + signals = messageStruct.ExportSignals() + meta = mustJSON(map[string]any{"status": "ok"}) + } } - err := messageStruct.FillFromBytes(data) + can, err := CreateCAN(model.CAN{ + VehicleID: vehicleID, + NodeID: nodeID, + Timestamp: ts, + CANID: canID, + Bytes: data, + UploadKey: uploadKeyInt, + Metadata: meta, + ProducedAt: producedAt, + }) if err != nil { - logger.SugarLogger.Infof("Error deserializing message: %s", err) + logger.SugarLogger.Infof("Error creating CAN record: %s", err) return } - signals := messageStruct.ExportSignals() - ts := int(binary.BigEndian.Uint64(timestamp)) + if len(signals) == 0 { + return + } now := time.Now().Truncate(time.Microsecond) for i := range signals { signals[i].Name = fmt.Sprintf("%s_%s", nodeID, signals[i].Name) signals[i].Timestamp = ts signals[i].VehicleID = vehicleID - signals[i].ProducedAt = time.UnixMicro(int64(ts)) + signals[i].ProducedAt = producedAt signals[i].CreatedAt = now } if err := CreateSignals(signals); err != nil { logger.SugarLogger.Infof("Error creating signals: %s", err) + return + } + + signalIDs := make([]string, len(signals)) + for i, s := range signals { + signalIDs[i] = s.ID + } + if err := CreateCANSignals(can.ID, signalIDs); err != nil { + logger.SugarLogger.Infof("Error creating CAN-signal links: %s", err) + } + + if config.EnableSignalWS { + for _, s := range signals { + Hub.Publish(s) + } + } +} + +func mustJSON(v any) []byte { + b, err := json.Marshal(v) + if err != nil { + // json.Marshal on a map of strings/strings can't fail in practice; + // fall back to a literal so callers always get a valid jsonb value. + return []byte(`{"status":"marshal_error"}`) } + return b } diff --git a/gr26/service/signal.go b/gr26/service/signal.go index dc8c6c7..9981679 100644 --- a/gr26/service/signal.go +++ b/gr26/service/signal.go @@ -12,6 +12,10 @@ import ( "gorm.io/gorm/clause" ) +// CreateSignal/CreateSignals are pure DB writes. WebSocket publishing +// lives in HandleMessage now so the published payload can carry the +// can_message_id alongside the signal. + func GetSignal(timestamp int, vehicleID string, name string) mapache.Signal { var signal mapache.Signal database.DB.Where("timestamp = ?", timestamp).Where("vehicle_id = ?", vehicleID).Where("name = ?", name).First(&signal) @@ -29,9 +33,6 @@ func CreateSignal(signal mapache.Signal) error { return fmt.Errorf("signal name cannot be empty") } signal.ID = ulid.Make().Prefixed("sgnl") - if config.EnableSignalWS { - Hub.Publish(signal) - } if config.EnableSignalDB { if database.DB.Where("timestamp = ?", signal.Timestamp).Where("vehicle_id = ?", signal.VehicleID).Where("name = ?", signal.Name).Updates(&signal).RowsAffected == 0 { logger.SugarLogger.Infow("[DB] New signal created", @@ -66,16 +67,15 @@ func CreateSignals(signals []mapache.Signal) error { } signals[i].ID = ulid.Make().Prefixed("sgnl") } - if config.EnableSignalWS { - for _, signal := range signals { - Hub.Publish(signal) - } - } if config.EnableSignalDB { + // Preserve the existing id on conflict so the join table in + // gr26_can_signal stays valid across retransmits. Returning id + // rewrites our locally-generated ULID with the actually-stored + // one when a row already existed. result := database.DB.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "timestamp"}, {Name: "vehicle_id"}, {Name: "name"}}, - DoUpdates: clause.AssignmentColumns([]string{"id", "value", "raw_value", "produced_at"}), - }).Create(&signals) + DoUpdates: clause.AssignmentColumns([]string{"value", "raw_value", "produced_at"}), + }, clause.Returning{Columns: []clause.Column{{Name: "id"}}}).Create(&signals) if result.Error != nil { return result.Error }