66from sqlalchemy .orm import Session
77
88from api import models , settings
9- from api .core .filesystem import FileSystem , get_user_filesystem
9+ from api .core .filesystem import FileSystem
1010from api .schemas import job as schemas
1111
1212
1313def enqueue_job (
14- job : models .Job , enqueueing_func : Callable [[schemas .QueueJob ], None ]
14+ job : models .Job ,
15+ filesystem : FileSystem ,
16+ enqueueing_func : Callable [[schemas .QueueJob ], None ],
1517) -> None :
16- user_fs = get_user_filesystem (user_id = job .user_id )
17-
1818 app = job .application
1919 job_config = settings .application_config .config [app ["application" ]][app ["version" ]][
2020 app ["entrypoint" ]
@@ -47,14 +47,14 @@ def prepare_files(root_in: str, root_out: str, fs: FileSystem) -> dict[str, str]
4747 f"artifact/{ artifact_id } "
4848 for artifact_id in job .attributes ["files_down" ]["artifact_ids" ]
4949 ]
50- _validate_files (user_fs , [config_path ] + data_paths + artifact_paths )
50+ _validate_files (filesystem , [config_path ] + data_paths + artifact_paths )
5151 roots_down = handler_config ["files_down" ]
52- files_down = prepare_files (config_path , roots_down ["config_id" ], user_fs )
52+ files_down = prepare_files (config_path , roots_down ["config_id" ], filesystem )
5353 for data_path in data_paths :
54- files_down .update (prepare_files (data_path , roots_down ["data_ids" ], user_fs ))
54+ files_down .update (prepare_files (data_path , roots_down ["data_ids" ], filesystem ))
5555 for artifact_path in artifact_paths :
5656 files_down .update (
57- prepare_files (artifact_path , roots_down ["artifact_ids" ], user_fs )
57+ prepare_files (artifact_path , roots_down ["artifact_ids" ], filesystem )
5858 )
5959
6060 app_specs = schemas .AppSpecs (
@@ -76,9 +76,9 @@ def prepare_files(root_in: str, root_out: str, fs: FileSystem) -> dict[str, str]
7676 )
7777
7878 paths_upload = {
79- "output" : user_fs .full_path_uri (job .paths_out ["output" ]),
80- "log" : user_fs .full_path_uri (job .paths_out ["log" ]),
81- "artifact" : user_fs .full_path_uri (job .paths_out ["artifact" ]),
79+ "output" : filesystem .full_path_uri (job .paths_out ["output" ]),
80+ "log" : filesystem .full_path_uri (job .paths_out ["log" ]),
81+ "artifact" : filesystem .full_path_uri (job .paths_out ["artifact" ]),
8282 }
8383
8484 queue_item = schemas .QueueJob (
@@ -117,6 +117,7 @@ def _validate_files(filesystem: FileSystem, paths: list[str]) -> None:
117117
118118def create_job (
119119 db : Session ,
120+ filesystem : FileSystem ,
120121 enqueueing_func : Callable [[schemas .QueueJob ], None ],
121122 job : schemas .JobCreate ,
122123 user_id : int ,
@@ -146,18 +147,17 @@ def create_job(
146147 status_code = status .HTTP_400_BAD_REQUEST ,
147148 detail = ve ,
148149 )
149- enqueue_job (db_job , enqueueing_func )
150+ enqueue_job (db_job , filesystem , enqueueing_func )
150151 db .commit ()
151152 db .refresh (db_job )
152153 return db_job
153154
154155
155- def delete_job (db : Session , db_job : models .Job ) -> models .Job :
156+ def delete_job (db : Session , filesystem : FileSystem , db_job : models .Job ) -> models .Job :
156157 db .delete (db_job )
157- user_fs = get_user_filesystem (user_id = db_job .user_id )
158158 for path in db_job .paths_out .values ():
159159 if path [- 1 ] != "/" :
160160 path += "/"
161- user_fs .delete (path )
161+ filesystem .delete (path )
162162 db .commit ()
163163 return db_job
0 commit comments