Skip to content

Commit fe1e050

Browse files
Cloud event support for event grid (#314)
Co-authored-by: hallvictoria <59299039+hallvictoria@users.noreply.github.com>
1 parent 416a4f3 commit fe1e050

5 files changed

Lines changed: 214 additions & 4 deletions

File tree

azure/functions/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from ._abc import TimerRequest, InputStream, Context, Out
55
from ._eventhub import EventHubEvent
6-
from ._eventgrid import EventGridEvent, EventGridOutputEvent
6+
from ._eventgrid import CloudEvent, EventGridEvent, EventGridOutputEvent
77
from ._cosmosdb import Document, DocumentList
88
from ._http import HttpRequest, HttpResponse
99
from .decorators import (FunctionApp, Function, Blueprint,
@@ -54,6 +54,7 @@
5454
# Binding rich types, sorted alphabetically.
5555
'Document',
5656
'DocumentList',
57+
'CloudEvent',
5758
'EventGridEvent',
5859
'EventGridOutputEvent',
5960
'EventHubEvent',

azure/functions/_abc.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,54 @@ def data_version(self) -> str:
369369
pass
370370

371371

372+
class CloudEvent(abc.ABC):
373+
"""A CloudEvents v1.0 event message."""
374+
375+
@property
376+
@abc.abstractmethod
377+
def id(self) -> str:
378+
pass
379+
380+
@property
381+
@abc.abstractmethod
382+
def source(self) -> str:
383+
pass
384+
385+
@property
386+
@abc.abstractmethod
387+
def type(self) -> str:
388+
pass
389+
390+
@property
391+
@abc.abstractmethod
392+
def specversion(self) -> str:
393+
pass
394+
395+
@property
396+
@abc.abstractmethod
397+
def time(self) -> typing.Optional[datetime.datetime]:
398+
pass
399+
400+
@property
401+
@abc.abstractmethod
402+
def subject(self) -> typing.Optional[str]:
403+
pass
404+
405+
@property
406+
@abc.abstractmethod
407+
def datacontenttype(self) -> typing.Optional[str]:
408+
pass
409+
410+
@property
411+
@abc.abstractmethod
412+
def dataschema(self) -> typing.Optional[str]:
413+
pass
414+
415+
@abc.abstractmethod
416+
def get_json(self) -> typing.Any:
417+
pass
418+
419+
372420
class Document(abc.ABC):
373421

374422
@classmethod

azure/functions/_eventgrid.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,76 @@ def __repr__(self) -> str:
109109
f'subject={self.subject} '
110110
f'at 0x{id(self):0x}>'
111111
)
112+
113+
114+
class CloudEvent(azf_abc.CloudEvent):
115+
"""A CloudEvents v1.0 event message."""
116+
117+
def __init__(self, *,
118+
id: str,
119+
source: str,
120+
type: str,
121+
specversion: str,
122+
data: typing.Optional[typing.Any],
123+
time: typing.Optional[datetime.datetime] = None,
124+
subject: typing.Optional[str] = None,
125+
datacontenttype: typing.Optional[str] = None,
126+
dataschema: typing.Optional[str] = None,
127+
**extensions: typing.Any) -> None:
128+
self.__id = id
129+
self.__source = source
130+
self.__type = type
131+
self.__specversion = specversion
132+
self.__data = data
133+
self.__time = time
134+
self.__subject = subject
135+
self.__datacontenttype = datacontenttype
136+
self.__dataschema = dataschema
137+
self.__extensions = extensions
138+
139+
@property
140+
def id(self) -> str:
141+
return self.__id
142+
143+
@property
144+
def source(self) -> str:
145+
return self.__source
146+
147+
@property
148+
def type(self) -> str:
149+
return self.__type
150+
151+
@property
152+
def specversion(self) -> str:
153+
return self.__specversion
154+
155+
@property
156+
def time(self) -> typing.Optional[datetime.datetime]:
157+
return self.__time
158+
159+
@property
160+
def subject(self) -> typing.Optional[str]:
161+
return self.__subject
162+
163+
@property
164+
def datacontenttype(self) -> typing.Optional[str]:
165+
return self.__datacontenttype
166+
167+
@property
168+
def dataschema(self) -> typing.Optional[str]:
169+
return self.__dataschema
170+
171+
def get_json(self) -> typing.Any:
172+
return self.__data
173+
174+
@property
175+
def extension_attrs(self) -> typing.Dict[str, typing.Any]:
176+
return dict(self.__extensions)
177+
178+
def __repr__(self) -> str:
179+
return (
180+
f'<azure.CloudEvent id={self.id} '
181+
f'source={self.source} '
182+
f'type={self.type} '
183+
f'at 0x{id(self):0x}>'
184+
)

azure/functions/eventgrid.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ def check_input_type_annotation(cls, pytype: type) -> bool:
2020
"""
2121
Event Grid always sends an array and may send more than one event in
2222
the array. The runtime invokes function once for each array element,
23-
thus no need to parse List[EventGridEvent]
23+
thus no need to parse List[EventGridEvent].
24+
Accepts both EventGridEvent (Event Grid schema) and CloudEvent
25+
(CloudEvents v1.0 schema).
2426
"""
25-
valid_types = azf_eventgrid.EventGridEvent
27+
valid_types = (azf_eventgrid.EventGridEvent, azf_eventgrid.CloudEvent)
2628
return isinstance(pytype, type) and issubclass(pytype, valid_types)
2729

2830
@classmethod
2931
def decode(cls, data: meta.Datum, *,
30-
trigger_metadata) -> azf_eventgrid.EventGridEvent:
32+
trigger_metadata) -> Union[azf_eventgrid.EventGridEvent,
33+
azf_eventgrid.CloudEvent]:
3134
data_type = data.type
3235

3336
if data_type == 'json':
@@ -36,6 +39,23 @@ def decode(cls, data: meta.Datum, *,
3639
raise NotImplementedError(
3740
f'unsupported event grid payload type: {data_type}')
3841

42+
if 'specversion' in body:
43+
known = {'specversion', 'id', 'source', 'type', 'time',
44+
'subject', 'datacontenttype', 'dataschema', 'data'}
45+
extensions = {k: v for k, v in body.items() if k not in known}
46+
return azf_eventgrid.CloudEvent(
47+
id=body.get('id'),
48+
source=body.get('source'),
49+
type=body.get('type'),
50+
specversion=body.get('specversion'),
51+
time=cls._parse_datetime(body.get('time')),
52+
subject=body.get('subject'),
53+
datacontenttype=body.get('datacontenttype'),
54+
dataschema=body.get('dataschema'),
55+
data=body.get('data'),
56+
**extensions,
57+
)
58+
3959
return azf_eventgrid.EventGridEvent(
4060
id=body.get('id'),
4161
topic=body.get('topic'),

tests/test_eventgrid.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ def test_eventgrid_input_type(self):
1818
self.assertFalse(check_input_type(str))
1919
self.assertFalse(check_input_type(bytes))
2020

21+
def test_cloudevent_input_type(self):
22+
check_input_type = azf_event_grid.EventGridEventInConverter.\
23+
check_input_type_annotation
24+
self.assertTrue(check_input_type(func.CloudEvent))
25+
self.assertFalse(check_input_type(List[func.CloudEvent]))
26+
self.assertFalse(check_input_type(str))
27+
self.assertFalse(check_input_type(bytes))
28+
2129
def test_eventgrid_output_type(self):
2230
check_output_type = azf_event_grid.EventGridEventOutConverter.\
2331
check_output_type_annotation
@@ -148,6 +156,66 @@ def _generate_multiple_eventgrid_event(with_date=True):
148156
data_version='dataVersion',
149157
)]
150158

159+
def test_cloudevent_decode(self):
160+
event = azf_event_grid.EventGridEventInConverter.decode(
161+
data=self._generate_single_cloudevent_datum(), trigger_metadata=None
162+
)
163+
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
164+
self.assertEqual(event.id, "A234-1234-1234")
165+
self.assertEqual(event.source, "/mycontext/subcontext")
166+
self.assertEqual(event.type, "com.example.type")
167+
self.assertEqual(event.specversion, "1.0")
168+
self.assertEqual(event.subject, "mysubject")
169+
self.assertEqual(event.datacontenttype, "application/json")
170+
self.assertIsNotNone(event.time)
171+
self.assertIsNotNone(event.get_json())
172+
self.assertEqual(event.get_json().get("key"), "value")
173+
174+
def test_cloudevent_decode_with_null_data(self):
175+
event = azf_event_grid.EventGridEventInConverter.decode(
176+
data=self._generate_single_cloudevent_datum(with_data=False),
177+
trigger_metadata=None
178+
)
179+
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
180+
self.assertEqual(event.id, "A234-1234-1234")
181+
self.assertIsNone(event.get_json())
182+
183+
def test_cloudevent_decode_with_extensions(self):
184+
event = azf_event_grid.EventGridEventInConverter.decode(
185+
data=self._generate_single_cloudevent_datum(with_extensions=True),
186+
trigger_metadata=None
187+
)
188+
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
189+
self.assertEqual(event.extension_attrs.get("customext"), "extvalue")
190+
191+
def test_eventgrid_schema_not_broken(self):
192+
event = azf_event_grid.EventGridEventInConverter.decode(
193+
data=self._generate_single_eventgrid_datum(), trigger_metadata=None
194+
)
195+
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.EventGridEvent)
196+
self.assertEqual(event.id, "00010001-0001-0001-0001-000100010001")
197+
self.assertEqual(event.topic, "/TestTopic/namespaces/test")
198+
self.assertEqual(event.event_type, "captureFileCreated")
199+
200+
@staticmethod
201+
def _generate_single_cloudevent_datum(with_data=True,
202+
with_extensions=False):
203+
payload = {
204+
"specversion": "1.0",
205+
"type": "com.example.type",
206+
"source": "/mycontext/subcontext",
207+
"id": "A234-1234-1234",
208+
"time": "2018-04-05T17:31:00Z",
209+
"subject": "mysubject",
210+
"datacontenttype": "application/json",
211+
}
212+
if with_data:
213+
payload["data"] = {"key": "value"}
214+
if with_extensions:
215+
payload["customext"] = "extvalue"
216+
import json as _json
217+
return func.meta.Datum(_json.dumps(payload), 'json')
218+
151219
@staticmethod
152220
def _generate_single_eventgrid_str(in_bytes=False):
153221
string_representation = '{"id": "id", ' \

0 commit comments

Comments
 (0)