-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathsubmit_api.py
More file actions
181 lines (142 loc) · 5.95 KB
/
submit_api.py
File metadata and controls
181 lines (142 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Utilities for `cardano-submit-api` REST service."""
import binascii
import contextlib
import dataclasses
import json
import logging
import pathlib as pl
import random
import shutil
import time
import requests
from cardano_clusterlib import clusterlib
from cardano_node_tests.utils import cluster_nodes
from cardano_node_tests.utils import custom_clusterlib
from cardano_node_tests.utils import http_client
LOGGER = logging.getLogger(__name__)
class SubmitApiError(Exception):
pass
@dataclasses.dataclass(frozen=True)
class SubmitApiOut:
txid: str
response: requests.Response
def is_running() -> bool:
"""Check if `cardano-submit-api` REST service is running."""
if not shutil.which("cardano-submit-api"):
return False
# TODO: `--metrics-port` is not available in older cardano-node releases, see node issue #4280
# If the metrics port is not available, we can start the `cardano-submit-api` only on the first
# cluster instance.
return cluster_nodes.services_status(service_names=["submit_api"])[0].status == "RUNNING"
def tx2cbor(*, tx_file: clusterlib.FileType, destination_dir: clusterlib.FileType = ".") -> pl.Path:
"""Convert signed Tx to binary CBOR."""
tx_file = pl.Path(tx_file)
out_file = pl.Path(destination_dir).expanduser() / f"{tx_file.name}.cbor"
with open(tx_file, encoding="utf-8") as in_fp:
tx_loaded = json.load(in_fp)
cbor_bin = binascii.unhexlify(tx_loaded["cborHex"])
with open(out_file, "wb") as out_fp:
out_fp.write(cbor_bin)
return out_file
def post_cbor(*, cbor_file: clusterlib.FileType, url: str) -> requests.Response:
"""Post binary CBOR representation of Tx to `cardano-submit-api` service on `url`."""
headers = {"Content-Type": "application/cbor"}
with open(cbor_file, "rb") as in_fp:
cbor_binary = in_fp.read()
i = 0
for i in range(1, 6):
if i > 1:
LOGGER.warning("Resubmitting transaction to submit-api.")
response = None
with contextlib.suppress(requests.exceptions.ReadTimeout):
response = http_client.get_session().post(
url, headers=headers, data=cbor_binary, timeout=60
)
if response is not None and not response and "MempoolTxTooSlow" in response.text:
# Repeat the request as the transaction didn't make it to mempool
pass
elif response is not None:
break
time.sleep(random.random())
else:
err = f"Failed to submit the tx after {i} attempts."
raise SubmitApiError(err)
return response
def submit_tx_bare(*, tx_file: clusterlib.FileType) -> SubmitApiOut:
"""Submit a signed Tx using `cardano-submit-api` service."""
cbor_file = tx2cbor(tx_file=tx_file)
submit_api_port = (
cluster_nodes.get_cluster_type()
.cluster_scripts.get_instance_ports(instance_num=cluster_nodes.get_instance_num())
.submit_api
)
url = f"http://localhost:{submit_api_port}/api/submit/tx"
response = post_cbor(cbor_file=cbor_file, url=url)
if not response:
msg = (
f"Failed to submit the tx.\n"
f" status: {response.status_code}\n"
f" reason: {response.reason}\n"
f" error: {response.text}"
)
raise SubmitApiError(msg)
out = SubmitApiOut(txid=response.json(), response=response)
return out
def submit_tx(
*,
cluster_obj: clusterlib.ClusterLib,
tx_file: clusterlib.FileType,
txins: list[clusterlib.UTXOData],
wait_blocks: int = 2,
) -> str:
"""Submit a transaction, resubmit if the transaction didn't make it to the chain.
Args:
cluster_obj: An instance of `clusterlib.ClusterLib`.
tx_file: A path to signed transaction file.
txins: An iterable of `clusterlib.UTXOData`, specifying input UTxOs.
wait_blocks: A number of new blocks to wait for (default = 2).
"""
txid = ""
err = None
for r in range(20):
err = None
if r == 0:
txid = submit_tx_bare(tx_file=tx_file).txid
else:
if not txid:
msg = "The TxId is not known."
raise SubmitApiError(msg)
LOGGER.warning(f"Resubmitting transaction '{txid}' (from '{tx_file}').")
try:
submit_tx_bare(tx_file=tx_file)
except SubmitApiError as exc:
# Check if resubmitting failed because an input UTxO was already spent
exc_str = str(exc)
inputs_spent = (
"All inputs are spent" in exc_str # In cardano-node >= 10.6.0
or "BadInputsUTxO" in exc_str
)
if not inputs_spent:
raise
err = exc
# If here, the TX is likely still in mempool and we need to wait
cluster_obj.wait_for_new_block(wait_blocks)
# Check that one of the input UTxOs can no longer be queried in order to verify
# the TX was successfully submitted to the chain (that the TX is no longer in mempool).
# An input is spent when its combination of hash and ix is not found in the list
# of current UTxOs.
# TODO: check that the transaction is 1-block deep (can't be done in CLI alone)
utxo_data = cluster_obj.g_query.get_utxo(utxo=txins[0])
if not utxo_data:
break
else:
if err is not None:
# Submitting the TX raised an exception as if the input was already
# spent, but it was either not the case, or the TX is still in mempool.
msg = f"Failed to resubmit the transaction '{txid}' (from '{tx_file}')."
raise SubmitApiError(msg) from err
msg = f"Transaction '{txid}' didn't make it to the chain (from '{tx_file}')."
raise SubmitApiError(msg)
# Create a `.submitted` status file when the Tx was successfully submitted
custom_clusterlib.create_submitted_file(tx_file=tx_file)
return txid