From e829682c1b23e8aa056d0276d2d38233a522c88c Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 28 Aug 2025 18:32:23 +0800 Subject: [PATCH 1/5] init refactor, add Scheduler update add clean old data schedule task update ScriptLog.state correctly black fix add created_at column in script_log update update update update stats when add script fix quality update add return resp when some error update add scheduler/starter bind host env --- .gitignore | 2 + scheduler/app/database/__init__.py | 4 +- scheduler/app/faas_scheduler/models.py | 15 +- scheduler/app/faas_scheduler/utils.py | 214 ++++++++++++----- scheduler/app/flask_server.py | 76 +++--- scheduler/app/scheduler.py | 315 ++++++++++++++++++++++--- starter/runner.py | 42 +++- 7 files changed, 511 insertions(+), 157 deletions(-) diff --git a/.gitignore b/.gitignore index cb64eaa..701dcbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ local_settings.py seatable-python-runner/ seatable-python-runner.zip + +.python-version diff --git a/scheduler/app/database/__init__.py b/scheduler/app/database/__init__.py index d12cfaa..fd18bb7 100644 --- a/scheduler/app/database/__init__.py +++ b/scheduler/app/database/__init__.py @@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, scoped_session DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root") DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD") @@ -37,4 +37,4 @@ engine = create_engine(db_url, **db_kwargs) Base = declarative_base() -DBSession = sessionmaker(bind=engine) +DBSession = scoped_session(sessionmaker(bind=engine)) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 5f76c20..dcae19d 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,6 +31,12 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) + state = Column(String(10)) + created_at = Column(DateTime, index=True) + + PENDING = "pending" + RUNNING = "running" + FINISHED = "finished" def __init__( self, @@ -39,7 +45,8 @@ def __init__( org_id, script_name, context_data, - started_at, + state, + created_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -47,7 +54,8 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.started_at = started_at + self.state = state + self.created_at = created_at self.operate_from = operate_from def to_dict(self, include_context_data=True, include_output=True): @@ -64,6 +72,9 @@ def to_dict(self, include_context_data=True, include_output=True): "success": self.success, "return_code": self.return_code, "operate_from": self.operate_from, + "state": self.state, + "created_at": self.created_at + and datetime_to_isoformat_timestr(self.created_at), } if include_context_data: diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 331a3f1..29be3da 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -2,14 +2,19 @@ import json import logging import requests -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional, Tuple from uuid import UUID from tzlocal import get_localzone from sqlalchemy import case, desc, func, text from sqlalchemy.orm import load_only -from faas_scheduler.models import ScriptLog +from faas_scheduler.models import ( + ScriptLog, + UserRunScriptStatistics, + OrgRunScriptStatistics, + DTableRunScriptStatistics, +) import sys @@ -174,6 +179,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): }, "context_data": context_data, "script_id": script_id, + "timeout": int(SUB_PROCESS_TIMEOUT), } headers = {"User-Agent": "python-scheduler/" + VERSION} logger.debug("I call starter at url %s", RUN_FUNC_URL) @@ -198,66 +204,140 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): return None -def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return - username = owner - - # dtable_run_script_statistcis - sqls = [ - """ - INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES - (:dtable_uuid, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_count(db_session, dtable_uuid, owner, org_id): + run_date = datetime.today().strftime("%Y-%m-%d") + try: + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_count += 1 + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_count += 1 + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_count += 1 + org_stats.update_at = datetime.now() + db_session.commit() + except Exception as e: + logger.exception( + "update stats for org_id %s owner %s dtable %s run count error %s", + org_id, + owner, + dtable_uuid, + e, + ) - # org_run_script_statistics - if org_id and org_id != -1: - sqls += [ - """ - INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # user_run_script_statistics - if "@seafile_group" not in username: - sqls += [ - """ - INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:username, :org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - org_id=:org_id, - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): + run_date = datetime.today().strftime("%Y-%m-%d") try: - for sql in sqls: - db_session.execute( - text(sql), - { - "dtable_uuid": dtable_uuid, - "username": username, - "org_id": org_id, - "run_date": datetime.today(), - "spend_time": spend_time, - "update_at": datetime.now(), - }, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_time += spend_time + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_time += spend_time + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_time += spend_time + org_stats.update_at = datetime.now() db_session.commit() except Exception as e: - logger.exception("update statistics sql error: %s", e) + logger.exception( + "update stats for org_id %s owner %s dtable %s run time error %s", + org_id, + owner, + dtable_uuid, + e, + ) # required to get "script logs" in dtable-web @@ -381,20 +461,27 @@ def add_script( org_id, script_name, context_data, + ScriptLog.PENDING, datetime.now(), operate_from, ) db_session.add(script) db_session.commit() + update_stats_run_count(db_session, dtable_uuid, owner, org_id) + return script -def update_script(db_session, script, success, return_code, output): - script.finished_at = datetime.now() +def update_script( + db_session, script, success, return_code, output, started_at, finished_at +): + script.started_at = started_at + script.finished_at = finished_at script.success = success script.return_code = return_code script.output = output + script.state = ScriptLog.FINISHED db_session.commit() return script @@ -422,11 +509,16 @@ def run_script( return True -def hook_update_script(db_session, script_id, success, return_code, output, spend_time): +def hook_update_script( + db_session, script_id, success, return_code, output, started_at, spend_time +): script = db_session.query(ScriptLog).filter_by(id=script_id).first() if script: - update_script(db_session, script, success, return_code, output) - update_statistics( + finished_at = started_at + timedelta(seconds=spend_time) + update_script( + db_session, script, success, return_code, output, started_at, finished_at + ) + update_stats_run_time( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index fd1047e..7bf9759 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer -from concurrent.futures import ThreadPoolExecutor from database import DBSession from faas_scheduler.utils import ( @@ -17,11 +16,8 @@ get_statistics_grouped_by_base, get_statistics_grouped_by_day, is_date_yyyy_mm_dd, - run_script, get_script, - add_script, get_run_script_statistics_by_month, - hook_update_script, can_run_task, get_run_scripts_count_monthly, ping_starter, @@ -30,6 +26,7 @@ uuid_str_to_32_chars, basic_log, ) +from scheduler import scheduler basic_log("scheduler.log") @@ -40,11 +37,16 @@ TIMEOUT_OUTPUT = ( "The script's running time exceeded the limit and the execution was aborted." ) +HOST = os.environ.get("PYTHON_SCHEDULER_BIND_HOST", "127.0.0.1") app = Flask(__name__) logger = logging.getLogger(__name__) -executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS) + + +@app.teardown_appcontext +def shutdown_session(exception=None): + DBSession.remove() @app.route("/ping/", methods=["GET"]) @@ -78,8 +80,6 @@ def scripts_api(): context_data = data.get("context_data") owner = data.get("owner") org_id = data.get("org_id") - script_url = data.get("script_url") - temp_api_token = data.get("temp_api_token") scripts_running_limit = data.get("scripts_running_limit", -1) operate_from = data.get("operate_from", "manualy") if not dtable_uuid or not script_name or not owner: @@ -93,27 +93,16 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script = add_script( - db_session, - dtable_uuid, - owner, + script_log = scheduler.add( + uuid_str_to_32_chars(dtable_uuid), org_id, + owner, script_name, context_data, operate_from, ) - logger.debug("lets call the starter to fire up the runner...") - executor.submit( - run_script, - script.id, - dtable_uuid, - script_name, - script_url, - temp_api_token, - context_data, - ) - return make_response(({"script_id": script.id}, 200)) + return make_response(({"script_id": script_log.id}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) @@ -146,26 +135,27 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if dtable_uuid != script.dtable_uuid or script_name != script.script_name: + if ( + uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid + or script_name != script.script_name + ): return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() + # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + # now = datetime.now() + # duration_seconds = (now - script.created_at).seconds + # if duration_seconds > SUB_PROCESS_TIMEOUT: + # script.success = False + # script.return_code = -1 + # script.finished_at = now + # script.output = TIMEOUT_OUTPUT + # db_session.commit() return make_response(({"script": script.to_dict()}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) - finally: - db_session.close() # get python script statistics logs... @@ -190,7 +180,9 @@ def task_logs_api(dtable_uuid, script_name): db_session = DBSession() try: - task_logs = list_task_logs(db_session, dtable_uuid, script_name, order_by) + task_logs = list_task_logs( + db_session, uuid_str_to_32_chars(dtable_uuid), script_name, order_by + ) count = task_logs.count() task_logs = task_logs[start:end] task_log_list = [task_log.to_dict() for task_log in task_logs] @@ -290,23 +282,19 @@ def record_script_result(): success = data.get("success", False) return_code = data.get("return_code") output = data.get("output") - spend_time = data.get("spend_time") + started_at = datetime.fromisoformat(data.get("started_at")) + spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") - - db_session = DBSession() - # update script_log and run-time statistics try: if script_id: - hook_update_script( - db_session, script_id, success, return_code, output, spend_time + scheduler.script_done_callback( + script_id, success, return_code, output, started_at, spend_time ) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) - finally: - db_session.close() return "success" @@ -578,5 +566,5 @@ def get_run_statistics_grouped_by_day(): if __name__ == "__main__": - http_server = WSGIServer(("127.0.0.1", 5055), app) + http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7d6bfe1..3597aa9 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -1,56 +1,299 @@ +import json +import logging import os -import gc import time -import logging -from threading import Thread +from datetime import datetime, timedelta +from threading import Lock, Thread from database import DBSession +from faas_scheduler.models import ScriptLog from faas_scheduler.utils import ( - check_and_set_tasks_timeout, + add_script, delete_log_after_days, delete_statistics_after_days, - basic_log, + run_script, + get_script_file, + hook_update_script, ) -basic_log("scheduler.log") +logger = logging.getLogger(__name__) +SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) +TIMEOUT_OUTPUT = "Script running for too long time!" -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes -logger = logging.getLogger(__name__) +class ScriptQueue: + + def __init__(self): + self.script_queue = ( + [] + ) # a list of ScriptLog instances, but can not be used to update database records!!! + self.script_dict = {} # a dict of {id: ScriptLog} + self.lock = Lock() + self.running_count = {} + # a dict of + # { + # "": 0, + # "_": 0, + # "__": 0 + # } + try: + run_limit_per_team = int(os.environ.get("RUN_LIMIT_PER_TEAM", 0)) + except: + run_limit_per_team = 0 + try: + run_limit_per_base = int(os.environ.get("RUN_LIMIT_PER_BASE", 0)) + except: + run_limit_per_base = 0 + try: + run_limit_per_script = int(os.environ.get("RUN_LIMIT_PER_SCRIPT", 0)) + except: + run_limit_per_script = 0 + self.config = { + "run_limit_per_team": run_limit_per_team, + "run_limit_per_base": run_limit_per_base, + "run_limit_per_script": run_limit_per_script, + } + + def can_run_script(self, script_log: ScriptLog): + if script_log.org_id != -1: + running_team_key = f"{script_log.org_id}" + else: + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" + + if self.config["run_limit_per_team"] > 0 and self.config[ + "run_limit_per_team" + ] <= self.running_count.get(running_team_key, 0): + return False + if self.config["run_limit_per_base"] > 0 and self.config[ + "run_limit_per_base" + ] <= self.running_count.get(running_base_key, 0): + return False + if self.config["run_limit_per_script"] > 0 and self.config[ + "run_limit_per_script" + ] <= self.running_count.get(running_script_key, 0): + return False + + return True + + def add(self, script_log: ScriptLog): + with self.lock: + self.script_queue.append(script_log) + self.script_dict[script_log.id] = script_log + self.inspect_queue_and_running( + pre_msg=f"add script {script_log.to_dict()} to queue" + ) + + def get(self): + """get the first valid task from self.q + + Return: an instance of ScriptTask or None + """ + with self.lock: + return_task = None + + index = 0 + while index < len(self.script_queue): + script_log = self.script_queue[index] + if self.can_run_script(script_log): + return_task = script_log + self.script_queue.pop(index) + self.increase_running(script_log) + self.inspect_queue_and_running( + pre_msg=f"get script {script_log.to_dict()} from queue" + ) + break + index += 1 + + return return_task + + def increase_running(self, script_log): + if script_log.org_id != -1: + running_team_key = f"{script_log.org_id}" + else: + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" + self.running_count[running_team_key] = ( + self.running_count[running_team_key] + 1 + if self.running_count.get(running_team_key) + else 1 + ) + self.running_count[running_base_key] = ( + self.running_count[running_base_key] + 1 + if self.running_count.get(running_base_key) + else 1 + ) + self.running_count[running_script_key] = ( + self.running_count[running_script_key] + 1 + if self.running_count.get(running_script_key) + else 1 + ) + def decrease_running(self, script_log): + if script_log.org_id != -1: + running_team_key = f"{script_log.org_id}" + else: + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" -class FAASTaskTimeoutSetter(Thread): + if running_team_key in self.running_count: + self.running_count[running_team_key] -= 1 + if not self.running_count.get(running_team_key): + self.running_count.pop(running_team_key, None) + + if running_base_key in self.running_count: + self.running_count[running_base_key] -= 1 + if not self.running_count.get(running_base_key): + self.running_count.pop(running_base_key, None) + + if running_script_key in self.running_count: + self.running_count[running_script_key] -= 1 + if not self.running_count.get(running_script_key): + self.running_count.pop(running_script_key, None) + + def script_done_callback(self, script_log: ScriptLog): + with self.lock: + self.script_dict.pop(script_log.id, None) + self.decrease_running(script_log) + self.inspect_queue_and_running( + pre_msg=f"script {script_log.to_dict()} run done" + ) + + def inspect_queue_and_running(self, pre_msg=None): + if logger.root.level != logging.DEBUG: + return + lines = ["\n"] + if pre_msg: + lines.append(pre_msg) + lines.append(f"{'>' * 10} running {'>' * 10}") + for key, value in self.running_count.items(): + lines.append(f"{key}: {value}") + lines.append(f"{'<' * 10} running {'<' * 10}") + + lines.append(f"{'>' * 10} queue {'>' * 10}") + for script_log in self.script_queue: + lines.append( + f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}" + ) + lines.append(f"{'<' * 10} queue {'<' * 10}") + logger.debug("\n".join(lines)) + + def get_script_log_by_id(self, script_id): + return self.script_dict.get(script_id) + + def pop_timeout_scripts(self): + script_logs = [] + now_time = datetime.now() + with self.lock: + for index in range(len(self.script_queue) - 1, -1, -1): + script_log = self.script_queue[index] + if ( + script_log.state == ScriptLog.RUNNING + and (now_time - script_log.started_at).seconds + >= SUB_PROCESS_TIMEOUT + ): + script_logs.append(self.script_queue.pop(index)) + self.decrease_running(script_log) + self.inspect_queue_and_running( + pre_msg=f"set script {script_log.to_dict()} timeout from queue" + ) + return script_logs + + +class Scheduelr: def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour - - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() - - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) - - # remove old script_logs and statistics + self.script_queue = ScriptQueue() + + def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from): + script_log = add_script( + DBSession(), + dtable_uuid, + owner, + org_id, + script_name, + context_data, + operate_from, + ) + self.script_queue.add(script_log) + return script_log + + def schedule(self): + while True: + script_log = self.script_queue.get() + if not script_log: + time.sleep(0.5) + continue + db_session = DBSession() + try: + db_session.query(ScriptLog).filter( + ScriptLog.id == script_log.id + ).update( + {ScriptLog.state: ScriptLog.RUNNING}, + synchronize_session=False, + ) + db_session.commit() + script_file_info = get_script_file( + script_log.dtable_uuid, script_log.script_name + ) + run_script( + script_log.id, + script_log.dtable_uuid, + script_log.script_name, + script_file_info["script_url"], + script_file_info["temp_api_token"], + json.loads(script_log.context_data), + ) + except Exception as e: + logger.exception(f"run script: {script_log} error {e}") + self.script_done_callback( + script_log.id, False, -1, "", datetime.now(), 0 + ) + finally: + DBSession.remove() + + def script_done_callback( + self, script_id, success, return_code, output, started_at, spend_time + ): + hook_update_script( + DBSession(), script_id, success, return_code, output, started_at, spend_time + ) + script_log = self.script_queue.get_script_log_by_id(script_id) + if not script_log: # not counted in memory, only update db record + return + self.script_queue.script_done_callback(script_log) + + def load_pending_scripts(self): + """load pending script logs, should be called only when server start""" + script_logs = list( + DBSession.query(ScriptLog) + .filter_by(state=ScriptLog.PENDING) + .filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1))) + .order_by(ScriptLog.id) + ) + logger.info(f"load {len(script_logs)} pending scripts created within 1 hour") + for script_log in script_logs: + self.script_queue.add(script_log) + + def statistic_cleaner(self): + while True: + db_session = DBSession() + try: delete_log_after_days(db_session) delete_statistics_after_days(db_session) + except Exception as e: + logger.exception(e) + finally: + DBSession.remove() + time.sleep(24 * 60 * 60) - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) + def start(self): + self.load_pending_scripts() + Thread(target=self.schedule, daemon=True).start() + Thread(target=self.statistic_cleaner, daemon=True).start() -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() +scheduler = Scheduelr() diff --git a/starter/runner.py b/starter/runner.py index e4cb429..a0d0520 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -7,6 +7,7 @@ import time import ast import sys +from datetime import datetime from concurrent.futures import ThreadPoolExecutor from uuid import uuid4 @@ -70,6 +71,9 @@ SEATABLE_USER_UID = 1000 SEATABLE_USER_GID = 1000 +# bind host +HOST = os.environ.get("PYTHON_STARTER_BIND_HOST", "127.0.0.1") + def get_log_level(level): if level.lower() == "info": @@ -132,12 +136,15 @@ def to_python_bool(value): return value.lower() == "true" -def send_to_scheduler(success, return_code, output, spend_time, request_data): +def send_to_scheduler( + success, return_code, output, started_at, spend_time, request_data +): """ This function is used to send result of script to scheduler - success: whether script running successfully - return_code: return-code of subprocess - output: output of subprocess or error message + - started_at: start timestamp - spend_time: time subprocess took - request_data: data from request """ @@ -152,7 +159,8 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): "success": success, "return_code": return_code, "output": output, - "spend_time": spend_time, + "started_at": datetime.fromtimestamp(started_at).isoformat(), + "spend_time": spend_time or 0, } result_data.update( { @@ -186,9 +194,11 @@ def run_python(data): logging.info("New python run initalized... (v%s)", VERSION) + started_at = time.time() + script_url = data.get("script_url") if not script_url: - send_to_scheduler(False, None, "Script URL is missing", None, data) + send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) return if ( to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) @@ -228,11 +238,11 @@ def run_python(data): logging.error( "Failed to get script from %s, response: %s", script_url, resp ) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return except Exception as e: logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return logging.debug("Generate temporary random folder directory") @@ -264,6 +274,7 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: logging.error("Failed to save script %s, error: %s", script_url, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return try: @@ -271,6 +282,7 @@ def run_python(data): os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return logging.debug("prepare the command to start the python runner") @@ -339,8 +351,6 @@ def run_python(data): command.append("run") # override command logging.debug("command: %s", command) - start_at = time.time() - logging.debug("try to start the python runner image") try: result = subprocess.run( @@ -371,6 +381,7 @@ def run_python(data): False, -1, "The script's running time exceeded the limit and the execution was aborted.", + started_at, DEFAULT_SUB_PROCESS_TIMEOUT, data, ) @@ -378,7 +389,7 @@ def run_python(data): except Exception as e: logging.exception(e) logging.error("Failed to run file %s error: %s", script_url, e) - send_to_scheduler(False, None, None, None, data) + send_to_scheduler(False, None, None, started_at, None, data) return else: logging.debug( @@ -388,7 +399,12 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", time.time() - start_at, data + False, + -1, + "Script invalid!", + started_at, + time.time() - started_at, + data, ) return with open(output_file_path, "r") as f: @@ -418,7 +434,7 @@ def run_python(data): except Exception as e: logging.warning("Fail to remove container error: %s", e) - spend_time = time.time() - start_at + spend_time = time.time() - started_at logging.info("python run finished successful. duration was: %s", spend_time) logging.debug( "send this to the scheduler. return_code: %s, output: %s, spend_time: %s, data: %s", @@ -427,7 +443,9 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, spend_time, data) + send_to_scheduler( + return_code == 0, return_code, output, started_at, spend_time, data + ) #################### @@ -459,4 +477,4 @@ def health_check(): if __name__ == "__main__": - app.run(port=8088, debug=False) + app.run(host=HOST, port=8088, debug=False) From 44685f73cba18a272c5187863835c05523bd94e9 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Tue, 13 Jan 2026 20:16:13 +0800 Subject: [PATCH 2/5] start scheduler in flask server --- scheduler/app/flask_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 7bf9759..d8ba308 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -566,5 +566,6 @@ def get_run_statistics_grouped_by_day(): if __name__ == "__main__": + scheduler.start() http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() From 6cb04627058b76d7b7f491fce861639ccc2f5e03 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Wed, 14 Jan 2026 10:41:41 +0800 Subject: [PATCH 3/5] update scheduler --- scheduler/app/flask_server.py | 4 +++- scheduler/app/scheduler.py | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index d8ba308..8946509 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -26,11 +26,13 @@ uuid_str_to_32_chars, basic_log, ) -from scheduler import scheduler +from scheduler import get_scheduler basic_log("scheduler.log") +scheduler = get_scheduler() + # defaults... SCRIPT_WORKERS = int(os.environ.get("PYTHON_SCHEDULER_SCRIPT_WORKERS", 5)) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 3597aa9..7f814e4 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -296,4 +296,10 @@ def start(self): Thread(target=self.statistic_cleaner, daemon=True).start() -scheduler = Scheduelr() +scheduler = None + +def get_scheduler(): + global scheduler + if not scheduler: + scheduler = Scheduelr() + return scheduler From 912f632103adea1326f03068fd8715c770e1da6b Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Wed, 14 Jan 2026 11:18:34 +0800 Subject: [PATCH 4/5] remove scoped_session --- scheduler/app/database/__init__.py | 4 ++-- scheduler/app/flask_server.py | 13 +++++----- scheduler/app/scheduler.py | 38 +++++++++++++++++------------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/scheduler/app/database/__init__.py b/scheduler/app/database/__init__.py index fd18bb7..d12cfaa 100644 --- a/scheduler/app/database/__init__.py +++ b/scheduler/app/database/__init__.py @@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.orm import sessionmaker DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root") DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD") @@ -37,4 +37,4 @@ engine = create_engine(db_url, **db_kwargs) Base = declarative_base() -DBSession = scoped_session(sessionmaker(bind=engine)) +DBSession = sessionmaker(bind=engine) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 8946509..08be50b 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -46,11 +46,6 @@ logger = logging.getLogger(__name__) -@app.teardown_appcontext -def shutdown_session(exception=None): - DBSession.remove() - - @app.route("/ping/", methods=["GET"]) def ping(): if not ping_starter(): @@ -96,6 +91,7 @@ def scripts_api(): ): return make_response(("The number of runs exceeds the limit"), 400) script_log = scheduler.add( + db_session, uuid_str_to_32_chars(dtable_uuid), org_id, owner, @@ -158,6 +154,8 @@ def script_api(script_id): except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) + finally: + db_session.close() # get python script statistics logs... @@ -288,15 +286,18 @@ def record_script_result(): spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") # update script_log and run-time statistics + db_session = DBSession() try: if script_id: scheduler.script_done_callback( - script_id, success, return_code, output, started_at, spend_time + db_session, script_id, success, return_code, output, started_at, spend_time ) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) + finally: + db_session.close() return "success" diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7f814e4..4a79c7a 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -208,9 +208,9 @@ class Scheduelr: def __init__(self): self.script_queue = ScriptQueue() - def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from): + def add(self, db_session, dtable_uuid, org_id, owner, script_name, context_data, operate_from): script_log = add_script( - DBSession(), + db_session, dtable_uuid, owner, org_id, @@ -250,16 +250,16 @@ def schedule(self): except Exception as e: logger.exception(f"run script: {script_log} error {e}") self.script_done_callback( - script_log.id, False, -1, "", datetime.now(), 0 + db_session, script_log.id, False, -1, "", datetime.now(), 0 ) finally: - DBSession.remove() + db_session.close() def script_done_callback( - self, script_id, success, return_code, output, started_at, spend_time + self, db_sesion, script_id, success, return_code, output, started_at, spend_time ): hook_update_script( - DBSession(), script_id, success, return_code, output, started_at, spend_time + db_sesion, script_id, success, return_code, output, started_at, spend_time ) script_log = self.script_queue.get_script_log_by_id(script_id) if not script_log: # not counted in memory, only update db record @@ -268,15 +268,21 @@ def script_done_callback( def load_pending_scripts(self): """load pending script logs, should be called only when server start""" - script_logs = list( - DBSession.query(ScriptLog) - .filter_by(state=ScriptLog.PENDING) - .filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1))) - .order_by(ScriptLog.id) - ) - logger.info(f"load {len(script_logs)} pending scripts created within 1 hour") - for script_log in script_logs: - self.script_queue.add(script_log) + db_session = DBSession() + try: + script_logs = list( + db_session.query(ScriptLog) + .filter_by(state=ScriptLog.PENDING) + .filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1))) + .order_by(ScriptLog.id) + ) + logger.info(f"load {len(script_logs)} pending scripts created within 1 hour") + for script_log in script_logs: + self.script_queue.add(script_log) + except Exception as e: + logger.exception(e) + finally: + db_session.close() def statistic_cleaner(self): while True: @@ -287,7 +293,7 @@ def statistic_cleaner(self): except Exception as e: logger.exception(e) finally: - DBSession.remove() + db_session.close() time.sleep(24 * 60 * 60) def start(self): From 564a5e1ba15d3e56f1e1e0c4d728426d2022c975 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Wed, 14 Jan 2026 16:53:13 +0800 Subject: [PATCH 5/5] update --- scheduler/app/scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 4a79c7a..acb5c65 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -82,7 +82,7 @@ def add(self, script_log: ScriptLog): self.script_queue.append(script_log) self.script_dict[script_log.id] = script_log self.inspect_queue_and_running( - pre_msg=f"add script {script_log.to_dict()} to queue" + pre_msg=f"added script {script_log.to_dict()} to queue" ) def get(self): @@ -101,7 +101,7 @@ def get(self): self.script_queue.pop(index) self.increase_running(script_log) self.inspect_queue_and_running( - pre_msg=f"get script {script_log.to_dict()} from queue" + pre_msg=f"got script {script_log.to_dict()} from queue" ) break index += 1 @@ -141,17 +141,17 @@ def decrease_running(self, script_log): if running_team_key in self.running_count: self.running_count[running_team_key] -= 1 - if not self.running_count.get(running_team_key): + if self.running_count.get(running_team_key, 0) <= 0: self.running_count.pop(running_team_key, None) if running_base_key in self.running_count: self.running_count[running_base_key] -= 1 - if not self.running_count.get(running_base_key): + if self.running_count.get(running_base_key, 0) <= 0: self.running_count.pop(running_base_key, None) if running_script_key in self.running_count: self.running_count[running_script_key] -= 1 - if not self.running_count.get(running_script_key): + if self.running_count.get(running_script_key, 0) <= 0: self.running_count.pop(running_script_key, None) def script_done_callback(self, script_log: ScriptLog): @@ -159,7 +159,7 @@ def script_done_callback(self, script_log: ScriptLog): self.script_dict.pop(script_log.id, None) self.decrease_running(script_log) self.inspect_queue_and_running( - pre_msg=f"script {script_log.to_dict()} run done" + pre_msg=f"run done script {script_log.to_dict()}" ) def inspect_queue_and_running(self, pre_msg=None):