Skip to content

Commit 8d9925e

Browse files
authored
Merge branch 'dev' into nytian/restart
2 parents 5aae164 + f5e6fbb commit 8d9925e

10 files changed

Lines changed: 536 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## Unreleased
6+
7+
### Added
8+
9+
- Client operation correlation logging: `FunctionInvocationId` is now propagated via HTTP headers to the host for client operations, enabling correlation with host logs.
10+
511
## 1.0.0b6
612

713
- [Create timer](https://github.com/Azure/azure-functions-durable-python/issues/35) functionality available

azure/durable_functions/decorators/durable_app.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,14 @@ async def df_client_middleware(*args, **kwargs):
195195
# construct rich object from it,
196196
# and assign parameter to that rich object
197197
starter = kwargs[parameter_name]
198-
client = client_constructor(starter)
198+
199+
# Try to extract the function invocation ID from the context for correlation
200+
function_invocation_id = None
201+
context = kwargs.get('context')
202+
if context is not None and hasattr(context, 'invocation_id'):
203+
function_invocation_id = context.invocation_id
204+
205+
client = client_constructor(starter, function_invocation_id)
199206
kwargs[parameter_name] = client
200207

201208
# Invoke user code with rich DF Client binding

azure/durable_functions/models/DurableOrchestrationClient.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,16 @@ class DurableOrchestrationClient:
2626
orchestration instances.
2727
"""
2828

29-
def __init__(self, context: str):
29+
def __init__(self, context: str, function_invocation_id: Optional[str] = None):
30+
"""Initialize a DurableOrchestrationClient.
31+
32+
Parameters
33+
----------
34+
context : str
35+
The JSON-encoded client binding context.
36+
function_invocation_id : Optional[str]
37+
The function invocation ID for correlation with host-side logs.
38+
"""
3039
self.task_hub_name: str
3140
self._uniqueWebHookOrigins: List[str]
3241
self._event_name_placeholder: str = "{eventName}"
@@ -39,6 +48,7 @@ def __init__(self, context: str):
3948
self._show_history_query_key: str = "showHistory"
4049
self._show_history_output_query_key: str = "showHistoryOutput"
4150
self._show_input_query_key: str = "showInput"
51+
self._function_invocation_id: Optional[str] = function_invocation_id
4252
self._orchestration_bindings: DurableOrchestrationBindings = \
4353
DurableOrchestrationBindings.from_json(context)
4454
self._post_async_request = post_async_request
@@ -84,7 +94,8 @@ async def start_new(self,
8494
request_url,
8595
self._get_json_input(client_input),
8696
trace_parent,
87-
trace_state)
97+
trace_state,
98+
self._function_invocation_id)
8899

89100
status_code: int = response[0]
90101
if status_code <= 202 and response[1]:
@@ -256,7 +267,10 @@ async def raise_event(
256267
request_url = self._get_raise_event_url(
257268
instance_id, event_name, task_hub_name, connection_name)
258269

259-
response = await self._post_async_request(request_url, json.dumps(event_data))
270+
response = await self._post_async_request(
271+
request_url,
272+
json.dumps(event_data),
273+
function_invocation_id=self._function_invocation_id)
260274

261275
switch_statement = {
262276
202: lambda: None,
@@ -445,7 +459,10 @@ async def terminate(self, instance_id: str, reason: str) -> None:
445459
"""
446460
request_url = f"{self._orchestration_bindings.rpc_base_url}instances/{instance_id}/" \
447461
f"terminate?reason={quote(reason)}"
448-
response = await self._post_async_request(request_url, None)
462+
response = await self._post_async_request(
463+
request_url,
464+
None,
465+
function_invocation_id=self._function_invocation_id)
449466
switch_statement = {
450467
202: lambda: None, # instance in progress
451468
410: lambda: None, # instance failed or terminated
@@ -564,7 +581,8 @@ async def signal_entity(self, entityId: EntityId, operation_name: str,
564581
request_url,
565582
json.dumps(operation_input) if operation_input else None,
566583
trace_parent,
567-
trace_state)
584+
trace_state,
585+
self._function_invocation_id)
568586

569587
switch_statement = {
570588
202: lambda: None # signal accepted
@@ -714,7 +732,10 @@ async def rewind(self,
714732
raise Exception("The Python SDK only supports RPC endpoints."
715733
+ "Please remove the `localRpcEnabled` setting from host.json")
716734

717-
response = await self._post_async_request(request_url, None)
735+
response = await self._post_async_request(
736+
request_url,
737+
None,
738+
function_invocation_id=self._function_invocation_id)
718739
status: int = response[0]
719740
ex_msg: str = ""
720741
if status == 200 or status == 202:
@@ -753,7 +774,10 @@ async def suspend(self, instance_id: str, reason: str) -> None:
753774
"""
754775
request_url = f"{self._orchestration_bindings.rpc_base_url}instances/{instance_id}/" \
755776
f"suspend?reason={quote(reason)}"
756-
response = await self._post_async_request(request_url, None)
777+
response = await self._post_async_request(
778+
request_url,
779+
None,
780+
function_invocation_id=self._function_invocation_id)
757781
switch_statement = {
758782
202: lambda: None, # instance is suspended
759783
410: lambda: None, # instance completed
@@ -826,7 +850,10 @@ async def resume(self, instance_id: str, reason: str) -> None:
826850
"""
827851
request_url = f"{self._orchestration_bindings.rpc_base_url}instances/{instance_id}/" \
828852
f"resume?reason={quote(reason)}"
829-
response = await self._post_async_request(request_url, None)
853+
response = await self._post_async_request(
854+
request_url,
855+
None,
856+
function_invocation_id=self._function_invocation_id)
830857
switch_statement = {
831858
202: lambda: None, # instance is resumed
832859
410: lambda: None, # instance completed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,10 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None,
246246
The HTTP request method.
247247
uri: str
248248
The HTTP request uri.
249-
content: Optional[str]
250-
The HTTP request content.
249+
content: str or dict, optional
250+
The HTTP request content. Can be a string or a JSON-serializable dictionary.
251+
Note: Although the type hint says 'str', a dictionary is accepted
252+
and will be serialized to JSON.
251253
headers: Optional[Dict[str, str]]
252254
The HTTP request headers.
253255
token_source: TokenSource

azure/durable_functions/models/TaskOrchestrationExecutor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def get_orchestrator_state_str(self) -> str:
276276
message contains in it the string representation of the orchestration's
277277
state
278278
"""
279-
if(self.output is not None):
279+
if (self.output is not None):
280280
try:
281281
# Attempt to serialize the output. If serialization fails, raise an
282282
# error indicating that the orchestration output is not serializable,
Lines changed: 123 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,87 @@
1-
from typing import Any, List, Union
1+
from typing import Any, List, Union, Optional
2+
import asyncio
23

34
import aiohttp
45

56

7+
# Global session and lock for thread-safe initialization
8+
_client_session: Optional[aiohttp.ClientSession] = None
9+
_session_lock: asyncio.Lock = asyncio.Lock()
10+
11+
12+
async def _get_session() -> aiohttp.ClientSession:
13+
"""Get or create the shared ClientSession.
14+
15+
Returns
16+
-------
17+
aiohttp.ClientSession
18+
The shared client session with configured timeout and connection pooling.
19+
"""
20+
global _client_session
21+
22+
# Double-check locking pattern for async
23+
if _client_session is None or _client_session.closed:
24+
async with _session_lock:
25+
# Check again after acquiring lock
26+
if _client_session is None or _client_session.closed:
27+
# Configure timeout optimized for localhost IPC
28+
timeout = aiohttp.ClientTimeout(
29+
total=240, # 4-minute total timeout for slow operations
30+
sock_connect=10, # Fast connection over localhost
31+
sock_read=None # Covered by total timeout
32+
)
33+
34+
# Configure TCP connector optimized for localhost IPC
35+
connector = aiohttp.TCPConnector(
36+
limit=30, # Maximum connections for single host
37+
limit_per_host=30, # Maximum connections per host
38+
enable_cleanup_closed=True # Enable cleanup of closed connections
39+
)
40+
41+
_client_session = aiohttp.ClientSession(
42+
timeout=timeout,
43+
connector=connector
44+
)
45+
46+
return _client_session
47+
48+
49+
async def _handle_request_error():
50+
"""Handle connection errors by closing and resetting the session.
51+
52+
This handles cases where the remote host process recycles.
53+
"""
54+
global _client_session
55+
async with _session_lock:
56+
if _client_session is not None and not _client_session.closed:
57+
try:
58+
await _client_session.close()
59+
finally:
60+
_client_session = None
61+
62+
63+
async def _close_session() -> None:
64+
"""Close the shared ClientSession if it exists.
65+
66+
Note: This function is currently only called by _handle_request_error().
67+
There is no worker shutdown hook available, but process shutdown will
68+
clean up all resources automatically.
69+
"""
70+
global _client_session
71+
72+
async with _session_lock:
73+
if _client_session is not None and not _client_session.closed:
74+
try:
75+
await _client_session.close()
76+
finally:
77+
_client_session = None
78+
79+
680
async def post_async_request(url: str,
781
data: Any = None,
882
trace_parent: str = None,
9-
trace_state: str = None) -> List[Union[int, Any]]:
83+
trace_state: str = None,
84+
function_invocation_id: str = None) -> List[Union[int, Any]]:
1085
"""Post request with the data provided to the url provided.
1186
1287
Parameters
@@ -19,62 +94,96 @@ async def post_async_request(url: str,
1994
traceparent header to send with the request
2095
trace_state: str
2196
tracestate header to send with the request
97+
function_invocation_id: str
98+
function invocation ID header to send for correlation
2299
23100
Returns
24101
-------
25102
[int, Any]
26103
Tuple with the Response status code and the data returned from the request
27104
"""
28-
async with aiohttp.ClientSession() as session:
29-
headers = {}
30-
if trace_parent:
31-
headers["traceparent"] = trace_parent
32-
if trace_state:
33-
headers["tracestate"] = trace_state
105+
session = await _get_session()
106+
headers = {}
107+
if trace_parent:
108+
headers["traceparent"] = trace_parent
109+
if trace_state:
110+
headers["tracestate"] = trace_state
111+
if function_invocation_id:
112+
headers["X-Azure-Functions-InvocationId"] = function_invocation_id
113+
114+
try:
34115
async with session.post(url, json=data, headers=headers) as response:
35116
# We disable aiohttp's input type validation
36117
# as the server may respond with alternative
37118
# data encodings. This is potentially unsafe.
38119
# More here: https://docs.aiohttp.org/en/stable/client_advanced.html
39120
data = await response.json(content_type=None)
40121
return [response.status, data]
122+
except (aiohttp.ClientError, asyncio.TimeoutError):
123+
# On connection errors, close and recreate session for next request
124+
await _handle_request_error()
125+
raise
41126

42127

43-
async def get_async_request(url: str) -> List[Any]:
128+
async def get_async_request(url: str,
129+
function_invocation_id: str = None) -> List[Any]:
44130
"""Get the data from the url provided.
45131
46132
Parameters
47133
----------
48134
url: str
49135
url to get the data from
136+
function_invocation_id: str
137+
function invocation ID header to send for correlation
50138
51139
Returns
52140
-------
53141
[int, Any]
54142
Tuple with the Response status code and the data returned from the request
55143
"""
56-
async with aiohttp.ClientSession() as session:
57-
async with session.get(url) as response:
144+
session = await _get_session()
145+
headers = {}
146+
if function_invocation_id:
147+
headers["X-Azure-Functions-InvocationId"] = function_invocation_id
148+
149+
try:
150+
async with session.get(url, headers=headers) as response:
58151
data = await response.json(content_type=None)
59152
if data is None:
60153
data = ""
61154
return [response.status, data]
155+
except (aiohttp.ClientError, asyncio.TimeoutError):
156+
# On connection errors, close and recreate session for next request
157+
await _handle_request_error()
158+
raise
62159

63160

64-
async def delete_async_request(url: str) -> List[Union[int, Any]]:
161+
async def delete_async_request(url: str,
162+
function_invocation_id: str = None) -> List[Union[int, Any]]:
65163
"""Delete the data from the url provided.
66164
67165
Parameters
68166
----------
69167
url: str
70168
url to delete the data from
169+
function_invocation_id: str
170+
function invocation ID header to send for correlation
71171
72172
Returns
73173
-------
74174
[int, Any]
75175
Tuple with the Response status code and the data returned from the request
76176
"""
77-
async with aiohttp.ClientSession() as session:
78-
async with session.delete(url) as response:
177+
session = await _get_session()
178+
headers = {}
179+
if function_invocation_id:
180+
headers["X-Azure-Functions-InvocationId"] = function_invocation_id
181+
182+
try:
183+
async with session.delete(url, headers=headers) as response:
79184
data = await response.json(content_type=None)
80185
return [response.status, data]
186+
except (aiohttp.ClientError, asyncio.TimeoutError):
187+
# On connection errors, close and recreate session for next request
188+
await _handle_request_error()
189+
raise

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pytest==7.1.2
44
python-dateutil==2.8.0
55
requests==2.32.4
66
jsonschema==4.25.1
7-
aiohttp==3.12.14
7+
aiohttp==3.13.3
88
azure-functions>=1.11.3b3
99
nox==2019.11.9
1010
furl==2.1.0

setup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def run(self, *args, **kwargs):
5555
python_requires='>=3.9,<4',
5656
install_requires=[
5757
'azure-functions>=1.12.0',
58-
'aiohttp>=3.12.14',
58+
'aiohttp>=3.13.3',
5959
'requests==2.*',
6060
'python-dateutil>=2.8.0',
6161
'furl>=2.1.0',
@@ -69,7 +69,6 @@ def run(self, *args, **kwargs):
6969
'python-dateutil==2.8.0',
7070
'requests==2.22.0',
7171
'jsonschema==4.25.1',
72-
'aiohttp==3.6.2',
7372
'azure-functions>=1.2.0',
7473
'nox==2019.11.9',
7574
'furl==2.1.0',

0 commit comments

Comments
 (0)