-
Notifications
You must be signed in to change notification settings - Fork 61
Expand file tree
/
Copy pathtest_idlc.py
More file actions
69 lines (57 loc) · 1.93 KB
/
test_idlc.py
File metadata and controls
69 lines (57 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
import pytest
MESSAGE_TEXT = "Hello, World!"
@pytest.fixture
def announcements_library():
'''
Generates the Announcements library from the test_idlc.idl file.
'''
import os
import shutil
import subprocess
assert 'CYCLONEDDS_HOME' in os.environ, 'CYCLONEDDS_HOME is not set'
idlc_path = os.path.join(os.environ['CYCLONEDDS_HOME'], 'bin', 'idlc')
subprocess.run([idlc_path, '-l', 'py', 'test_idlc.idl'])
yield
shutil.rmtree('Announcements')
@pytest.mark.asyncio
async def test_communication(announcements_library):
'''
Creates a publisher and a subscriber and checks if the subscriber receives
the message sent by the publisher.
'''
tasks = [
_subscriber(),
_publisher(MESSAGE_TEXT),
]
results = await asyncio.gather(*tasks)
assert results[0] == MESSAGE_TEXT
async def _publisher(message_text, timeout=2):
'''
Sends a given message text to the subscriber.
'''
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from Announcements import Message
participant = DomainParticipant(0)
topic = Topic(participant, "Announcements", Message)
writer = DataWriter(participant, topic)
message = Message(text=message_text)
writer.write(message)
await asyncio.sleep(timeout)
async def _subscriber(timeout=2):
'''
Receives a message. Returns None if it times out.
'''
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.sub import DataReader
from Announcements import Message
from cyclonedds.util import duration
participant = DomainParticipant(0)
topic = Topic(participant, "Announcements", Message)
reader = DataReader(participant, topic)
async for update in reader.read_aiter(timeout=duration(seconds=timeout)):
return update.text
return None