Skip to content
Merged
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/go-chi/chi/v5 v5.2.2
github.com/google/uuid v1.6.0
github.com/hashicorp/yamux v0.1.2
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/stretchr/testify v1.11.1
golang.org/x/mod v0.33.0
google.golang.org/protobuf v1.36.11
Expand Down Expand Up @@ -177,8 +179,6 @@ require (
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.4 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
Expand Down
19 changes: 19 additions & 0 deletions helm/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ spec:
{{- with .Values.labels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.metrics.enabled }}
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: {{ .Values.metrics.port | quote }}
prometheus.io/path: "/metrics"
{{- end }}
spec:
serviceAccountName: {{ .Values.serviceAccount.name | quote }}
restartPolicy: Always
Expand Down Expand Up @@ -111,10 +117,23 @@ spec:
imagePullPolicy: {{ .Values.image.pullPolicy }}
command:
- /coder-logstream-kube
{{- if .Values.metrics.enabled }}
ports:
- name: metrics
containerPort: {{ .Values.metrics.port }}
protocol: TCP
{{- end }}
resources: {{ toYaml .Values.resources | nindent 12 }}
env:
- name: CODER_URL
value: {{ .Values.url }}
{{- if .Values.metrics.enabled }}
- name: CODER_LOGSTREAM_METRICS_ADDR
value: ":{{ .Values.metrics.port }}"
{{- else }}
- name: CODER_LOGSTREAM_METRICS_ADDR
value: ""
{{- end }}
{{- if .Values.namespaces }}
- name: CODER_NAMESPACES
value: {{ join "," .Values.namespaces }}
Expand Down
7 changes: 7 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ tolerations:
# value: "value"
# effect: "NoSchedule"

# metrics -- Prometheus metrics configuration.
metrics:
# metrics.enabled -- Whether to expose a Prometheus /metrics endpoint.
enabled: false
# metrics.port -- The port to serve Prometheus metrics on.
port: 9100

# labels -- The pod labels for coder-logstream-kube. See:
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
labels: {}
Expand Down
34 changes: 9 additions & 25 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"cdr.dev/slog/v3"
"golang.org/x/mod/semver"
"storj.io/drpc"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
Expand Down Expand Up @@ -557,7 +556,7 @@ func (l *logQueuer) cleanup() {
}

func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) {
client := agentsdk.New(l.coderURL, agentsdk.WithFixedToken(log.agentToken))
client := newInstrumentedClient(l.coderURL, log.agentToken)

logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)
Expand Down Expand Up @@ -590,28 +589,11 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif
}
supportsRole := buildInfoErr == nil && versionAtLeast(buildInfo.Version, "v2.31.0")

var (
logDest agentsdk.LogDest
rpcConn drpc.Conn
)
if supportsRole {
arpc, _, err := client.ConnectRPC28WithRole(gracefulCtx, "logstream-kube")
if err != nil {
logger.Error(ctx, "drpc connect with role", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
logDest = arpc
rpcConn = arpc.DRPCConn()
} else {
arpc, err := client.ConnectRPC20(gracefulCtx)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
logDest = arpc
rpcConn = arpc.DRPCConn()
logDest, rpcConn, err := client.connectLogDest(gracefulCtx, supportsRole)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
go func() {
err := ls.SendLoop(gracefulCtx, logDest)
Expand Down Expand Up @@ -690,7 +672,9 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
if len(queuedLogs) == 0 {
return
}
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
sendErr := lgr.scriptLogger.Send(ctx, queuedLogs...)
recordSendResult(sendErr)
if sendErr != nil {
l.scheduleRetry(ctx, log.agentToken)
return
}
Expand Down
25 changes: 24 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -30,6 +31,7 @@ func root() *serpent.Command {
kubeConfig string
namespacesStr string
labelSelector string
metricsAddr string
)
cmd := &serpent.Command{
Use: "coder-logstream-kube",
Expand Down Expand Up @@ -73,6 +75,14 @@ func root() *serpent.Command {
Value: serpent.StringOf(&labelSelector),
Description: "Label selector to use when listing pods.",
},
{
Name: "metrics-addr",
Flag: "metrics-addr",
Env: "CODER_LOGSTREAM_METRICS_ADDR",
Default: "",
Value: serpent.StringOf(&metricsAddr),
Description: "Address to serve Prometheus metrics on. Set to empty to disable.",
},
},
Handler: func(inv *serpent.Invocation) error {
if coderURL == "" {
Expand Down Expand Up @@ -115,13 +125,15 @@ func root() *serpent.Command {
}
}

logger := slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug)

reporter, err := newPodEventLogger(inv.Context(), podEventLoggerOptions{
coderURL: parsedURL,
client: client,
namespaces: namespaces,
fieldSelector: fieldSelector,
labelSelector: labelSelector,
logger: slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug),
logger: logger,
maxRetries: 15, // 15 retries is the default max retries for a log send failure.
})
if err != nil {
Expand All @@ -130,6 +142,17 @@ func root() *serpent.Command {
defer func() {
_ = reporter.Close()
}()

if metricsAddr != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", metricsHandler())
go func() {
if err := http.ListenAndServe(metricsAddr, mux); err != nil {
logger.Error(inv.Context(), "metrics server failed", slog.Error(err))
}
}()
}

select {
case err := <-reporter.errChan:
return fmt.Errorf("pod event reporter: %w", err)
Expand Down
86 changes: 86 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"context"
"net/http"
"net/url"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"storj.io/drpc"
)

var (
requestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "coder_logstream_requests_total",
Help: "Total number of requests to the Coder API.",
}, []string{"method", "status"})
)

func init() {
prometheus.MustRegister(requestsTotal)

// Initialize label combinations so they appear in /metrics at zero.
for _, method := range []string{"PostLogSource", "ConnectRPC", "SendLog"} {
Comment thread
kacpersaw marked this conversation as resolved.
Outdated
requestsTotal.WithLabelValues(method, "success")
requestsTotal.WithLabelValues(method, "failure")
}
}

func metricsHandler() http.Handler {
return promhttp.Handler()
}

// record is a helper that increments the appropriate request counter.
func record(method string, err error) {
if err != nil {
requestsTotal.WithLabelValues(method, "failure").Inc()
} else {
requestsTotal.WithLabelValues(method, "success").Inc()
}
}

// instrumentedClient wraps agentsdk.Client to record Prometheus metrics
// on every API call. This keeps metric instrumentation in one place
// rather than scattered across call sites.
type instrumentedClient struct {
*agentsdk.Client
}

func newInstrumentedClient(coderURL *url.URL, token string) *instrumentedClient {
return &instrumentedClient{
Client: agentsdk.New(coderURL, agentsdk.WithFixedToken(token)),
}
}

func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.PostLogSourceRequest) (codersdk.WorkspaceAgentLogSource, error) {
resp, err := c.Client.PostLogSource(ctx, req)
record("PostLogSource", err)
return resp, err
}

// connectLogDest establishes the appropriate RPC connection based on
// server capabilities, recording metrics for the attempt.
func (c *instrumentedClient) connectLogDest(ctx context.Context, supportsRole bool) (agentsdk.LogDest, drpc.Conn, error) {
if supportsRole {
arpc, _, err := c.ConnectRPC28WithRole(ctx, "logstream-kube")
record("ConnectRPC", err)
if err != nil {
return nil, nil, err
}
return arpc, arpc.DRPCConn(), nil
}
arpc, err := c.ConnectRPC20(ctx)
record("ConnectRPC", err)
if err != nil {
return nil, nil, err
}
return arpc, arpc.DRPCConn(), nil
}

// recordSendResult records the result of a log send operation.
func recordSendResult(err error) {
record("SendLog", err)
}
91 changes: 91 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"io"
"net"
"net/http"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

func getCounterValue(t *testing.T, cv *prometheus.CounterVec, labels ...string) float64 {
t.Helper()
m := &dto.Metric{}
c, err := cv.GetMetricWithLabelValues(labels...)
require.NoError(t, err)
require.NoError(t, c.Write(m))
return m.GetCounter().GetValue()
}

func TestMetricsIncrement(t *testing.T) {
t.Parallel()

// Record baseline values (metrics are global and may have been
// incremented by other tests running in the same process).
Comment thread
kacpersaw marked this conversation as resolved.
Outdated
baseSuccess := getCounterValue(t, requestsTotal, "PostLogSource", "success")
baseFailure := getCounterValue(t, requestsTotal, "PostLogSource", "failure")
baseSendSuccess := getCounterValue(t, requestsTotal, "SendLog", "success")

// Simulate success via record helper
record("PostLogSource", nil)
require.Equal(t, baseSuccess+1, getCounterValue(t, requestsTotal, "PostLogSource", "success"))

// Simulate failure via record helper
record("PostLogSource", io.ErrUnexpectedEOF)
require.Equal(t, baseFailure+1, getCounterValue(t, requestsTotal, "PostLogSource", "failure"))

// Simulate send success
recordSendResult(nil)
require.Equal(t, baseSendSuccess+1, getCounterValue(t, requestsTotal, "SendLog", "success"))
}

func TestMetricsHandler(t *testing.T) {
t.Parallel()

handler := metricsHandler()
require.NotNil(t, handler)
}

func TestMetricsEndpoint(t *testing.T) {
t.Parallel()

// Pick a random free port.
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
addr := listener.Addr().String()
_ = listener.Close()

mux := http.NewServeMux()
mux.Handle("/metrics", metricsHandler())
srv := &http.Server{Addr: addr, Handler: mux}
go func() { _ = srv.ListenAndServe() }()
t.Cleanup(func() { _ = srv.Close() })

// Wait for the server to be ready.
require.Eventually(t, func() bool {
resp, err := http.Get("http://" + addr + "/metrics")
if err != nil {
return false
}
_ = resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 2*time.Second, 50*time.Millisecond)

// Bump a counter and verify it appears in the output.
record("PostLogSource", nil)

resp, err := http.Get("http://" + addr + "/metrics")
require.NoError(t, err)
defer func() { _ = resp.Body.Close() }()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

require.True(t, strings.Contains(string(body), "coder_logstream_requests_total"),
"expected coder_logstream_requests_total in metrics output")
}