Skip to content

Commit 0b41cbb

Browse files
committed
Test Python code generation
Signed-off-by: "Adriano Marto Reis" <adrianomarto@gmail.com>
1 parent 29c64e6 commit 0b41cbb

File tree

4 files changed

+227
-0
lines changed

4 files changed

+227
-0
lines changed

tests/test_idlc.idl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
module Announcements
2+
{
3+
struct Message
4+
{
5+
string text;
6+
};
7+
};

tests/test_idlc.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
import pytest
3+
4+
5+
MESSAGE_TEXT = "Hello, World!"
6+
7+
8+
@pytest.fixture
9+
def announcements_library():
10+
'''
11+
Generates the Announcements library from the test_idlc.idl file.
12+
'''
13+
import os
14+
import shutil
15+
import subprocess
16+
assert 'CYCLONEDDS_HOME' in os.environ, 'CYCLONEDDS_HOME is not set'
17+
idlc_path = os.path.join(os.environ['CYCLONEDDS_HOME'], 'bin', 'idlc')
18+
subprocess.run([idlc_path, '-l', 'py', 'test_idlc.idl'])
19+
yield
20+
shutil.rmtree('Announcements')
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_communication(announcements_library):
25+
'''
26+
Creates a publisher and a subscriber and checks if the subscriber receives
27+
the message sent by the publisher.
28+
'''
29+
tasks = [
30+
_subscriber(),
31+
_publisher(MESSAGE_TEXT),
32+
]
33+
results = await asyncio.gather(*tasks)
34+
assert results[0] == MESSAGE_TEXT
35+
36+
37+
async def _publisher(message_text, timeout=2):
38+
'''
39+
Sends a given message text to the subscriber.
40+
'''
41+
from cyclonedds.domain import DomainParticipant
42+
from cyclonedds.topic import Topic
43+
from cyclonedds.pub import DataWriter
44+
from Announcements import Message
45+
46+
participant = DomainParticipant(0)
47+
topic = Topic(participant, "Announcements", Message)
48+
writer = DataWriter(participant, topic)
49+
message = Message(text=message_text)
50+
writer.write(message)
51+
await asyncio.sleep(timeout)
52+
53+
54+
async def _subscriber(timeout=2):
55+
'''
56+
Receives a message. Returns None if it times out.
57+
'''
58+
from cyclonedds.domain import DomainParticipant
59+
from cyclonedds.topic import Topic
60+
from cyclonedds.sub import DataReader
61+
from Announcements import Message
62+
from cyclonedds.util import duration
63+
64+
participant = DomainParticipant(0)
65+
topic = Topic(participant, "Announcements", Message)
66+
reader = DataReader(participant, topic)
67+
async for update in reader.read_aiter(timeout=duration(seconds=timeout)):
68+
return update.text
69+
return None

tests/test_idlc_hierarchy.idl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module Hierarchy
2+
{
3+
@mutable
4+
struct Base
5+
{
6+
string fieldA;
7+
};
8+
9+
@mutable
10+
struct Derived : Base
11+
{
12+
string fieldB;
13+
};
14+
};

tests/test_idlc_hierarchy.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import asyncio
2+
import pytest
3+
import sys
4+
5+
6+
IDL_FILE = 'test_idlc_hierarchy.idl'
7+
8+
9+
@pytest.fixture
10+
def hierarchy_library():
11+
'''
12+
Generates the Hierarchy library from the test_idlc_hierarchy.idl file.
13+
'''
14+
import os
15+
import shutil
16+
import subprocess
17+
assert 'CYCLONEDDS_HOME' in os.environ, 'CYCLONEDDS_HOME is not set'
18+
idlc_path = os.path.join(os.environ['CYCLONEDDS_HOME'], 'bin', 'idlc')
19+
subprocess.run([idlc_path, '-l', 'py', IDL_FILE])
20+
yield
21+
shutil.rmtree('Hierarchy')
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_base_topic(hierarchy_library):
26+
'''
27+
Creates a publisher and a subscriber of the Base topic and checks if the
28+
subscriber receives the update sent by the publisher.
29+
'''
30+
from Hierarchy import Base
31+
base = Base(fieldA="Hakuna")
32+
tasks = [
33+
_subscriber(Base),
34+
_publisher(Base, base),
35+
]
36+
results = await asyncio.gather(*tasks)
37+
assert results[0] == results[1]
38+
39+
40+
@pytest.mark.asyncio
41+
async def test_derived_topic(hierarchy_library):
42+
'''
43+
Creates a publisher and a subscriber of the Derived topic and checks if the
44+
subscriber receives the update sent by the publisher.
45+
'''
46+
from Hierarchy import Derived
47+
derived = Derived(
48+
fieldA="Hakuna",
49+
fieldB="Matata",
50+
)
51+
tasks = [
52+
_subscriber(Derived),
53+
_publisher(Derived, derived),
54+
]
55+
results = await asyncio.gather(*tasks)
56+
assert results[0] == results[1]
57+
58+
59+
@pytest.mark.asyncio
60+
@pytest.mark.skipif(sys.platform == 'darwin', reason='I do not know why this fails on macOS')
61+
async def test_cyclonedds_typeof_command(hierarchy_library):
62+
'''
63+
Executes the command "cyclonedds typeof" and compares the result with the
64+
IDL file.
65+
'''
66+
from Hierarchy import Derived
67+
tasks = [
68+
_subscriber(Derived),
69+
_type_checker(Derived, IDL_FILE),
70+
]
71+
await asyncio.gather(*tasks)
72+
73+
74+
async def _publisher(topic_class, value, timeout=2):
75+
'''
76+
Sends a given value update to the subscriber.
77+
'''
78+
from cyclonedds.domain import DomainParticipant
79+
from cyclonedds.topic import Topic
80+
from cyclonedds.pub import DataWriter
81+
82+
participant = DomainParticipant(0)
83+
topic = Topic(participant, topic_class.__name__, topic_class)
84+
writer = DataWriter(participant, topic)
85+
writer.write(value)
86+
await asyncio.sleep(timeout)
87+
return value
88+
89+
90+
async def _subscriber(topic_class, timeout=2):
91+
'''
92+
Receives an update. Returns None if it times out.
93+
'''
94+
from cyclonedds.domain import DomainParticipant
95+
from cyclonedds.topic import Topic
96+
from cyclonedds.sub import DataReader
97+
from cyclonedds.util import duration
98+
99+
participant = DomainParticipant(0)
100+
topic = Topic(participant, topic_class.__name__, topic_class)
101+
reader = DataReader(participant, topic)
102+
async for update in reader.read_aiter(timeout=duration(seconds=timeout)):
103+
return update
104+
return None
105+
106+
107+
async def _type_checker(topic_class, idl_file):
108+
'''
109+
Executes the command "cyclonedds typeof" and compares the result with a
110+
given IDL file.
111+
'''
112+
def _normalise(text):
113+
text = text.replace('\n', ' ')
114+
text = ' '.join(text.split())
115+
return text
116+
117+
# loads the IDL file
118+
with open(idl_file) as file:
119+
expected_idl = file.read().rstrip()
120+
121+
# executes the command
122+
import subprocess
123+
result = subprocess.run(
124+
['cyclonedds', 'typeof', topic_class.__name__, '--suppress-progress-bar'],
125+
stdout=subprocess.PIPE,
126+
check=True,)
127+
command_output = result.stdout.decode().splitlines()
128+
129+
# skips the first lines of the output (As defined in participant...)
130+
first_idl_line = 0
131+
while first_idl_line < len(command_output) and not command_output[first_idl_line].startswith('module'):
132+
first_idl_line += 1
133+
actual_idl = '\n'.join(command_output[first_idl_line:])
134+
135+
# compares the command output with the IDL file
136+
print(actual_idl)
137+
assert _normalise(actual_idl) == _normalise(expected_idl)

0 commit comments

Comments
 (0)