Skip to content

Commit 9464505

Browse files
authored
Asyncio Client Fixes (2) (#780)
Some small and big fixes and improvements to mostly the asyncio client, but some asyncore as well: * Check the existence of event handlers in *Map proxies. * Cluster get_member must use the correct variable * Compact schema send retry, blocking cloud discovery refresh * Create tasks for conn.close_connection calls. * Fix ConnectionManager start_connect_all_members task * Fixed the deadlock in asyncio listener * Fixed the race condition in ProxyManager.get_or_create * Fixed the race condition in _handle_successful auth * Fixed wrong method retries in VC * Remove the write loop in reactor * VC put_if_absent must copy the passed document before updating it.
1 parent 57b5175 commit 9464505

10 files changed

Lines changed: 165 additions & 106 deletions

File tree

hazelcast/internal/asyncio_cluster.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,15 @@ def _fire_membership_events(self, dead_members, new_members):
262262
if handler:
263263
try:
264264
handler(dead_member)
265-
except:
265+
except Exception:
266266
_logger.exception("Exception in membership listener")
267267

268268
for new_member in new_members:
269269
for handler, _ in self._listeners.values():
270270
if handler:
271271
try:
272272
handler(new_member)
273-
except:
273+
except Exception:
274274
_logger.exception("Exception in membership listener")
275275

276276
def _detect_membership_events(self, previous_members, current_members):

hazelcast/internal/asyncio_connection.py

Lines changed: 91 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import struct
66
import time
77
import uuid
8-
from typing import Coroutine
8+
from typing import Coroutine, Tuple
99

1010
from hazelcast import __version__
1111
from hazelcast.config import ReconnectMode
@@ -307,37 +307,18 @@ async def connect_to_all_cluster_members(self, sync_start):
307307

308308
self._start_connect_all_members_timer()
309309

310-
async def on_connection_close(self, closed_connection):
311-
remote_uuid = closed_connection.remote_uuid
312-
remote_address = closed_connection.remote_address
313-
314-
if not remote_address:
310+
async def on_connection_close(self, closed_connection, unsafe=False):
311+
if not closed_connection.remote_address:
315312
_logger.debug(
316313
"Destroying %s, but it has no remote address, hence nothing is "
317314
"removed from the connection dictionary",
318315
closed_connection,
319316
)
320317
return
321318

322-
disconnected = False
323-
removed = False
324-
trigger_reconnection = False
325-
async with self._lock:
326-
connection = self.active_connections.get(remote_uuid, None)
327-
if connection == closed_connection:
328-
self.active_connections.pop(remote_uuid, None)
329-
removed = True
330-
_logger.info(
331-
"Removed connection to %s:%s, connection: %s",
332-
remote_address,
333-
remote_uuid,
334-
connection,
335-
)
336-
337-
if not self.active_connections:
338-
trigger_reconnection = True
339-
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
340-
disconnected = True
319+
disconnected, removed, trigger_reconnection = await self._determine_connection_state(
320+
closed_connection, unsafe=unsafe
321+
)
341322

342323
if disconnected:
343324
self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED)
@@ -359,9 +340,40 @@ async def on_connection_close(self, closed_connection):
359340
_logger.debug(
360341
"Destroying %s, but there is no mapping for %s in the connection dictionary",
361342
closed_connection,
343+
closed_connection.remote_uuid,
344+
)
345+
346+
async def _determine_connection_state(
347+
self, closed_connection, unsafe=False
348+
) -> Tuple[bool, bool, bool]:
349+
if unsafe:
350+
return self._determine_connection_state_unsafe(closed_connection)
351+
async with self._lock:
352+
return self._determine_connection_state_unsafe(closed_connection)
353+
354+
def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]:
355+
remote_uuid = closed_connection.remote_uuid
356+
disconnected = False
357+
removed = False
358+
trigger_reconnection = False
359+
connection = self.active_connections.get(remote_uuid, None)
360+
if connection == closed_connection:
361+
self.active_connections.pop(remote_uuid, None)
362+
removed = True
363+
_logger.info(
364+
"Removed connection to %s:%s, connection: %s",
365+
closed_connection.remote_address,
362366
remote_uuid,
367+
connection,
363368
)
364369

370+
if not self.active_connections:
371+
trigger_reconnection = True
372+
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
373+
disconnected = True
374+
375+
return disconnected, removed, trigger_reconnection
376+
365377
def check_invocation_allowed(self):
366378
state = self._client_state
367379
if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections:
@@ -464,6 +476,12 @@ def _init_wait_strategy(self, config):
464476
def _start_connect_all_members_timer(self):
465477
connecting_uuids = set()
466478

479+
async def connect_to_member(member):
480+
try:
481+
await self._get_or_connect_to_member(member)
482+
except Exception:
483+
_logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True)
484+
467485
async def run():
468486
await asyncio.sleep(1)
469487
if not self._lifecycle_service.running:
@@ -480,7 +498,7 @@ async def run():
480498
connecting_uuids.add(member_uuid)
481499
if not self._lifecycle_service.running:
482500
break
483-
tg.create_task(self._get_or_connect_to_member(member))
501+
tg.create_task(connect_to_member(member))
484502
member_uuids.append(member_uuid)
485503

486504
for item in member_uuids:
@@ -658,49 +676,54 @@ async def _handle_successful_auth(self, response, connection):
658676

659677
existing = self.active_connections.get(remote_uuid, None)
660678

661-
if existing:
662-
await connection.close_connection(
663-
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
664-
)
665-
return existing
666-
667-
new_cluster_id = response["cluster_id"]
668-
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
669-
if changed_cluster:
670-
await self._check_client_state_on_cluster_change(connection)
671-
_logger.warning(
672-
"Switching from current cluster: %s to new cluster: %s",
673-
self._cluster_id,
674-
new_cluster_id,
675-
)
676-
self._on_cluster_restart()
679+
if existing:
680+
await connection.close_connection(
681+
"Duplicate connection to same member with UUID: %s" % remote_uuid,
682+
None,
683+
unsafe=True,
684+
)
685+
return existing
686+
687+
new_cluster_id = response["cluster_id"]
688+
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
689+
if changed_cluster:
690+
await self._check_client_state_on_cluster_change(connection)
691+
_logger.warning(
692+
"Switching from current cluster: %s to new cluster: %s",
693+
self._cluster_id,
694+
new_cluster_id,
695+
)
696+
self._on_cluster_restart()
677697

678-
async with self._lock:
679698
is_initial_connection = not self.active_connections
680699
self.active_connections[remote_uuid] = connection
681700
fire_connected_lifecycle_event = False
682701

683-
if is_initial_connection:
684-
self._cluster_id = new_cluster_id
685-
# In split brain, the client might connect to the one half
686-
# of the cluster, and then later might reconnect to the
687-
# other half, after the half it was connected to is
688-
# completely dead. Since the cluster id is preserved in
689-
# split brain scenarios, it is impossible to distinguish
690-
# reconnection to the same cluster vs reconnection to the
691-
# other half of the split brain. However, in the latter,
692-
# we might need to send some state to the other half of
693-
# the split brain (like Compact schemas). That forces us
694-
# to send the client state to the cluster after the first
695-
# cluster connection, regardless the cluster id is
696-
# changed or not.
697-
if self._established_initial_cluster_connection:
698-
self._client_state = ClientState.CONNECTED_TO_CLUSTER
699-
await self._initialize_on_cluster(new_cluster_id)
700-
else:
701-
fire_connected_lifecycle_event = True
702-
self._established_initial_cluster_connection = True
703-
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
702+
init_on_cluster = False
703+
if is_initial_connection:
704+
self._cluster_id = new_cluster_id
705+
# In split brain, the client might connect to the one half
706+
# of the cluster, and then later might reconnect to the
707+
# other half, after the half it was connected to is
708+
# completely dead. Since the cluster id is preserved in
709+
# split brain scenarios, it is impossible to distinguish
710+
# reconnection to the same cluster vs reconnection to the
711+
# other half of the split brain. However, in the latter,
712+
# we might need to send some state to the other half of
713+
# the split brain (like Compact schemas). That forces us
714+
# to send the client state to the cluster after the first
715+
# cluster connection, regardless the cluster id is
716+
# changed or not.
717+
if self._established_initial_cluster_connection:
718+
self._client_state = ClientState.CONNECTED_TO_CLUSTER
719+
init_on_cluster = True
720+
else:
721+
fire_connected_lifecycle_event = True
722+
self._established_initial_cluster_connection = True
723+
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
724+
725+
if init_on_cluster:
726+
await self._initialize_on_cluster(new_cluster_id)
704727

705728
if fire_connected_lifecycle_event:
706729
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)
@@ -777,7 +800,7 @@ async def _check_client_state_on_cluster_change(self, connection):
777800
# we can operate on. In those scenarios, we rely on the fact that we will
778801
# reopen the connections.
779802
reason = "Connection does not belong to the cluster %s" % self._cluster_id
780-
await connection.close_connection(reason, None)
803+
await connection.close_connection(reason, None, unsafe=True)
781804
raise ValueError(reason)
782805

783806
def _on_cluster_restart(self):
@@ -985,13 +1008,13 @@ def send_message(self, message):
9851008
self._write(message.buf)
9861009
return True
9871010

988-
# Not named close to distinguish it from the asyncore.dispatcher.close.
989-
async def close_connection(self, reason, cause):
1011+
async def close_connection(self, reason, cause, unsafe=False):
9901012
"""Closes the connection.
9911013
9921014
Args:
9931015
reason (str): The reason this connection is going to be closed. Is allowed to be None.
9941016
cause (Exception): The exception responsible for closing this connection. Is allowed to be None.
1017+
unsafe (bool): Do not acquire a lock
9951018
"""
9961019
if not self.live:
9971020
return
@@ -1003,7 +1026,7 @@ async def close_connection(self, reason, cause):
10031026
self._inner_close()
10041027
except Exception:
10051028
_logger.exception("Error while closing the the connection %s", self)
1006-
await self._connection_manager.on_connection_close(self)
1029+
await self._connection_manager.on_connection_close(self, unsafe=unsafe)
10071030

10081031
def _log_close(self, reason, cause):
10091032
msg = "%s closed. Reason: %s"

hazelcast/internal/asyncio_proxy/manager.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import typing
23

34
from hazelcast.internal.asyncio_proxy.vector_collection import (
@@ -29,11 +30,24 @@ def __init__(self, context):
2930

3031
async def get_or_create(self, service_name, name, create_on_remote=True):
3132
ns = (service_name, name)
32-
if ns in self._proxies:
33-
return self._proxies[ns]
33+
proxy = self._proxies.get(ns)
34+
if proxy is not None:
35+
if isinstance(proxy, asyncio.Future):
36+
return await proxy
37+
return proxy
3438

35-
proxy = await self._create_proxy(service_name, name, create_on_remote)
39+
# allocate the proxy slot, so a task that tries to access the same proxy knows it's being created
40+
fut = asyncio.get_running_loop().create_future()
41+
self._proxies[ns] = fut
42+
try:
43+
proxy = await self._create_proxy(service_name, name, create_on_remote)
44+
except BaseException as e:
45+
self._proxies.pop(ns, None)
46+
fut.set_exception(e)
47+
raise
48+
# replace the placeholder with the proxy
3649
self._proxies[ns] = proxy
50+
fut.set_result(proxy)
3751
return proxy
3852

3953
async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy:
@@ -59,4 +73,4 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True):
5973
return False
6074

6175
def get_distributed_objects(self):
62-
return to_list(self._proxies.values())
76+
return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future))

hazelcast/internal/asyncio_proxy/map.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,23 +279,32 @@ def handle_event_entry(
279279
number_of_affected_entries,
280280
)
281281
if event.event_type == EntryEventType.ADDED:
282-
added_func(event)
282+
if added_func:
283+
added_func(event)
283284
elif event.event_type == EntryEventType.REMOVED:
284-
removed_func(event)
285+
if removed_func:
286+
removed_func(event)
285287
elif event.event_type == EntryEventType.UPDATED:
286-
updated_func(event)
288+
if updated_func:
289+
updated_func(event)
287290
elif event.event_type == EntryEventType.EVICTED:
288-
evicted_func(event)
291+
if evicted_func:
292+
evicted_func(event)
289293
elif event.event_type == EntryEventType.EVICT_ALL:
290-
evict_all_func(event)
294+
if evict_all_func:
295+
evict_all_func(event)
291296
elif event.event_type == EntryEventType.CLEAR_ALL:
292-
clear_all_func(event)
297+
if clear_all_func:
298+
clear_all_func(event)
293299
elif event.event_type == EntryEventType.MERGED:
294-
merged_func(event)
300+
if merged_func:
301+
merged_func(event)
295302
elif event.event_type == EntryEventType.EXPIRED:
296-
expired_func(event)
303+
if expired_func:
304+
expired_func(event)
297305
elif event.event_type == EntryEventType.LOADED:
298-
loaded_func(event)
306+
if loaded_func:
307+
loaded_func(event)
299308

300309
return await self._register_listener(
301310
request,

hazelcast/internal/asyncio_proxy/vector_collection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ def handler(message):
410410
value_data = self._to_data(document.value)
411411
except SchemaNotReplicatedError as e:
412412
return await self._send_schema_and_retry(e, self.put_if_absent, key, document)
413+
document = copy.copy(document)
413414
document.value = value_data
414415
request = vector_collection_put_if_absent_codec.encode_request(
415416
self.name,

0 commit comments

Comments
 (0)