-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathcreate_limit_order.py
More file actions
90 lines (70 loc) · 2.29 KB
/
create_limit_order.py
File metadata and controls
90 lines (70 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import json
import os
import time
import uuid
import websockets
from dotenv import load_dotenv
from solders.keypair import Keypair
from common.constants import WS_URL
from common.utils import sign_message
# Load environment variables
load_dotenv()
PRIVATE_KEY = os.getenv("PRIVATE_KEY")
async def exec_main():
# Generate account based on private key
keypair = Keypair.from_base58_string(PRIVATE_KEY)
public_key = str(keypair.pubkey())
# Scaffold the signature header
timestamp = int(time.time() * 1_000)
signature_header = {
"timestamp": timestamp,
"expiry_window": 5_000,
"type": "create_order",
}
# Construct the signature payload
signature_payload = {
"symbol": "BTC",
"price": str(100_000),
"reduce_only": False,
"amount": "0.1",
"side": "bid",
"tif": "GTC",
"client_order_id": str(uuid.uuid4()),
}
# Use the helper function to sign the message
message, signature = sign_message(signature_header, signature_payload, keypair)
# Construct the request reusing the payload and constructing common request fields
request_header = {
"account": public_key,
"signature": signature,
"timestamp": signature_header["timestamp"],
"expiry_window": signature_header["expiry_window"],
}
# Combine headers and payload for the final message
message_to_send = {
**request_header,
**signature_payload,
}
# Connect to WebSocket
async with websockets.connect(WS_URL, ping_interval=30) as websocket:
# Prepare the WebSocket message according to the backend format
ws_message = {
"id": str(uuid.uuid4()),
"params": {"create_order": message_to_send},
}
# Send the message
await websocket.send(json.dumps(ws_message))
# Wait for response
response = await websocket.recv()
print(f"Response: {response}")
# Print details for debugging
print("\nDebug Info:")
print(f"Address: {public_key}")
print(f"Message: {message}")
print(f"Signature: {signature}")
print(f"WebSocket Message: {ws_message}")
async def main():
await exec_main()
if __name__ == "__main__":
asyncio.run(main())