From 7c9036dd133d088bd86337458724412112df0c44 Mon Sep 17 00:00:00 2001 From: Zhao Chen Date: Thu, 26 Mar 2026 23:02:24 +0800 Subject: [PATCH 1/5] feat: log system environment info at startup for debugging Add envinfo package that logs build version, runtime, CPU, memory, and disk usage information at startup. On Linux, cgroup CPU/memory limits are also logged to help diagnose container resource constraints. Signed-off-by: Zhao Chen Signed-off-by: Zhao Chen --- cmd/build.go | 3 + cmd/pull.go | 5 + cmd/root.go | 5 + go.mod | 8 ++ go.sum | 18 +++ pkg/envinfo/cgroup_linux.go | 146 ++++++++++++++++++++++++ pkg/envinfo/cgroup_linux_test.go | 34 ++++++ pkg/envinfo/cgroup_other.go | 22 ++++ pkg/envinfo/envinfo.go | 190 +++++++++++++++++++++++++++++++ pkg/envinfo/envinfo_test.go | 139 ++++++++++++++++++++++ 10 files changed, 570 insertions(+) create mode 100644 pkg/envinfo/cgroup_linux.go create mode 100644 pkg/envinfo/cgroup_linux_test.go create mode 100644 pkg/envinfo/cgroup_other.go create mode 100644 pkg/envinfo/envinfo.go create mode 100644 pkg/envinfo/envinfo_test.go diff --git a/cmd/build.go b/cmd/build.go index c41c0628..cca40fdd 100644 --- a/cmd/build.go +++ b/cmd/build.go @@ -25,6 +25,7 @@ import ( "github.com/modelpack/modctl/pkg/backend" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/envinfo" ) var buildConfig = config.NewBuild() @@ -69,6 +70,8 @@ func init() { // runBuild runs the build modctl. func runBuild(ctx context.Context, workDir string) error { + envinfo.LogDiskInfo("buildWorkDir", workDir) + b, err := backend.New(rootConfig.StorageDir) if err != nil { return err diff --git a/cmd/pull.go b/cmd/pull.go index f4df1fa9..3a197245 100644 --- a/cmd/pull.go +++ b/cmd/pull.go @@ -25,6 +25,7 @@ import ( "github.com/modelpack/modctl/pkg/backend" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/envinfo" ) var pullConfig = config.NewPull() @@ -63,6 +64,10 @@ func init() { // runPull runs the pull modctl. func runPull(ctx context.Context, target string) error { + if pullConfig.ExtractDir != "" { + envinfo.LogDiskInfo("pullExtractDir", pullConfig.ExtractDir) + } + b, err := backend.New(rootConfig.StorageDir) if err != nil { return err diff --git a/cmd/root.go b/cmd/root.go index 33ca10ce..83dae805 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,6 +31,7 @@ import ( "github.com/modelpack/modctl/cmd/modelfile" internalpb "github.com/modelpack/modctl/internal/pb" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/envinfo" ) var rootConfig *config.Root @@ -78,6 +79,10 @@ var rootCmd = &cobra.Command{ // TODO: need refactor as currently use a global flag to control the progress bar render. internalpb.SetDisableProgress(rootConfig.DisableProgress) + + // Log environment information for debugging. + envinfo.LogEnvironment(rootConfig.StorageDir) + return nil }, PersistentPostRunE: func(cmd *cobra.Command, args []string) error { diff --git a/go.mod b/go.mod index dfb352d6..0fbb1312 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/modelpack/model-spec v0.0.7 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.1 + github.com/shirou/gopsutil/v4 v4.26.2 github.com/sirupsen/logrus v1.9.4 github.com/spf13/cobra v1.10.2 github.com/spf13/viper v1.21.0 @@ -72,6 +73,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect + github.com/ebitengine/purego v0.10.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -79,6 +81,7 @@ require ( github.com/go-git/go-billy/v5 v5.8.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect @@ -94,6 +97,7 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.20 // indirect @@ -103,6 +107,7 @@ require ( github.com/pjbgf/sha1cd v0.3.2 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.2 // indirect @@ -118,7 +123,10 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/tklauser/go-sysconf v0.3.16 // indirect + github.com/tklauser/numcpus v0.11.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/zeebo/blake3 v0.2.4 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/go.sum b/go.sum index 0db28355..b001e071 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,8 @@ github.com/dragonflyoss/model-spec v0.0.6 h1:Q0gsPMPqFapdNsDvBsyQJRJkEiHK1LeBLoi github.com/dragonflyoss/model-spec v0.0.6/go.mod h1:42eygCbweJm2ZJ4XmfyTeu/3Y+5NkSn19J1xNy7s7yw= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU= +github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o= github.com/elazarl/goproxy v1.7.2/go.mod h1:82vkLNir0ALaW14Rc399OTTjyNREgmdL2cVoIbS6XaE= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -151,6 +153,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= @@ -183,6 +187,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= @@ -235,6 +240,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/libgit2/git2go/v34 v34.0.0 h1:UKoUaKLmiCRbOCD3PtUi2hD6hESSXzME/9OUZrGcgu8= github.com/libgit2/git2go/v34 v34.0.0/go.mod h1:blVco2jDAw6YTXkErMMqzHLcAjKkwF0aWIRHBqiJkZ0= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -273,6 +280,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= @@ -303,6 +312,8 @@ github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDc github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4= +github.com/shirou/gopsutil/v4 v4.26.2 h1:X8i6sicvUFih4BmYIGT1m2wwgw2VG9YgrDTi7cIRGUI= +github.com/shirou/gopsutil/v4 v4.26.2/go.mod h1:LZ6ewCSkBqUpvSOf+LsTGnRinC6iaNUNMGBtDkJBaLQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= @@ -338,10 +349,16 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= +github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= +github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= +github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/vbauerster/mpb/v8 v8.12.0 h1:+gneY3ifzc88tKDzOtfG8k8gfngCx615S2ZmFM4liWg= github.com/vbauerster/mpb/v8 v8.12.0/go.mod h1:V02YIuMVo301Y1VE9VtZlD8s84OMsk+EKN6mwvf/588= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/blake3 v0.2.4 h1:KYQPkhpRtcqh0ssGYcKLG1JYvddkEA8QwCM/yBqhaZI= @@ -447,6 +464,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/envinfo/cgroup_linux.go b/pkg/envinfo/cgroup_linux.go new file mode 100644 index 00000000..a11aea4f --- /dev/null +++ b/pkg/envinfo/cgroup_linux.go @@ -0,0 +1,146 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:build linux + +package envinfo + +import ( + "os" + "strconv" + "strings" + + "github.com/sirupsen/logrus" +) + +// cgroupLimits holds the CPU and memory limits from cgroup. +type cgroupLimits struct { + CPUQuota float64 // effective CPU cores (quota/period), 0 if unlimited + MemLimit uint64 // memory limit in bytes, 0 if unlimited + InCgroup bool // true if running inside a cgroup with limits +} + +// getCgroupLimits reads cgroup v2 then v1 CPU and memory limits. +func getCgroupLimits() *cgroupLimits { + limits := &cgroupLimits{} + + // Try cgroup v2 first. + if tryV2(limits) { + return limits + } + + // Fall back to cgroup v1. + tryV1(limits) + return limits +} + +func tryV2(limits *cgroupLimits) bool { + // cgroup v2 uses unified hierarchy at /sys/fs/cgroup/. + cpuMax, err := os.ReadFile("/sys/fs/cgroup/cpu.max") + if err != nil { + return false + } + + parts := strings.Fields(strings.TrimSpace(string(cpuMax))) + if len(parts) == 2 && parts[0] != "max" { + quota, err1 := strconv.ParseFloat(parts[0], 64) + period, err2 := strconv.ParseFloat(parts[1], 64) + if err1 == nil && err2 == nil && period > 0 { + limits.CPUQuota = quota / period + limits.InCgroup = true + } + } + + memMax, err := os.ReadFile("/sys/fs/cgroup/memory.max") + if err != nil { + return limits.InCgroup + } + + memStr := strings.TrimSpace(string(memMax)) + if memStr != "max" { + memLimit, err := strconv.ParseUint(memStr, 10, 64) + if err == nil { + limits.MemLimit = memLimit + limits.InCgroup = true + } + } + + return limits.InCgroup +} + +func tryV1(limits *cgroupLimits) { + // cgroup v1: CPU quota. + quotaBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") + if err != nil { + return + } + + quota, err := strconv.ParseFloat(strings.TrimSpace(string(quotaBytes)), 64) + if err != nil || quota <= 0 { + // -1 means no limit. + return + } + + periodBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us") + if err != nil { + return + } + + period, err := strconv.ParseFloat(strings.TrimSpace(string(periodBytes)), 64) + if err != nil || period <= 0 { + return + } + + limits.CPUQuota = quota / period + limits.InCgroup = true + + // cgroup v1: Memory limit. + memBytes, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes") + if err != nil { + return + } + + memLimit, err := strconv.ParseUint(strings.TrimSpace(string(memBytes)), 10, 64) + if err != nil { + return + } + + // Very large values (like 2^63) indicate no limit. + const noLimitThreshold = 1 << 62 + if memLimit < noLimitThreshold { + limits.MemLimit = memLimit + } +} + +// logCgroupInfo logs container resource limits if running in a cgroup. +func logCgroupInfo() { + limits := getCgroupLimits() + if !limits.InCgroup { + return + } + + fields := logrus.Fields{} + if limits.CPUQuota > 0 { + fields["cpuQuota"] = strconv.FormatFloat(limits.CPUQuota, 'f', 2, 64) + } + if limits.MemLimit > 0 { + fields["memoryLimit"] = formatBytes(limits.MemLimit) + } + + if len(fields) > 0 { + logrus.WithFields(fields).Info("cgroup limits") + } +} diff --git a/pkg/envinfo/cgroup_linux_test.go b/pkg/envinfo/cgroup_linux_test.go new file mode 100644 index 00000000..8c781022 --- /dev/null +++ b/pkg/envinfo/cgroup_linux_test.go @@ -0,0 +1,34 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:build linux + +package envinfo + +import ( + "testing" +) + +func TestGetCgroupLimits(t *testing.T) { + // This test runs on Linux and verifies getCgroupLimits does not panic. + // The actual values depend on the environment (container vs bare metal). + limits := getCgroupLimits() + if limits == nil { + t.Fatal("getCgroupLimits returned nil") + } + + t.Logf("InCgroup=%v CPUQuota=%.2f MemLimit=%d", limits.InCgroup, limits.CPUQuota, limits.MemLimit) +} diff --git a/pkg/envinfo/cgroup_other.go b/pkg/envinfo/cgroup_other.go new file mode 100644 index 00000000..6ad7e885 --- /dev/null +++ b/pkg/envinfo/cgroup_other.go @@ -0,0 +1,22 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:build !linux + +package envinfo + +// logCgroupInfo is a no-op on non-Linux platforms. +func logCgroupInfo() {} diff --git a/pkg/envinfo/envinfo.go b/pkg/envinfo/envinfo.go new file mode 100644 index 00000000..55eb6d1e --- /dev/null +++ b/pkg/envinfo/envinfo.go @@ -0,0 +1,190 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package envinfo + +import ( + "fmt" + "path/filepath" + "runtime" + "strings" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" + "github.com/sirupsen/logrus" + + "github.com/modelpack/modctl/pkg/version" +) + +// LogEnvironment collects and logs system environment information. +// Individual collection failures are logged as warnings and do not +// prevent other information from being collected. +func LogEnvironment(storageDir string) { + logVersionInfo() + logRuntimeInfo() + logCPUInfo() + logMemoryInfo() + logCgroupInfo() + LogDiskInfo("storageDir", storageDir) +} + +// LogDiskInfo logs disk usage information for the device that hosts +// the given path. It can be called from any command to log disk info +// for command-specific directories. +func LogDiskInfo(name, path string) { + if path == "" { + return + } + + absPath, err := filepath.Abs(path) + if err != nil { + logrus.WithError(err).Warnf("failed to resolve path for %s", name) + return + } + + usage, err := disk.Usage(absPath) + if err != nil { + logrus.WithError(err).Warnf("failed to get disk usage for %s", name) + return + } + + logrus.WithFields(logrus.Fields{ + "name": name, + "path": absPath, + "fstype": usage.Fstype, + "total": formatBytes(usage.Total), + "free": formatBytes(usage.Free), + "usagePercent": fmt.Sprintf("%.1f%%", usage.UsedPercent), + }).Info("disk info") +} + +func logVersionInfo() { + logrus.WithFields(logrus.Fields{ + "version": version.GitVersion, + "commit": version.GitCommit, + "platform": version.Platform, + "buildTime": version.BuildTime, + }).Info("build info") +} + +func logRuntimeInfo() { + logrus.WithFields(logrus.Fields{ + "go": runtime.Version(), + "os": runtime.GOOS, + "arch": runtime.GOARCH, + "gomaxprocs": runtime.GOMAXPROCS(0), + }).Info("runtime info") +} + +func logCPUInfo() { + physicalCount, err := cpu.Counts(false) + if err != nil { + logrus.WithError(err).Warn("failed to get physical CPU count") + } + + logicalCount, err := cpu.Counts(true) + if err != nil { + logrus.WithError(err).Warn("failed to get logical CPU count") + } + + fields := logrus.Fields{ + "physicalCores": physicalCount, + "logicalCores": logicalCount, + } + + infos, err := cpu.Info() + if err != nil { + logrus.WithError(err).Warn("failed to get CPU model info") + } else if len(infos) > 0 { + fields["model"] = infos[0].ModelName + } + + percents, err := cpu.Percent(0, false) + if err != nil { + logrus.WithError(err).Warn("failed to get CPU usage") + } else if len(percents) > 0 { + fields["usagePercent"] = fmt.Sprintf("%.1f%%", percents[0]) + } + + logrus.WithFields(fields).Info("cpu info") +} + +func logMemoryInfo() { + v, err := mem.VirtualMemory() + if err != nil { + logrus.WithError(err).Warn("failed to get memory info") + return + } + + logrus.WithFields(logrus.Fields{ + "total": formatBytes(v.Total), + "available": formatBytes(v.Available), + "usagePercent": fmt.Sprintf("%.1f%%", v.UsedPercent), + }).Info("memory info") +} + +// isVirtualFS checks whether a filesystem type or device path indicates +// a non-block-device filesystem where IO counters are not available. +// This includes FUSE, network filesystems, RAM-based filesystems, and +// container overlay filesystems. +func isVirtualFS(fstype, device string) bool { + fstypeLower := strings.ToLower(fstype) + + virtualFSTypes := []string{ + "fuse", "nfs", "cifs", "smb", "smbfs", + "tmpfs", "ramfs", "devtmpfs", + "overlay", "aufs", + "sshfs", "s3fs", "gcsfuse", "ossfs", + "9p", "virtiofs", + } + + for _, vfs := range virtualFSTypes { + if fstypeLower == vfs || strings.HasPrefix(fstypeLower, vfs+".") { + return true + } + } + + // Devices not under /dev/ are generally not block devices. + // e.g., "s3fs", "sshfs#user@host:", "server:/export" + if device != "" && !strings.HasPrefix(device, "/dev/") { + return true + } + + return false +} + +func formatBytes(b uint64) string { + const ( + KB = 1024 + MB = KB * 1024 + GB = MB * 1024 + TB = GB * 1024 + ) + + switch { + case b >= TB: + return fmt.Sprintf("%.2f TB", float64(b)/float64(TB)) + case b >= GB: + return fmt.Sprintf("%.2f GB", float64(b)/float64(GB)) + case b >= MB: + return fmt.Sprintf("%.2f MB", float64(b)/float64(MB)) + case b >= KB: + return fmt.Sprintf("%.2f KB", float64(b)/float64(KB)) + default: + return fmt.Sprintf("%d B", b) + } +} diff --git a/pkg/envinfo/envinfo_test.go b/pkg/envinfo/envinfo_test.go new file mode 100644 index 00000000..a37020f3 --- /dev/null +++ b/pkg/envinfo/envinfo_test.go @@ -0,0 +1,139 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package envinfo + +import ( + "bytes" + "testing" + + "github.com/sirupsen/logrus" +) + +func TestLogEnvironment(t *testing.T) { + var buf bytes.Buffer + logrus.SetOutput(&buf) + logrus.SetLevel(logrus.InfoLevel) + logrus.SetFormatter(&logrus.TextFormatter{DisableTimestamp: true}) + defer logrus.SetOutput(nil) + + LogEnvironment(t.TempDir()) + + output := buf.String() + + expectedMessages := []string{ + "build info", + "runtime info", + "cpu info", + "memory info", + "disk info", + } + + for _, msg := range expectedMessages { + if !bytes.Contains([]byte(output), []byte(msg)) { + t.Errorf("expected log output to contain %q, got:\n%s", msg, output) + } + } +} + +func TestLogDiskInfo(t *testing.T) { + var buf bytes.Buffer + logrus.SetOutput(&buf) + logrus.SetLevel(logrus.InfoLevel) + logrus.SetFormatter(&logrus.TextFormatter{DisableTimestamp: true}) + defer logrus.SetOutput(nil) + + LogDiskInfo("testDir", t.TempDir()) + + output := buf.String() + if !bytes.Contains([]byte(output), []byte("testDir")) { + t.Errorf("expected log output to contain 'testDir', got:\n%s", output) + } + if !bytes.Contains([]byte(output), []byte("disk info")) { + t.Errorf("expected log output to contain 'disk info', got:\n%s", output) + } +} + +func TestLogDiskInfoEmptyPath(t *testing.T) { + var buf bytes.Buffer + logrus.SetOutput(&buf) + logrus.SetLevel(logrus.InfoLevel) + defer logrus.SetOutput(nil) + + LogDiskInfo("empty", "") + + if buf.Len() > 0 { + t.Errorf("expected no output for empty path, got:\n%s", buf.String()) + } +} + +func TestIsVirtualFS(t *testing.T) { + tests := []struct { + name string + fstype string + device string + expected bool + }{ + {"ext4 block device", "ext4", "/dev/sda1", false}, + {"xfs block device", "xfs", "/dev/nvme0n1p1", false}, + {"apfs block device", "apfs", "/dev/disk3s3s1", false}, + {"fuse", "fuse", "s3fs", true}, + {"fuse.s3fs", "fuse.s3fs", "s3fs", true}, + {"fuse.sshfs", "fuse.sshfs", "sshfs#user@host:", true}, + {"nfs", "nfs", "server:/export", true}, + {"nfs4", "nfs", "server:/export", true}, + {"cifs", "cifs", "//server/share", true}, + {"tmpfs", "tmpfs", "tmpfs", true}, + {"overlay", "overlay", "overlay", true}, + {"virtiofs", "virtiofs", "myfs", true}, + {"9p", "9p", "hostshare", true}, + {"unknown non-dev device", "ext4", "some-random-path", true}, + {"empty", "", "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isVirtualFS(tt.fstype, tt.device) + if result != tt.expected { + t.Errorf("isVirtualFS(%q, %q) = %v, want %v", tt.fstype, tt.device, result, tt.expected) + } + }) + } +} + +func TestFormatBytes(t *testing.T) { + tests := []struct { + input uint64 + expected string + }{ + {0, "0 B"}, + {512, "512 B"}, + {1024, "1.00 KB"}, + {1536, "1.50 KB"}, + {1048576, "1.00 MB"}, + {1073741824, "1.00 GB"}, + {1099511627776, "1.00 TB"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + result := formatBytes(tt.input) + if result != tt.expected { + t.Errorf("formatBytes(%d) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} From 55c9fc0538fd0db9b7ccbc3cfcb0960e66a586ac Mon Sep 17 00:00:00 2001 From: Zhao Chen Date: Tue, 7 Apr 2026 21:10:12 +0800 Subject: [PATCH 2/5] feat: add IO throughput summary for push/pull/fetch operations Add pkg/iometrics package with a Tracker that wraps source io.Reader using atomic counters to measure bytes and cumulative Read() call duration. TrackTransfer() measures per-goroutine wall-clock time. After each operation completes, a summary is output to both terminal (stderr) and log file showing effective throughput, source read throughput, and readFraction (sourceReadTime / transferTime) to help identify whether the bottleneck is in source reading or sink writing. Signed-off-by: Zhao Chen --- pkg/backend/fetch.go | 8 +- pkg/backend/pull.go | 29 ++-- pkg/backend/push.go | 30 ++-- pkg/iometrics/format.go | 52 ++++++ pkg/iometrics/reader.go | 42 +++++ pkg/iometrics/tracker.go | 109 +++++++++++++ pkg/iometrics/tracker_test.go | 291 ++++++++++++++++++++++++++++++++++ 7 files changed, 541 insertions(+), 20 deletions(-) create mode 100644 pkg/iometrics/format.go create mode 100644 pkg/iometrics/reader.go create mode 100644 pkg/iometrics/tracker.go create mode 100644 pkg/iometrics/tracker_test.go diff --git a/pkg/backend/fetch.go b/pkg/backend/fetch.go index 990d1afa..4c822df8 100644 --- a/pkg/backend/fetch.go +++ b/pkg/backend/fetch.go @@ -31,6 +31,7 @@ import ( internalpb "github.com/modelpack/modctl/internal/pb" "github.com/modelpack/modctl/pkg/backend/remote" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/iometrics" ) // Fetch fetches partial files to the output. @@ -101,6 +102,8 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e pb.Start() defer pb.Stop() + tracker := iometrics.NewTracker("fetch") + g, ctx := errgroup.WithContext(ctx) g.SetLimit(cfg.Concurrency) @@ -114,7 +117,9 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e } logrus.Debugf("fetch: processing layer %s", layer.Digest) - if err := pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer); err != nil { + if err := tracker.TrackTransfer(func() error { + return pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer, tracker) + }); err != nil { return err } @@ -127,6 +132,7 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e return err } + tracker.Summary() logrus.Infof("fetch: fetched %d layers", len(layers)) return nil } diff --git a/pkg/backend/pull.go b/pkg/backend/pull.go index 3cb6a1b0..bb485139 100644 --- a/pkg/backend/pull.go +++ b/pkg/backend/pull.go @@ -33,6 +33,7 @@ import ( "github.com/modelpack/modctl/pkg/backend/remote" "github.com/modelpack/modctl/pkg/codec" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/iometrics" "github.com/modelpack/modctl/pkg/storage" ) @@ -82,6 +83,8 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err pb.Start() defer pb.Stop() + tracker := iometrics.NewTracker("pull") + // copy the image to the destination, there are three steps: // 1. copy the layers. // 2. copy the config. @@ -96,11 +99,11 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err var fn func(desc ocispec.Descriptor) error if cfg.ExtractFromRemote { fn = func(desc ocispec.Descriptor) error { - return pullAndExtractFromRemote(gctx, pb, internalpb.NormalizePrompt("Pulling blob"), src, cfg.ExtractDir, desc) + return pullAndExtractFromRemote(gctx, pb, internalpb.NormalizePrompt("Pulling blob"), src, cfg.ExtractDir, desc, tracker) } } else { fn = func(desc ocispec.Descriptor) error { - return pullIfNotExist(gctx, pb, internalpb.NormalizePrompt("Pulling blob"), src, dst, desc, repo, tag) + return pullIfNotExist(gctx, pb, internalpb.NormalizePrompt("Pulling blob"), src, dst, desc, repo, tag, tracker) } } @@ -117,7 +120,9 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err logrus.Debugf("pull: processing layer %s", layer.Digest) // call the before hook. cfg.Hooks.BeforePullLayer(layer, manifest) - err := fn(layer) + err := tracker.TrackTransfer(func() error { + return fn(layer) + }) // call the after hook. cfg.Hooks.AfterPullLayer(layer, err) if err != nil { @@ -139,19 +144,24 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err // return earlier if extract from remote is enabled as config and manifest // are not needed for this operation. if cfg.ExtractFromRemote { + tracker.Summary() return nil } // copy the config. if err := retry.Do(func() error { - return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling config"), src, dst, manifest.Config, repo, tag) + return tracker.TrackTransfer(func() error { + return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling config"), src, dst, manifest.Config, repo, tag, tracker) + }) }, append(defaultRetryOpts, retry.Context(ctx))...); err != nil { return fmt.Errorf("failed to pull config to local: %w", err) } // copy the manifest. if err := retry.Do(func() error { - return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling manifest"), src, dst, manifestDesc, repo, tag) + return tracker.TrackTransfer(func() error { + return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling manifest"), src, dst, manifestDesc, repo, tag, tracker) + }) }, append(defaultRetryOpts, retry.Context(ctx))...); err != nil { return fmt.Errorf("failed to pull manifest to local: %w", err) } @@ -166,12 +176,13 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err logrus.Infof("pull: pulled and extracted artifact %s", target) } + tracker.Summary() logrus.Infof("pull: pulled artifact %s", target) return nil } // pullIfNotExist copies the content from the src storage to the dst storage if the content does not exist. -func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, dst storage.Storage, desc ocispec.Descriptor, repo, tag string) error { +func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, dst storage.Storage, desc ocispec.Descriptor, repo, tag string, tracker *iometrics.Tracker) error { // fetch the content from the source storage. content, err := src.Fetch(ctx, desc) if err != nil { @@ -180,7 +191,7 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri defer content.Close() - reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content) + reader := pb.Add(prompt, desc.Digest.String(), desc.Size, tracker.WrapReader(content)) hash := sha256.New() reader = io.TeeReader(reader, hash) @@ -244,7 +255,7 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri // pullAndExtractFromRemote pulls the layer and extract it to the target output path directly, // and will not store the layer to the local storage. -func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, outputDir string, desc ocispec.Descriptor) error { +func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, outputDir string, desc ocispec.Descriptor, tracker *iometrics.Tracker) error { // fetch the content from the source storage. content, err := src.Fetch(ctx, desc) if err != nil { @@ -252,7 +263,7 @@ func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, p } defer content.Close() - reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content) + reader := pb.Add(prompt, desc.Digest.String(), desc.Size, tracker.WrapReader(content)) hash := sha256.New() reader = io.TeeReader(reader, hash) diff --git a/pkg/backend/push.go b/pkg/backend/push.go index 2674cee2..869e3065 100644 --- a/pkg/backend/push.go +++ b/pkg/backend/push.go @@ -32,6 +32,7 @@ import ( internalpb "github.com/modelpack/modctl/internal/pb" "github.com/modelpack/modctl/pkg/backend/remote" "github.com/modelpack/modctl/pkg/config" + "github.com/modelpack/modctl/pkg/iometrics" "github.com/modelpack/modctl/pkg/storage" ) @@ -70,6 +71,8 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err pb.Start() defer pb.Stop() + tracker := iometrics.NewTracker("push") + // copy the image to the destination, there are three steps: // 1. copy the layers. // 2. copy the config. @@ -91,7 +94,9 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err return retry.Do(func() error { logrus.Debugf("push: processing layer %s", layer.Digest) - if err := pushIfNotExist(gctx, pb, internalpb.NormalizePrompt("Copying blob"), src, dst, layer, repo, tag); err != nil { + if err := tracker.TrackTransfer(func() error { + return pushIfNotExist(gctx, pb, internalpb.NormalizePrompt("Copying blob"), src, dst, layer, repo, tag, tracker) + }); err != nil { return err } logrus.Debugf("push: successfully processed layer %s", layer.Digest) @@ -106,29 +111,34 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err // copy the config. if err := retry.Do(func() error { - return pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying config"), src, dst, manifest.Config, repo, tag) + return tracker.TrackTransfer(func() error { + return pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying config"), src, dst, manifest.Config, repo, tag, tracker) + }) }, append(defaultRetryOpts, retry.Context(ctx))...); err != nil { return fmt.Errorf("failed to push config to remote: %w", err) } // copy the manifest. if err := retry.Do(func() error { - return pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying manifest"), src, dst, ocispec.Descriptor{ - MediaType: manifest.MediaType, - Size: int64(len(manifestRaw)), - Digest: godigest.FromBytes(manifestRaw), - Data: manifestRaw, - }, repo, tag) + return tracker.TrackTransfer(func() error { + return pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying manifest"), src, dst, ocispec.Descriptor{ + MediaType: manifest.MediaType, + Size: int64(len(manifestRaw)), + Digest: godigest.FromBytes(manifestRaw), + Data: manifestRaw, + }, repo, tag, tracker) + }) }, append(defaultRetryOpts, retry.Context(ctx))...); err != nil { return fmt.Errorf("failed to push manifest to remote: %w", err) } + tracker.Summary() logrus.Infof("push: pushed artifact %s", target) return nil } // pushIfNotExist copies the content from the src storage to the dst storage if the content does not exist. -func pushIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src storage.Storage, dst *remote.Repository, desc ocispec.Descriptor, repo, tag string) error { +func pushIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src storage.Storage, dst *remote.Repository, desc ocispec.Descriptor, repo, tag string, tracker *iometrics.Tracker) error { // check whether the content exists in the destination storage. exist, err := dst.Exists(ctx, desc) if err != nil { @@ -177,7 +187,7 @@ func pushIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri return err } - reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content) + reader := pb.Add(prompt, desc.Digest.String(), desc.Size, tracker.WrapReader(content)) // resolve issue: https://github.com/modelpack/modctl/issues/50 // wrap the content to the NopCloser, because the implementation of the distribution will // always return the error when Close() is called. diff --git a/pkg/iometrics/format.go b/pkg/iometrics/format.go new file mode 100644 index 00000000..39b8746f --- /dev/null +++ b/pkg/iometrics/format.go @@ -0,0 +1,52 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iometrics + +import ( + "fmt" + "time" +) + +func formatBytes(b uint64) string { + const ( + KB = 1024 + MB = KB * 1024 + GB = MB * 1024 + TB = GB * 1024 + ) + + switch { + case b >= TB: + return fmt.Sprintf("%.2f TB", float64(b)/float64(TB)) + case b >= GB: + return fmt.Sprintf("%.2f GB", float64(b)/float64(GB)) + case b >= MB: + return fmt.Sprintf("%.2f MB", float64(b)/float64(MB)) + case b >= KB: + return fmt.Sprintf("%.2f KB", float64(b)/float64(KB)) + default: + return fmt.Sprintf("%d B", b) + } +} + +func formatThroughput(bytes int64, d time.Duration) string { + if d == 0 || bytes == 0 { + return "N/A" + } + mbPerSec := float64(bytes) / (1024 * 1024) / d.Seconds() + return fmt.Sprintf("%.2f MB/s", mbPerSec) +} diff --git a/pkg/iometrics/reader.go b/pkg/iometrics/reader.go new file mode 100644 index 00000000..249bc8f6 --- /dev/null +++ b/pkg/iometrics/reader.go @@ -0,0 +1,42 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iometrics + +import ( + "io" + "sync/atomic" + "time" +) + +// countingReader wraps an io.Reader, atomically incrementing shared byte +// and nanosecond counters on every Read() call. Multiple countingReader +// instances can share the same counters for cross-goroutine aggregation. +type countingReader struct { + reader io.Reader + bytes *atomic.Int64 + nanos *atomic.Int64 +} + +func (r *countingReader) Read(p []byte) (int, error) { + start := time.Now() + n, err := r.reader.Read(p) + r.nanos.Add(int64(time.Since(start))) + if n > 0 { + r.bytes.Add(int64(n)) + } + return n, err +} diff --git a/pkg/iometrics/tracker.go b/pkg/iometrics/tracker.go new file mode 100644 index 00000000..36fd8f2d --- /dev/null +++ b/pkg/iometrics/tracker.go @@ -0,0 +1,109 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iometrics + +import ( + "fmt" + "io" + "os" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" +) + +// Tracker aggregates IO throughput metrics from multiple concurrent +// goroutines. Create one per operation (push/pull/fetch) and pass it +// to each goroutine so they share the same atomic counters. +type Tracker struct { + operation string + startTime time.Time + bytes atomic.Int64 // total bytes read from source + sourceNanos atomic.Int64 // cumulative source Read() call durations + transferNanos atomic.Int64 // cumulative per-goroutine wall clock +} + +// NewTracker creates a new Tracker and records the start time. +// Call this before launching the errgroup. +func NewTracker(operation string) *Tracker { + return &Tracker{ + operation: operation, + startTime: time.Now(), + } +} + +// WrapReader wraps an io.Reader to count bytes and accumulate Read() +// call durations into the shared atomic counters. +func (t *Tracker) WrapReader(r io.Reader) io.Reader { + return &countingReader{ + reader: r, + bytes: &t.bytes, + nanos: &t.sourceNanos, + } +} + +// TrackTransfer measures the wall-clock duration of a single transfer +// (one goroutine handling one blob) and accumulates it. Place this +// inside retry.Do so each retry attempt is measured independently. +func (t *Tracker) TrackTransfer(fn func() error) error { + start := time.Now() + err := fn() + t.transferNanos.Add(int64(time.Since(start))) + return err +} + +// Summary outputs a throughput summary to both the log file (logrus) +// and the terminal (stderr). Call this after all goroutines have +// completed (after g.Wait()) — the happens-before from errgroup +// guarantees all atomic stores are visible. +func (t *Tracker) Summary() { + wallClock := time.Since(t.startTime) + totalBytes := t.bytes.Load() + sourceNanos := t.sourceNanos.Load() + transferNanos := t.transferNanos.Load() + + if totalBytes == 0 { + return + } + + sourceReadTime := time.Duration(sourceNanos) + + var readFraction float64 + if transferNanos > 0 { + readFraction = float64(sourceNanos) / float64(transferNanos) + } + + // Log structured fields to log file. + logrus.WithFields(logrus.Fields{ + "operation": t.operation, + "totalBytes": formatBytes(uint64(totalBytes)), + "wallClock": wallClock.Round(time.Millisecond).String(), + "sourceReadTime": sourceReadTime.Round(time.Millisecond).String(), + "sourceReadThroughput": formatThroughput(totalBytes, sourceReadTime), + "effectiveThroughput": formatThroughput(totalBytes, wallClock), + "readFraction": fmt.Sprintf("%.2f", readFraction), + }).Info("io throughput summary") + + // Print concise summary to terminal. + fmt.Fprintf(os.Stderr, "IO summary: %s in %s, effective %s, source %s, read ratio %.2f\n", + formatBytes(uint64(totalBytes)), + wallClock.Round(time.Millisecond), + formatThroughput(totalBytes, wallClock), + formatThroughput(totalBytes, sourceReadTime), + readFraction, + ) +} diff --git a/pkg/iometrics/tracker_test.go b/pkg/iometrics/tracker_test.go new file mode 100644 index 00000000..1e43c75d --- /dev/null +++ b/pkg/iometrics/tracker_test.go @@ -0,0 +1,291 @@ +/* + * Copyright 2024 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iometrics + +import ( + "bytes" + "errors" + "io" + "strings" + "sync" + "testing" + "time" + + "github.com/sirupsen/logrus" +) + +func TestCountingReaderDataIntegrity(t *testing.T) { + data := []byte("hello world, this is a test of the counting reader") + tracker := NewTracker("test") + wrapped := tracker.WrapReader(bytes.NewReader(data)) + + got, err := io.ReadAll(wrapped) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !bytes.Equal(got, data) { + t.Errorf("data mismatch: got %q, want %q", got, data) + } + + if tracker.bytes.Load() != int64(len(data)) { + t.Errorf("bytes = %d, want %d", tracker.bytes.Load(), len(data)) + } + + if tracker.sourceNanos.Load() <= 0 { + t.Error("sourceNanos should be > 0 after reads") + } +} + +// slowReader simulates a slow source by sleeping on each Read call. +type slowReader struct { + data []byte + pos int + delay time.Duration +} + +func (r *slowReader) Read(p []byte) (int, error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + time.Sleep(r.delay) + n := copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +func TestCountingReaderTimingAccumulation(t *testing.T) { + tracker := NewTracker("test") + sr := &slowReader{data: make([]byte, 128), delay: 10 * time.Millisecond} + wrapped := tracker.WrapReader(sr) + + _, err := io.ReadAll(wrapped) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + sourceTime := time.Duration(tracker.sourceNanos.Load()) + // At least one Read call with 10ms delay. + if sourceTime < 10*time.Millisecond { + t.Errorf("sourceNanos = %v, expected >= 10ms", sourceTime) + } +} + +func TestConcurrentAggregation(t *testing.T) { + tracker := NewTracker("test") + numGoroutines := 10 + dataPerGoroutine := 1024 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for range numGoroutines { + go func() { + defer wg.Done() + data := make([]byte, dataPerGoroutine) + wrapped := tracker.WrapReader(bytes.NewReader(data)) + io.ReadAll(wrapped) + }() + } + + wg.Wait() + + expectedBytes := int64(numGoroutines * dataPerGoroutine) + if tracker.bytes.Load() != expectedBytes { + t.Errorf("bytes = %d, want %d", tracker.bytes.Load(), expectedBytes) + } +} + +func TestTrackTransfer(t *testing.T) { + tracker := NewTracker("test") + + err := tracker.TrackTransfer(func() error { + time.Sleep(20 * time.Millisecond) + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + transferTime := time.Duration(tracker.transferNanos.Load()) + if transferTime < 20*time.Millisecond { + t.Errorf("transferNanos = %v, expected >= 20ms", transferTime) + } +} + +func TestTrackTransferPropagatesError(t *testing.T) { + tracker := NewTracker("test") + expected := errors.New("transfer failed") + + err := tracker.TrackTransfer(func() error { + return expected + }) + + if !errors.Is(err, expected) { + t.Errorf("err = %v, want %v", err, expected) + } + + // Duration should still be recorded even on error. + if tracker.transferNanos.Load() <= 0 { + t.Error("transferNanos should be > 0 even on error") + } +} + +func TestTrackTransferConcurrent(t *testing.T) { + tracker := NewTracker("test") + numGoroutines := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for range numGoroutines { + go func() { + defer wg.Done() + tracker.TrackTransfer(func() error { + time.Sleep(10 * time.Millisecond) + return nil + }) + }() + } + + wg.Wait() + + // Each goroutine sleeps 10ms. Total cumulative should be >= 10ms * numGoroutines. + transferTime := time.Duration(tracker.transferNanos.Load()) + minExpected := time.Duration(numGoroutines) * 10 * time.Millisecond + if transferTime < minExpected { + t.Errorf("transferNanos = %v, expected >= %v", transferTime, minExpected) + } +} + +func TestSummaryZeroBytes(t *testing.T) { + var buf bytes.Buffer + logrus.SetOutput(&buf) + defer logrus.SetOutput(nil) + + tracker := NewTracker("test") + // Should not panic or produce output when no bytes were transferred. + tracker.Summary() + + if buf.Len() > 0 { + t.Errorf("expected no log output for zero bytes, got: %s", buf.String()) + } +} + +func TestSummaryOutput(t *testing.T) { + var buf bytes.Buffer + logrus.SetOutput(&buf) + logrus.SetLevel(logrus.InfoLevel) + logrus.SetFormatter(&logrus.TextFormatter{DisableTimestamp: true}) + defer logrus.SetOutput(nil) + + tracker := NewTracker("push") + + // Simulate a transfer: read some bytes with source tracking. + data := make([]byte, 1024*1024) // 1MB + wrapped := tracker.WrapReader(bytes.NewReader(data)) + tracker.TrackTransfer(func() error { + _, err := io.ReadAll(wrapped) + return err + }) + + tracker.Summary() + + output := buf.String() + for _, expected := range []string{ + "io throughput summary", + "operation=push", + "totalBytes=", + "readFraction=", + } { + if !strings.Contains(output, expected) { + t.Errorf("log output missing %q, got:\n%s", expected, output) + } + } +} + +func TestReadFractionAccuracy(t *testing.T) { + tracker := NewTracker("test") + + // Simulate: source read takes 80ms, total transfer takes 100ms. + // readFraction should be ~0.8. + sr := &slowReader{data: make([]byte, 64), delay: 80 * time.Millisecond} + wrapped := tracker.WrapReader(sr) + tracker.TrackTransfer(func() error { + _, err := io.ReadAll(wrapped) + if err != nil { + return err + } + // Simulate 20ms of sink write time. + time.Sleep(20 * time.Millisecond) + return nil + }) + + sourceNanos := tracker.sourceNanos.Load() + transferNanos := tracker.transferNanos.Load() + + if transferNanos == 0 { + t.Fatal("transferNanos should be > 0") + } + + readFraction := float64(sourceNanos) / float64(transferNanos) + // Expect readFraction to be roughly 0.8 (with tolerance for scheduling jitter). + if readFraction < 0.5 || readFraction > 0.95 { + t.Errorf("readFraction = %.2f, expected ~0.8 (sourceNanos=%d, transferNanos=%d)", + readFraction, sourceNanos, transferNanos) + } +} + +func TestFormatBytes(t *testing.T) { + tests := []struct { + input uint64 + expected string + }{ + {0, "0 B"}, + {512, "512 B"}, + {1024, "1.00 KB"}, + {1048576, "1.00 MB"}, + {1073741824, "1.00 GB"}, + {1099511627776, "1.00 TB"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + if got := formatBytes(tt.input); got != tt.expected { + t.Errorf("formatBytes(%d) = %q, want %q", tt.input, got, tt.expected) + } + }) + } +} + +func TestFormatThroughput(t *testing.T) { + // 1MB in 1 second = 1.00 MB/s + result := formatThroughput(1024*1024, time.Second) + if result != "1.00 MB/s" { + t.Errorf("formatThroughput(1MB, 1s) = %q, want %q", result, "1.00 MB/s") + } + + // Zero duration returns N/A. + if got := formatThroughput(1024, 0); got != "N/A" { + t.Errorf("formatThroughput(1024, 0) = %q, want %q", got, "N/A") + } + + // Zero bytes returns N/A. + if got := formatThroughput(0, time.Second); got != "N/A" { + t.Errorf("formatThroughput(0, 1s) = %q, want %q", got, "N/A") + } +} From 7dc4574f686ce051fcae8d0d346d7c8f15809314 Mon Sep 17 00:00:00 2001 From: Zhao Chen Date: Tue, 7 Apr 2026 21:27:17 +0800 Subject: [PATCH 3/5] feat: show disk/network throughput with bottleneck indicator Replace generic "source/effective" labels with operation-aware labels (push: disk read / network write, pull: network read / disk write). Derive sink throughput from transferTime - sourceReadTime. Mark the slower side as bottleneck. Add overall effective throughput to output. Signed-off-by: Zhao Chen --- pkg/iometrics/tracker.go | 49 +++++++++++++++++++------ pkg/iometrics/tracker_test.go | 68 +++++++++++++++++++++-------------- 2 files changed, 81 insertions(+), 36 deletions(-) diff --git a/pkg/iometrics/tracker.go b/pkg/iometrics/tracker.go index 36fd8f2d..c49de6c5 100644 --- a/pkg/iometrics/tracker.go +++ b/pkg/iometrics/tracker.go @@ -66,6 +66,22 @@ func (t *Tracker) TrackTransfer(fn func() error) error { return err } +// sourceLabel and sinkLabel return human-readable labels for the source +// and sink sides based on the operation type. +func (t *Tracker) sourceLabel() string { + if t.operation == "push" { + return "disk read" + } + return "network read" +} + +func (t *Tracker) sinkLabel() string { + if t.operation == "push" { + return "network write" + } + return "disk write" +} + // Summary outputs a throughput summary to both the log file (logrus) // and the terminal (stderr). Call this after all goroutines have // completed (after g.Wait()) — the happens-before from errgroup @@ -80,11 +96,17 @@ func (t *Tracker) Summary() { return } - sourceReadTime := time.Duration(sourceNanos) + sourceDuration := time.Duration(sourceNanos) + sinkNanos := transferNanos - sourceNanos + sinkDuration := time.Duration(max(sinkNanos, 0)) - var readFraction float64 - if transferNanos > 0 { - readFraction = float64(sourceNanos) / float64(transferNanos) + sourceThroughput := formatThroughput(totalBytes, sourceDuration) + sinkThroughput := formatThroughput(totalBytes, sinkDuration) + + // Identify the bottleneck by comparing cumulative durations. + bottleneck := t.sinkLabel() + if sourceNanos > sinkNanos { + bottleneck = t.sourceLabel() } // Log structured fields to log file. @@ -92,18 +114,25 @@ func (t *Tracker) Summary() { "operation": t.operation, "totalBytes": formatBytes(uint64(totalBytes)), "wallClock": wallClock.Round(time.Millisecond).String(), - "sourceReadTime": sourceReadTime.Round(time.Millisecond).String(), - "sourceReadThroughput": formatThroughput(totalBytes, sourceReadTime), "effectiveThroughput": formatThroughput(totalBytes, wallClock), - "readFraction": fmt.Sprintf("%.2f", readFraction), + t.sourceLabel(): sourceThroughput, + t.sinkLabel(): sinkThroughput, + "bottleneck": bottleneck, }).Info("io throughput summary") // Print concise summary to terminal. - fmt.Fprintf(os.Stderr, "IO summary: %s in %s, effective %s, source %s, read ratio %.2f\n", + srcArrow := "" + snkArrow := "" + if bottleneck == t.sourceLabel() { + srcArrow = " ← bottleneck" + } else { + snkArrow = " ← bottleneck" + } + fmt.Fprintf(os.Stderr, "IO summary: %s in %s, %s | %s: %s%s | %s: %s%s\n", formatBytes(uint64(totalBytes)), wallClock.Round(time.Millisecond), formatThroughput(totalBytes, wallClock), - formatThroughput(totalBytes, sourceReadTime), - readFraction, + t.sourceLabel(), sourceThroughput, srcArrow, + t.sinkLabel(), sinkThroughput, snkArrow, ) } diff --git a/pkg/iometrics/tracker_test.go b/pkg/iometrics/tracker_test.go index 1e43c75d..c72d57e7 100644 --- a/pkg/iometrics/tracker_test.go +++ b/pkg/iometrics/tracker_test.go @@ -210,7 +210,9 @@ func TestSummaryOutput(t *testing.T) { "io throughput summary", "operation=push", "totalBytes=", - "readFraction=", + "bottleneck=", + "disk read", + "network write", } { if !strings.Contains(output, expected) { t.Errorf("log output missing %q, got:\n%s", expected, output) @@ -218,36 +220,50 @@ func TestSummaryOutput(t *testing.T) { } } -func TestReadFractionAccuracy(t *testing.T) { - tracker := NewTracker("test") +func TestBottleneckDetection(t *testing.T) { + // Simulate source-bottleneck: source read takes 80ms, sink takes 20ms. + t.Run("source bottleneck", func(t *testing.T) { + tracker := NewTracker("pull") + sr := &slowReader{data: make([]byte, 64), delay: 80 * time.Millisecond} + wrapped := tracker.WrapReader(sr) + tracker.TrackTransfer(func() error { + _, err := io.ReadAll(wrapped) + if err != nil { + return err + } + time.Sleep(20 * time.Millisecond) + return nil + }) - // Simulate: source read takes 80ms, total transfer takes 100ms. - // readFraction should be ~0.8. - sr := &slowReader{data: make([]byte, 64), delay: 80 * time.Millisecond} - wrapped := tracker.WrapReader(sr) - tracker.TrackTransfer(func() error { - _, err := io.ReadAll(wrapped) - if err != nil { - return err + sourceNanos := tracker.sourceNanos.Load() + sinkNanos := tracker.transferNanos.Load() - sourceNanos + if sourceNanos <= sinkNanos { + t.Errorf("expected source > sink for source bottleneck: source=%d, sink=%d", + sourceNanos, sinkNanos) } - // Simulate 20ms of sink write time. - time.Sleep(20 * time.Millisecond) - return nil }) - sourceNanos := tracker.sourceNanos.Load() - transferNanos := tracker.transferNanos.Load() - - if transferNanos == 0 { - t.Fatal("transferNanos should be > 0") - } + // Simulate sink-bottleneck: source read takes 10ms, sink takes 80ms. + t.Run("sink bottleneck", func(t *testing.T) { + tracker := NewTracker("push") + sr := &slowReader{data: make([]byte, 64), delay: 10 * time.Millisecond} + wrapped := tracker.WrapReader(sr) + tracker.TrackTransfer(func() error { + _, err := io.ReadAll(wrapped) + if err != nil { + return err + } + time.Sleep(80 * time.Millisecond) + return nil + }) - readFraction := float64(sourceNanos) / float64(transferNanos) - // Expect readFraction to be roughly 0.8 (with tolerance for scheduling jitter). - if readFraction < 0.5 || readFraction > 0.95 { - t.Errorf("readFraction = %.2f, expected ~0.8 (sourceNanos=%d, transferNanos=%d)", - readFraction, sourceNanos, transferNanos) - } + sourceNanos := tracker.sourceNanos.Load() + sinkNanos := tracker.transferNanos.Load() - sourceNanos + if sinkNanos <= sourceNanos { + t.Errorf("expected sink > source for sink bottleneck: source=%d, sink=%d", + sourceNanos, sinkNanos) + } + }) } func TestFormatBytes(t *testing.T) { From 244c286d3449d80d61db7d10fc3f952ab96eb90b Mon Sep 17 00:00:00 2001 From: Zhao Chen Date: Tue, 7 Apr 2026 22:05:03 +0800 Subject: [PATCH 4/5] refactor: address code review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix cgroup v1/v2: detect CPU and memory limits independently so one missing limit doesn't prevent detecting the other - Remove cpu.Percent(0) from startup log — unreliable at process start - Remove unused isVirtualFS function (dead code after IO removal) - Replace duplicated formatBytes with humanize.IBytes from go-humanize Signed-off-by: Zhao Chen --- pkg/envinfo/cgroup_linux.go | 99 ++++++++++++++--------------------- pkg/envinfo/envinfo.go | 69 ++---------------------- pkg/envinfo/envinfo_test.go | 58 -------------------- pkg/iometrics/format.go | 22 -------- pkg/iometrics/tracker.go | 5 +- pkg/iometrics/tracker_test.go | 28 ++-------- 6 files changed, 49 insertions(+), 232 deletions(-) diff --git a/pkg/envinfo/cgroup_linux.go b/pkg/envinfo/cgroup_linux.go index a11aea4f..510f7515 100644 --- a/pkg/envinfo/cgroup_linux.go +++ b/pkg/envinfo/cgroup_linux.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" + "github.com/dustin/go-humanize" "github.com/sirupsen/logrus" ) @@ -49,32 +50,26 @@ func getCgroupLimits() *cgroupLimits { func tryV2(limits *cgroupLimits) bool { // cgroup v2 uses unified hierarchy at /sys/fs/cgroup/. - cpuMax, err := os.ReadFile("/sys/fs/cgroup/cpu.max") - if err != nil { - return false - } - - parts := strings.Fields(strings.TrimSpace(string(cpuMax))) - if len(parts) == 2 && parts[0] != "max" { - quota, err1 := strconv.ParseFloat(parts[0], 64) - period, err2 := strconv.ParseFloat(parts[1], 64) - if err1 == nil && err2 == nil && period > 0 { - limits.CPUQuota = quota / period - limits.InCgroup = true + // Detect CPU and memory independently — one may be set without the other. + if cpuMax, err := os.ReadFile("/sys/fs/cgroup/cpu.max"); err == nil { + parts := strings.Fields(strings.TrimSpace(string(cpuMax))) + if len(parts) == 2 && parts[0] != "max" { + quota, err1 := strconv.ParseFloat(parts[0], 64) + period, err2 := strconv.ParseFloat(parts[1], 64) + if err1 == nil && err2 == nil && period > 0 { + limits.CPUQuota = quota / period + limits.InCgroup = true + } } } - memMax, err := os.ReadFile("/sys/fs/cgroup/memory.max") - if err != nil { - return limits.InCgroup - } - - memStr := strings.TrimSpace(string(memMax)) - if memStr != "max" { - memLimit, err := strconv.ParseUint(memStr, 10, 64) - if err == nil { - limits.MemLimit = memLimit - limits.InCgroup = true + if memMax, err := os.ReadFile("/sys/fs/cgroup/memory.max"); err == nil { + memStr := strings.TrimSpace(string(memMax)) + if memStr != "max" { + if memLimit, err := strconv.ParseUint(memStr, 10, 64); err == nil { + limits.MemLimit = memLimit + limits.InCgroup = true + } } } @@ -82,46 +77,28 @@ func tryV2(limits *cgroupLimits) bool { } func tryV1(limits *cgroupLimits) { - // cgroup v1: CPU quota. - quotaBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") - if err != nil { - return - } - - quota, err := strconv.ParseFloat(strings.TrimSpace(string(quotaBytes)), 64) - if err != nil || quota <= 0 { - // -1 means no limit. - return - } - - periodBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us") - if err != nil { - return - } - - period, err := strconv.ParseFloat(strings.TrimSpace(string(periodBytes)), 64) - if err != nil || period <= 0 { - return + // cgroup v1: CPU quota. Detect independently from memory. + if quotaBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"); err == nil { + if quota, err := strconv.ParseFloat(strings.TrimSpace(string(quotaBytes)), 64); err == nil && quota > 0 { + if periodBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us"); err == nil { + if period, err := strconv.ParseFloat(strings.TrimSpace(string(periodBytes)), 64); err == nil && period > 0 { + limits.CPUQuota = quota / period + limits.InCgroup = true + } + } + } } - limits.CPUQuota = quota / period - limits.InCgroup = true - // cgroup v1: Memory limit. - memBytes, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes") - if err != nil { - return - } - - memLimit, err := strconv.ParseUint(strings.TrimSpace(string(memBytes)), 10, 64) - if err != nil { - return - } - - // Very large values (like 2^63) indicate no limit. - const noLimitThreshold = 1 << 62 - if memLimit < noLimitThreshold { - limits.MemLimit = memLimit + if memBytes, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); err == nil { + if memLimit, err := strconv.ParseUint(strings.TrimSpace(string(memBytes)), 10, 64); err == nil { + // Very large values (like 2^63) indicate no limit. + const noLimitThreshold = 1 << 62 + if memLimit < noLimitThreshold { + limits.MemLimit = memLimit + limits.InCgroup = true + } + } } } @@ -137,7 +114,7 @@ func logCgroupInfo() { fields["cpuQuota"] = strconv.FormatFloat(limits.CPUQuota, 'f', 2, 64) } if limits.MemLimit > 0 { - fields["memoryLimit"] = formatBytes(limits.MemLimit) + fields["memoryLimit"] = humanize.IBytes(limits.MemLimit) } if len(fields) > 0 { diff --git a/pkg/envinfo/envinfo.go b/pkg/envinfo/envinfo.go index 55eb6d1e..7f30a8fe 100644 --- a/pkg/envinfo/envinfo.go +++ b/pkg/envinfo/envinfo.go @@ -20,8 +20,8 @@ import ( "fmt" "path/filepath" "runtime" - "strings" + "github.com/dustin/go-humanize" "github.com/shirou/gopsutil/v4/cpu" "github.com/shirou/gopsutil/v4/disk" "github.com/shirou/gopsutil/v4/mem" @@ -66,8 +66,8 @@ func LogDiskInfo(name, path string) { "name": name, "path": absPath, "fstype": usage.Fstype, - "total": formatBytes(usage.Total), - "free": formatBytes(usage.Free), + "total": humanize.IBytes(usage.Total), + "free": humanize.IBytes(usage.Free), "usagePercent": fmt.Sprintf("%.1f%%", usage.UsedPercent), }).Info("disk info") } @@ -113,13 +113,6 @@ func logCPUInfo() { fields["model"] = infos[0].ModelName } - percents, err := cpu.Percent(0, false) - if err != nil { - logrus.WithError(err).Warn("failed to get CPU usage") - } else if len(percents) > 0 { - fields["usagePercent"] = fmt.Sprintf("%.1f%%", percents[0]) - } - logrus.WithFields(fields).Info("cpu info") } @@ -131,60 +124,8 @@ func logMemoryInfo() { } logrus.WithFields(logrus.Fields{ - "total": formatBytes(v.Total), - "available": formatBytes(v.Available), + "total": humanize.IBytes(v.Total), + "available": humanize.IBytes(v.Available), "usagePercent": fmt.Sprintf("%.1f%%", v.UsedPercent), }).Info("memory info") } - -// isVirtualFS checks whether a filesystem type or device path indicates -// a non-block-device filesystem where IO counters are not available. -// This includes FUSE, network filesystems, RAM-based filesystems, and -// container overlay filesystems. -func isVirtualFS(fstype, device string) bool { - fstypeLower := strings.ToLower(fstype) - - virtualFSTypes := []string{ - "fuse", "nfs", "cifs", "smb", "smbfs", - "tmpfs", "ramfs", "devtmpfs", - "overlay", "aufs", - "sshfs", "s3fs", "gcsfuse", "ossfs", - "9p", "virtiofs", - } - - for _, vfs := range virtualFSTypes { - if fstypeLower == vfs || strings.HasPrefix(fstypeLower, vfs+".") { - return true - } - } - - // Devices not under /dev/ are generally not block devices. - // e.g., "s3fs", "sshfs#user@host:", "server:/export" - if device != "" && !strings.HasPrefix(device, "/dev/") { - return true - } - - return false -} - -func formatBytes(b uint64) string { - const ( - KB = 1024 - MB = KB * 1024 - GB = MB * 1024 - TB = GB * 1024 - ) - - switch { - case b >= TB: - return fmt.Sprintf("%.2f TB", float64(b)/float64(TB)) - case b >= GB: - return fmt.Sprintf("%.2f GB", float64(b)/float64(GB)) - case b >= MB: - return fmt.Sprintf("%.2f MB", float64(b)/float64(MB)) - case b >= KB: - return fmt.Sprintf("%.2f KB", float64(b)/float64(KB)) - default: - return fmt.Sprintf("%d B", b) - } -} diff --git a/pkg/envinfo/envinfo_test.go b/pkg/envinfo/envinfo_test.go index a37020f3..c00e79cb 100644 --- a/pkg/envinfo/envinfo_test.go +++ b/pkg/envinfo/envinfo_test.go @@ -79,61 +79,3 @@ func TestLogDiskInfoEmptyPath(t *testing.T) { t.Errorf("expected no output for empty path, got:\n%s", buf.String()) } } - -func TestIsVirtualFS(t *testing.T) { - tests := []struct { - name string - fstype string - device string - expected bool - }{ - {"ext4 block device", "ext4", "/dev/sda1", false}, - {"xfs block device", "xfs", "/dev/nvme0n1p1", false}, - {"apfs block device", "apfs", "/dev/disk3s3s1", false}, - {"fuse", "fuse", "s3fs", true}, - {"fuse.s3fs", "fuse.s3fs", "s3fs", true}, - {"fuse.sshfs", "fuse.sshfs", "sshfs#user@host:", true}, - {"nfs", "nfs", "server:/export", true}, - {"nfs4", "nfs", "server:/export", true}, - {"cifs", "cifs", "//server/share", true}, - {"tmpfs", "tmpfs", "tmpfs", true}, - {"overlay", "overlay", "overlay", true}, - {"virtiofs", "virtiofs", "myfs", true}, - {"9p", "9p", "hostshare", true}, - {"unknown non-dev device", "ext4", "some-random-path", true}, - {"empty", "", "", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := isVirtualFS(tt.fstype, tt.device) - if result != tt.expected { - t.Errorf("isVirtualFS(%q, %q) = %v, want %v", tt.fstype, tt.device, result, tt.expected) - } - }) - } -} - -func TestFormatBytes(t *testing.T) { - tests := []struct { - input uint64 - expected string - }{ - {0, "0 B"}, - {512, "512 B"}, - {1024, "1.00 KB"}, - {1536, "1.50 KB"}, - {1048576, "1.00 MB"}, - {1073741824, "1.00 GB"}, - {1099511627776, "1.00 TB"}, - } - - for _, tt := range tests { - t.Run(tt.expected, func(t *testing.T) { - result := formatBytes(tt.input) - if result != tt.expected { - t.Errorf("formatBytes(%d) = %q, want %q", tt.input, result, tt.expected) - } - }) - } -} diff --git a/pkg/iometrics/format.go b/pkg/iometrics/format.go index 39b8746f..4cb2fb57 100644 --- a/pkg/iometrics/format.go +++ b/pkg/iometrics/format.go @@ -21,28 +21,6 @@ import ( "time" ) -func formatBytes(b uint64) string { - const ( - KB = 1024 - MB = KB * 1024 - GB = MB * 1024 - TB = GB * 1024 - ) - - switch { - case b >= TB: - return fmt.Sprintf("%.2f TB", float64(b)/float64(TB)) - case b >= GB: - return fmt.Sprintf("%.2f GB", float64(b)/float64(GB)) - case b >= MB: - return fmt.Sprintf("%.2f MB", float64(b)/float64(MB)) - case b >= KB: - return fmt.Sprintf("%.2f KB", float64(b)/float64(KB)) - default: - return fmt.Sprintf("%d B", b) - } -} - func formatThroughput(bytes int64, d time.Duration) string { if d == 0 || bytes == 0 { return "N/A" diff --git a/pkg/iometrics/tracker.go b/pkg/iometrics/tracker.go index c49de6c5..4aaa2866 100644 --- a/pkg/iometrics/tracker.go +++ b/pkg/iometrics/tracker.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/dustin/go-humanize" "github.com/sirupsen/logrus" ) @@ -112,7 +113,7 @@ func (t *Tracker) Summary() { // Log structured fields to log file. logrus.WithFields(logrus.Fields{ "operation": t.operation, - "totalBytes": formatBytes(uint64(totalBytes)), + "totalBytes": humanize.IBytes(uint64(totalBytes)), "wallClock": wallClock.Round(time.Millisecond).String(), "effectiveThroughput": formatThroughput(totalBytes, wallClock), t.sourceLabel(): sourceThroughput, @@ -129,7 +130,7 @@ func (t *Tracker) Summary() { snkArrow = " ← bottleneck" } fmt.Fprintf(os.Stderr, "IO summary: %s in %s, %s | %s: %s%s | %s: %s%s\n", - formatBytes(uint64(totalBytes)), + humanize.IBytes(uint64(totalBytes)), wallClock.Round(time.Millisecond), formatThroughput(totalBytes, wallClock), t.sourceLabel(), sourceThroughput, srcArrow, diff --git a/pkg/iometrics/tracker_test.go b/pkg/iometrics/tracker_test.go index c72d57e7..5789b67f 100644 --- a/pkg/iometrics/tracker_test.go +++ b/pkg/iometrics/tracker_test.go @@ -139,9 +139,9 @@ func TestTrackTransferPropagatesError(t *testing.T) { t.Errorf("err = %v, want %v", err, expected) } - // Duration should still be recorded even on error. - if tracker.transferNanos.Load() <= 0 { - t.Error("transferNanos should be > 0 even on error") + // transferNanos should be non-negative (duration is recorded even on error). + if tracker.transferNanos.Load() < 0 { + t.Errorf("transferNanos should be >= 0, got %d", tracker.transferNanos.Load()) } } @@ -266,28 +266,6 @@ func TestBottleneckDetection(t *testing.T) { }) } -func TestFormatBytes(t *testing.T) { - tests := []struct { - input uint64 - expected string - }{ - {0, "0 B"}, - {512, "512 B"}, - {1024, "1.00 KB"}, - {1048576, "1.00 MB"}, - {1073741824, "1.00 GB"}, - {1099511627776, "1.00 TB"}, - } - - for _, tt := range tests { - t.Run(tt.expected, func(t *testing.T) { - if got := formatBytes(tt.input); got != tt.expected { - t.Errorf("formatBytes(%d) = %q, want %q", tt.input, got, tt.expected) - } - }) - } -} - func TestFormatThroughput(t *testing.T) { // 1MB in 1 second = 1.00 MB/s result := formatThroughput(1024*1024, time.Second) From 92e52eb73f1a33ef3cb9003c55900d4c73a97f87 Mon Sep 17 00:00:00 2001 From: Zhao Chen Date: Tue, 7 Apr 2026 22:21:39 +0800 Subject: [PATCH 5/5] fix: resolve process cgroup path and track manifest bytes - Read /proc/self/cgroup to find the process's actual cgroup path before reading limit files, so containers in k8s/Docker sub-cgroups get the correct CPU/memory limits instead of root cgroup values - Wrap manifest bytes.NewReader with tracker.WrapReader so push summaries include manifest bytes in totalBytes Signed-off-by: Zhao Chen --- pkg/backend/push.go | 2 +- pkg/envinfo/cgroup_linux.go | 79 ++++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/pkg/backend/push.go b/pkg/backend/push.go index 869e3065..ef6be956 100644 --- a/pkg/backend/push.go +++ b/pkg/backend/push.go @@ -167,7 +167,7 @@ func pushIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri // push the content to the destination, and wrap the content reader for progress bar, // manifest should use dst.Manifests().Push, others should use dst.Blobs().Push. if desc.MediaType == ocispec.MediaTypeImageManifest { - reader := pb.Add(prompt, desc.Digest.String(), desc.Size, bytes.NewReader(desc.Data)) + reader := pb.Add(prompt, desc.Digest.String(), desc.Size, tracker.WrapReader(bytes.NewReader(desc.Data))) if err := dst.Manifests().Push(ctx, desc, reader); err != nil { err = fmt.Errorf("failed to push manifest %s, err: %w", desc.Digest.String(), err) pb.Abort(desc.Digest.String(), err) diff --git a/pkg/envinfo/cgroup_linux.go b/pkg/envinfo/cgroup_linux.go index 510f7515..da63ae10 100644 --- a/pkg/envinfo/cgroup_linux.go +++ b/pkg/envinfo/cgroup_linux.go @@ -20,6 +20,7 @@ package envinfo import ( "os" + "path/filepath" "strconv" "strings" @@ -29,9 +30,9 @@ import ( // cgroupLimits holds the CPU and memory limits from cgroup. type cgroupLimits struct { - CPUQuota float64 // effective CPU cores (quota/period), 0 if unlimited - MemLimit uint64 // memory limit in bytes, 0 if unlimited - InCgroup bool // true if running inside a cgroup with limits + CPUQuota float64 // effective CPU cores (quota/period), 0 if unlimited + MemLimit uint64 // memory limit in bytes, 0 if unlimited + InCgroup bool // true if running inside a cgroup with limits } // getCgroupLimits reads cgroup v2 then v1 CPU and memory limits. @@ -48,10 +49,40 @@ func getCgroupLimits() *cgroupLimits { return limits } +// resolveCgroupV2Path returns the filesystem path to the current process's +// cgroup v2 directory by reading /proc/self/cgroup. In cgroup v2 (unified +// hierarchy), the file contains a single line like "0::/kubepods/pod-xxx". +// Returns empty string if the cgroup path cannot be determined. +func resolveCgroupV2Path() string { + data, err := os.ReadFile("/proc/self/cgroup") + if err != nil { + return "" + } + + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + // cgroup v2 line format: "0::" + parts := strings.SplitN(line, ":", 3) + if len(parts) == 3 && parts[0] == "0" && parts[1] == "" { + cgPath := parts[2] + if cgPath == "" || cgPath == "/" { + // Root cgroup — fall back to /sys/fs/cgroup directly. + return "/sys/fs/cgroup" + } + return filepath.Join("/sys/fs/cgroup", cgPath) + } + } + + return "" +} + func tryV2(limits *cgroupLimits) bool { - // cgroup v2 uses unified hierarchy at /sys/fs/cgroup/. + cgDir := resolveCgroupV2Path() + if cgDir == "" { + return false + } + // Detect CPU and memory independently — one may be set without the other. - if cpuMax, err := os.ReadFile("/sys/fs/cgroup/cpu.max"); err == nil { + if cpuMax, err := os.ReadFile(filepath.Join(cgDir, "cpu.max")); err == nil { parts := strings.Fields(strings.TrimSpace(string(cpuMax))) if len(parts) == 2 && parts[0] != "max" { quota, err1 := strconv.ParseFloat(parts[0], 64) @@ -63,7 +94,7 @@ func tryV2(limits *cgroupLimits) bool { } } - if memMax, err := os.ReadFile("/sys/fs/cgroup/memory.max"); err == nil { + if memMax, err := os.ReadFile(filepath.Join(cgDir, "memory.max")); err == nil { memStr := strings.TrimSpace(string(memMax)) if memStr != "max" { if memLimit, err := strconv.ParseUint(memStr, 10, 64); err == nil { @@ -76,11 +107,40 @@ func tryV2(limits *cgroupLimits) bool { return limits.InCgroup } +// resolveCgroupV1Path returns the filesystem path for a given cgroup v1 +// controller (e.g., "cpu", "memory") by reading /proc/self/cgroup. +// Each line has format: "hierarchy-ID:controller-list:cgroup-path". +func resolveCgroupV1Path(controller string) string { + data, err := os.ReadFile("/proc/self/cgroup") + if err != nil { + return "" + } + + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + parts := strings.SplitN(line, ":", 3) + if len(parts) != 3 { + continue + } + + controllers := strings.Split(parts[1], ",") + for _, c := range controllers { + if c == controller { + cgPath := parts[2] + return filepath.Join("/sys/fs/cgroup", controller, cgPath) + } + } + } + + // Fallback to the base controller path. + return filepath.Join("/sys/fs/cgroup", controller) +} + func tryV1(limits *cgroupLimits) { // cgroup v1: CPU quota. Detect independently from memory. - if quotaBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"); err == nil { + cpuDir := resolveCgroupV1Path("cpu") + if quotaBytes, err := os.ReadFile(filepath.Join(cpuDir, "cpu.cfs_quota_us")); err == nil { if quota, err := strconv.ParseFloat(strings.TrimSpace(string(quotaBytes)), 64); err == nil && quota > 0 { - if periodBytes, err := os.ReadFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us"); err == nil { + if periodBytes, err := os.ReadFile(filepath.Join(cpuDir, "cpu.cfs_period_us")); err == nil { if period, err := strconv.ParseFloat(strings.TrimSpace(string(periodBytes)), 64); err == nil && period > 0 { limits.CPUQuota = quota / period limits.InCgroup = true @@ -90,7 +150,8 @@ func tryV1(limits *cgroupLimits) { } // cgroup v1: Memory limit. - if memBytes, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); err == nil { + memDir := resolveCgroupV1Path("memory") + if memBytes, err := os.ReadFile(filepath.Join(memDir, "memory.limit_in_bytes")); err == nil { if memLimit, err := strconv.ParseUint(strings.TrimSpace(string(memBytes)), 10, 64); err == nil { // Very large values (like 2^63) indicate no limit. const noLimitThreshold = 1 << 62