-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjmfinder.py
More file actions
491 lines (422 loc) · 16.8 KB
/
jmfinder.py
File metadata and controls
491 lines (422 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
#!/usr/bin/env python3
"""
Standalone script with no dependencies.
The only requirement is a Bitcoin Core node running with REST server enabled.
Pass -rest through CLI or rest=1 in bitcoin.conf
The node can be pruned and txindex is not required because jmfinder scans the blocks directly.
Most of the raw data parsing code is taken from https://github.com/alecalve/python-bitcoin-blockchain-parser
Thank you!
"""
import hashlib
import json
import struct
import sys
from argparse import ArgumentParser, Namespace
from collections import Counter
from concurrent.futures import ProcessPoolExecutor
from enum import Enum
from itertools import chain
from logging import Logger, getLogger, Formatter, StreamHandler
from math import ceil
from time import monotonic
from typing import Tuple, Dict, Any, List, Generator, Optional
from urllib.error import URLError, HTTPError
from urllib.request import urlopen, Request
log: Optional[Logger] = None
# JOINMARKET PATTERN
# Number inputs >= number of CoinJoin outs (equal sized)
# Non-equal outs = CoinJoin outs or CoinJoin outs -1
# At least 3 CoinJoin outs (2 technically possible but excluded)
# We filter out joins with less than 3 participants as they are
# not really in JoinMarket "correct usage" and there will be a lot
# of false positives.
# We filter out "joins" less than 75000 sats as they are unlikely to
# be JoinMarket and there tend to be many low-value false positives.
MIN_PARTICIPANTS = 3
MIN_CJ_AMOUNT = 75000
def is_jm(n_in: int, n_out: int, values: List[int]) -> Tuple[int, int]:
"""
Check JoinMarket pattern.
Return a tuple with the CoinJoin amount and the number of equal outputs.
These are 0,0 if not a JoinMarket CoinJoin.
"""
assumed_cj_outs = n_out // 2
if n_out % 2:
assumed_cj_outs += 1
if assumed_cj_outs < MIN_PARTICIPANTS:
return 0, 0
if n_in < assumed_cj_outs:
return 0, 0
most_common_value, equal_outs = Counter(values).most_common(1)[0]
if most_common_value < MIN_CJ_AMOUNT:
return 0, 0
if equal_outs != assumed_cj_outs:
return 0, 0
return most_common_value, equal_outs
def get_logger(verbose: bool) -> Logger:
logger = getLogger(__name__)
ch = StreamHandler()
if verbose:
# DEBUG
logger.setLevel(10)
else:
# INFO
logger.setLevel(20)
ch.setFormatter(Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S'))
logger.addHandler(ch)
return logger
def get_args() -> Namespace:
parser = ArgumentParser(
usage="jmfinder.py [options] start [end]",
description='Given a starting and finishing block height, finds JoinMarket CoinJoins.',
)
parser.add_argument(
"start",
action="store",
type=int,
help="Start block height, pass a negative value to scan the last n blocks from end",
)
parser.add_argument(
"end",
nargs='?',
action="store",
type=int,
help="End block height, default is the latest block",
default=0
)
parser.add_argument(
"-o",
'--host',
action="store",
type=str,
help="Bitcoin Core REST host, default localhost",
default='localhost'
)
parser.add_argument(
"-p",
'--port',
action="store",
type=int,
help="Bitcoin Core REST port, default 8332",
default=8332
)
parser.add_argument(
'-f',
'--filename',
action='store',
type=str,
dest='candidate_file_name',
help='Filename to write identifiers of candidate '
'transactions, default candidates.txt',
default='candidates.txt')
parser.add_argument(
'-j',
'--jobs',
type=int,
help='Use N processes, default to the number of processors on the machine. Pass 0 to prevent multiprocessing',
metavar='N',
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help="Increase logging verbosity to DEBUG",
)
return parser.parse_args()
class ExitStatus(Enum):
"""
Exit status codes.
"""
SUCCESS = 0
FAILURE = 1
ARGERROR = 2
class ReqType(Enum):
"""
Possible request types of the REST interface.
"""
BIN = ".bin"
HEX = ".hex"
JSON = ".json"
class RestApi(Enum):
"""
Bitcoin Core supported REST API.
"""
# Given a transaction hash, return a transaction.
# By default, this method will only search the mempool. To query for a confirmed transaction,
# enable the transaction index via "txindex=1" command line/configuration option.
TX = '/tx'
# Given a block hash, return a block
BLOCK = '/block'
# Given a block hash, return a block only containing the TXID
# instead of the complete transaction details
BLOCK_NO_DETAILS = '/block/notxdetails'
# Given a count and a block hash, return amount of block headers in upward direction
HEADERS = '/headers'
# Given a height, return hash of block at height provided
BLOCKHASH = '/blockhashbyheight'
# Return various state info regarding block chain processing
# ONLY SUPPORTS JSON FORMAT
CHAININFO = '/chaininfo'
# Query UTXO set given a set of outpoints
UTXO = '/getutxos'
# Query UTXO set given a set of outpoint and apply mempool transactions during the calculation,
# thus exposing their UTXOs and removing outputs that they spend
UTXO_CHECK_MEMPOOL = '/getutxos/checkmempool'
# Return various information about the mempool
MEMPOOL_INFO = '/mempool/info'
# Return transactions in the mempool
MEMPOOL_CONTENT = '/mempool/contents'
def to_uri(self, req_type: ReqType, *args) -> str:
"""
Return complete URI for RestApi.
"""
return f"{self.value}{''.join(f'/{arg}' for arg in args)}{req_type.value}"
class Btc:
"""
Client object to interact with REST interface.
"""
__slots__ = ('url',)
HEADERS = {'User-Agent': 'jmfinder'}
TIMEOUT = 3
def __init__(self, host: str, port: int):
self.url = f'http://{host}:{port}'
def get_response(self, method: RestApi, *args, req_type: ReqType = ReqType.JSON) -> bytes:
"""
Send HTTP request to the server.
If the call succeeds, return raw response in bytes.
Else log error and terminate the script, the "rationale" for this is below.
"""
url = f'{self.url}/rest{method.to_uri(req_type, *args)}'
request = Request(url, headers=self.HEADERS)
try:
with urlopen(request, timeout=self.TIMEOUT) as response:
return response.read()
except HTTPError as exc:
log.error(f'Bad status code: {exc.code} {exc.reason}')
except URLError as exc:
log.error(f'Unable to connect to {url}: {exc.reason}')
except TimeoutError:
log.error('Request timed out')
except Exception as exc:
log.error(str(exc))
log.error(f'Request for {url} failed')
# Failed to perform HTTP request.
# Since this is a standalone script, we are okay stopping the program here.
# Ideally this could be improved so that's done somewhere else.
# Also some re-try logic could be added in some cases.
sys.exit(ExitStatus.FAILURE.value)
def get_json(self, method: RestApi, *args) -> Dict[str, Any]:
"""
Return the response body converted to Dict.
"""
return json.loads(self.get_response(method, *args, req_type=ReqType.JSON))
def get_block(self, blockhash: str, no_details: bool = False) -> Dict[str, Any]:
"""
Wrapper around Block or BlockNoDetails method
"""
return self.get_json(RestApi.BLOCK_NO_DETAILS if no_details else RestApi.BLOCK, blockhash)
def get_blockhash(self, height: int) -> str:
"""
Wrapper around Blockhash method
"""
blockhash: str = (self.get_json(RestApi.BLOCKHASH, height))['blockhash']
return blockhash
def get_info(self) -> Dict[str, Any]:
"""
Wrapper around Chaininfo method
"""
return self.get_json(RestApi.CHAININFO)
def double_sha256(data: bytes) -> bytes:
return hashlib.sha256(hashlib.sha256(data).digest()).digest()
def format_hash(hash_: bytes) -> str:
return hash_[::-1].hex()
def decode_uint32(data: bytes) -> int:
if len(data) != 4:
raise ValueError
return struct.unpack("<I", data)[0]
def decode_uint64(data: bytes) -> int:
if len(data) != 8:
raise ValueError
return struct.unpack("<Q", data)[0]
def decode_varint(data: bytes) -> Tuple[int, int]:
size = int(data[0])
if size > 255:
raise ValueError
if size < 253:
return size, 1
if size == 253:
format_ = '<H'
elif size == 254:
format_ = '<I'
elif size == 255:
format_ = '<Q'
else:
# Should never be reached
raise ValueError(f"Unknown format for size : {size}")
size = struct.calcsize(format_)
return struct.unpack(format_, data[1:size + 1])[0], size + 1
def get_blocks(start_block: int, end_block: int, btc: Btc) -> Generator[Tuple[bytes, int], Any, None]:
"""
Yield a tuple with each block in binary format and its height.
"""
for height in range(start_block, end_block + 1):
blockhash = btc.get_blockhash(height)
yield btc.get_response(RestApi.BLOCK, blockhash, req_type=ReqType.BIN), height
def parse_block(block_data: Tuple[bytes, int]) -> List[str]:
"""
Parse raw block in binary format.
Return a list with the identifiers of each possible JoinMarket CoinJoin in the block.
"""
results: List[str] = []
processed_txs = 0
block, height = block_data
# Skip the header
txs_data = block[80:]
# The number of transactions contained in this block
n_txs, block_offset = decode_varint(txs_data)
# Loop through the block's transactions
for i in range(n_txs):
tx_size = 0
# Try from 1024 (1KiB) -> 1073741824 (1GiB) slice widths
for j in range(0, 20):
try:
# Parse tx
offset_e = block_offset + (1024 * 2 ** j)
tx = txs_data[block_offset:offset_e]
# The transaction's version number
version = decode_uint32(tx[:4])
# The transaction's locktime as int
locktime = decode_uint32(tx[-4:])
is_segwit = False
tx_offset = 4
# Adds basic support for segwit transactions
# - https://bitcoincore.org/en/segwit_wallet_dev/
# - https://en.bitcoin.it/wiki/Protocol_documentation#BlockTransactions
if tx[tx_offset:tx_offset + 2] == b'\x00\x01':
is_segwit = True
tx_offset += 2
n_in, varint_size = decode_varint(tx[tx_offset:])
tx_offset += varint_size
# Parse inputs
for _ in range(n_in):
tx_input = tx[tx_offset:]
script_length, varint_length = decode_varint(tx_input[36:])
script_start = 36 + varint_length
in_size = script_start + script_length + 4
tx_offset += in_size
n_out, varint_size = decode_varint(tx[tx_offset:])
tx_offset += varint_size
# Parse outputs
values = []
for _ in range(n_out):
tx_output = tx[tx_offset:]
_value_hex = tx_output[:8]
# The value of the output expressed in sats
values.append(decode_uint64(_value_hex))
script_length, varint_size = decode_varint(tx_output[8:])
script_start = 8 + varint_size
out_size = script_start + script_length
tx_offset += out_size
# Parse witnesses
if is_segwit:
offset_before_tx_witnesses = tx_offset
for _ in range(n_in):
tx_witnesses_n, varint_size = decode_varint(tx[tx_offset:])
tx_offset += varint_size
for _ in range(tx_witnesses_n):
component_length, varint_size = decode_varint(tx[tx_offset:])
tx_offset += varint_size
tx_offset += component_length
tx_size = tx_offset + 4
tx = tx[:tx_size]
if tx_size != len(tx):
raise ValueError("Incomplete transaction")
# Segwit transactions have two transaction ids/hashes, txid and wtxid
# txid is a hash of all of the legacy transaction fields only
if is_segwit:
txid_data = tx[:4] + tx[6:offset_before_tx_witnesses] + tx[-4:]
else:
txid_data = tx
txid = format_hash(double_sha256(txid_data))
# The transaction size in virtual bytes.
if not is_segwit:
vsize = tx_size
else:
# The witness is the last element in a transaction before the
# 4 byte locktime and self._offset_before_tx_witnesses is the
# position where the witness starts
witness_size = tx_size - offset_before_tx_witnesses - 4
# Size of the transaction without the segwit marker (2 bytes) and
# the witness
stripped_size = tx_size - (2 + witness_size)
weight = stripped_size * 3 + tx_size
# Vsize is weight / 4 rounded up
vsize = ceil(weight / 4)
# Check for JoinMarket pattern
most_common_value, equal_outs = is_jm(n_in, n_out, values)
if most_common_value > 0:
results.append(f'{txid},{height},{i}')
log.info(f'\n\nFound possible JoinMarket CoinJoin at height {height}\n'
f'TXID: {txid}\n'
f'Inputs: {n_in}\n'
f'Outputs: {n_out}\n'
f'Equal value outputs: {equal_outs}\n'
f'Equal output amount: {most_common_value}\n'
f'Vsize: {vsize}\n'
f'Version: {version}\n'
f'Locktime: {locktime}\n')
processed_txs += 1
break
except Exception as exc:
# raise exc
continue
# Skipping to the next transaction
block_offset += tx_size
# Make sure we have parsed all the transactions in the block
if processed_txs != n_txs:
log.error(f'Failed to parse {n_txs - processed_txs} transactions in block at height {height}')
sys.exit(ExitStatus.FAILURE.value)
log.info(f'Processed block {height}.')
return results
def main() -> None:
args = get_args()
global log
log = get_logger(args.verbose)
btc = Btc(args.host, args.port)
log.debug(f'Started HTTP client to {btc.url}')
info = btc.get_info()
end_block = args.end if args.end else info['blocks']
if args.start < 0:
start_block = end_block - abs(args.start) + 1
else:
start_block = args.start
if info['pruned']:
if start_block < info['pruneheight']:
log.error(f"Can't scan past pruned height. Given start height ({start_block}) is lower than "
f"lowest-height complete block stored ({info['pruneheight']}).")
sys.exit(ExitStatus.ARGERROR.value)
log.info(f'Scanning from block {start_block} to block {end_block}')
start_time = monotonic()
if args.jobs == 0:
# Synchronous, do not use ProcessPoolExecutor
results = list(chain.from_iterable(map(parse_block, get_blocks(start_block, end_block, btc))))
else:
# Asynchronous using a pool of args.jobs processes
# If args.jobs is None, uses all available processors
with ProcessPoolExecutor(args.jobs) as executor:
results = list(chain.from_iterable(executor.map(parse_block, get_blocks(start_block, end_block, btc))))
log.info(f'Scan completed in {monotonic() - start_time:.2f}s')
with open(args.candidate_file_name, 'a+', encoding='UTF-8') as f:
# Go to the beginning of the file
f.seek(0)
lines = (line.strip() for line in chain(f.readlines(), results))
# Remove duplicates and sort by block height
result = sorted(set(lines), key=lambda x: x.split(',')[1])
# Clear the old content
f.truncate(0)
f.writelines('\n'.join(result))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
log.info('Shutting down')