Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .commitlintrc.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
module.exports = {
extends: ['@commitlint/config-conventional'],
rules: {
'body-max-line-length': [0],
'type-enum': [2, 'always', [
'build',
'chore',
'ci',
'docs',
'wiki',
'feat',
'fix',
'perf',
'refactor',
'revert',
'style',
'test',
'vibe'
'plan'
'vision',
]],
},
};
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,20 @@ export PSSTD_GOSSIP=":7947" # peer sync listen address, de
export PSSTD_SEEDS="10.0.1.20:7946,10.0.1.21:7946" # explicit peer sync addresses
export PSSTD_DB="./data" # local state directory
export PSSTD_WEB="true" # set false for sync-only nodes
export PSSTD_NODE_NAME="rack-a-01" # optional stable node identity override
export PSSTD_NODE_TTL="15s" # how long this node's heartbeat stays online
./psstd
```

By default, psstd uses the OS hostname as the node identity. Set
`PSSTD_NODE_NAME` for cloned hosts, containers, or multiple test instances that
would otherwise publish the same hostname. The override must be non-empty and
must not contain whitespace.

Each node publishes its own heartbeat TTL with `PSSTD_NODE_TTL`. Shorter values
make stale/offline indication react faster; longer values are better for slow or
lossy networks. The default is `15s`, and values must be at least `2s`.

## Discovery

| Environment | How nodes find each other |
Expand Down
1 change: 0 additions & 1 deletion discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func discoverPeers() []string {
continue
}
peer := fmt.Sprintf("%s:%d", addr.String(), entry.Port)
log.Printf("mDNS: discovered peer %s (%s)", entry.Name, peer)
peers = append(peers, peer)
}
return peers
Expand Down
20 changes: 10 additions & 10 deletions gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"log"
"sync"
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/hashicorp/memberlist"
Expand All @@ -14,14 +13,15 @@ import (
var appVersion = "dev"

type NodeStats struct {
Name string `json:"name"`
Version string `json:"version,omitempty"`
WebURL string `json:"web,omitempty"`
CPU []float64 `json:"cpu"`
MemUsed uint64 `json:"mu"`
MemTotal uint64 `json:"mt"`
Load [3]float64 `json:"ld"`
UpdatedAt int64 `json:"ts"` // unix nano, LWW key
Name string `json:"name"`
Version string `json:"version,omitempty"`
WebURL string `json:"web,omitempty"`
TTLSeconds int `json:"ttl,omitempty"`
CPU []float64 `json:"cpu"`
MemUsed uint64 `json:"mu"`
MemTotal uint64 `json:"mt"`
Load [3]float64 `json:"ld"`
UpdatedAt int64 `json:"ts"` // unix nano, LWW key
}

var errStaleVersion = errors.New("stale version")
Expand Down Expand Up @@ -107,7 +107,7 @@ func purgeOfflineDifferentVersion(db *pebble.DB, version string) error {
}

func nodeRecordOffline(s NodeStats) bool {
return s.UpdatedAt == 0 || time.Since(time.Unix(0, s.UpdatedAt)) > 15*time.Second
return nodeHealth(s).State == healthOffline
}

// ── Delegate ──────────────────────────────────────────────────────────────────
Expand Down
150 changes: 123 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
Expand All @@ -15,19 +16,34 @@ import (
"github.com/hashicorp/memberlist"
)

type cliOptions struct {
List bool
}

const (
envDB = "PSSTD_DB"
envHTTP = "PSSTD_HTTP"
envGossip = "PSSTD_GOSSIP"
envSeeds = "PSSTD_SEEDS"
envHTTPAd = "PSSTD_ADVERTISE_HTTP"
envWeb = "PSSTD_WEB" // "true" to enable HTTP, default true
gossipPort = 7946
httpPort = 8080
envDB = "PSSTD_DB"
envHTTP = "PSSTD_HTTP"
envGossip = "PSSTD_GOSSIP"
envSeeds = "PSSTD_SEEDS"
envHTTPAd = "PSSTD_ADVERTISE_HTTP"
envWeb = "PSSTD_WEB" // "true" to enable HTTP, default true
envNodeName = "PSSTD_NODE_NAME"
envNodeTTL = "PSSTD_NODE_TTL"
gossipPort = 7946
httpPort = 8080
)

func main() {
opts := parseCLI(os.Args[1:])
hostname, _ := os.Hostname()
nodeName, err := nodeNameFromEnv(hostname)
if err != nil {
log.Fatalf("node name: %v", err)
}
nodeTTL, err := nodeTTLFromEnv()
if err != nil {
log.Fatalf("node ttl: %v", err)
}

dbPath := envOr(envDB, "./data")
httpAddr := envOr(envHTTP, fmt.Sprintf(":%d", httpPort))
Expand All @@ -47,7 +63,7 @@ func main() {
if err != nil {
if pebbleLockHeld(err) {
log.Printf("psstd already appears to own %s; starting terminal mirror instead", dbPath)
runTerminalMirror(hostname, gossipAddr, seeds)
runTerminalMirror(nodeName, gossipAddr, seeds, opts.List)
return
}
log.Fatalf("pebble open: %v", err)
Expand All @@ -65,7 +81,7 @@ func main() {
delegate := newKVDelegate(db, appVersion)

cfg := memberlist.DefaultLANConfig()
cfg.Name = hostname
cfg.Name = nodeName
cfg.BindAddr, cfg.BindPort = splitHostPort(gossipAddr)
cfg.Delegate = delegate
cfg.Events = newEventDelegate(db, appVersion)
Expand All @@ -79,7 +95,7 @@ func main() {
}
db = nil
log.Printf("psstd already appears to be listening on %s; starting terminal mirror instead", gossipAddr)
runTerminalMirror(hostname, gossipAddr, seeds)
runTerminalMirror(nodeName, gossipAddr, seeds, opts.List)
return
}
log.Fatalf("memberlist create: %v", err)
Expand All @@ -88,39 +104,87 @@ func main() {

// ── Discovery ────────────────────────────────────────────────────────────
// 1. Register ourselves via mDNS so peers can find us on LAN
stopMDNS := registerMDNS(hostname, cfg.BindPort)
stopMDNS := registerMDNS(nodeName, cfg.BindPort)
defer stopMDNS()

// 2. Scan for existing peers (mDNS + any explicit seeds)
discovered := discoverPeers()
allSeeds := append(seeds, discovered...)
joinedPeers := 0
joinErr := error(nil)
if len(allSeeds) > 0 {
if n, err := list.Join(allSeeds); err != nil {
log.Printf("join warning (joined %d): %v", n, err)
} else {
log.Printf("joined cluster, %d peer(s)", n)
}
} else {
log.Println("no peers found — running solo, will be discovered by others")
joinedPeers, joinErr = list.Join(allSeeds)
}
logStartupConfig(startupConfig{
NodeName: nodeName,
DBPath: dbPath,
HTTPAddr: httpAddr,
WebURL: webURL,
GossipAddr: gossipAddr,
WebEnabled: webEnabled,
Version: appVersion,
NodeTTL: nodeTTL,
SeedCount: len(seeds),
MDNSCount: len(discovered),
JoinedPeers: joinedPeers,
JoinErr: joinErr,
})

// ── Stats heartbeat ─────────────────────────────────────────────────────
go statsLoop(hostname, webURL, appVersion, db, delegate)
go statsLoop(nodeName, webURL, appVersion, nodeTTL, db, delegate)

// ── HTTP ─────────────────────────────────────────────────────────────────
if webEnabled {
mux := http.NewServeMux()
mux.HandleFunc("/", makeHandler(db, hostname))
log.Printf("psstd version=%s node=%s http=%s advertise=%s gossip=%s web=true", appVersion, hostname, httpAddr, webURL, gossipAddr)
mux.HandleFunc("/", makeHandler(db, nodeName))
if err := http.ListenAndServe(httpAddr, mux); err != nil {
log.Fatalf("http: %v", err)
}
} else {
log.Printf("psstd version=%s node=%s gossip=%s web=false", appVersion, hostname, gossipAddr)
select {} // block forever
}
}

func parseCLI(args []string) cliOptions {
fs := flag.NewFlagSet("psstd", flag.ExitOnError)
fs.SetOutput(os.Stderr)
var opts cliOptions
fs.BoolVar(&opts.List, "l", false, "render terminal mirror as a vertical node list")
fs.BoolVar(&opts.List, "list", false, "render terminal mirror as a vertical node list")
_ = fs.Parse(args)
return opts
}

type startupConfig struct {
NodeName string
DBPath string
HTTPAddr string
WebURL string
GossipAddr string
WebEnabled bool
Version string
NodeTTL time.Duration
SeedCount int
MDNSCount int
JoinedPeers int
JoinErr error
}

func logStartupConfig(cfg startupConfig) {
log.Print(startupSummary(cfg))
}

func startupSummary(cfg startupConfig) string {
join := "solo"
if cfg.JoinErr != nil {
join = fmt.Sprintf("warning joined=%d error=%q", cfg.JoinedPeers, cfg.JoinErr)
} else if cfg.JoinedPeers > 0 {
join = fmt.Sprintf("joined=%d", cfg.JoinedPeers)
}
return fmt.Sprintf("psstd startup: version=%s node=%s db=%s web=%t http=%s advertise=%s gossip=%s ttl=%s seeds=%d mdns=%d join=%s",
cfg.Version, cfg.NodeName, cfg.DBPath, cfg.WebEnabled, cfg.HTTPAddr, cfg.WebURL, cfg.GossipAddr, cfg.NodeTTL, cfg.SeedCount, cfg.MDNSCount, join)
}

func pebbleLockHeld(err error) bool {
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "lock") &&
Expand All @@ -137,7 +201,7 @@ func addressInUse(err error) bool {
strings.Contains(msg, "bind: only one usage of each socket address")
}

func runTerminalMirror(hostname, gossipAddr string, seeds []string) {
func runTerminalMirror(hostname, gossipAddr string, seeds []string, listMode bool) {
tmpDir, err := os.MkdirTemp("", "psstd-view-*")
if err != nil {
log.Fatalf("terminal mirror temp db: %v", err)
Expand Down Expand Up @@ -172,7 +236,7 @@ func runTerminalMirror(hostname, gossipAddr string, seeds []string) {
log.Printf("terminal mirror joined cluster, %d peer(s)", n)
}

terminalRenderLoop(db)
terminalRenderLoop(db, listMode)
}

func terminalMirrorSeeds(gossipAddr string, seeds []string) []string {
Expand All @@ -188,11 +252,11 @@ func terminalMirrorSeeds(gossipAddr string, seeds []string) []string {

// ── Stats loop ───────────────────────────────────────────────────────────────

func statsLoop(hostname, webURL, version string, db *pebble.DB, d *kvDelegate) {
func statsLoop(hostname, webURL, version string, ttl time.Duration, db *pebble.DB, d *kvDelegate) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats, err := collectStats(hostname, webURL, version)
stats, err := collectStats(hostname, webURL, version, ttl)
if err != nil {
log.Printf("stats error: %v", err)
continue
Expand Down Expand Up @@ -255,6 +319,38 @@ func envOr(key, def string) string {
return def
}

func nodeNameFromEnv(hostname string) (string, error) {
override, ok := os.LookupEnv(envNodeName)
if !ok || override == "" {
if strings.TrimSpace(hostname) == "" {
return "", fmt.Errorf("hostname is empty; set %s", envNodeName)
}
return hostname, nil
}
if strings.TrimSpace(override) != override || override == "" {
return "", fmt.Errorf("%s must not be empty or have leading/trailing whitespace", envNodeName)
}
if strings.ContainsAny(override, " \t\r\n") {
return "", fmt.Errorf("%s must not contain whitespace", envNodeName)
}
return override, nil
}

func nodeTTLFromEnv() (time.Duration, error) {
value, ok := os.LookupEnv(envNodeTTL)
if !ok || value == "" {
return defaultNodeTTL, nil
}
ttl, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("%s must be a duration such as 15s or 1m: %w", envNodeTTL, err)
}
if ttl < 2*time.Second {
return 0, fmt.Errorf("%s must be at least 2s", envNodeTTL)
}
return ttl, nil
}

func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
Expand Down
Loading
Loading