forked from dapr/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgrpc_interceptor.py
More file actions
75 lines (59 loc) · 2.8 KB
/
grpc_interceptor.py
File metadata and controls
75 lines (59 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from collections import namedtuple
import grpc
class _ClientCallDetails(
namedtuple(
"_ClientCallDetails",
["method", "timeout", "metadata", "credentials", "wait_for_ready", "compression"],
),
grpc.ClientCallDetails,
):
"""This is an implementation of the ClientCallDetails interface needed for interceptors.
This class takes six named values and inherits the ClientCallDetails from grpc package.
This class encloses the values that describe a RPC to be invoked.
"""
pass
class DefaultClientInterceptorImpl(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""
def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata
def _intercept_call(self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details."""
if self._metadata is None:
return client_call_details
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
else:
metadata = []
metadata.extend(self._metadata)
client_call_details = _ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return client_call_details
def intercept_unary_unary(self, continuation, client_call_details, request):
new_client_call_details = self._intercept_call(client_call_details)
return continuation(new_client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
new_client_call_details = self._intercept_call(client_call_details)
return continuation(new_client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details, request):
new_client_call_details = self._intercept_call(client_call_details)
return continuation(new_client_call_details, request)
def intercept_stream_stream(self, continuation, client_call_details, request):
new_client_call_details = self._intercept_call(client_call_details)
return continuation(new_client_call_details, request)