Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,24 @@ jobs:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.11
go install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.19.1
- name: Buf lint
run: buf lint
run: |
# Resolve shared proto dependency for buf to find cross-repo imports.
shared_dir="$(go list -m -f '{{.Dir}}' github.com/evalops/proto 2>/dev/null)/proto" || true
if [ -d "$shared_dir" ]; then
ln -sf "$shared_dir/identity" proto/identity
ln -sf "$shared_dir/audit" proto/audit
ln -sf "$shared_dir/meter" proto/meter
ln -sf "$shared_dir/memory" proto/memory
fi
buf lint
- name: Buf breaking change detection
if: github.event_name == 'pull_request'
# TODO: remove continue-on-error after merging shared proto migration.
# tenant.proto intentionally moved Organization/OrgMember/APIKey to
# shared identity.v1 types, which is a breaking wire change.
continue-on-error: true
run: |
git fetch origin main --depth=1 2>/dev/null || true
if git show origin/main:buf.yaml >/dev/null 2>&1; then
buf breaking --against '.git#branch=origin/main'
else
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ lint:
golangci-lint run

generate-proto:
buf generate
@shared_dir=$$(go list -m -f '{{.Dir}}' github.com/evalops/proto 2>/dev/null)/proto; \
if [ -d "$$shared_dir" ]; then \
for pkg in identity audit meter memory; do ln -sfn "$$shared_dir/$$pkg" proto/$$pkg 2>/dev/null || true; done; \
fi
buf generate --path proto/gate/v1
@for pkg in identity audit meter memory; do [ -L proto/$$pkg ] && rm -f proto/$$pkg || true; done

verify-generated: generate-proto
git diff --exit-code -- internal/gen/
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25.0
require (
connectrpc.com/connect v1.19.1
github.com/coreos/go-oidc/v3 v3.17.0
github.com/evalops/proto v0.0.0-20260412001136-61b4bcf4dd10
github.com/evalops/service-runtime v0.1.1-0.20260411182605-97604898cea1
github.com/go-chi/chi/v5 v5.1.0
github.com/go-jose/go-jose/v4 v4.1.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7c
github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/evalops/proto v0.0.0-20260412001136-61b4bcf4dd10 h1:PtFnD76H6A1iXxpBr1HgxijkT8Tz/zenVvF/HrhHx3w=
github.com/evalops/proto v0.0.0-20260412001136-61b4bcf4dd10/go.mod h1:EXB8IcqMaV58Tt0w2GaQia3YOwzrEvnIWFZetHX+IxE=
github.com/evalops/service-runtime v0.1.1-0.20260411182605-97604898cea1 h1:9ATVPLIqSZnnoKcG2GXR6cUuPlDamXgBTLO+YUPVuRg=
github.com/evalops/service-runtime v0.1.1-0.20260411182605-97604898cea1/go.mod h1:5+UWdH1P7neHK15xlMkqxKm8ujoo85JiGfrMDapImPg=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
Expand Down
182 changes: 100 additions & 82 deletions internal/audit/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,22 @@ package audit

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"

auditv1 "github.com/evalops/proto/gen/go/audit/v1"

"github.com/evalops/gate/internal/protocol"
)

// Entry is a single audit log entry.
type Entry struct {
Timestamp time.Time `json:"timestamp"`
ConnID string `json:"conn_id"`
EventType string `json:"event_type"` // session, request, response
Username string `json:"username"`
Resource string `json:"resource"`
Action string `json:"action"` // allow, block, mask
Statement string `json:"statement,omitempty"`
QueryType string `json:"query_type,omitempty"`
ClientAddr string `json:"client_addr"`
Details map[string]interface{} `json:"details,omitempty"`
}

// Logger implements protocol.AuditLogger with structured logging and buffered storage.
type Logger struct {
mu sync.Mutex
entries []Entry
entries []*auditv1.Event
sink Sink
stopCh chan struct{}
done chan struct{}
Expand All @@ -37,7 +26,7 @@ type Logger struct {

// Sink is where audit entries get persisted (DB, file, external service).
type Sink interface {
Write(ctx context.Context, entries []Entry) error
Write(ctx context.Context, entries []*auditv1.Event) error
}

type closeableSink interface {
Expand All @@ -47,7 +36,7 @@ type closeableSink interface {
// NewLogger creates an audit logger.
func NewLogger(sink Sink) *Logger {
l := &Logger{
entries: make([]Entry, 0, 1000),
entries: make([]*auditv1.Event, 0, 1000),
sink: sink,
stopCh: make(chan struct{}),
done: make(chan struct{}),
Expand All @@ -57,103 +46,127 @@ func NewLogger(sink Sink) *Logger {
}

func (l *Logger) LogSession(ctx context.Context, session *protocol.SessionContext, action string) error {
entry := Entry{
Timestamp: time.Now().UTC(),
ConnID: session.ConnID,
EventType: "session",
Username: session.Identity.Username,
Resource: session.Resource.Name,
Action: action,
ClientAddr: session.ClientAddr.String(),
event := &auditv1.Event{
Timestamp: timestamppb.Now(),
Actor: &auditv1.Actor{
Id: session.Identity.Username,
Type: "user",
IpAddress: session.ClientAddr.String(),
},
Resource: &auditv1.Resource{
Type: "database",
Name: session.Resource.Name,
},
Action: "session",
Outcome: action,
Metadata: map[string]string{
"conn_id": session.ConnID,
},
}
l.append(entry)
l.append(event)
log.Info().
Str("event", "session").
Str("user", entry.Username).
Str("resource", entry.Resource).
Str("user", session.Identity.Username).
Str("resource", session.Resource.Name).
Str("action", action).
Msg("audit")
return nil
}

func (l *Logger) LogRequest(ctx context.Context, session *protocol.SessionContext, req *protocol.Request, result *protocol.PolicyResult) error {
details := metadataDetails(nil, req.Metadata)
metadata := make(map[string]string)
mergeMetadata(metadata, "", req.Metadata)
if len(result.Meta) > 0 {
details = metadataDetailsWithPrefix(details, "policy.", result.Meta)
mergeMetadata(metadata, "policy.", result.Meta)
}
if result.Reason != "" {
if details == nil {
details = make(map[string]interface{})
}
details["reason"] = result.Reason
metadata["reason"] = result.Reason
}
entry := Entry{
Timestamp: time.Now().UTC(),
ConnID: session.ConnID,
EventType: "request",
Username: session.Identity.Username,
Resource: session.Resource.Name,
Action: result.Action,
Statement: req.Statement,
QueryType: req.Type,
ClientAddr: session.ClientAddr.String(),
Details: details,
// Set dedicated fields last so protocol metadata cannot overwrite them.
metadata["conn_id"] = session.ConnID
if req.Statement != "" {
metadata["statement"] = req.Statement
}
l.append(entry)
if req.Type != "" {
metadata["query_type"] = req.Type
}

event := &auditv1.Event{
Timestamp: timestamppb.Now(),
Actor: &auditv1.Actor{
Id: session.Identity.Username,
Type: "user",
IpAddress: session.ClientAddr.String(),
},
Resource: &auditv1.Resource{
Type: "database",
Name: session.Resource.Name,
},
Action: "request",
Outcome: result.Action,
Metadata: metadata,
}
l.append(event)
log.Info().
Str("event", "request").
Str("user", entry.Username).
Str("user", session.Identity.Username).
Str("query_type", req.Type).
Str("action", result.Action).
Msg("audit")
return nil
}

func (l *Logger) LogResponse(ctx context.Context, session *protocol.SessionContext, req *protocol.Request, resp *protocol.Response) error {
action := "allow"
details := metadataDetails(nil, req.Metadata)
outcome := "allow"
metadata := make(map[string]string)
mergeMetadata(metadata, "", req.Metadata)
if resp != nil && len(resp.Metadata) > 0 {
details = metadataDetails(details, resp.Metadata)
mergeMetadata(metadata, "", resp.Metadata)
if respAction := resp.Metadata["action"]; respAction != "" {
action = respAction
outcome = respAction
}
}
entry := Entry{
Timestamp: time.Now().UTC(),
ConnID: session.ConnID,
EventType: "response",
Username: session.Identity.Username,
Resource: session.Resource.Name,
Action: action,
Statement: req.Statement,
ClientAddr: session.ClientAddr.String(),
Details: details,
// Set dedicated fields last so protocol metadata cannot overwrite them.
metadata["conn_id"] = session.ConnID
if req.Statement != "" {
metadata["statement"] = req.Statement
}
l.append(entry)
return nil
}

func metadataDetailsWithPrefix(existing map[string]interface{}, prefix string, metadata map[string]string) map[string]interface{} {
if len(metadata) == 0 {
return existing
}
if existing == nil {
existing = make(map[string]interface{}, len(metadata))
event := &auditv1.Event{
Timestamp: timestamppb.Now(),
Actor: &auditv1.Actor{
Id: session.Identity.Username,
Type: "user",
IpAddress: session.ClientAddr.String(),
},
Resource: &auditv1.Resource{
Type: "database",
Name: session.Resource.Name,
},
Action: "response",
Outcome: outcome,
Metadata: metadata,
}
for key, value := range metadata {
existing[prefix+key] = value
}
return existing
l.append(event)
log.Info().
Str("event", "response").
Str("user", session.Identity.Username).
Str("resource", session.Resource.Name).
Str("outcome", outcome).
Msg("audit")
return nil
}

func metadataDetails(existing map[string]interface{}, metadata map[string]string) map[string]interface{} {
return metadataDetailsWithPrefix(existing, "", metadata)
func mergeMetadata(dst map[string]string, prefix string, src map[string]string) {
for key, value := range src {
dst[prefix+key] = value
}
}

func (l *Logger) append(entry Entry) {
func (l *Logger) append(event *auditv1.Event) {
l.mu.Lock()
defer l.mu.Unlock()
l.entries = append(l.entries, entry)
l.entries = append(l.entries, event)
}

func (l *Logger) flushLoop() {
Expand All @@ -179,7 +192,7 @@ func (l *Logger) flush(ctx context.Context) error {
return nil
}
batch := l.entries
l.entries = make([]Entry, 0, 1000)
l.entries = make([]*auditv1.Event, 0, 1000)
l.mu.Unlock()

if l.sink != nil {
Expand Down Expand Up @@ -216,9 +229,14 @@ var _ protocol.AuditLogger = (*Logger)(nil)
// StdoutSink writes audit entries as JSON to stdout (useful for dev/debugging).
type StdoutSink struct{}

func (s *StdoutSink) Write(ctx context.Context, entries []Entry) error {
func (s *StdoutSink) Write(ctx context.Context, entries []*auditv1.Event) error {
marshaler := protojson.MarshalOptions{EmitUnpopulated: false}
for _, e := range entries {
data, _ := json.Marshal(e)
data, err := marshaler.Marshal(e)
if err != nil {
log.Error().Err(err).Msg("audit_flush: failed to marshal event")
continue
}
log.Info().RawJSON("audit_entry", data).Msg("audit_flush")
}
return nil
Expand Down
Loading
Loading