Skip to content

Commit 21d1c12

Browse files
authored
Merge pull request #1 from arago/remove_deadlock
[bugfix] Avoid read-in deadlock
2 parents c281c5b + 327fef0 commit 21d1c12

File tree

3 files changed

+86
-53
lines changed

3 files changed

+86
-53
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
# v1.0.1
2+
3+
[bugfix] Avoid read-in deadlock
4+
5+
* When an error is thrown while data is read in, the queues
6+
within HiroGraphBatch need to be closed cleanly or a
7+
deadlock will occur. Fixed.
8+
19
# v1.0.0
210

311
Created project by splitting `batchclient.py` from [hiro-graph-client](https://github.com/arago/hiro-client-python).

src/hiro_batch_client/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.0.dev0
1+
1.0.1

src/hiro_batch_client/batchclient.py

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
from enum import Enum
55
from typing import Optional, Tuple, Any, Iterator, IO
66

7-
from requests.exceptions import RequestException
8-
97
from hiro_graph_client.client import HiroGraph
108
from hiro_graph_client.clientlib import AbstractTokenApiHandler
9+
from requests.exceptions import RequestException
1110

1211

1312
class Result(Enum):
@@ -491,7 +490,8 @@ def error_message(entity: Entity,
491490
action: Action,
492491
error: Exception,
493492
original: dict,
494-
status_code: int = None) -> dict:
493+
status_code: int = None,
494+
interrupted: bool = None) -> dict:
495495
"""
496496
Failure message format
497497
@@ -513,16 +513,23 @@ def error_message(entity: Entity,
513513
:param error: The exception raised
514514
:param original: The data that lead to the exception
515515
:param status_code: HTTP status code if available
516+
:param interrupted: Indicates, that the current batch processing has been interrupted.
516517
:return: The message
517518
"""
519+
520+
message = str(error)
521+
if interrupted:
522+
message = "BATCH PROCESSING ABORTED! " + message + \
523+
" All further data has been ignored after this error occurred."
524+
518525
return {
519526
"status": Result.FAILURE.value,
520527
"entity": entity.value,
521528
"action": action.value,
522529
"data": {
523530
"error": error.__class__.__name__,
524531
"code": status_code,
525-
"message": str(error),
532+
"message": message,
526533
"original_data": original
527534
}
528535
}
@@ -1037,67 +1044,85 @@ def _request_queue_put(_command: str,
10371044
self.request_queue.put((_command, _attributes))
10381045
return _parallel_workers
10391046

1040-
session = SessionData(self.use_xid_cache)
1047+
parallel_workers = 0
10411048

1042-
collected_results = [] if self.callback is None else None
1049+
try:
1050+
session = SessionData(self.use_xid_cache)
10431051

1044-
parallel_workers = 0
1052+
collected_results = [] if self.callback is None else None
10451053

1046-
executor.submit(HiroGraphBatch._reader, self, collected_results)
1054+
executor.submit(HiroGraphBatch._reader, self, collected_results)
10471055

1048-
handle_session_data = False
1049-
for command_entry in command_iter:
1050-
for command, attributes in command_entry.items():
1056+
handle_session_data = False
1057+
for command_entry in command_iter:
1058+
for command, attributes in command_entry.items():
10511059

1052-
if command == "handle_vertices_combined":
1053-
command = "handle_vertices"
1054-
handle_session_data = True
1060+
if command == "handle_vertices_combined":
1061+
command = "handle_vertices"
1062+
handle_session_data = True
10551063

1056-
try:
1057-
if command not in self.command_map:
1058-
raise SourceValueError("No such command \"{}\".".format(command))
1064+
try:
1065+
if command not in self.command_map:
1066+
raise SourceValueError("No such command \"{}\".".format(command))
10591067

1060-
if isinstance(attributes, list):
1061-
for attribute_entry in attributes:
1068+
if isinstance(attributes, list):
1069+
for attribute_entry in attributes:
1070+
parallel_workers = _request_queue_put(
1071+
command,
1072+
attribute_entry,
1073+
parallel_workers)
1074+
else:
10621075
parallel_workers = _request_queue_put(
10631076
command,
1064-
attribute_entry,
1077+
attributes,
10651078
parallel_workers)
1066-
else:
1067-
parallel_workers = _request_queue_put(
1068-
command,
1069-
attributes,
1070-
parallel_workers)
1071-
1072-
except SourceValueError as err:
1073-
sub_result, sub_code = HiroBatchRunner.error_message(
1074-
Entity.UNDEFINED,
1075-
Action.UNDEFINED,
1076-
err,
1077-
attributes,
1078-
400), 400
1079-
1080-
self.result_queue.put((sub_result, sub_code))
1081-
1082-
if handle_session_data:
1083-
self.request_queue.join()
1084-
self._edges_from_session(session)
1085-
self._timeseries_from_session(session)
1086-
self._attachments_from_session(session)
1087-
# Ensure, that all data related to the original vertex has been sent
1088-
# before creating any issues.
1089-
self.request_queue.join()
1090-
self._issues_from_session(session)
10911079

1092-
self.request_queue.join()
1093-
self.result_queue.join()
1094-
1095-
for _ in range(parallel_workers):
1096-
self.request_queue.put(None)
1080+
# Empty old attributes, so they do not show up on exceptions
1081+
# that might be thrown before the new attributes have been
1082+
# read (i.e. read-in-exceptions while iterating over *command_iter*).
1083+
attributes = None
1084+
1085+
except SourceValueError as err:
1086+
sub_result, sub_code = HiroBatchRunner.error_message(
1087+
entity=Entity.UNDEFINED,
1088+
action=Action.UNDEFINED,
1089+
error=err,
1090+
original=attributes,
1091+
status_code=400), 400
1092+
1093+
self.result_queue.put((sub_result, sub_code))
1094+
1095+
if handle_session_data:
1096+
self.request_queue.join()
1097+
self._edges_from_session(session)
1098+
self._timeseries_from_session(session)
1099+
self._attachments_from_session(session)
1100+
# Ensure, that all data related to the original vertex has been sent
1101+
# before creating any issues.
1102+
self.request_queue.join()
1103+
self._issues_from_session(session)
1104+
1105+
return collected_results
1106+
1107+
except Exception as err:
1108+
sub_result, sub_code = HiroBatchRunner.error_message(
1109+
entity=Entity.UNDEFINED,
1110+
action=Action.UNDEFINED,
1111+
error=err,
1112+
original=attributes,
1113+
status_code=400,
1114+
interrupted=True), 400
1115+
1116+
self.result_queue.put((sub_result, sub_code))
1117+
1118+
finally:
1119+
self.request_queue.join()
1120+
self.result_queue.join()
10971121

1098-
self.result_queue.put(None)
1122+
for _ in range(parallel_workers):
1123+
self.request_queue.put(None)
10991124

1100-
return collected_results
1125+
self.result_queue.put(None)
11011126

11021127

11031128
class SourceValueError(ValueError):

0 commit comments

Comments
 (0)