Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions lib/instances/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
restartpolicy "github.com/kernel/hypeman/lib/restart-policy"
"github.com/nrednav/cuid2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"gvisor.dev/gvisor/pkg/cleanup"
)

Expand Down Expand Up @@ -264,7 +265,7 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin
return nil, false, err
}

existsByMetadata, err := m.instanceNameExists(req.Name)
existsByMetadata, err := m.instanceNameExists(ctx, req.Name, "fork_precheck")
if err != nil {
return nil, false, fmt.Errorf("check instance name availability: %w", err)
}
Expand Down Expand Up @@ -422,31 +423,51 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin
}

func (m *manager) saveForkMetadata(ctx context.Context, meta *metadata) error {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.fork_metadata.save",
trace.WithAttributes(attribute.String("operation", "save_fork_metadata")),
)
var retErr error
defer func() { finishInstancesSpan(span, retErr) }()

lockCtx, lockWaitSpan := m.tracerOrDefault().Start(ctx, "instances.fork_metadata.lock_wait",
trace.WithAttributes(attribute.String("operation", "fork_metadata_lock_wait")),
)
m.forkMetadataMu.Lock()
finishInstancesSpan(lockWaitSpan, nil)

holdCtx, lockHoldSpan := m.tracerOrDefault().Start(lockCtx, "instances.fork_metadata.lock_hold",
trace.WithAttributes(attribute.String("operation", "fork_metadata_lock_hold")),
)
defer m.forkMetadataMu.Unlock()
defer func() { finishInstancesSpan(lockHoldSpan, retErr) }()

// Earlier name checks are advisory so callers can fail before doing fork
// work when possible. This is the serialized admission point for fork
// metadata, so concurrent forks re-check names immediately before save.
name := meta.Name
existsByMetadata, err := m.instanceNameExists(name)
existsByMetadata, err := m.instanceNameExists(holdCtx, name, "fork_admission")
if err != nil {
return fmt.Errorf("check instance name availability: %w", err)
retErr = fmt.Errorf("check instance name availability: %w", err)
return retErr
}
if existsByMetadata {
return fmt.Errorf("%w: instance name '%s' already exists", ErrAlreadyExists, name)
retErr = fmt.Errorf("%w: instance name '%s' already exists", ErrAlreadyExists, name)
return retErr
}
if meta.NetworkEnabled {
exists, err := m.networkManager.NameExists(ctx, name, "")
exists, err := m.networkManager.NameExists(holdCtx, name, "")
if err != nil {
return fmt.Errorf("check instance name availability: %w", err)
retErr = fmt.Errorf("check instance name availability: %w", err)
return retErr
}
if exists {
return fmt.Errorf("%w: instance name '%s' already exists in network", ErrAlreadyExists, name)
retErr = fmt.Errorf("%w: instance name '%s' already exists in network", ErrAlreadyExists, name)
return retErr
}
}
if err := m.saveMetadata(meta); err != nil {
return fmt.Errorf("save fork metadata: %w", err)
retErr = fmt.Errorf("save fork metadata: %w", err)
return retErr
}
return nil
}
Expand Down
35 changes: 32 additions & 3 deletions lib/instances/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/kernel/hypeman/lib/instances/phasetracking"
"github.com/kernel/hypeman/lib/logger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// exitSentinelPrefix is the machine-parseable prefix written by init to serial console.
Expand Down Expand Up @@ -879,22 +880,39 @@ func (m *manager) listInstances(ctx context.Context) ([]Instance, error) {
return result, nil
}

func (m *manager) findInstanceMetadataByExactName(name string) (*metadata, error) {
func (m *manager) findInstanceMetadataByExactName(ctx context.Context, name string) (*metadata, error) {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.metadata.find_exact_name",
trace.WithAttributes(attribute.String("operation", "find_exact_name")),
)
defer span.End()

files, err := m.listMetadataFiles()
if err != nil {
span.RecordError(err)
return nil, err
}
span.SetAttributes(attribute.Int("metadata_files", len(files)))

scanned := 0
for _, file := range files {
id := filepath.Base(filepath.Dir(file))
scanned++
meta, err := m.loadMetadata(id)
if err != nil {
continue
}
if meta.Name == name {
span.SetAttributes(
attribute.Int("metadata_files_scanned", scanned),
attribute.Bool("matched", true),
)
return meta, nil
}
}
span.SetAttributes(
attribute.Int("metadata_files_scanned", scanned),
attribute.Bool("matched", false),
)
return nil, ErrNotFound
}

Expand Down Expand Up @@ -949,14 +967,25 @@ func (m *manager) findInstanceMetadataByNameOrIDPrefix(idOrName string, minPrefi
return nil, ErrNotFound
}

func (m *manager) instanceNameExists(name string) (bool, error) {
_, err := m.findInstanceMetadataByExactName(name)
func (m *manager) instanceNameExists(ctx context.Context, name, caller string) (bool, error) {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.metadata.name_exists",
trace.WithAttributes(
attribute.String("operation", "metadata_name_exists"),
attribute.String("caller", caller),
),
)
defer span.End()

_, err := m.findInstanceMetadataByExactName(ctx, name)
if err == nil {
span.SetAttributes(attribute.Bool("exists", true))
return true, nil
}
if err == ErrNotFound {
span.SetAttributes(attribute.Bool("exists", false))
return false, nil
}
span.RecordError(err)
return false, err
}

Expand Down
2 changes: 1 addition & 1 deletion lib/instances/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (m *manager) ensureSnapshotNameAvailable(sourceInstanceID, snapshotName str
}

func (m *manager) ensureInstanceNameAvailableForSnapshotFork(ctx context.Context, name string, networkEnabled bool) error {
existsByMetadata, err := m.instanceNameExists(name)
existsByMetadata, err := m.instanceNameExists(ctx, name, "snapshot_create")
if err != nil {
return fmt.Errorf("check instance name availability: %w", err)
}
Expand Down
81 changes: 73 additions & 8 deletions lib/instances/snapshot_alias_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

"github.com/kernel/hypeman/lib/forkvm"
"github.com/kernel/hypeman/lib/hypervisor"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func withSnapshotSourceAliasReadLock(run func() error) error {
Expand All @@ -16,13 +19,19 @@ func withSnapshotSourceAliasReadLock(run func() error) error {
}

func prepareForkWithAliasReadLock(ctx context.Context, starter hypervisor.VMStarter, req hypervisor.ForkPrepareRequest) (hypervisor.ForkPrepareResult, error) {
ctx, span := startInstancesSpan(ctx, otel.Tracer("hypeman/instances"), "instances.snapshot_alias.prepare_fork",
attribute.String("operation", "snapshot_alias_prepare_fork"),
)
var retErr error
defer func() { finishInstancesSpan(span, retErr) }()

var result hypervisor.ForkPrepareResult
err := withSnapshotSourceAliasReadLock(func() error {
retErr = withSnapshotSourceAliasReadLock(func() error {
var err error
result, err = starter.PrepareFork(ctx, req)
return err
})
return result, err
return result, retErr
}

func copyGuestDirectoryWithAliasReadLock(srcDir, dstDir string) error {
Expand All @@ -32,16 +41,42 @@ func copyGuestDirectoryWithAliasReadLock(srcDir, dstDir string) error {
}

func (m *manager) copyForkSourceGuestDirectory(ctx context.Context, sourceState State, sourceID string, stored *StoredMetadata, srcDir, dstDir, deferredSnapshotMemoryPath string) error {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.fork.copy_guest_directory",
trace.WithAttributes(
attribute.String("operation", "fork_copy_guest_directory"),
attribute.String("instance_id", sourceID),
attribute.String("source_state", string(sourceState)),
attribute.Bool("deferred_snapshot_memory", deferredSnapshotMemoryPath != ""),
),
)
var retErr error
defer func() { finishInstancesSpan(span, retErr) }()

if sourceState == StateStandby {
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(sourceID), m.snapshotJobKeyForInstance(sourceID), stored.HypervisorType); err != nil {
return fmt.Errorf("prepare standby snapshot for fork: %w", err)
readyCtx, readyDone := m.startLifecycleStep(ctx, "instances.fork.copy_guest_directory.ensure_snapshot_memory_ready",
attribute.String("operation", "fork_copy_ensure_snapshot_memory_ready"),
attribute.String("instance_id", sourceID),
attribute.String("hypervisor", string(stored.HypervisorType)),
)
if err := m.ensureSnapshotMemoryReady(readyCtx, m.paths.InstanceSnapshotLatest(sourceID), m.snapshotJobKeyForInstance(sourceID), stored.HypervisorType); err != nil {
readyDone(err)
retErr = fmt.Errorf("prepare standby snapshot for fork: %w", err)
return retErr
}
readyDone(nil)
}

copyOptions := forkvm.CopyOptions{}
if deferredSnapshotMemoryPath != "" {
copyOptions.SkipRelativePaths = map[string]struct{}{firecrackerSnapshotMemoryRelPath: {}}
}
return withSnapshotSourceAliasReadLock(func() error {

_, cloneDone := m.startLifecycleStep(ctx, "instances.fork.copy_guest_directory.clone",
attribute.String("operation", "fork_copy_guest_directory_clone"),
attribute.String("instance_id", sourceID),
attribute.Bool("deferred_snapshot_memory", deferredSnapshotMemoryPath != ""),
)
retErr = withSnapshotSourceAliasReadLock(func() error {
if err := forkvm.CopyGuestDirectoryWithOptions(srcDir, dstDir, copyOptions); err != nil {
if errors.Is(err, forkvm.ErrSparseCopyUnsupported) {
return fmt.Errorf("fork requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err)
Expand All @@ -50,17 +85,45 @@ func (m *manager) copyForkSourceGuestDirectory(ctx context.Context, sourceState
}
return nil
})
cloneDone(retErr)
return retErr
}

func (m *manager) copySnapshotGuestDirectoryForFork(ctx context.Context, snapshotID string, hvType hypervisor.Type, dstDir, deferredSnapshotMemoryPath string) error {
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.SnapshotGuestDir(snapshotID), "", hvType); err != nil {
return fmt.Errorf("prepare snapshot memory for fork: %w", err)
ctx, span := m.tracerOrDefault().Start(ctx, "instances.snapshot.copy_guest_directory",
trace.WithAttributes(
attribute.String("operation", "snapshot_copy_guest_directory"),
attribute.String("snapshot_id", snapshotID),
attribute.String("hypervisor", string(hvType)),
attribute.Bool("deferred_snapshot_memory", deferredSnapshotMemoryPath != ""),
),
)
var retErr error
defer func() { finishInstancesSpan(span, retErr) }()

readyCtx, readyDone := m.startLifecycleStep(ctx, "instances.snapshot.copy_guest_directory.ensure_snapshot_memory_ready",
attribute.String("operation", "snapshot_copy_ensure_snapshot_memory_ready"),
attribute.String("snapshot_id", snapshotID),
attribute.String("hypervisor", string(hvType)),
)
if err := m.ensureSnapshotMemoryReady(readyCtx, m.paths.SnapshotGuestDir(snapshotID), "", hvType); err != nil {
readyDone(err)
retErr = fmt.Errorf("prepare snapshot memory for fork: %w", err)
return retErr
}
readyDone(nil)

copyOptions := forkvm.CopyOptions{}
if deferredSnapshotMemoryPath != "" {
copyOptions.SkipRelativePaths = map[string]struct{}{firecrackerSnapshotMemoryRelPath: {}}
}
return withSnapshotSourceAliasReadLock(func() error {

_, cloneDone := m.startLifecycleStep(ctx, "instances.snapshot.copy_guest_directory.clone",
attribute.String("operation", "snapshot_copy_guest_directory_clone"),
attribute.String("snapshot_id", snapshotID),
attribute.Bool("deferred_snapshot_memory", deferredSnapshotMemoryPath != ""),
)
retErr = withSnapshotSourceAliasReadLock(func() error {
if err := forkvm.CopyGuestDirectoryWithOptions(m.paths.SnapshotGuestDir(snapshotID), dstDir, copyOptions); err != nil {
if errors.Is(err, forkvm.ErrSparseCopyUnsupported) {
return fmt.Errorf("fork from snapshot requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err)
Expand All @@ -69,4 +132,6 @@ func (m *manager) copySnapshotGuestDirectoryForFork(ctx context.Context, snapsho
}
return nil
})
cloneDone(retErr)
return retErr
}
Loading