Skip to content

Error fetching Celery task state #634

@philpanch

Description

@philpanch

Greetings!

I have a strange behavior of scheduler. When DAGs are triggered either manually or automatically, I receive the following error:

scheduler_1             | Celery Task ID: ('my_task', 'do_my_task', datetime.datetime(2020, 6, 21, 4, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
scheduler_1             | Traceback (most recent call last):
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
scheduler_1             |     context = constructor(dialect, self, conn, *args)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 768, in _init_compiled
scheduler_1             |     self.cursor = self.create_cursor()
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 1122, in create_cursor
scheduler_1             |     return self._dbapi_connection.cursor()
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 965, in cursor
scheduler_1             |     return self.connection.cursor(*args, **kwargs)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
scheduler_1             |     raise AirflowTaskTimeout(self.error_message)
scheduler_1             | airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 405
scheduler_1             | 
scheduler_1             | The above exception was the direct cause of the following exception:
scheduler_1             | 
scheduler_1             | Traceback (most recent call last):
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 106, in fetch_celery_task_state
scheduler_1             |     res = (celery_task[0], celery_task[1].state)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 484, in state
scheduler_1             |     return self._get_task_meta()['status']
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 423, in _get_task_meta
scheduler_1             |     return self._maybe_set_cache(self.backend.get_task_meta(self.id))
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 523, in get_task_meta
scheduler_1             |     meta = self._get_task_meta_for(task_id)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 51, in _inner
scheduler_1             |     return fun(*args, **kwargs)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 156, in _get_task_meta_for
scheduler_1             |     task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3367, in __iter__
scheduler_1             |     return self._execute_and_instances(context)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3392, in _execute_and_instances
scheduler_1             |     result = conn.execute(querycontext.statement, self._params)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 982, in execute
scheduler_1             |     return meth(self, multiparams, params)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
scheduler_1             |     return connection._execute_clauseelement(self, multiparams, params)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
scheduler_1             |     distilled_params,
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1176, in _execute_context
scheduler_1             |     e, util.text_type(statement), parameters, None, None
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
scheduler_1             |     util.raise_from_cause(sqlalchemy_exception, exc_info)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
scheduler_1             |     reraise(type(exception), exception, tb=exc_tb, cause=cause)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
scheduler_1             |     raise value.with_traceback(tb)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
scheduler_1             |     context = constructor(dialect, self, conn, *args)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 768, in _init_compiled
scheduler_1             |     self.cursor = self.create_cursor()
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 1122, in create_cursor
scheduler_1             |     return self._dbapi_connection.cursor()
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 965, in cursor
scheduler_1             |     return self.connection.cursor(*args, **kwargs)
scheduler_1             |   File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
scheduler_1             |     raise AirflowTaskTimeout(self.error_message)
scheduler_1             | sqlalchemy.exc.StatementError: (airflow.exceptions.AirflowTaskTimeout) Timeout, PID: 405
scheduler_1             | [SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback 
scheduler_1             | FROM celery_taskmeta 
scheduler_1             | WHERE celery_taskmeta.task_id = %(task_id_1)s]
scheduler_1             | [parameters: [immutabledict({})]]
scheduler_1             | 
scheduler_1             | 
scheduler_1             | [2021-03-18 10:25:36,606] {timeout.py:42} ERROR - Process timed out, PID: 409
scheduler_1             | [2021-03-18 10:25:39,750] {{scheduler_job.py:214}} WARNING - Killing PID 395
scheduler_1             | [2021-03-18 10:25:40,196] {{scheduler_job.py:214}} WARNING - Killing PID 396

Does anyone know how to fix this issue? It seems like it appears only locally. Thank you in advance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions