forked from QuantConnect/Lean.DataSource.DataBento
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDataBentoHistoryProivder.cs
More file actions
262 lines (230 loc) · 10.5 KB
/
DataBentoHistoryProivder.cs
File metadata and controls
262 lines (230 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2026 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using NodaTime;
using QuantConnect.Util;
using QuantConnect.Data;
using QuantConnect.Logging;
using QuantConnect.Securities;
using QuantConnect.Data.Market;
using QuantConnect.Lean.Engine.HistoricalData;
namespace QuantConnect.Lean.DataSource.DataBento;
/// <summary>
/// Implements a history provider for DataBento historical data.
/// Uses consolidators to produce the requested resolution when necessary.
/// </summary>
public partial class DataBentoDataProvider : MappedSynchronizingHistoryProvider
{
private static int _dataPointCount;
/// <summary>
/// Indicates whether a error for an invalid start time has been fired, where the start time is greater than or equal to the end time in UTC.
/// </summary>
private volatile bool _invalidStartTimeErrorFired;
/// <summary>
/// Indicates whether the warning for invalid <see cref="SecurityType"/> has been fired.
/// </summary>
private volatile bool _invalidSecurityTypeWarningFired;
/// <summary>
/// Indicates whether a DataBento dataset error has already been logged.
/// </summary>
private bool _dataBentoDatasetErrorFired;
/// <summary>
/// Gets the total number of data points emitted by this history provider
/// </summary>
public override int DataPointCount => _dataPointCount;
/// <summary>
/// Initializes this history provider to work for the specified job
/// </summary>
/// <param name="parameters">The initialization parameters</param>
public override void Initialize(HistoryProviderInitializeParameters parameters)
{
}
/// <summary>
/// Gets the history for the requested security
/// </summary>
/// <param name="historyRequest">The historical data request</param>
/// <returns>An enumerable of BaseData points</returns>
public override IEnumerable<BaseData>? GetHistory(HistoryRequest historyRequest)
{
if (!CanSubscribe(historyRequest.Symbol))
{
if (!_invalidSecurityTypeWarningFired)
{
_invalidSecurityTypeWarningFired = true;
Log.Trace($"{nameof(DataBentoDataProvider)}.{nameof(GetHistory)}:" +
$"History request not supported for symbol '{historyRequest.Symbol}' (SecurityType: {historyRequest.Symbol.SecurityType}, Canonical: {historyRequest.Symbol.IsCanonical()}).");
}
return null;
}
if (historyRequest.EndTimeUtc < historyRequest.StartTimeUtc)
{
if (!_invalidStartTimeErrorFired)
{
_invalidStartTimeErrorFired = true;
Log.Error($"{nameof(DataBentoDataProvider)}.{nameof(GetHistory)}: Invalid date range: the start date must be earlier than the end date.");
}
return null;
}
if (!_symbolMapper.DataBentoDataSetByLeanMarket.TryGetValue(historyRequest.Symbol.ID.Market, out var dataSetSpecifications) || dataSetSpecifications == null)
{
if (!_dataBentoDatasetErrorFired)
{
_dataBentoDatasetErrorFired = true;
Log.Error($"{nameof(DataBentoDataProvider)}.{nameof(GetHistory)}: " +
$"DataBento dataset not found for symbol '{historyRequest.Symbol.Value}, Market = {historyRequest.Symbol.ID.Market}."
);
}
return null;
}
if (dataSetSpecifications.TryGetDelayWarningMessage(out var message))
{
Log.Trace(message);
}
var dataSet = dataSetSpecifications.DataSetID;
var history = default(IEnumerable<BaseData>);
var brokerageSymbol = _symbolMapper.GetBrokerageSymbol(historyRequest.Symbol);
switch (historyRequest.TickType)
{
case TickType.Trade when historyRequest.Resolution == Resolution.Tick:
history = GetTradeTicks(historyRequest, brokerageSymbol, dataSet);
break;
case TickType.Trade:
history = GetAggregatedTradeBars(historyRequest, brokerageSymbol, dataSet);
break;
case TickType.Quote when historyRequest.Resolution == Resolution.Tick:
history = GetQuoteTicks(historyRequest, brokerageSymbol, dataSet);
break;
case TickType.Quote when historyRequest is { Resolution: Resolution.Second or Resolution.Minute }:
history = GetIntraDayQuoteBars(historyRequest, historyRequest.Resolution, brokerageSymbol, dataSet);
break;
case TickType.Quote:
history = GetInterDayQuoteBars(historyRequest, brokerageSymbol, dataSet);
break;
case TickType.OpenInterest:
history = GetOpenInterestBars(historyRequest, brokerageSymbol, dataSet);
break;
}
if (history == null)
{
return null;
}
return FilterHistory(history, historyRequest, historyRequest.StartTimeLocal, historyRequest.EndTimeLocal);
}
private static IEnumerable<BaseData> FilterHistory(IEnumerable<BaseData> history, HistoryRequest request, DateTime startTimeLocal, DateTime endTimeLocal)
{
// cleaning the data before returning it back to user
foreach (var bar in history)
{
if (bar.Time >= startTimeLocal && bar.EndTime <= endTimeLocal)
{
if (request.ExchangeHours.IsOpen(bar.Time, bar.EndTime, request.IncludeExtendedMarketHours))
{
Interlocked.Increment(ref _dataPointCount);
yield return bar;
}
}
}
}
private IEnumerable<BaseData> GetOpenInterestBars(HistoryRequest request, string brokerageSymbol, string dataBentoDataSet)
{
foreach (var oi in _historicalApiClient.GetOpenInterest(brokerageSymbol, request.StartTimeUtc, request.EndTimeUtc, dataBentoDataSet))
{
if (oi.Header.UtcDateTime == null)
{
continue;
}
yield return new OpenInterest(oi.Header.UtcDateTime.Value.ConvertFromUtc(request.ExchangeHours.TimeZone), request.Symbol, oi.Quantity);
}
}
private IEnumerable<BaseData> GetAggregatedTradeBars(HistoryRequest request, string brokerageSymbol, string dataBentoDataSet)
{
var period = request.Resolution.ToTimeSpan();
foreach (var b in _historicalApiClient.GetHistoricalOhlcvBars(brokerageSymbol, request.StartTimeUtc, request.EndTimeUtc, request.Resolution, dataBentoDataSet))
{
if (b.Header.UtcDateTime == null)
{
continue;
}
yield return new TradeBar(b.Header.UtcDateTime.Value.ConvertFromUtc(request.ExchangeHours.TimeZone), request.Symbol, b.Open, b.High, b.Low, b.Close, b.Volume, period);
}
}
/// <summary>
/// Gets the trade ticks that will potentially be aggregated for the specified history request
/// </summary>
private IEnumerable<BaseData> GetTradeTicks(HistoryRequest request, string brokerageSymbol, string dataBentoDataSet)
{
foreach (var t in _historicalApiClient.GetLevelOneData(brokerageSymbol, request.StartTimeUtc, request.EndTimeUtc, dataBentoDataSet))
{
if (t.Price.HasValue)
{
if (t.Header.UtcDateTime == null)
{
continue;
}
yield return new Tick(t.Header.UtcDateTime.Value.ConvertFromUtc(request.ExchangeHours.TimeZone), request.Symbol, "", "", t.Size, t.Price.Value);
}
}
}
private IEnumerable<BaseData>? GetInterDayQuoteBars(HistoryRequest request, string brokerageSymbol, string dataBentoDataSet)
{
foreach (var qb in LeanData.AggregateQuoteBars(GetIntraDayQuoteBars(request, Resolution.Minute, brokerageSymbol, dataBentoDataSet), request.Symbol, request.Resolution.ToTimeSpan()))
{
yield return qb;
}
}
/// <summary>
/// Gets the quote ticks that will potentially be aggregated for the specified history request
/// </summary>
private IEnumerable<QuoteBar> GetIntraDayQuoteBars(HistoryRequest request, Resolution resolution, string brokerageSymbol, string dataBentoDataSet)
{
var period = resolution.ToTimeSpan();
foreach (var q in _historicalApiClient.GetBestBidOfferIntervals(brokerageSymbol, request.StartTimeUtc, request.EndTimeUtc, resolution, dataBentoDataSet))
{
if (q.LevelOne == null || !q.LevelOne.HasBidOrAskPrice())
{
continue;
}
if (q.UtcDateTime == null)
{
continue;
}
var bar = new QuoteBar(q.UtcDateTime.Value.ConvertFromUtc(request.ExchangeHours.TimeZone), request.Symbol, bid: null, lastBidSize: decimal.Zero, ask: null, lastAskSize: decimal.Zero, period);
if (q.LevelOne.BidPx.HasValue)
{
bar.UpdateBid(q.LevelOne.BidPx.Value, q.LevelOne.BidSz);
}
if (q.LevelOne.AskPx.HasValue)
{
bar.UpdateAsk(q.LevelOne.AskPx.Value, q.LevelOne.AskSz);
}
yield return bar;
}
}
private IEnumerable<BaseData> GetQuoteTicks(HistoryRequest request, string brokerageSymbol, string dataBentoDataSet)
{
foreach (var q in _historicalApiClient.GetLevelOneData(brokerageSymbol, request.StartTimeUtc, request.EndTimeUtc, dataBentoDataSet))
{
if (q.LevelOne == null || !q.LevelOne.HasBidOrAskPrice())
{
continue;
}
if (q.Header.UtcDateTime == null)
{
continue;
}
yield return new Tick(q.Header.UtcDateTime.Value.ConvertFromUtc(request.ExchangeHours.TimeZone), request.Symbol, q.LevelOne.BidSz, q.LevelOne.BidPx ?? 0m, q.LevelOne.AskSz, q.LevelOne.AskPx ?? 0m);
}
}
}