@@ -393,32 +393,106 @@ def task_signature(self, a):
393393
394394@pytest .mark .usefixtures ("_bind_settings" )
395395def test_task_map (celery_session_worker ):
396- """Tasks executed in a map or starmap are not executed as tasks"""
396+ """Tasks executed via map canvas primitive should be tracked.
397+
398+ Note: The individual function calls within map are not separate Celery tasks,
399+ so we track the map operation itself with the inner task's name.
400+ """
397401
398402 @celery .shared_task (bind = True , base = Task )
399- def task_map (self , a ):
400- assert self .taskbadger_task is None
401- assert Badger .current .session ().client is None
403+ def task_map_fn (self , a ):
402404 return a * 2
403405
404406 celery_session_worker .reload ()
405407
406- task_map = task_map .map (list (range (5 )))
408+ map_canvas = task_map_fn .map (list (range (5 )))
407409
408410 with (
409411 mock .patch ("taskbadger.celery.create_task_safe" ) as create ,
410412 mock .patch ("taskbadger.celery.update_task_safe" ) as update ,
411- mock .patch ("taskbadger.celery.get_task" ) as get_task ,
413+ mock .patch ("taskbadger.celery.get_task" ),
412414 ):
413- result = task_map .delay ()
415+ tb_task = task_for_test ()
416+ create .return_value = tb_task
417+ result = map_canvas .delay ()
414418 assert result .get (timeout = 10 , propagate = True ) == [0 , 2 , 4 , 6 , 8 ]
415419
416- assert create .call_count == 0
417- assert get_task .call_count == 0
418- assert update .call_count == 0
420+ # Map operation should create one TaskBadger task
421+ assert create .call_count == 1
422+ # Verify the task name includes inner task name and canvas type/count suffix
423+ call_args = create .call_args
424+ task_name = call_args [0 ][0 ]
425+ assert "task_map_fn" in task_name
426+ assert task_name .endswith ("(map 5)" )
427+ assert update .call_count == 2 # PROCESSING and SUCCESS
419428 assert Badger .current .session ().client is None
420429
421430
431+ @pytest .mark .usefixtures ("_bind_settings" )
432+ def test_task_starmap (celery_session_worker ):
433+ """Tasks executed via starmap canvas primitive should be tracked."""
434+
435+ @celery .shared_task (bind = True , base = Task )
436+ def task_starmap_fn (self , a , b ):
437+ return a + b
438+
439+ celery_session_worker .reload ()
440+
441+ starmap_canvas = task_starmap_fn .starmap ([(1 , 2 ), (3 , 4 ), (5 , 6 )])
442+
443+ with (
444+ mock .patch ("taskbadger.celery.create_task_safe" ) as create ,
445+ mock .patch ("taskbadger.celery.update_task_safe" ) as update ,
446+ mock .patch ("taskbadger.celery.get_task" ),
447+ ):
448+ tb_task = task_for_test ()
449+ create .return_value = tb_task
450+ result = starmap_canvas .delay ()
451+ assert result .get (timeout = 10 , propagate = True ) == [3 , 7 , 11 ]
452+
453+ # Starmap operation should create one TaskBadger task
454+ assert create .call_count == 1
455+ # Verify the task name includes inner task name and canvas type/count suffix
456+ call_args = create .call_args
457+ task_name = call_args [0 ][0 ]
458+ assert "task_starmap_fn" in task_name
459+ assert task_name .endswith ("(starmap 3)" )
460+ assert update .call_count == 2 # PROCESSING and SUCCESS
461+
462+
463+ @pytest .mark .usefixtures ("_bind_settings" )
464+ def test_task_chunks (celery_session_worker ):
465+ """Tasks executed via chunks canvas primitive should be tracked."""
466+
467+ @celery .shared_task (bind = True , base = Task )
468+ def task_chunks_fn (self , a ):
469+ return a * 2
470+
471+ celery_session_worker .reload ()
472+
473+ # chunks creates multiple starmap tasks
474+ chunks_canvas = task_chunks_fn .chunks ([(x ,) for x in range (6 )], 2 ).group ()
475+
476+ with (
477+ mock .patch ("taskbadger.celery.create_task_safe" ) as create ,
478+ mock .patch ("taskbadger.celery.update_task_safe" ) as update ,
479+ mock .patch ("taskbadger.celery.get_task" ),
480+ ):
481+ tb_task = task_for_test ()
482+ create .return_value = tb_task
483+ result = chunks_canvas .delay ()
484+ assert result .get (timeout = 10 , propagate = True ) == [[0 , 2 ], [4 , 6 ], [8 , 10 ]]
485+
486+ # Each chunk should create a TaskBadger task (3 chunks of 2)
487+ assert create .call_count == 3
488+ # Verify the task names include inner task name and canvas type/count suffix
489+ for call in create .call_args_list :
490+ task_name = call [0 ][0 ]
491+ assert "task_chunks_fn" in task_name
492+ assert task_name .endswith ("(starmap 2)" ) # chunks uses starmap internally
493+ assert update .call_count == 6 # 3 tasks * 2 updates each
494+
495+
422496@pytest .mark .usefixtures ("_bind_settings" )
423497def test_celery_task_already_in_terminal_state (celery_session_worker ):
424498 @celery .shared_task (bind = True , base = Task )
0 commit comments