Skip to content

Commit fae31b0

Browse files
committed
Test Python code generation
Signed-off-by: "Adriano Marto Reis" <adrianomarto@gmail.com>
1 parent 29c64e6 commit fae31b0

4 files changed

Lines changed: 225 additions & 0 deletions

File tree

tests/test_idlc.idl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
module Announcements
2+
{
3+
struct Message
4+
{
5+
string text;
6+
};
7+
};

tests/test_idlc.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
import pytest
3+
4+
5+
MESSAGE_TEXT = "Hello, World!"
6+
7+
8+
@pytest.fixture
9+
def announcements_library():
10+
'''
11+
Generates the Announcements library from the test_idlc.idl file.
12+
'''
13+
import os
14+
import shutil
15+
import subprocess
16+
assert 'CYCLONEDDS_HOME' in os.environ, 'CYCLONEDDS_HOME is not set'
17+
idlc_path = os.path.join(os.environ['CYCLONEDDS_HOME'], 'bin', 'idlc')
18+
subprocess.run([idlc_path, '-l', 'py', 'test_idlc.idl'])
19+
yield
20+
shutil.rmtree('Announcements')
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_communication(announcements_library):
25+
'''
26+
Creates a publisher and a subscriber and checks if the subscriber receives
27+
the message sent by the publisher.
28+
'''
29+
tasks = [
30+
_subscriber(),
31+
_publisher(MESSAGE_TEXT),
32+
]
33+
results = await asyncio.gather(*tasks)
34+
assert results[0] == MESSAGE_TEXT
35+
36+
37+
async def _publisher(message_text, timeout=2):
38+
'''
39+
Sends a given message text to the subscriber.
40+
'''
41+
from cyclonedds.domain import DomainParticipant
42+
from cyclonedds.topic import Topic
43+
from cyclonedds.pub import DataWriter
44+
from Announcements import Message
45+
46+
participant = DomainParticipant(0)
47+
topic = Topic(participant, "Announcements", Message)
48+
writer = DataWriter(participant, topic)
49+
message = Message(text=message_text)
50+
writer.write(message)
51+
await asyncio.sleep(timeout)
52+
53+
54+
async def _subscriber(timeout=2):
55+
'''
56+
Receives a message. Returns None if it times out.
57+
'''
58+
from cyclonedds.domain import DomainParticipant
59+
from cyclonedds.topic import Topic
60+
from cyclonedds.sub import DataReader
61+
from Announcements import Message
62+
from cyclonedds.util import duration
63+
64+
participant = DomainParticipant(0)
65+
topic = Topic(participant, "Announcements", Message)
66+
reader = DataReader(participant, topic)
67+
async for update in reader.read_aiter(timeout=duration(seconds=timeout)):
68+
return update.text
69+
return None

tests/test_idlc_hierarchy.idl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module Hierarchy
2+
{
3+
@mutable
4+
struct Base
5+
{
6+
string fieldA;
7+
};
8+
9+
@mutable
10+
struct Derived : Base
11+
{
12+
string fieldB;
13+
};
14+
};

tests/test_idlc_hierarchy.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import asyncio
2+
import pytest
3+
4+
5+
IDL_FILE = 'test_idlc_hierarchy.idl'
6+
7+
8+
@pytest.fixture
9+
def hierarchy_library():
10+
'''
11+
Generates the Hierarchy library from the test_idlc_hierarchy.idl file.
12+
'''
13+
import os
14+
import shutil
15+
import subprocess
16+
assert 'CYCLONEDDS_HOME' in os.environ, 'CYCLONEDDS_HOME is not set'
17+
idlc_path = os.path.join(os.environ['CYCLONEDDS_HOME'], 'bin', 'idlc')
18+
subprocess.run([idlc_path, '-l', 'py', IDL_FILE])
19+
yield
20+
shutil.rmtree('Hierarchy')
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_base_topic(hierarchy_library):
25+
'''
26+
Creates a publisher and a subscriber of the Base topic and checks if the
27+
subscriber receives the update sent by the publisher.
28+
'''
29+
from Hierarchy import Base
30+
base = Base(fieldA="Hakuna")
31+
tasks = [
32+
_subscriber(Base),
33+
_publisher(Base, base),
34+
]
35+
results = await asyncio.gather(*tasks)
36+
assert results[0] == results[1]
37+
38+
39+
@pytest.mark.asyncio
40+
async def test_derived_topic(hierarchy_library):
41+
'''
42+
Creates a publisher and a subscriber of the Derived topic and checks if the
43+
subscriber receives the update sent by the publisher.
44+
'''
45+
from Hierarchy import Derived
46+
derived = Derived(
47+
fieldA="Hakuna",
48+
fieldB="Matata",
49+
)
50+
tasks = [
51+
_subscriber(Derived),
52+
_publisher(Derived, derived),
53+
]
54+
results = await asyncio.gather(*tasks)
55+
assert results[0] == results[1]
56+
57+
58+
@pytest.mark.asyncio
59+
async def test_cyclonedds_typeof_command(hierarchy_library):
60+
'''
61+
Executes the command "cyclonedds typeof" and compares the result with the
62+
IDL file.
63+
'''
64+
from Hierarchy import Derived
65+
tasks = [
66+
_subscriber(Derived),
67+
_type_checker(Derived, IDL_FILE),
68+
]
69+
await asyncio.gather(*tasks)
70+
71+
72+
async def _publisher(topic_class, value, timeout=2):
73+
'''
74+
Sends a given value update to the subscriber.
75+
'''
76+
from cyclonedds.domain import DomainParticipant
77+
from cyclonedds.topic import Topic
78+
from cyclonedds.pub import DataWriter
79+
80+
participant = DomainParticipant(0)
81+
topic = Topic(participant, topic_class.__name__, topic_class)
82+
writer = DataWriter(participant, topic)
83+
writer.write(value)
84+
await asyncio.sleep(timeout)
85+
return value
86+
87+
88+
async def _subscriber(topic_class, timeout=2):
89+
'''
90+
Receives an update. Returns None if it times out.
91+
'''
92+
from cyclonedds.domain import DomainParticipant
93+
from cyclonedds.topic import Topic
94+
from cyclonedds.sub import DataReader
95+
from cyclonedds.util import duration
96+
97+
participant = DomainParticipant(0)
98+
topic = Topic(participant, topic_class.__name__, topic_class)
99+
reader = DataReader(participant, topic)
100+
async for update in reader.read_aiter(timeout=duration(seconds=timeout)):
101+
return update
102+
return None
103+
104+
105+
async def _type_checker(topic_class, idl_file):
106+
'''
107+
Executes the command "cyclonedds typeof" and compares the result with a
108+
given IDL file.
109+
'''
110+
def _normalise(text):
111+
text = text.replace('\n', ' ')
112+
text = ' '.join(text.split())
113+
return text
114+
115+
# loads the IDL file
116+
with open(idl_file) as file:
117+
expected_idl = file.read().rstrip()
118+
119+
# executes the command
120+
import subprocess
121+
result = subprocess.run(
122+
['cyclonedds', 'typeof', topic_class.__name__, '--suppress-progress-bar'],
123+
stdout=subprocess.PIPE,
124+
check=True,)
125+
command_output = result.stdout.decode().splitlines()
126+
127+
# skips the first lines of the output (As defined in participant...)
128+
first_idl_line = 0
129+
while first_idl_line < len(command_output) and not command_output[first_idl_line].startswith('module'):
130+
first_idl_line += 1
131+
actual_idl = '\n'.join(command_output[first_idl_line:])
132+
133+
# compares the command output with the IDL file
134+
print(actual_idl)
135+
assert _normalise(actual_idl) == _normalise(expected_idl)

0 commit comments

Comments
 (0)