|
| 1 | +import json |
1 | 2 | from pathlib import Path |
2 | | -import tempfile |
3 | 3 | from typing import Any, Dict |
4 | 4 |
|
5 | 5 | import eth_keyfile |
6 | 6 | from eth_typing import Address |
7 | | -from eth_utils import to_bytes |
| 7 | +from eth_utils import add_0x_prefix, to_bytes |
8 | 8 |
|
| 9 | +from ethpm_cli._utils.filesystem import atomic_replace |
| 10 | +from ethpm_cli._utils.input import parse_bool_flag |
| 11 | +from ethpm_cli._utils.logger import cli_logger |
9 | 12 | from ethpm_cli._utils.xdg import get_xdg_ethpmcli_root |
10 | 13 | from ethpm_cli.constants import KEYFILE_PATH |
11 | 14 | from ethpm_cli.exceptions import AuthorizationError |
12 | 15 |
|
| 16 | +PRIVATE_KEY_WARNING = ( |
| 17 | + "Please be careful when using your private key, it is a sensitive piece of information.\n", |
| 18 | + "~ ~ ~ Not your keys, not your crypto. ~ ~ ~\n", |
| 19 | + "ethPM doesn't save your private key, but it is best practice to re-use an already encrypted ", |
| 20 | + "keyfile and link it with `ethpm auth link` rather than regenerating a new encrypted keyfile.", |
| 21 | +) |
| 22 | + |
| 23 | + |
| 24 | +def link_keyfile(keyfile_path: Path) -> None: |
| 25 | + if valid_keyfile_exists(): |
| 26 | + xdg_keyfile = get_keyfile_path() |
| 27 | + cli_logger.info( |
| 28 | + f"Keyfile detected at {xdg_keyfile}. Please use `ethpm auth unlink` to delete this " |
| 29 | + "keyfile before linking a new one." |
| 30 | + ) |
| 31 | + else: |
| 32 | + import_keyfile(keyfile_path) |
| 33 | + address = get_authorized_address() |
| 34 | + cli_logger.info( |
| 35 | + f"Keyfile stored for address: {address}\n" |
| 36 | + "It's now available for use when its password is passed in with the " |
| 37 | + "`--keyfile-password` flag." |
| 38 | + ) |
| 39 | + |
| 40 | + |
| 41 | +def unlink_keyfile() -> None: |
| 42 | + if not valid_keyfile_exists(): |
| 43 | + cli_logger.info("Unable to unlink keyfile: empty keyfile found.") |
| 44 | + else: |
| 45 | + keyfile_path = get_keyfile_path() |
| 46 | + address = get_authorized_address() |
| 47 | + keyfile_path.write_text("") |
| 48 | + cli_logger.info(f"Keyfile removed for address: {address}") |
| 49 | + |
| 50 | + |
| 51 | +def init_keyfile() -> None: |
| 52 | + if valid_keyfile_exists(): |
| 53 | + cli_logger.info( |
| 54 | + f"Keyfile detected. Please use `ethpm auth unlink` to delete this " |
| 55 | + "keyfile before initializing a new one." |
| 56 | + ) |
| 57 | + return |
| 58 | + |
| 59 | + cli_logger.info(PRIVATE_KEY_WARNING) |
| 60 | + agreement = parse_bool_flag( |
| 61 | + "Are you sure you want to proceed with initializing a keyfile? " |
| 62 | + ) |
| 63 | + if not agreement: |
| 64 | + cli_logger.info("Aborting keyfile initialization.") |
| 65 | + return |
| 66 | + |
| 67 | + private_key = to_bytes(text=input("Please enter your 32-length private key: ")) |
| 68 | + validate_private_key_length(private_key) |
| 69 | + password = to_bytes( |
| 70 | + text=input( |
| 71 | + "Please enter a password to encrypt your keyfile with (Don't forget this password!): " |
| 72 | + ) |
| 73 | + ) |
| 74 | + keyfile_json = eth_keyfile.create_keyfile_json(private_key, password) |
| 75 | + ethpm_xdg_root = get_xdg_ethpmcli_root() |
| 76 | + ethpm_cli_keyfile_path = ethpm_xdg_root / KEYFILE_PATH |
| 77 | + with atomic_replace(ethpm_cli_keyfile_path) as file: |
| 78 | + file.write(json.dumps(keyfile_json)) |
| 79 | + address = get_authorized_address() |
| 80 | + cli_logger.info(f"Encrypted keyfile saved for address: {address}") |
| 81 | + |
| 82 | + |
| 83 | +def valid_keyfile_exists() -> bool: |
| 84 | + try: |
| 85 | + get_authorized_address() |
| 86 | + except AuthorizationError: |
| 87 | + return False |
| 88 | + return True |
| 89 | + |
| 90 | + |
| 91 | +def validate_private_key_length(private_key: str) -> None: |
| 92 | + if len(private_key) != 32: |
| 93 | + raise AuthorizationError(f"{private_key} is not 32 long") |
| 94 | + |
13 | 95 |
|
14 | 96 | def import_keyfile(keyfile_path: Path) -> None: |
15 | 97 | validate_keyfile(keyfile_path) |
16 | 98 | ethpm_xdg_root = get_xdg_ethpmcli_root() |
17 | 99 | ethpm_cli_keyfile_path = ethpm_xdg_root / KEYFILE_PATH |
18 | | - tmp_keyfile = Path(tempfile.NamedTemporaryFile().name) |
19 | | - tmp_keyfile.write_text(keyfile_path.read_text()) |
20 | | - tmp_keyfile.replace(ethpm_cli_keyfile_path) |
| 100 | + with atomic_replace(ethpm_cli_keyfile_path) as file: |
| 101 | + file.write(keyfile_path.read_text()) |
21 | 102 |
|
22 | 103 |
|
23 | 104 | def get_keyfile_path() -> Path: |
@@ -49,7 +130,7 @@ def get_authorized_address() -> Address: |
49 | 130 | Returns the address associated with stored keyfile. No password required. |
50 | 131 | """ |
51 | 132 | keyfile = get_keyfile_data() |
52 | | - return keyfile["address"] |
| 133 | + return add_0x_prefix(keyfile["address"]) |
53 | 134 |
|
54 | 135 |
|
55 | 136 | def get_authorized_private_key(password: str) -> str: |
|
0 commit comments