diff --git a/internal/kube/kube.go b/internal/kube/kube.go index 0b2b9930d..1fc7fe990 100644 --- a/internal/kube/kube.go +++ b/internal/kube/kube.go @@ -15,6 +15,11 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +const ( + clientQPS = 50 + clientBurst = 100 +) + // K8sEnvRequest represents the PUT request body to be sent to kosli from k8s type K8sEnvRequest struct { Artifacts []*PodData `json:"artifacts"` @@ -86,8 +91,8 @@ func NewK8sClientSet(kubeconfigPath string) (*K8SConnection, error) { // set the QPS and burst for the config to control the rate of requests to the API server // defaults are 5 QPS and 10 burst which is too low for large clusters - config.QPS = 50 - config.Burst = 100 + config.QPS = clientQPS + config.Burst = clientBurst clientset, err := kubernetes.NewForConfig(config) if err != nil { @@ -127,10 +132,15 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors + // semaphore to limit the number of concurrent requests to the API server + sem := make(chan struct{}, (clientBurst/2)-1) // max concurrent requests: slightly lower than client's QPS to avoid throttling + for _, ns := range filteredNamespaces { wg.Add(1) go func(ns string) { defer wg.Done() + sem <- struct{}{} // acquire the semaphore + defer func() { <-sem }() // release the semaphore // Check if any error occurred in any other gorouties: select { case <-ctx.Done(): diff --git a/internal/kube/kube_test.go b/internal/kube/kube_test.go index 2bfa24520..ec5140d54 100644 --- a/internal/kube/kube_test.go +++ b/internal/kube/kube_test.go @@ -192,6 +192,19 @@ func (suite *KubeTestSuite) TestGetPodsData() { } } +func (suite *KubeTestSuite) TestGetPodsDataWithThrottling() { + // create a large number of pods + for i := 0; i < 200; i++ { + suite.createNamespace(fmt.Sprintf("ns-%d", i)) + } + // Get pods data with timeout check + startTime := time.Now() + _, err := suite.clientset.GetPodsData(&filters.ResourceFilterOptions{IncludeNamesRegex: []string{"^ns-.*"}}, logger.NewStandardLogger()) + duration := time.Since(startTime) + require.NoErrorf(suite.Suite.T(), err, "error getting pods data for test GetPodsDataWithThrottling") + require.LessOrEqual(suite.Suite.T(), duration, 5*time.Second, "GetPodsData should complete within 5 seconds, but took %v", duration) +} + func (suite *KubeTestSuite) TestFilterNamespaces() { type args struct { nsList []corev1.Namespace