|
8 | 8 |
|
9 | 9 | import asyncssh |
10 | 10 | from asyncssh.sftp import SFTPOpUnsupported |
11 | | -from fsspec.asyn import AsyncFileSystem, async_methods, sync, sync_wrapper |
| 11 | +from fsspec.asyn import ( |
| 12 | + AsyncFileSystem, |
| 13 | + FSTimeoutError, |
| 14 | + async_methods, |
| 15 | + sync, |
| 16 | + sync_wrapper, |
| 17 | +) |
12 | 18 | from fsspec.utils import infer_storage_options |
13 | 19 |
|
14 | 20 | from sshfs.file import SSHFile |
@@ -70,9 +76,7 @@ def __init__( |
70 | 76 | max_sftp_channels=max_sessions - _SHELL_CHANNELS, |
71 | 77 | **_client_args, |
72 | 78 | ) |
73 | | - weakref.finalize( |
74 | | - self, sync, self.loop, self._finalize, self._pool, self._stack |
75 | | - ) |
| 79 | + weakref.finalize(self, self._finalize, self.loop, self._pool, self._stack) |
76 | 80 |
|
77 | 81 | @classmethod |
78 | 82 | def _strip_protocol(cls, path): |
@@ -101,15 +105,29 @@ async def _connect( |
101 | 105 | connect = sync_wrapper(_connect) |
102 | 106 |
|
103 | 107 | @staticmethod |
104 | | - async def _finalize(pool, stack): |
105 | | - await pool.close() |
106 | | - |
107 | | - # If an error occurs while the SSHFile is trying to |
108 | | - # open the native file, then the client might get broken |
109 | | - # due to partial initialization. We are just going to ignore |
110 | | - # the errors that arises on the finalization layer |
111 | | - with suppress(BrokenPipeError): |
112 | | - await stack.aclose() |
| 108 | + def _finalize(loop, pool, stack): |
| 109 | + async def close(): |
| 110 | + await pool.close() |
| 111 | + # If an error occurs while the SSHFile is trying to |
| 112 | + # open the native file, then the client might get broken |
| 113 | + # due to partial initialization. We are just going to ignore |
| 114 | + # the errors that arises on the finalization layer |
| 115 | + with suppress(BrokenPipeError): |
| 116 | + await stack.aclose() |
| 117 | + |
| 118 | + if loop is not None and loop.is_running(): |
| 119 | + try: |
| 120 | + loop = asyncio.get_running_loop() |
| 121 | + loop.create_task(close()) |
| 122 | + return |
| 123 | + except RuntimeError: |
| 124 | + pass |
| 125 | + |
| 126 | + try: |
| 127 | + sync(loop, close, timeout=0.1) |
| 128 | + return |
| 129 | + except FSTimeoutError: |
| 130 | + pass |
113 | 131 |
|
114 | 132 | @property |
115 | 133 | def client(self): |
|
0 commit comments