-
Notifications
You must be signed in to change notification settings - Fork 85
Expand file tree
/
Copy pathrun_remote.py
More file actions
304 lines (237 loc) · 10.3 KB
/
run_remote.py
File metadata and controls
304 lines (237 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import logging
from credentials import *
from tools.utils_basic import logging_init, is_symbol, debug
from tools.utils_cache import *
from tools.utils_ding import DingMessager
from tools.utils_remote import DataSource, ExitRight
from delegate.base_delegate import update_position_held
from delegate.xt_subscriber import XtSubscriber, xt_get_ticks
from trader.pools import StocksPoolBlackEmpty as Pool
from trader.buyer import BaseBuyer as Buyer
from trader.seller_groups import LTT2GroupSeller as Seller
from tools.utils_remote import pull_stock_codes
data_source = DataSource.AKSHARE
STRATEGY_NAME = '远程选股'
SELECTION_ID = 'REMOTE'
DING_MESSAGER = DingMessager(DING_SECRET, DING_TOKENS)
IS_PROD = False # 生产环境标志:False 表示使用掘金模拟盘 True 表示使用QMT账户下单交易
IS_DEBUG = True # 日志输出标记:控制台是否打印debug方法的输出
PATH_BASE = CACHE_PROD_PATH if IS_PROD else CACHE_TEST_PATH
PATH_ASSETS = PATH_BASE + '/assets.csv' # 记录历史净值
PATH_DEAL = PATH_BASE + '/deal_hist.csv' # 记录历史成交
PATH_HELD = PATH_BASE + '/positions.json' # 记录持仓信息
PATH_MAXP = PATH_BASE + '/max_price.json' # 记录建仓后历史最高
PATH_MINP = PATH_BASE + '/min_price.json' # 记录建仓后历史最低
PATH_LOGS = PATH_BASE + '/logs.txt' # 记录策略的历史日志
PATH_INFO = PATH_BASE + '/tmp_{}.pkl' # 用来缓存当天的指标信息
disk_lock = threading.Lock() # 操作磁盘文件缓存的锁
cache_selected: dict[str, set] = {} # 记录选股历史,去重
class PoolConf:
day_count = 100 # 100个自然日里有 > 60个交易日
price_adjust = ExitRight.QFQ # 历史价格复权
columns = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'amount']
class BuyConf:
time_ranges = [['14:45', '14:57']]
interval = 10
order_premium = 0.09 # 保证成功买入成交的溢价
slot_count = 20 # 持股数量上限
slot_capacity = 15000 # 每个仓的资金上限
daily_buy_max = 10 # 单日买入股票上限
once_buy_limit = 20 # 单次选股最多买入股票数量
min_price = 2.00 # 限制最低可买入股票的现价
inc_limit = 1.20 # 相对于昨日收盘的涨幅限制
class SellConf:
time_ranges = [['09:30', '11:30'], ['13:00', '15:00']]
interval = 2
order_premium = 0.09 # 保证成功卖出成交的溢价
switch_time_range = ['14:30', '14:57']
switch_hold_days = 5 # 持仓天数
switch_demand_daily_up = 0.003 # 换仓上限乘数
hard_time_range = ['09:31', '14:57']
earn_limit = 9.999 # 硬性止盈率
risk_limit = 1 - 0.07 # 硬性止损率
risk_tight = 0.002 # 硬性止损率每日上移
return_time_range = ['09:31', '14:57']
return_of_profit = [
(1.16, 9.99, 0.16),
(1.07, 1.16, 0.33),
(1.04, 1.07, 0.55),
(1.02, 1.04, 0.66),
]
cci_time_range = ['09:31', '14:57']
cci_upper = 370.0 # CCI 高卖点阈值
cci_lower = -70.0 # CCI 低卖点阈值
opening_time_range = ['14:40', '14:57']
open_low_rate = 0.99 # 低于开仓日最低价比例
open_vol_rate = 0.30 # 低于开仓日成交量比例
ma_time_range = ['09:31', '14:57']
ma_above = 30
# ======== 盘前 ========
def before_trade_day() -> None:
# held_increase() -> None:
update_position_held(disk_lock, my_delegate, PATH_HELD)
if all_held_inc(disk_lock, PATH_HELD):
logging.warning('===== 所有持仓计数 +1 =====')
print(f'[持仓计数] All stocks held day +1')
# refresh_code_list() -> None:
my_pool.refresh()
positions = my_delegate.check_positions()
hold_list = [position.stock_code for position in positions if is_symbol(position.stock_code)]
my_suber.update_code_list(my_pool.get_code_list() + hold_list)
# prepare_history() -> None:
now = datetime.datetime.now()
for i in range(15, 30):
delete_file(PATH_INFO.format((now - datetime.timedelta(days=i)).strftime('%Y_%m_%d')))
cache_path = PATH_INFO.format(now.strftime('%Y_%m_%d'))
start = get_prev_trading_date(now, PoolConf.day_count)
end = get_prev_trading_date(now, 1)
# 只有持仓列表
positions = my_delegate.check_positions()
holding_list = [position.stock_code for position in positions if is_symbol(position.stock_code)]
my_suber.download_cache_history(
cache_path=cache_path,
code_list=holding_list,
start=start,
end=end,
adjust=PoolConf.price_adjust,
columns=PoolConf.columns,
data_source=data_source
)
def near_trade_begin():
now = datetime.datetime.now()
start = get_prev_trading_date(now, PoolConf.day_count)
end = get_prev_trading_date(now, 1)
positions = my_delegate.check_positions()
history_list = my_pool.get_code_list()
history_list += [position.stock_code for position in positions if is_symbol(position.stock_code)]
my_suber.refresh_memory_history(code_list=history_list, start=start, end=end, data_source=data_source)
# ======== 买点 ========
def check_stock_codes(selected_codes: list[str], quotes: Dict) -> dict[str, dict]:
selections = {}
for code in selected_codes:
if code not in quotes:
debug(code, f'本次quotes没数据')
continue
# if code not in my_pool.cache_whitelist:
# debug(code, f'不在白名单')
# continue
# if code in my_pool.cache_blacklist:
# debug(code, f'在黑名单')
# continue
quote = quotes[code]
curr_price = quote['lastPrice']
if not curr_price > BuyConf.min_price:
debug(code, f'价格小于{BuyConf.min_price}')
continue
# if not curr_open < curr_price < prev_close * BuyConf.inc_limit:
# debug(code, f'涨幅不符合区间 {curr_open} < {curr_price} < {prev_close * BuyConf.inc_limit}')
# continue
selections[code] = {
'price': max(quote['askPrice'] + [quote['lastPrice']]),
'lastClose': quote['lastClose'],
}
return selections
def scan_buy(quotes: Dict, curr_date: str, positions: List) -> None:
selected_codes, err_msg = pull_stock_codes(SELECTION_ID, RECOMMEND_HOST, AUTHENTICATION)
if selected_codes is None:
print(f'[{err_msg}]', end='')
return
else:
print(f'[{len(selected_codes)}]')
selections = {}
# 选出一个以上的股票
if selected_codes is not None and len(selected_codes) > 0:
once_quotes = xt_get_ticks(selected_codes)
selections = check_stock_codes(selected_codes, once_quotes)
global cache_selected
cache_selected = my_buyer.buy_selections(selections, cache_selected, curr_date, positions)
# ======== 卖点 ========
def scan_sell(quotes: Dict, curr_date: str, curr_time: str, positions: List) -> None:
max_prices, held_info = update_max_prices(disk_lock, quotes, positions, PATH_MAXP, PATH_MINP, PATH_HELD)
my_seller.execute_sell(quotes, curr_date, curr_time, positions, held_info, max_prices, my_suber.cache_history)
# ======== 框架 ========
def execute_strategy(curr_date: str, curr_time: str, curr_seconds: str, curr_quotes: Dict) -> bool:
positions = my_delegate.check_positions()
for time_range in SellConf.time_ranges:
if time_range[0] <= curr_time <= time_range[1]:
if int(curr_seconds) % SellConf.interval == 0:
scan_sell(curr_quotes, curr_date, curr_time, positions)
for time_range in BuyConf.time_ranges:
if time_range[0] <= curr_time <= time_range[1]:
if int(curr_seconds) % BuyConf.interval == 0:
scan_buy(curr_quotes, curr_date, positions)
return True
return True
if __name__ == '__main__':
logging_init(path=PATH_LOGS, level=logging.INFO)
STRATEGY_NAME = STRATEGY_NAME if IS_PROD else STRATEGY_NAME + '[测]'
print(f'[正在启动] {STRATEGY_NAME}')
if IS_PROD:
from delegate.xt_callback import XtCustomCallback
from delegate.xt_delegate import XtDelegate
my_callback = XtCustomCallback(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
ding_messager=DING_MESSAGER,
disk_lock=disk_lock,
path_deal=PATH_DEAL,
path_held=PATH_HELD,
path_max_prices=PATH_MAXP,
path_min_prices=PATH_MINP,
)
my_delegate = XtDelegate(
account_id=QMT_ACCOUNT_ID,
client_path=QMT_CLIENT_PATH,
callback=my_callback,
ding_messager=DING_MESSAGER,
)
else:
from delegate.gm_callback import GmCallback
from delegate.gm_delegate import GmDelegate
my_callback = GmCallback(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
ding_messager=DING_MESSAGER,
disk_lock=disk_lock,
path_deal=PATH_DEAL,
path_held=PATH_HELD,
path_max_prices=PATH_MAXP,
path_min_prices=PATH_MINP,
)
my_delegate = GmDelegate(
account_id=QMT_ACCOUNT_ID,
callback=my_callback,
ding_messager=DING_MESSAGER,
)
my_pool = Pool(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
parameters=PoolConf,
ding_messager=DING_MESSAGER,
)
my_buyer = Buyer(
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
parameters=BuyConf,
)
my_seller = Seller(
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
parameters=SellConf,
)
my_suber = XtSubscriber(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
path_deal=PATH_DEAL,
path_assets=PATH_ASSETS,
execute_strategy=execute_strategy,
before_trade_day=before_trade_day,
near_trade_begin=near_trade_begin,
use_ap_scheduler=True,
ding_messager=DING_MESSAGER,
open_tick_memory_cache=True,
open_today_deal_report=True,
open_today_hold_report=True,
)
my_suber.start_scheduler()