forked from silver6wings/SilverQuant
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_wencai.py
More file actions
326 lines (251 loc) · 10.9 KB
/
run_wencai.py
File metadata and controls
326 lines (251 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import time
import math
import logging
import schedule
from credentials import *
from tools.utils_basic import logging_init, is_symbol
from tools.utils_cache import *
from tools.utils_ding import DingMessager
from delegate.xt_delegate import xt_get_ticks
from delegate.xt_subscriber import XtSubscriber, update_position_held
from trader.buyer import BaseBuyer as Buyer
from trader.pools import StocksPoolWhiteIndexes as Pool
from trader.seller_groups import ClassicGroupSeller as Seller
from selector.select_wencai import get_wencai_codes_prices, get_prompt
select_prompt = get_prompt()
STRATEGY_NAME = '问财选股'
DING_MESSAGER = DingMessager(DING_SECRET, DING_TOKENS)
IS_PROD = False
IS_DEBUG = True
PATH_BASE = CACHE_BASE_PATH
PATH_ASSETS = PATH_BASE + '/assets.csv' # 记录历史净值
PATH_DEAL = PATH_BASE + '/deal_hist.csv' # 记录历史成交
PATH_HELD = PATH_BASE + '/held_days.json' # 记录持仓日期
PATH_MAXP = PATH_BASE + '/max_price.json' # 记录历史最高
PATH_LOGS = PATH_BASE + '/logs.txt' # 用来存储选股和委托操作
lock_of_disk_cache = threading.Lock() # 操作磁盘文件缓存的锁
cache_selected: Dict[str, Set] = {} # 记录选股历史,去重
cache_history: Dict[str, pd.DataFrame] = {} # 记录历史日线行情的信息 { code: DataFrame }
def debug(*args):
if IS_DEBUG:
print(*args)
class PoolConf:
white_indexes = [IndexSymbol.INDEX_ZZ_ALL]
black_prompts = ['ST', '退市']
class BuyConf:
time_ranges = [['14:47', '14:57']]
# wencai 尽可能时间长些,不然会被封IP
interval = 30 # 扫描买入间隔,60的约数:1-6, 10, 12, 15, 20, 30
order_premium = 0.02 # 保证市价单成交的溢价,单位(元)
slot_count = 10 # 持股数量上限
slot_capacity = 10000 # 每个仓的资金上限
once_buy_limit = 10 # 单次选股最多买入股票数量(若单次未买进当日不会再买这只
class SellConf:
time_ranges = [['09:31', '11:30'], ['13:00', '14:57']]
interval = 1 # 扫描买入间隔,60的约数:1-6, 10, 12, 15, 20, 30
order_premium = 0.02 # 保证市价单成交的溢价,单位(元)
hard_time_range = ['09:31', '14:57']
earn_limit = 9.999 # 硬性止盈率
risk_limit = 1 - 0.03 # 硬性止损率
risk_tight = 0.002 # 硬性止损率每日上移
# 利润从最高点回撤卖出
fall_time_range = ['09:31', '14:57']
fall_from_top = [
(1.08, 9.99, 0.02),
(1.02, 1.08, 0.05),
]
# 涨幅超过建仓价xA,并小于建仓价xB 时,回撤涨幅的C倍卖出
# (A, B, C)
return_time_range = ['09:31', '14:57']
return_of_profit = [
(1.20, 9.99, 0.100),
(1.08, 1.20, 0.200),
(1.05, 1.08, 0.300),
(1.03, 1.05, 0.500),
(1.02, 1.03, 0.800),
]
# ======== 盘前 ========
def held_increase() -> None:
if not check_is_open_day(datetime.datetime.now().strftime('%Y-%m-%d')):
return
update_position_held(lock_of_disk_cache, my_delegate, PATH_HELD)
if all_held_inc(lock_of_disk_cache, PATH_HELD):
logging.warning('===== 所有持仓计数 +1 =====')
print(f'All held stock day +1!')
def refresh_code_list():
if not check_is_open_day(datetime.datetime.now().strftime('%Y-%m-%d')):
return
my_pool.refresh()
positions = my_delegate.check_positions()
hold_list = [position.stock_code for position in positions if is_symbol(position.stock_code)]
my_suber.update_code_list(hold_list)
# ======== 买点 ========
def pull_stock_codes() -> List[str]:
codes_wencai = get_wencai_codes_prices(select_prompt)
codes_top = []
for code in codes_wencai:
if (code in my_pool.cache_whitelist) and (code not in my_pool.cache_blacklist):
codes_top.append(code)
return codes_top
def check_stock_codes(selected_codes: list[str], quotes: Dict) -> List[Dict[str, any]]:
selections = []
for code in selected_codes:
if code not in quotes:
debug(code, f'本次quotes没数据')
continue
quote = quotes[code]
curr_price = quote['lastPrice']
selection = {
'code': code,
'price': round(max(quote['askPrice'] + [curr_price]), 2),
'lastClose': round(quote['lastClose'], 2),
}
selections.append(selection)
return selections
def scan_buy(quotes: Dict, curr_date: str, positions: List) -> None:
selected_codes = pull_stock_codes()
selections = []
# 选出一个以上的股票
if selected_codes is not None and len(selected_codes) > 0:
once_quotes = xt_get_ticks(selected_codes)
selections = check_stock_codes(selected_codes, once_quotes)
if len(selections) > 0:
position_codes = [position.stock_code for position in positions]
position_count = get_holding_position_count(positions)
available_cash = my_delegate.check_asset().cash
available_slot = available_cash // BuyConf.slot_capacity
buy_count = max(0, BuyConf.slot_count - position_count) # 确认剩余的仓位
buy_count = min(buy_count, available_slot) # 确认现金够用
buy_count = min(buy_count, len(selections)) # 确认选出的股票够用
buy_count = min(buy_count, BuyConf.once_buy_limit) # 限制一秒内下单数量
buy_count = int(buy_count)
for i in range(len(selections)): # 依次买入
# logging.info(f'买数相关:持仓{position_count} 现金{available_cash} 已选{len(selections)}')
if buy_count > 0:
code = selections[i]['code']
price = selections[i]['price']
last_close = selections[i]['lastClose']
buy_volume = math.floor(BuyConf.slot_capacity / price / 100) * 100
if buy_volume <= 0:
debug(f'{code} 不够一手')
elif code in position_codes:
debug(f'{code} 正在持仓')
elif curr_date in cache_selected and code in cache_selected[curr_date]:
debug(f'{code} 今日已选')
else:
buy_count = buy_count - 1
# 如果今天未被选股过 and 目前没有持仓则记录(意味着不会加仓
my_buyer.order_buy(code=code, price=price, last_close=last_close,
volume=buy_volume, remark='买入委托')
else:
break
# 记录选股历史
if curr_date not in cache_selected:
cache_selected[curr_date] = set()
for selection in selections:
if selection['code'] not in cache_selected[curr_date]:
cache_selected[curr_date].add(selection['code'])
logging.warning(f"记录选股 {selection['code']}\t现价: {selection['price']:.2f}")
# ======== 卖点 ========
def scan_sell(quotes: Dict, curr_date: str, curr_time: str, positions: List) -> None:
max_prices, held_days = update_max_prices(lock_of_disk_cache, quotes, positions, PATH_MAXP, PATH_HELD)
my_seller.execute_sell(quotes, curr_date, curr_time, positions, held_days, max_prices, my_suber.cache_history)
# ======== 框架 ========
def execute_strategy(curr_date: str, curr_time: str, curr_seconds: str, curr_quotes: Dict) -> bool:
positions = my_delegate.check_positions()
for time_range in SellConf.time_ranges:
if time_range[0] <= curr_time <= time_range[1]:
if int(curr_seconds) % SellConf.interval == 0:
scan_sell(curr_quotes, curr_date, curr_time, positions)
for time_range in BuyConf.time_ranges:
if time_range[0] <= curr_time <= time_range[1]:
if int(curr_seconds) % BuyConf.interval == 0:
scan_buy(curr_quotes, curr_date, positions)
return True
return False
if __name__ == '__main__':
logging_init(path=PATH_LOGS, level=logging.INFO)
STRATEGY_NAME = STRATEGY_NAME if IS_PROD else STRATEGY_NAME + "(模拟)"
print(f'正在启动 {STRATEGY_NAME}...')
if IS_PROD:
from delegate.xt_callback import XtCustomCallback
from delegate.xt_delegate import XtDelegate, get_holding_position_count
my_callback = XtCustomCallback(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
ding_messager=DING_MESSAGER,
lock_of_disk_cache=lock_of_disk_cache,
path_deal=PATH_DEAL,
path_held=PATH_HELD,
path_maxp=PATH_MAXP,
)
my_delegate = XtDelegate(
account_id=QMT_ACCOUNT_ID,
client_path=QMT_CLIENT_PATH,
callback=my_callback,
)
else:
from delegate.gm_callback import GmCallback
from delegate.gm_delegate import GmDelegate, get_holding_position_count
my_callback = GmCallback(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
ding_messager=DING_MESSAGER,
lock_of_disk_cache=lock_of_disk_cache,
path_deal=PATH_DEAL,
path_held=PATH_HELD,
path_maxp=PATH_MAXP,
)
my_delegate = GmDelegate(
account_id=QMT_ACCOUNT_ID,
callback=my_callback,
ding_messager=DING_MESSAGER,
)
my_pool = Pool(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
parameters=PoolConf,
ding_messager=DING_MESSAGER,
)
my_buyer = Buyer(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
parameters=BuyConf,
)
my_seller = Seller(
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
parameters=SellConf,
)
my_suber = XtSubscriber(
account_id=QMT_ACCOUNT_ID,
strategy_name=STRATEGY_NAME,
delegate=my_delegate,
path_deal=PATH_DEAL,
path_assets=PATH_ASSETS,
execute_strategy=execute_strategy,
ding_messager=DING_MESSAGER,
open_today_deal_report=True,
open_today_hold_report=True,
)
my_suber.start_scheduler()
temp_now = datetime.datetime.now()
temp_date = temp_now.strftime('%Y-%m-%d')
temp_time = temp_now.strftime('%H:%M')
# 定时任务启动
schedule.every().day.at('08:05').do(held_increase)
schedule.every().day.at('08:10').do(refresh_code_list)
if '08:05' < temp_time < '15:30' and check_is_open_day(temp_date):
held_increase()
if '08:10' < temp_time < '14:57':
refresh_code_list()
if '09:25' < temp_time < '11:30' or '13:00' <= temp_time < '14:57':
my_suber.subscribe_tick() # 重启时如果在交易时间则订阅Tick
try:
while True:
schedule.run_pending()
time.sleep(1)
finally:
schedule.clear()
my_delegate.shutdown()