Skip to content

Commit ef81044

Browse files
committed
only fix start/finish fields
1 parent ee272e7 commit ef81044

4 files changed

Lines changed: 18 additions & 250 deletions

File tree

scheduler/app/faas_scheduler/models.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,15 @@ class ScriptLog(Base):
3131
return_code = Column(Integer, nullable=True)
3232
output = Column(Text, nullable=True)
3333
operate_from = Column(String(255))
34-
state = Column(String(10))
3534
created_at = Column(DateTime, index=True)
3635

37-
PENDING = "pending"
38-
RUNNING = "running"
39-
FINISHED = "finished"
40-
4136
def __init__(
4237
self,
4338
dtable_uuid,
4439
owner,
4540
org_id,
4641
script_name,
4742
context_data,
48-
state,
4943
created_at,
5044
operate_from=None,
5145
):
@@ -54,7 +48,6 @@ def __init__(
5448
self.org_id = org_id
5549
self.script_name = script_name
5650
self.context_data = context_data
57-
self.state = state
5851
self.created_at = created_at
5952
self.operate_from = operate_from
6053

@@ -86,7 +79,6 @@ def to_dict(self):
8679
"return_code": self.return_code,
8780
"output": self.output,
8881
"operate_from": self.operate_from,
89-
"state": self.state,
9082
"created_at": self.created_at
9183
and datetime_to_isoformat_timestr(self.created_at),
9284
}

scheduler/app/faas_scheduler/utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def ping_starter():
8383
## triggered from scheduler.py to remove old script_logs
8484
def delete_log_after_days(db_session):
8585
clean_script_logs = (
86-
"DELETE FROM `script_log` WHERE `started_at` < DATE_SUB(NOW(), INTERVAL %s DAY)"
86+
"DELETE FROM `script_log` WHERE `created_at` < DATE_SUB(NOW(), INTERVAL %s DAY)"
8787
% DELETE_LOG_DAYS
8888
)
8989
logger.debug(clean_script_logs)
@@ -459,7 +459,6 @@ def add_script(
459459
org_id,
460460
script_name,
461461
context_data,
462-
ScriptLog.PENDING,
463462
datetime.now(),
464463
operate_from,
465464
)
@@ -479,7 +478,6 @@ def update_script(
479478
script.success = success
480479
script.return_code = return_code
481480
script.output = output
482-
script.state = ScriptLog.FINISHED
483481
db_session.commit()
484482

485483
return script

scheduler/app/flask_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def ping():
5959

6060
# called from dtable-web to start the python run
6161
@app.route("/run-script/", methods=["POST"])
62-
def scripts_api():
62+
def run_script_api():
6363
if not check_auth_token(request):
6464
return make_response(("Forbidden: the auth token is not correct.", 403))
6565

@@ -108,7 +108,7 @@ def scripts_api():
108108

109109
# called from dtable-web to get the status of a specific run.
110110
@app.route("/run-script/<script_id>/", methods=["GET"])
111-
def script_api(script_id):
111+
def get_script_api(script_id):
112112
if not check_auth_token(request):
113113
return make_response(("Forbidden: the auth token is not correct.", 403))
114114

scheduler/app/scheduler.py

Lines changed: 15 additions & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import os
44
import time
5+
from concurrent.futures import ThreadPoolExecutor
56
from datetime import datetime, timedelta
67
from threading import Lock, Thread
78

@@ -14,199 +15,17 @@
1415
run_script,
1516
get_script_file,
1617
hook_update_script,
18+
check_and_set_tasks_timeout,
1719
)
1820

1921
logger = logging.getLogger(__name__)
2022
SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15))
21-
TIMEOUT_OUTPUT = "Script running for too long time!"
22-
23-
24-
class ScriptQueue:
25-
26-
def __init__(self):
27-
self.script_queue = (
28-
[]
29-
) # a list of ScriptLog instances, but can not be used to update database records!!!
30-
self.script_dict = {} # a dict of {id: ScriptLog}
31-
self.lock = Lock()
32-
self.running_count = {}
33-
# a dict of
34-
# {
35-
# "<team>": 0,
36-
# "<team>_<dtable_uuid>": 0,
37-
# "<team>_<dtable_uuid>_<script_name>": 0
38-
# }
39-
try:
40-
run_limit_per_team = int(os.environ.get("RUN_LIMIT_PER_TEAM", 0))
41-
except:
42-
run_limit_per_team = 0
43-
try:
44-
run_limit_per_base = int(os.environ.get("RUN_LIMIT_PER_BASE", 0))
45-
except:
46-
run_limit_per_base = 0
47-
try:
48-
run_limit_per_script = int(os.environ.get("RUN_LIMIT_PER_SCRIPT", 0))
49-
except:
50-
run_limit_per_script = 0
51-
self.config = {
52-
"run_limit_per_team": run_limit_per_team,
53-
"run_limit_per_base": run_limit_per_base,
54-
"run_limit_per_script": run_limit_per_script,
55-
}
56-
57-
def can_run_script(self, script_log: ScriptLog):
58-
if script_log.org_id != -1:
59-
running_team_key = f"{script_log.org_id}"
60-
else:
61-
running_team_key = f"{script_log.owner}"
62-
running_base_key = f"{running_team_key}_{script_log.dtable_uuid}"
63-
running_script_key = f"{running_base_key}_{script_log.script_name}"
64-
65-
if self.config["run_limit_per_team"] > 0 and self.config[
66-
"run_limit_per_team"
67-
] <= self.running_count.get(running_team_key, 0):
68-
return False
69-
if self.config["run_limit_per_base"] > 0 and self.config[
70-
"run_limit_per_base"
71-
] <= self.running_count.get(running_base_key, 0):
72-
return False
73-
if self.config["run_limit_per_script"] > 0 and self.config[
74-
"run_limit_per_script"
75-
] <= self.running_count.get(running_script_key, 0):
76-
return False
77-
78-
return True
79-
80-
def add(self, script_log: ScriptLog):
81-
with self.lock:
82-
self.script_queue.append(script_log)
83-
self.script_dict[script_log.id] = script_log
84-
self.inspect_queue_and_running(
85-
pre_msg=f"add script {script_log.get_info()} to queue"
86-
)
87-
88-
def get(self):
89-
"""get the first valid task from self.q
90-
91-
Return: an instance of ScriptTask or None
92-
"""
93-
with self.lock:
94-
return_task = None
95-
96-
index = 0
97-
while index < len(self.script_queue):
98-
script_log = self.script_queue[index]
99-
if self.can_run_script(script_log):
100-
return_task = script_log
101-
self.script_queue.pop(index)
102-
self.increase_running(script_log)
103-
self.inspect_queue_and_running(
104-
pre_msg=f"get script {script_log.get_info()} from queue"
105-
)
106-
break
107-
index += 1
108-
109-
return return_task
110-
111-
def increase_running(self, script_log):
112-
if script_log.org_id != -1:
113-
running_team_key = f"{script_log.org_id}"
114-
else:
115-
running_team_key = f"{script_log.owner}"
116-
running_base_key = f"{running_team_key}_{script_log.dtable_uuid}"
117-
running_script_key = f"{running_base_key}_{script_log.script_name}"
118-
self.running_count[running_team_key] = (
119-
self.running_count[running_team_key] + 1
120-
if self.running_count.get(running_team_key)
121-
else 1
122-
)
123-
self.running_count[running_base_key] = (
124-
self.running_count[running_base_key] + 1
125-
if self.running_count.get(running_base_key)
126-
else 1
127-
)
128-
self.running_count[running_script_key] = (
129-
self.running_count[running_script_key] + 1
130-
if self.running_count.get(running_script_key)
131-
else 1
132-
)
133-
134-
def decrease_running(self, script_log):
135-
if script_log.org_id != -1:
136-
running_team_key = f"{script_log.org_id}"
137-
else:
138-
running_team_key = f"{script_log.owner}"
139-
running_base_key = f"{running_team_key}_{script_log.dtable_uuid}"
140-
running_script_key = f"{running_base_key}_{script_log.script_name}"
141-
142-
if running_team_key in self.running_count:
143-
self.running_count[running_team_key] -= 1
144-
if not self.running_count.get(running_team_key):
145-
self.running_count.pop(running_team_key, None)
146-
147-
if running_base_key in self.running_count:
148-
self.running_count[running_base_key] -= 1
149-
if not self.running_count.get(running_base_key):
150-
self.running_count.pop(running_base_key, None)
151-
152-
if running_script_key in self.running_count:
153-
self.running_count[running_script_key] -= 1
154-
if not self.running_count.get(running_script_key):
155-
self.running_count.pop(running_script_key, None)
156-
157-
def script_done_callback(self, script_log: ScriptLog):
158-
with self.lock:
159-
self.script_dict.pop(script_log.id, None)
160-
self.decrease_running(script_log)
161-
self.inspect_queue_and_running(
162-
pre_msg=f"script {script_log.get_info()} run done"
163-
)
164-
165-
def inspect_queue_and_running(self, pre_msg=None):
166-
if logger.root.level != logging.DEBUG:
167-
return
168-
lines = ["\n"]
169-
if pre_msg:
170-
lines.append(pre_msg)
171-
lines.append(f"{'>' * 10} running {'>' * 10}")
172-
for key, value in self.running_count.items():
173-
lines.append(f"{key}: {value}")
174-
lines.append(f"{'<' * 10} running {'<' * 10}")
175-
176-
lines.append(f"{'>' * 10} queue {'>' * 10}")
177-
for script_log in self.script_queue:
178-
lines.append(
179-
f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}"
180-
)
181-
lines.append(f"{'<' * 10} queue {'<' * 10}")
182-
logger.debug("\n".join(lines))
183-
184-
def get_script_log_by_id(self, script_id):
185-
return self.script_dict.get(script_id)
186-
187-
def pop_timeout_scripts(self):
188-
script_logs = []
189-
now_time = datetime.now()
190-
with self.lock:
191-
for index in range(len(self.script_queue) - 1, -1, -1):
192-
script_log = self.script_queue[index]
193-
if (
194-
script_log.state == ScriptLog.RUNNING
195-
and (now_time - script_log.started_at).seconds
196-
>= SUB_PROCESS_TIMEOUT
197-
):
198-
script_logs.append(self.script_queue.pop(index))
199-
self.decrease_running(script_log)
200-
self.inspect_queue_and_running(
201-
pre_msg=f"set script {script_log.get_info()} timeout from queue"
202-
)
203-
return script_logs
20423

20524

20625
class Scheduelr:
20726

20827
def __init__(self):
209-
self.script_queue = ScriptQueue()
28+
self.executor = ThreadPoolExecutor()
21029

21130
def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from):
21231
script_log = add_script(
@@ -218,65 +37,26 @@ def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_fro
21837
context_data,
21938
operate_from,
22039
)
221-
self.script_queue.add(script_log)
40+
script_file_info = get_script_file(
41+
script_log.dtable_uuid, script_log.script_name
42+
)
43+
self.executor.submit(
44+
run_script,
45+
script_log.id,
46+
dtable_uuid,
47+
script_name,
48+
script_file_info["script_url"],
49+
script_file_info["temp_api_token"],
50+
context_data,
51+
)
22252
return script_log
22353

224-
def schedule(self):
225-
while True:
226-
script_log = self.script_queue.get()
227-
if not script_log:
228-
time.sleep(0.5)
229-
continue
230-
db_session = DBSession()
231-
try:
232-
db_session.query(ScriptLog).filter(
233-
ScriptLog.id == script_log.id
234-
).update(
235-
{ScriptLog.state: ScriptLog.RUNNING},
236-
synchronize_session=False,
237-
)
238-
db_session.commit()
239-
script_file_info = get_script_file(
240-
script_log.dtable_uuid, script_log.script_name
241-
)
242-
run_script(
243-
script_log.id,
244-
script_log.dtable_uuid,
245-
script_log.script_name,
246-
script_file_info["script_url"],
247-
script_file_info["temp_api_token"],
248-
json.loads(script_log.context_data),
249-
)
250-
except Exception as e:
251-
logger.exception(f"run script: {script_log} error {e}")
252-
self.script_done_callback(
253-
script_log.id, False, -1, "", datetime.now(), 0
254-
)
255-
finally:
256-
DBSession.remove()
257-
25854
def script_done_callback(
25955
self, script_id, success, return_code, output, started_at, spend_time
26056
):
26157
hook_update_script(
26258
DBSession(), script_id, success, return_code, output, started_at, spend_time
26359
)
264-
script_log = self.script_queue.get_script_log_by_id(script_id)
265-
if not script_log: # not counted in memory, only update db record
266-
return
267-
self.script_queue.script_done_callback(script_log)
268-
269-
def load_pending_scripts(self):
270-
"""load pending script logs, should be called only when server start"""
271-
script_logs = list(
272-
DBSession.query(ScriptLog)
273-
.filter_by(state=ScriptLog.PENDING)
274-
.filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1)))
275-
.order_by(ScriptLog.id)
276-
)
277-
logger.info(f"load {len(script_logs)} pending scripts created within 1 hour")
278-
for script_log in script_logs:
279-
self.script_queue.add(script_log)
28060

28161
def statistic_cleaner(self):
28262
while True:
@@ -291,8 +71,6 @@ def statistic_cleaner(self):
29171
time.sleep(24 * 60 * 60)
29272

29373
def start(self):
294-
self.load_pending_scripts()
295-
Thread(target=self.schedule, daemon=True).start()
29674
Thread(target=self.statistic_cleaner, daemon=True).start()
29775

29876

0 commit comments

Comments
 (0)