-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathcommon.py
More file actions
621 lines (515 loc) · 19.8 KB
/
common.py
File metadata and controls
621 lines (515 loc) · 19.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
import contextlib
import logging
import string
import time
import typing as tp
import pytest
from cardano_clusterlib import clusterlib
from cardano_node_tests.cluster_management import cluster_management
from cardano_node_tests.tests import issues
from cardano_node_tests.utils import cluster_nodes
from cardano_node_tests.utils import clusterlib_utils
from cardano_node_tests.utils import configuration
from cardano_node_tests.utils import helpers
from cardano_node_tests.utils import pytest_utils
from cardano_node_tests.utils.versions import VERSIONS
LOGGER = logging.getLogger(__name__)
MAX_INT64 = (2**63) - 1
MAX_UINT64 = (2**64) - 1
COMPAT_ERAS = ("shelley", "allegra", "mary", "alonzo", "babbage")
ADDR_ALPHABET = string.ascii_lowercase + string.digits
ORDER5_BYRON = (
pytest.mark.order(5) if "_fast" not in configuration.TESTNET_VARIANT else pytest.mark.noop
)
LONG_BYRON = pytest.mark.long if "_fast" not in configuration.TESTNET_VARIANT else pytest.mark.noop
_BLD_SKIP_REASON = ""
if VERSIONS.transaction_era != VERSIONS.cluster_era:
_BLD_SKIP_REASON = "transaction era must be the same as node era"
BUILD_UNUSABLE = bool(_BLD_SKIP_REASON)
# Common `skipif`s
SKIPIF_BUILD_UNUSABLE = pytest.mark.skipif(
BUILD_UNUSABLE,
reason=(
f"cannot use `build` with Tx era '{VERSIONS.transaction_era_name}': {_BLD_SKIP_REASON}"
),
)
SKIPIF_BUILD_EST_1199 = pytest.mark.skipif(
True, # We don't want to execute `issues.cli_1199.is_blocked()` during import time
reason="`build-estimate` fails to balance tx with no txouts",
)
SKIPIF_WRONG_ERA = pytest.mark.skipif(
not (
VERSIONS.cluster_era >= VERSIONS.DEFAULT_CLUSTER_ERA
and VERSIONS.transaction_era == VERSIONS.cluster_era
),
reason="meant to run with default era or higher, where cluster era == Tx era",
)
SKIPIF_TOKENS_UNUSABLE = pytest.mark.skipif(
VERSIONS.transaction_era < VERSIONS.MARY,
reason="native tokens are available only in Mary+ eras",
)
_PLUTUS_SKIP_REASON = ""
if VERSIONS.transaction_era < VERSIONS.ALONZO:
_PLUTUS_SKIP_REASON = "Plutus is available only in Alonzo+ eras"
SKIPIF_PLUTUS_UNUSABLE = pytest.mark.skipif(
bool(_PLUTUS_SKIP_REASON),
reason=_PLUTUS_SKIP_REASON,
)
SKIPIF_PLUTUSV2_UNUSABLE = pytest.mark.skipif(
VERSIONS.transaction_era < VERSIONS.BABBAGE,
reason="Plutus V2 is available only in Babbage+ eras",
)
_PLUTUSV3_SKIP_REASON = ""
if VERSIONS.transaction_era < VERSIONS.CONWAY:
_PLUTUSV3_SKIP_REASON = "Plutus V3 is available only in Conway+ eras"
PLUTUSV3_UNUSABLE = bool(_PLUTUSV3_SKIP_REASON)
SKIPIF_PLUTUSV3_UNUSABLE = pytest.mark.skipif(
PLUTUSV3_UNUSABLE,
reason=_PLUTUSV3_SKIP_REASON,
)
SKIPIF_ON_TESTNET = pytest.mark.skipif(
cluster_nodes.get_cluster_type().type != cluster_nodes.ClusterType.LOCAL,
reason="not supposed to run on long-running testnet",
)
SKIPIF_ON_LOCAL = pytest.mark.skipif(
cluster_nodes.get_cluster_type().type == cluster_nodes.ClusterType.LOCAL,
reason="supposed to run on long-running testnet",
)
# Common parametrization
PARAM_BUILD_METHOD = pytest.mark.parametrize(
"build_method",
(
clusterlib_utils.BuildMethods.BUILD_RAW,
pytest.param(clusterlib_utils.BuildMethods.BUILD, marks=SKIPIF_BUILD_UNUSABLE),
clusterlib_utils.BuildMethods.BUILD_EST,
),
)
PARAM_BUILD_METHOD_NO_EST = pytest.mark.parametrize(
"build_method",
(
clusterlib_utils.BuildMethods.BUILD_RAW,
pytest.param(
clusterlib_utils.BuildMethods.BUILD,
marks=SKIPIF_BUILD_UNUSABLE,
),
pytest.param(
clusterlib_utils.BuildMethods.BUILD_EST,
marks=SKIPIF_BUILD_EST_1199,
),
),
)
PARAM_OFFLINE_BUILD_METHOD = pytest.mark.parametrize(
"offline_build_method",
(
clusterlib_utils.BuildMethods.BUILD_RAW,
clusterlib_utils.BuildMethods.BUILD_EST,
),
)
PARAM_PLUTUS_VERSION = pytest.mark.parametrize(
"plutus_version",
(
"v1",
pytest.param("v2", marks=SKIPIF_PLUTUSV2_UNUSABLE),
),
ids=("plutus_v1", "plutus_v2"),
)
PARAM_PLUTUS3_VERSION = pytest.mark.parametrize(
"plutus_version",
(
"v1",
pytest.param("v2", marks=SKIPIF_PLUTUSV2_UNUSABLE),
pytest.param("v3", marks=SKIPIF_PLUTUSV3_UNUSABLE),
),
ids=("plutus_v1", "plutus_v2", "plutus_v3"),
)
PARAM_PLUTUS2ONWARDS_VERSION = pytest.mark.parametrize(
"plutus_version",
(
"v2",
pytest.param("v3", marks=SKIPIF_PLUTUSV3_UNUSABLE),
),
ids=("plutus_v2", "plutus_v3"),
)
PARAM_COMPAT_ERAS = pytest.mark.parametrize("era", COMPAT_ERAS)
# Intervals for `wait_for_epoch_interval` (negative values are counted from the end of an epoch)
if cluster_nodes.get_cluster_type().type == cluster_nodes.ClusterType.LOCAL:
# Time buffer at the end of an epoch, enough to do something that takes several transactions
EPOCH_STOP_SEC_BUFFER = -40
# Time when all ledger state info is available for the current epoch
EPOCH_START_SEC_LEDGER_STATE = -19
# Time buffer at the end of an epoch after getting ledger state info
EPOCH_STOP_SEC_LEDGER_STATE = -15
else:
# We can be more generous on testnets
EPOCH_STOP_SEC_BUFFER = -200
EPOCH_START_SEC_LEDGER_STATE = -300
EPOCH_STOP_SEC_LEDGER_STATE = -200
def hypothesis_settings(max_examples: int = 100) -> tp.Any:
import hypothesis # noqa: PLC0415
return hypothesis.settings(
max_examples=max_examples,
deadline=None,
suppress_health_check=(
hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.function_scoped_fixture,
hypothesis.HealthCheck.filter_too_much,
),
)
def unique_time_str() -> str:
"""Return unique string based on current timestamp.
Useful for property-based tests as it isn't possible to use `random` module in hypothesis tests.
"""
return str(time.time()).replace(".", "")[-8:]
def get_test_id(
cluster_or_manager: clusterlib.ClusterLib | cluster_management.ClusterManager,
) -> str:
"""Return unique test ID - function name + assigned cluster instance + random string.
Log the test ID into cluster manager log file.
"""
if isinstance(cluster_or_manager, clusterlib.ClusterLib):
cid_part = f"_ci{cluster_or_manager.cluster_id}"
cm: cluster_management.ClusterManager = cluster_or_manager._cluster_manager # type: ignore
else:
cid_part = ""
cm = cluster_or_manager
cinstance = str(cm._cluster_instance_num) if cm._cluster_instance_num != -1 else ""
curr_test = pytest_utils.get_current_test()
rand_str = clusterlib.get_rand_str(6)
test_id = f"{curr_test.test_function}{curr_test.test_params}{cid_part}_{rand_str}"
# Log test ID to cluster manager log file - getting test ID happens early
# after the start of a test, so the log entry can be used for determining
# time of the test start
cm.log(f"c{cinstance}: got ID `{test_id}` for '{curr_test.full}'")
return test_id
def get_nodes_missing_utxos(
cluster_obj: clusterlib.ClusterLib,
utxos: list[clusterlib.UTXOData],
) -> set[str]:
"""Return set of nodes that don't have the given UTxOs."""
missing_nodes: set[str] = set()
known_nodes = cluster_nodes.get_cluster_type().NODES
# Skip the check if there is only one node
if len(known_nodes) <= 1:
return missing_nodes
instance_num = cluster_nodes.get_instance_num()
# Check if all nodes know about the UTxO
try:
for node in known_nodes:
# Set 'CARDANO_NODE_SOCKET_PATH' to point to socket of the selected node
cluster_nodes.set_cluster_env(
instance_num=instance_num, socket_file_name=f"{node}.socket"
)
if not cluster_obj.g_query.get_utxo(utxo=utxos):
missing_nodes.add(node)
finally:
# Restore 'CARDANO_NODE_SOCKET_PATH' to original value
cluster_nodes.set_cluster_env(instance_num=instance_num)
return missing_nodes
def check_missing_utxos(
cluster_obj: clusterlib.ClusterLib,
utxos: list[clusterlib.UTXOData],
) -> None:
"""Fail if any node is missing the given UTxOs."""
missing_nodes = get_nodes_missing_utxos(cluster_obj=cluster_obj, utxos=utxos)
if missing_nodes:
msg = f"Following nodes are missing the given UTxOs: {sorted(missing_nodes)}"
raise AssertionError(msg)
def detect_fork(
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
temp_template: str,
) -> tuple[set[str], set[str]]:
"""Detect if one or more nodes have forked blockchain or is out of sync."""
forked_nodes: set[str] = set()
unsynced_nodes: set[str] = set()
known_nodes = cluster_nodes.get_cluster_type().NODES
if len(known_nodes) <= 1:
LOGGER.warning("WARNING: Not enough nodes available to detect forks, skipping the check.")
return forked_nodes, unsynced_nodes
instance_num = cluster_nodes.get_instance_num()
# Create a UTxO
payment_rec = cluster_obj.g_address.gen_payment_addr_and_keys(
name=f"{temp_template}_fork",
)
tx_raw_output = clusterlib_utils.fund_from_faucet(
payment_rec,
cluster_obj=cluster_obj,
all_faucets=cluster_manager.cache.addrs_data,
amount=2_000_000,
tx_name=f"{temp_template}_fork",
)
assert tx_raw_output
utxos = cluster_obj.g_query.get_utxo(tx_raw_output=tx_raw_output)
# Check if all nodes know about the UTxO
try:
for node in known_nodes:
# Set 'CARDANO_NODE_SOCKET_PATH' to point to socket of the selected node
cluster_nodes.set_cluster_env(
instance_num=instance_num, socket_file_name=f"{node}.socket"
)
for __ in range(5):
if float(cluster_obj.g_query.get_tip()["syncProgress"]) == 100:
break
time.sleep(1)
else:
unsynced_nodes.add(node)
continue
if not cluster_obj.g_query.get_utxo(utxo=utxos):
forked_nodes.add(node)
finally:
# Restore 'CARDANO_NODE_SOCKET_PATH' to original value
cluster_nodes.set_cluster_env(instance_num=instance_num)
# Forked nodes are the ones that differ from the majority of nodes
if forked_nodes and len(forked_nodes) > (len(known_nodes) // 2):
forked_nodes = known_nodes - forked_nodes
return forked_nodes, unsynced_nodes
def fail_on_fork(
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
temp_template: str,
) -> None:
"""Fail if one or more nodes have forked blockchain or is out of sync."""
forked_nodes, unsynced_nodes = detect_fork(
cluster_manager=cluster_manager, cluster_obj=cluster_obj, temp_template=temp_template
)
err_msg = []
if forked_nodes:
err_msg.append(f"Following nodes appear to have forked blockchain: {sorted(forked_nodes)}")
if unsynced_nodes:
err_msg.append(f"Following nodes appear to be out of sync: {sorted(unsynced_nodes)}")
if err_msg:
# The local cluster needs to be respun before it is usable again
cluster_manager.set_needs_respin()
raise AssertionError("\n".join(err_msg))
def match_blocker(func: tp.Callable) -> tp.Any:
"""Fail or Xfail the test if CLI error is raised."""
try:
ret = func()
except clusterlib.CLIError as exc:
str_exc = str(exc)
if (
" transaction build " in str_exc
and "fromConsensusQueryResult: internal query mismatch" in str_exc
and "--certificate-file" in str_exc
):
issues.cli_268.finish_test()
raise
return ret
def get_conway_address_deposit(cluster_obj: clusterlib.ClusterLib) -> int:
"""Get stake address deposit amount - is required in Conway+."""
stake_deposit_amt = -1
if VERSIONS.transaction_era >= VERSIONS.CONWAY:
stake_deposit_amt = cluster_obj.g_query.get_address_deposit()
return stake_deposit_amt
def _get_funded_addresses(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
create_func: tp.Callable[[], list],
fund_idx: list[int] | None = None,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
) -> list:
"""Create and fund addresses.
If `amount` and no `min_amount` is provided, fund once and never re-fund.
If `amount` is not provided, re-fund 3 * `min_amount` when balance drops below `min_amount`.
If both `amount` and `min_amount` are provided, re-fund `amount` when balance
drops below `min_amount`.
"""
no_refund = amount is not None and min_amount is None
# Set a default minimum amount if none is provided
drop_amount = min_amount or 50_000_000
if no_refund:
assert amount # For mypy
# Use the exact specified amount
fund_amount = amount
drop_amount = amount
elif amount is not None:
# Amount given: use it
fund_amount = amount
else:
# No amount given: fund triple the minimum
fund_amount = drop_amount * 3
if caching_key:
fixture_cache: cluster_management.FixtureCache[list | None]
with cluster_manager.cache_fixture(key=caching_key) as fixture_cache:
if fixture_cache.value is None:
addrs = create_func()
fixture_cache.value = addrs
else:
addrs = fixture_cache.value
# If amount is explicitly specified, skip re-funding
if no_refund:
return addrs
else:
addrs = create_func()
# Fund source addresses
selected_addrs = addrs if fund_idx is None else [addrs[i] for i in fund_idx]
# The `selected_addrs` can be both `AddressRecord`s or `PoolUser`s
payment_addrs = ((sa.payment if hasattr(sa, "payment") else sa) for sa in selected_addrs)
fund_addrs: list[clusterlib.AddressRecord] = [
a for a in payment_addrs if cluster_obj.g_query.get_address_balance(a.address) < drop_amount
]
if fund_addrs:
clusterlib_utils.fund_from_faucet(
*fund_addrs,
cluster_obj=cluster_obj,
all_faucets=cluster_manager.cache.addrs_data,
amount=fund_amount,
tx_name=f"{name_template}_addrs",
force=True,
)
return addrs
def get_payment_addrs(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
num: int,
fund_idx: list[int] | None = None,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
key_gen_method: clusterlib_utils.KeyGenMethods = clusterlib_utils.KeyGenMethods.DIRECT,
) -> list[clusterlib.AddressRecord]:
"""Create new payment addresses."""
if num < 1:
err = f"Number of addresses must be at least 1, got: {num}"
raise ValueError(err)
def _create_addrs() -> list[clusterlib.AddressRecord]:
addrs = clusterlib_utils.create_payment_addr_records(
*[f"{name_template}_fund_addr_{i}" for i in range(1, num + 1)],
cluster_obj=cluster_obj,
key_gen_method=key_gen_method,
)
return addrs
return _get_funded_addresses(
name_template=name_template,
cluster_manager=cluster_manager,
cluster_obj=cluster_obj,
create_func=_create_addrs,
fund_idx=fund_idx,
caching_key=caching_key,
amount=amount,
min_amount=min_amount,
)
def get_payment_addr(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
key_gen_method: clusterlib_utils.KeyGenMethods = clusterlib_utils.KeyGenMethods.DIRECT,
) -> clusterlib.AddressRecord:
"""Create a single new payment address."""
return get_payment_addrs(
name_template=name_template,
cluster_manager=cluster_manager,
cluster_obj=cluster_obj,
num=1,
caching_key=caching_key,
amount=amount,
min_amount=min_amount,
key_gen_method=key_gen_method,
)[0]
def get_pool_users(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
num: int,
fund_idx: list[int] | None = None,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
payment_key_gen_method: clusterlib_utils.KeyGenMethods = clusterlib_utils.KeyGenMethods.DIRECT,
) -> list[clusterlib.PoolUser]:
"""Create new pool users."""
if num < 1:
err = f"Number of pool users must be at least 1, got: {num}"
raise ValueError(err)
def _create_pool_users() -> list[clusterlib.PoolUser]:
users = clusterlib_utils.create_pool_users(
cluster_obj=cluster_obj,
name_template=f"{name_template}_pool_user",
no_of_addr=num,
payment_key_gen_method=payment_key_gen_method,
)
return users
return _get_funded_addresses(
name_template=name_template,
cluster_manager=cluster_manager,
cluster_obj=cluster_obj,
create_func=_create_pool_users,
fund_idx=fund_idx,
caching_key=caching_key,
amount=amount,
min_amount=min_amount,
)
def get_pool_user(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
payment_key_gen_method: clusterlib_utils.KeyGenMethods = clusterlib_utils.KeyGenMethods.DIRECT,
) -> clusterlib.PoolUser:
"""Create a single new pool user."""
return get_pool_users(
name_template=name_template,
cluster_manager=cluster_manager,
cluster_obj=cluster_obj,
num=1,
caching_key=caching_key,
amount=amount,
min_amount=min_amount,
payment_key_gen_method=payment_key_gen_method,
)[0]
def get_registered_pool_user(
name_template: str,
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
caching_key: str = "",
amount: int | None = None,
min_amount: int | None = None,
) -> clusterlib.PoolUser:
"""Create new registered pool users."""
pool_user = get_pool_user(
name_template=name_template,
cluster_manager=cluster_manager,
cluster_obj=cluster_obj,
caching_key=caching_key,
amount=amount,
min_amount=min_amount,
)
if not cluster_obj.g_query.get_stake_addr_info(pool_user.stake.address):
# Register the stake address
clusterlib_utils.register_stake_address(
cluster_obj=cluster_obj,
pool_user=pool_user,
name_template=f"{name_template}_pool_user",
deposit_amt=cluster_obj.g_query.get_address_deposit(),
)
return pool_user
def is_fee_in_interval(fee: float, expected_fee: float, frac: float = 0.1) -> bool:
"""Check that the fee is within the expected range on local testnet.
The fee is considered to be within the expected range if it is within the expected_fee +/- frac
range.
"""
# We have the fees calibrated only for local testnet
if cluster_nodes.get_cluster_type().type == cluster_nodes.ClusterType.TESTNET:
return True
return helpers.is_in_interval(fee, expected_fee, frac=frac)
@contextlib.contextmanager
def allow_unstable_error_messages() -> tp.Iterator[None]:
"""Catch AssertionError and either log it or raise it.
Used in tests where error messages can vary between node/CLI versions.
"""
if not configuration.ALLOW_UNSTABLE_ERROR_MESSAGES:
yield
return
try:
yield
except AssertionError:
LOGGER.exception("AssertionError suppressed")