Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

const (
clientQPS = 50
clientBurst = 100
)

// K8sEnvRequest represents the PUT request body to be sent to kosli from k8s
type K8sEnvRequest struct {
Artifacts []*PodData `json:"artifacts"`
Expand Down Expand Up @@ -86,8 +91,8 @@ func NewK8sClientSet(kubeconfigPath string) (*K8SConnection, error) {

// set the QPS and burst for the config to control the rate of requests to the API server
// defaults are 5 QPS and 10 burst which is too low for large clusters
config.QPS = 50
config.Burst = 100
config.QPS = clientQPS
config.Burst = clientBurst

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
Expand Down Expand Up @@ -127,10 +132,15 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// semaphore to limit the number of concurrent requests to the API server
sem := make(chan struct{}, (clientBurst/2)-1) // max concurrent requests: slightly lower than client's QPS to avoid throttling

for _, ns := range filteredNamespaces {
wg.Add(1)
go func(ns string) {
defer wg.Done()
sem <- struct{}{} // acquire the semaphore
defer func() { <-sem }() // release the semaphore
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
Expand Down
13 changes: 13 additions & 0 deletions internal/kube/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ func (suite *KubeTestSuite) TestGetPodsData() {
}
}

func (suite *KubeTestSuite) TestGetPodsDataWithThrottling() {
// create a large number of pods
for i := 0; i < 200; i++ {
suite.createNamespace(fmt.Sprintf("ns-%d", i))
}
// Get pods data with timeout check
startTime := time.Now()
_, err := suite.clientset.GetPodsData(&filters.ResourceFilterOptions{IncludeNamesRegex: []string{"^ns-.*"}}, logger.NewStandardLogger())
duration := time.Since(startTime)
require.NoErrorf(suite.Suite.T(), err, "error getting pods data for test GetPodsDataWithThrottling")
require.LessOrEqual(suite.Suite.T(), duration, 5*time.Second, "GetPodsData should complete within 5 seconds, but took %v", duration)
}

func (suite *KubeTestSuite) TestFilterNamespaces() {
type args struct {
nsList []corev1.Namespace
Expand Down