Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions lib/instances/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,8 @@ func (m *manager) createInstance(
// Success - release cleanup stack (prevent cleanup)
cu.Release()

// Return instance with derived state
finalInst := m.toInstance(ctx, meta)
if finalInst.BootMarkersHydrated {
if err := m.saveMetadata(meta); err != nil {
log.WarnContext(ctx, "failed to persist hydrated boot markers after create", "instance_id", id, "error", err)
}
}
// Return instance state from current metadata without forcing a log scan.
finalInst := m.toInstanceWithoutHydration(ctx, meta)
// Record metrics
if m.metrics != nil {
m.recordDuration(ctx, m.metrics.createDuration, start, "success", hvType)
Expand Down
2 changes: 1 addition & 1 deletion lib/instances/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe
if err != nil {
return nil, err
}
if inst != nil && inst.State == StateRunning {
if inst != nil && (inst.State == StateRunning || inst.State == StateInitializing) {
if err := ensureGuestAgentReadyForForkPhase(ctx, &inst.StoredMetadata, "before returning running fork instance"); err != nil {
return nil, fmt.Errorf("wait for forked guest agent readiness: %w", err)
}
Expand Down
12 changes: 11 additions & 1 deletion lib/instances/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ func TestForkCloudHypervisorFromRunningNetwork(t *testing.T) {
NetworkEnabled: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = manager.DeleteInstance(context.Background(), source.Id) })
sourceID := source.Id
t.Cleanup(func() { _ = manager.DeleteInstance(context.Background(), sourceID) })
source, err = waitForInstanceState(ctx, manager, source.Id, StateRunning, 20*time.Second)
require.NoError(t, err)
require.NoError(t, waitForVMReady(ctx, source.SocketPath, 5*time.Second))

assert.NotEmpty(t, source.IP)
Expand All @@ -399,13 +402,20 @@ func TestForkCloudHypervisorFromRunningNetwork(t *testing.T) {
TargetState: StateRunning,
})
require.NoError(t, err)
require.Contains(t, []State{StateInitializing, StateRunning}, forked.State)
forked, err = waitForInstanceState(ctx, manager, forked.Id, StateRunning, 20*time.Second)
require.NoError(t, err)
require.Equal(t, StateRunning, forked.State)
forkedID := forked.Id
t.Cleanup(func() { _ = manager.DeleteInstance(context.Background(), forkedID) })

// Source should be restored and still reachable by its private IP.
sourceAfterFork, err := manager.GetInstance(ctx, source.Id)
require.NoError(t, err)
if sourceAfterFork.State != StateRunning {
sourceAfterFork, err = waitForInstanceState(ctx, manager, source.Id, StateRunning, 20*time.Second)
require.NoError(t, err)
}
require.Equal(t, StateRunning, sourceAfterFork.State)
require.NotEmpty(t, sourceAfterFork.IP)
assertHostCanReachNginx(t, sourceAfterFork.IP, 80, 60*time.Second)
Expand Down
24 changes: 23 additions & 1 deletion lib/instances/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/kernel/hypeman/lib/logger"
Expand All @@ -31,6 +32,23 @@ var ErrTailNotFound = fmt.Errorf("tail command not found: required for log strea
// ErrLogNotFound is returned when the requested log file doesn't exist
var ErrLogNotFound = fmt.Errorf("log file not found")

var appLogNoiseMarkers = []string{
"HYPEMAN-PROGRAM-START",
"HYPEMAN-AGENT-READY",
"HYPEMAN-HEADERS-START",
"HYPEMAN-HEADERS-READY",
"HYPEMAN-HEADERS-FAILED",
}

func shouldSkipAppLogLine(line string) bool {
for _, marker := range appLogNoiseMarkers {
if strings.Contains(line, marker) {
return true
}
}
return false
}

// streamInstanceLogs streams instance logs from the specified source
// Returns last N lines, then continues following if follow=true
func (m *manager) streamInstanceLogs(ctx context.Context, id string, tail int, follow bool, source LogSource) (<-chan string, error) {
Expand Down Expand Up @@ -91,11 +109,15 @@ func (m *manager) streamInstanceLogs(ctx context.Context, id string, tail int, f

scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
if source == LogSourceApp && shouldSkipAppLogLine(line) {
continue
}
select {
case <-ctx.Done():
log.DebugContext(ctx, "log stream cancelled", "instance_id", id)
return
case out <- scanner.Text():
case out <- line:
}
}

Expand Down
46 changes: 46 additions & 0 deletions lib/instances/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package instances

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestShouldSkipAppLogLine(t *testing.T) {
t.Parallel()

tests := []struct {
name string
line string
want bool
}{
{
name: "program start marker",
line: "2026-03-10T10:00:00Z [INFO] HYPEMAN-PROGRAM-START ts=2026-03-10T10:00:00Z mode=exec",
want: true,
},
{
name: "agent ready marker",
line: "2026-03-10T10:00:01Z [INFO] HYPEMAN-AGENT-READY ts=2026-03-10T10:00:01Z",
want: true,
},
{
name: "headers marker",
line: "2026-03-10T10:00:02Z [INFO] HYPEMAN-HEADERS-READY",
want: true,
},
{
name: "normal app log line",
line: "2026-03-10T10:00:03Z [INFO] build completed successfully",
want: false,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tt.want, shouldSkipAppLogLine(tt.line))
})
}
}
31 changes: 29 additions & 2 deletions lib/instances/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ type stateResult struct {
// deriveState determines instance state by checking socket and querying the hypervisor.
// Returns StateUnknown with an error message if the socket exists but hypervisor is unreachable.
func (m *manager) deriveState(ctx context.Context, stored *StoredMetadata) stateResult {
return m.deriveStateWithOptions(ctx, stored, true)
}

// deriveStateWithoutHydration determines instance state without scanning serial logs
// to hydrate missing boot markers.
func (m *manager) deriveStateWithoutHydration(ctx context.Context, stored *StoredMetadata) stateResult {
return m.deriveStateWithOptions(ctx, stored, false)
}

func (m *manager) deriveStateWithOptions(ctx context.Context, stored *StoredMetadata, hydrateBootMarkers bool) stateResult {
log := logger.FromContext(ctx)

// 1. Check if socket exists
Expand Down Expand Up @@ -75,7 +85,10 @@ func (m *manager) deriveState(ctx context.Context, stored *StoredMetadata) state
case hypervisor.StateCreated:
return stateResult{State: StateCreated}
case hypervisor.StateRunning:
hydrated := m.hydrateBootMarkersFromLogs(stored)
hydrated := false
if hydrateBootMarkers {
hydrated = m.hydrateBootMarkersFromLogs(stored)
}
return stateResult{
State: deriveRunningState(stored),
BootMarkersHydrated: hydrated,
Expand Down Expand Up @@ -302,7 +315,21 @@ func (m *manager) hasSnapshot(dataDir string) bool {

// toInstance converts stored metadata to Instance with derived fields
func (m *manager) toInstance(ctx context.Context, meta *metadata) Instance {
result := m.deriveState(ctx, &meta.StoredMetadata)
return m.toInstanceWithStateDerivation(ctx, meta, true)
}

func (m *manager) toInstanceWithoutHydration(ctx context.Context, meta *metadata) Instance {
return m.toInstanceWithStateDerivation(ctx, meta, false)
}

func (m *manager) toInstanceWithStateDerivation(ctx context.Context, meta *metadata, hydrateBootMarkers bool) Instance {
var result stateResult
if hydrateBootMarkers {
result = m.deriveState(ctx, &meta.StoredMetadata)
} else {
result = m.deriveStateWithoutHydration(ctx, &meta.StoredMetadata)
}

inst := Instance{
StoredMetadata: meta.StoredMetadata,
State: result.State,
Expand Down
9 changes: 2 additions & 7 deletions lib/instances/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,8 @@ func (m *manager) restoreInstance(
log.WarnContext(ctx, "failed to update metadata after restore", "instance_id", id, "error", err)
}

// Return instance with derived state (should be Running now)
finalInst := m.toInstance(ctx, meta)
if finalInst.BootMarkersHydrated {
if err := m.saveMetadata(meta); err != nil {
log.WarnContext(ctx, "failed to persist hydrated boot markers after restore", "instance_id", id, "error", err)
}
}
// Return instance state from current metadata without forcing a log scan.
finalInst := m.toInstanceWithoutHydration(ctx, meta)
// Record metrics
if m.metrics != nil {
m.recordDuration(ctx, m.metrics.restoreDuration, start, "success", stored.HypervisorType)
Expand Down
9 changes: 2 additions & 7 deletions lib/instances/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,8 @@ func (m *manager) startInstance(
log.WarnContext(ctx, "failed to update metadata after VM start", "instance_id", id, "error", err)
}

// Return instance with derived state (should be Running now)
finalInst := m.toInstance(ctx, meta)
if finalInst.BootMarkersHydrated {
if err := m.saveMetadata(meta); err != nil {
log.WarnContext(ctx, "failed to persist hydrated boot markers after start", "instance_id", id, "error", err)
}
}
// Return instance state from current metadata without forcing a log scan.
finalInst := m.toInstanceWithoutHydration(ctx, meta)
// Record metrics
if m.metrics != nil {
m.recordDuration(ctx, m.metrics.startDuration, start, "success", stored.HypervisorType)
Expand Down
2 changes: 1 addition & 1 deletion lib/instances/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type StoredMetadata struct {

// Timestamps (stored for historical tracking)
CreatedAt time.Time
StartedAt *time.Time // Last time VM was started
StartedAt *time.Time // Boot epoch start time (set on create/start; preserved across standby restore)
StoppedAt *time.Time // Last time VM was stopped

// Boot progress markers (derived from guest serial log sentinels and persisted)
Expand Down
6 changes: 3 additions & 3 deletions lib/network/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (m *manager) getOrInitDefaultNetwork(ctx context.Context) (*Network, error)
return network, nil
}

// Self-heal should never delete TAPs for active instances. We pass an empty
// preserve set so CleanupOrphanedTAPs is skipped in Initialize.
if initErr := m.Initialize(ctx, []string{}); initErr != nil {
// Self-heal should never delete TAPs for active instances. We pass nil so
// CleanupOrphanedTAPs is skipped in Initialize.
if initErr := m.Initialize(ctx, nil); initErr != nil {
return nil, fmt.Errorf("initialize network manager: %w", initErr)
}

Expand Down
71 changes: 53 additions & 18 deletions lib/system/init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

"github.com/kernel/hypeman/lib/vmconfig"
)

func main() {
Expand Down Expand Up @@ -50,31 +54,46 @@ func main() {
dropToShell()
}

// Phase 4/5: Run independent setup tasks in parallel.
// Keep strict dependencies around mount -> overlay -> config and
// bind-mount barrier before mode handoff.
var setupWG sync.WaitGroup
if cfg.NetworkEnabled {
setupWG.Add(1)
runNetworkSetup := func() {
if !cfg.NetworkEnabled {
return
}
if err := configureNetwork(log, cfg); err != nil {
log.Error("hypeman-init:network", "failed to configure network", err)
// Continue anyway - network isn't always required
}
}
runVolumesSetup := func() {
if len(cfg.VolumeMounts) == 0 {
return
}
if err := mountVolumes(log, cfg); err != nil {
log.Error("hypeman-init:volumes", "failed to mount volumes", err)
// Continue anyway
}
}

// Phase 4/5: Run setup tasks.
// Network + volume setup are parallelized only when mounted paths are disjoint
// from /etc, because network setup writes /overlay/newroot/etc/resolv.conf.
if shouldRunNetworkAndVolumesInParallel(cfg) {
var setupWG sync.WaitGroup
setupWG.Add(2)
go func() {
defer setupWG.Done()
if err := configureNetwork(log, cfg); err != nil {
log.Error("hypeman-init:network", "failed to configure network", err)
// Continue anyway - network isn't always required
}
runNetworkSetup()
}()
}
if len(cfg.VolumeMounts) > 0 {
setupWG.Add(1)
go func() {
defer setupWG.Done()
if err := mountVolumes(log, cfg); err != nil {
log.Error("hypeman-init:volumes", "failed to mount volumes", err)
// Continue anyway
}
runVolumesSetup()
}()
setupWG.Wait()
} else {
// When /etc (or /etc/*) is volume-mounted, configure network after volumes
// so resolv.conf is written into the mounted path instead of being hidden.
runVolumesSetup()
runNetworkSetup()
}
setupWG.Wait()

// Phase 6: Bind mount filesystems to new root
if err := bindMountsToNewRoot(log); err != nil {
Expand Down Expand Up @@ -106,6 +125,22 @@ func main() {
}
}

func shouldRunNetworkAndVolumesInParallel(cfg *vmconfig.Config) bool {
if !cfg.NetworkEnabled || len(cfg.VolumeMounts) == 0 {
return false
}

for _, vol := range cfg.VolumeMounts {
// Normalize to an absolute path inside the guest.
mountPath := filepath.Clean("/" + strings.TrimPrefix(vol.Path, "/"))
if mountPath == "/" || mountPath == "/etc" || strings.HasPrefix(mountPath, "/etc/") {
return false
}
}

return true
}

// dropToShell drops to an interactive shell for debugging when boot fails
func dropToShell() {
fmt.Fprintln(os.Stderr, "FATAL: dropping to shell for debugging")
Expand Down
Loading