From 21b33e1ddf7fa3882d14a6340a84613726b733cd Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Mon, 6 Sep 2021 16:49:41 +0900 Subject: [PATCH 1/5] update: use SScan instead of SMembers Command --- pkg/backend/redis/task.go | 41 ++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index dec1c6d..295598a 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,11 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func } func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) { - taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []*task.Task{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID) if err != nil { return nil, err } @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) b.deadletterQueueKey(queueUID), b.pendingTaskQueueKey(queueUID), } - taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []string{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID) if err != nil { return []string{}, err } @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) } return keysToDelete, nil } + +func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) { + var chunkSize = int64(b.ChunkSizeInGet) + var cursor uint64 + var taskUIDs []string + for { + keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result() + if err == redis.Nil { + return []string{}, nil + } + if err != nil { + return []string{}, err + } + taskUIDs = append(taskUIDs, keys...) + cursor = nextCursor + if cursor == 0 { + break + } + } + return taskUIDs, nil +} From 2b99cf7d427f0a8feab8066b7d93c286163f5378 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Tue, 7 Sep 2021 17:12:40 +0900 Subject: [PATCH 2/5] update: use UNLINK and chunking keys to delete task keys --- cmd/root.go | 9 +++++---- pkg/backend/config/config.go | 12 +++++++----- pkg/backend/redis/queue.go | 12 ++++++++++-- pkg/backend/redis/redis_test.go | 9 +++++---- pkg/backend/redis/task.go | 10 +++++----- pkg/worker/worker_test.go | 7 ++++--- 6 files changed, 36 insertions(+), 23 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 82d3e88..72b82a6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,10 +297,11 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 1d148ff..66e18ba 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,10 +30,11 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int } // TODO: support UniversalOptions @@ -52,7 +53,8 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..f9873b3 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // DEL {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) + chunkSize := b.ChunkSizeInGet + numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { - pipe.Del(keysToDelete...) + for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { + end := begin + chunkSize + if end > numOfKeysToDelete { + end = numOfKeysToDelete + } + pipe.Unlink(keysToDelete[begin:end]...) + } pipe.HDel(b.allQueuesKey(), queue.Spec.Name) return nil }) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 36ce2c5..d097d55 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() { ibackend, err := NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - KeyPrefix: "test", - Client: client, - Backoff: backoffConfig, - ChunkSizeInGet: 1000, + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index 295598a..e90a6a2 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,11 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index e3b67fc..5f32855 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() { bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - Client: client, - Backoff: backendConfig, - ChunkSizeInGet: 1000, + Client: client, + Backoff: backendConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) From f04fb8c49fe959479c796a52d0e1b65ef7af2207 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Wed, 8 Sep 2021 17:19:29 +0900 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Shingo Omura --- pkg/backend/config/config.go | 2 +- pkg/backend/redis/queue.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 66e18ba..61906e4 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -54,7 +54,7 @@ type RedisClientConfig struct { IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` - ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index f9873b3..9b085d9 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // UNLINK {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys (chunked) // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,7 +240,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) - chunkSize := b.ChunkSizeInGet + chunkSize := b.ChunkSizeInDelete numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { From f2b8503401d9719b8cf674d5b4c53c79def46ce6 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Wed, 8 Sep 2021 17:39:41 +0900 Subject: [PATCH 4/5] add test to delete large queue --- pkg/backend/redis/redis_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index d097d55..1ca3485 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -278,6 +278,26 @@ var _ = Describe("Backend", func() { Expect(err).To(Equal(iface.TaskQueueNotFound)) }) }) + When("the large queue exists", func() { + It("can delete the queue", func() { + queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + numOfTasks := 12345 + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) + + queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(queuesHash)).To(Equal(0)) + keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + }) }) }) From 34edb2549b812128b2a40252593ea4b83d4f95b7 Mon Sep 17 00:00:00 2001 From: ota42y Date: Fri, 18 Feb 2022 17:42:32 +0900 Subject: [PATCH 5/5] add without transaction mode --- cmd/root.go | 11 +-- pkg/backend/config/config.go | 16 +++-- pkg/backend/iface/backend.go | 1 + pkg/backend/redis/queue.go | 121 ++++++++++++++++++++++++++++---- pkg/backend/redis/redis_test.go | 72 ++++++++++++++++--- pkg/backend/redis/worker.go | 5 +- 6 files changed, 191 insertions(+), 35 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 72b82a6..d594b54 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,11 +297,12 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, - ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, + WithoutTransaction: cmdOpts.Redis.WithoutTransaction, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 61906e4..74b9bb0 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,11 +30,12 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int - ChunkSizeInDelete int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int + WithoutTransaction bool } // TODO: support UniversalOptions @@ -53,8 +54,9 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` - ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` + WithoutTransaction bool `json:"withoutTransaction" yaml:"withoutTransaction" default:"false"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/iface/backend.go b/pkg/backend/iface/backend.go index 69b9a6f..aacabb0 100644 --- a/pkg/backend/iface/backend.go +++ b/pkg/backend/iface/backend.go @@ -32,6 +32,7 @@ var ( TaskQueueNotFound = errors.New("Queue not found") TaskQueueExisted = errors.New("Queue already exists") TaskQueueEmptyError = errors.New("Queue is empty") + TaskQueueIsTooLarge = errors.New("Queue have many tasks so we need --without_transaction option") TaskSuspendedError = errors.New("Queue is suspended") WorkerNotFound = errors.New("Worker not found") WorkerExitedError = errors.New("Worker already exists") diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 9b085d9..90ec6a4 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -204,7 +204,7 @@ func (b *Backend) UpdateQueue(ctx context.Context, queueSpec taskqueue.TaskQueue return queue, nil } -func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { +func (b *Backend) deleteQueueWithTransaction(ctx context.Context, queueName string) error { if err := taskqueue.ValidateQueueName(queueName); err != nil { return err } @@ -212,13 +212,23 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { if err != nil { return err } + + numOfTasks, err := b.Client.SCard(b.tasksKey(queue.UID.String())).Result() + if err != nil { + return err + } + + if int64(b.ChunkSizeInDelete) <= numOfTasks { + return iface.TaskQueueIsTooLarge + } + // WATCH {all_queues_key} {queue_key} // .. worker_keys = collect worker keys // WATCH worker_keys // .. task_keys = collect task keys // WATCh task_keys // MULTI - // UNLINK {queue_key} worker_keys task_keys (chunked) + // DEL {queue_key} worker_keys task_keys // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,16 +250,8 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) - chunkSize := b.ChunkSizeInDelete - numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { - for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { - end := begin + chunkSize - if end > numOfKeysToDelete { - end = numOfKeysToDelete - } - pipe.Unlink(keysToDelete[begin:end]...) - } + pipe.Del(keysToDelete...) pipe.HDel(b.allQueuesKey(), queue.Spec.Name) return nil }) @@ -260,13 +262,108 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { b.Logger.With(). Str("queueName", queueName). Str("queueUID", queue.UID.String()). - Str("operation", "DeleteQueue"). + Str("operation", "DeleteQueueWithTransaction"). Logger(), txf, b.allQueuesKey(), b.queueKey(queue.UID.String()), ) } +func (b *Backend) deleteQueueWithoutTransaction(ctx context.Context, queueName string) error { + if err := taskqueue.ValidateQueueName(queueName); err != nil { + return err + } + queue, err := b.ensureQueueExistsByName(b.Client, queueName) + if err != nil { + return err + } + // + // .. worker_keys = collect worker keys + // .. task_keys = collect task keys + // Use chunk to divide and loop until UNLINK all keys + // ---loop start--- + // WATCH {all_queues_key} {queue_key} + // UNLINK chulk_keys + // ---loop end--- + // WATCH {all_queues_key} {queue_key} + // MULTI + // DEL {queue_key} worker_keys task_keys + // HDEL {all_queues_key} {queueName} + // EXEC + keysToDelete := []string{} + + workerKeysToDelete, err := b.allWorkersKeysForDeleteQueue(b.Client, queue.UID.String()) + if err != nil { + return err + } + keysToDelete = append(keysToDelete, workerKeysToDelete...) + + taskKeysToDelete, err := b.allTasksKeysForDeleteQueue(b.Client, queue.UID.String()) + if err != nil { + return err + } + keysToDelete = append(keysToDelete, taskKeysToDelete...) + + // chunk delete + for cursor := 0; cursor < len(keysToDelete); cursor += b.ChunkSizeInDelete { + time.Sleep(100 * time.Millisecond) + end := cursor + b.ChunkSizeInDelete + if len(keysToDelete) <= end { + end = len(keysToDelete) + } + + deleteKeys := keysToDelete[cursor:end] + if len(deleteKeys) == 0 { + continue + } + err = b.runTxWithBackOff( + ctx, + b.Logger.With(). + Str("queueName", queueName). + Str("queueUID", queue.UID.String()). + Str("operation", "DeleteQueueWithTransaction"). + Logger(), + func(tx *redis.Tx) error { + _, err = tx.Unlink(deleteKeys...).Result() + return err + }, + b.allQueuesKey(), b.queueKey(queue.UID.String()), + ) + if err != nil { + return err + } + } + + err = b.runTxWithBackOff( + ctx, + b.Logger.With(). + Str("queueName", queueName). + Str("queueUID", queue.UID.String()). + Str("operation", "DeleteQueueWithTransaction"). + Logger(), + func(tx *redis.Tx) error { + _, err = tx.Unlink(b.queueKey(queue.UID.String())).Result() + if err != nil { + return err + } + + _, err = tx.HDel(b.allQueuesKey(), queue.Spec.Name).Result() + return err + }, + b.allQueuesKey(), b.queueKey(queue.UID.String()), + ) + + return err +} + +func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { + if b.WithoutTransaction { + return b.deleteQueueWithoutTransaction(ctx, queueName) + } else { + return b.deleteQueueWithTransaction(ctx, queueName) + } +} + func (b *Backend) ensureQueueExistsByName(rds redis.Cmdable, queueName string) (*taskqueue.TaskQueue, error) { uid, err := b.lookupQueueUID(rds, queueName) if err != nil { diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 1ca3485..c8e864a 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -280,22 +280,16 @@ var _ = Describe("Backend", func() { }) When("the large queue exists", func() { It("can delete the queue", func() { - queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + testutil.MustCreateQueue(backend, SampleQueueSpec) // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize - numOfTasks := 12345 + numOfTasks := 1000 // numOfTasks < ChunkSizeInDelete for i := 0; i < numOfTasks; i++ { _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) Expect(err).NotTo(HaveOccurred()) } - Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) - - queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(queuesHash)).To(Equal(0)) - keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(keys)).To(Equal(0)) + err := backend.DeleteQueue(context.Background(), SampleQueueSpec.Name) + Expect(err).To(Equal(iface.TaskQueueIsTooLarge)) }) }) }) @@ -1053,3 +1047,61 @@ var _ = Describe("Backend", func() { }) }) }) + +var _ = Describe("BackendWihoutTransaction", func() { + var backend *Backend + BeforeEach(func() { + var err error + backoffConfig := backendconfig.DefaultBackoffConfig() + backoffConfig.MaxRetry = 0 + ibackend, err := NewBackend(logger, backendconfig.Config{ + BackendType: "redis", + Redis: &backendconfig.RedisConfig{ + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, + WithoutTransaction: true, + }, + }) + Expect(err).NotTo(HaveOccurred()) + backend, _ = ibackend.(*Backend) + }) + + AfterEach(func() { + keys, err := client.Keys("*").Result() + Expect(err).NotTo(HaveOccurred()) + if len(keys) > 0 { + num, err := client.Del(keys...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(num).To(Equal(int64(len(keys)))) + } + }) + + Context("Queue Operation", func() { + Context("DeleteQueue", func() { + When("the large queue exists", func() { + It("can delete the queue", func() { + queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + // numOfTaksk <= chunSize + numOfTasks := 1000 + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) + + queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(queuesHash)).To(Equal(0)) + keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + }) + }) + }) +}) diff --git a/pkg/backend/redis/worker.go b/pkg/backend/redis/worker.go index 79afea5..c205cae 100644 --- a/pkg/backend/redis/worker.go +++ b/pkg/backend/redis/worker.go @@ -483,7 +483,7 @@ func (b *Backend) ensureWorkerExistsByUID(rds redis.Cmdable, queue *taskqueue.Ta } func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID string) ([]string, error) { - keysToDelete := []string{b.workersKey(queueUID)} + keysToDelete := []string{} workerUIDs, err := rds.SMembers(b.workersKey(queueUID)).Result() if err == redis.Nil { return []string{}, nil @@ -497,5 +497,8 @@ func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID strin b.workerPendingTaskQueueKey(queueUID, workerUID), b.workerTasksKey(queueUID, workerUID)) } + + // If you are not using transactions, you need to delete the wokers key at the end. + keysToDelete = append(keysToDelete, b.workersKey(queueUID)) return keysToDelete, nil }