Skip to content

Commit ed3d8ce

Browse files
committed
Split off DataFrame and non DF example for StreamingEvents
1 parent 039c20e commit ed3d8ce

File tree

2 files changed

+350
-103
lines changed

2 files changed

+350
-103
lines changed

2.Content/2.2-Pricing/2.2.03-Pricing-StreamingEvents.ipynb

Lines changed: 64 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
{
7373
"data": {
7474
"text/plain": [
75-
"<refinitiv.data._data.core.session._platform_session.PlatformSession at 0x228ff300640>"
75+
"<refinitiv.data._data.core.session._platform_session.PlatformSession at 0x2da3a6080a0>"
7676
]
7777
},
7878
"execution_count": 2,
@@ -98,47 +98,21 @@
9898
"metadata": {},
9999
"outputs": [],
100100
"source": [
101-
"# declare a DataFrame to use for storing our streaming data\n",
102-
"df = DataFrame()\n",
103-
"# and one to store status events\n",
104-
"df_status = DataFrame()\n",
105-
"\n",
106101
"# Function to handle the intial Refresh for each item\n",
107102
"def handle_refresh(streaming_prices, instrument_name, fields):\n",
108-
"# We will do nothing in this instance - instead we will use the create_dataframe below\n",
109-
"# to create a dataframe - once we have received initial values for all items\n",
103+
" # One way to access data - get dataframe\n",
104+
" print(f\"Refresh : {streaming_prices.get_snapshot()}\")\n",
110105
" return\n",
111106
"\n",
112-
"# Function to initially populate Dataframe, once initial values received for all items\n",
113-
"def create_dataframe(streaming_prices):\n",
114-
" global df\n",
115-
" snapshot = streaming_prices.get_snapshot()\n",
116-
" field_names = snapshot.columns[1:]\n",
117-
" instrument_names = snapshot['Instrument'].values\n",
118-
" df = DataFrame(index=instrument_names, columns=field_names)\n",
119-
" df['Status']=''\n",
120-
" for price in streaming_prices:\n",
121-
" for field_name, field_value in price.get_fields().items():\n",
122-
" df.at[price.name, field_name] = field_value\n",
123-
" if price.name in df_status.index:\n",
124-
" df.at[price.name, 'Status'] = df_status.code[price.name]\n",
125-
" display(df)\n",
126-
"\n",
127107
"# Function to update dataframe, when we receive updates for individual items\n",
128-
"def update_dataframe(streaming_prices, instrument_name, fields):\n",
129-
" global df\n",
130-
" clear_output(wait=True)\n",
131-
" for field_name, field_value in fields.items():\n",
132-
" df.at[instrument_name, field_name] = field_value\n",
133-
" if instrument_name in df_status.index:\n",
134-
" df.at[instrument_name, 'Status'] = df_status.code[price.name]\n",
135-
" display(df)\n",
108+
"def handle_update(streaming_prices, instrument_name, fields):\n",
109+
" # Alternative way of accesing data - access the updated fields\n",
110+
" print(f\"Update : {instrument_name}:{fields}\")\n",
136111
" \n",
137112
"# Function to extract status code for an item as & when received from server\n",
138-
"# The status object also contains a more detailed 'message'\n",
139-
"def get_status(streaming_prices, instrument_name, status):\n",
140-
" global init_status\n",
141-
" df_status.at[instrument_name, 'code'] = status['code']"
113+
"# Status contains a 'code' and a more detailed 'message'\n",
114+
"def handle_status(streaming_prices, instrument_name, status):\n",
115+
" print(f\"Status : {instrument_name}:{status['code']}:{status['message']}\")"
142116
]
143117
},
144118
{
@@ -153,88 +127,62 @@
153127
"execution_count": 4,
154128
"metadata": {},
155129
"outputs": [
130+
{
131+
"name": "stdout",
132+
"output_type": "stream",
133+
"text": [
134+
"Refresh : Instrument BID ASK\n",
135+
"0 EUR= 1.1934 1.1938\n",
136+
"1 GBP= <NA> <NA>\n",
137+
"2 JPY= <NA> <NA>\n",
138+
"3 BADRIC <NA> <NA>\n",
139+
"Refresh : Instrument BID ASK\n",
140+
"0 EUR= 1.1934 1.1938\n",
141+
"1 GBP= 1.3956 1.396\n",
142+
"2 JPY= <NA> <NA>\n",
143+
"3 BADRIC <NA> <NA>\n",
144+
"Refresh : Instrument BID ASK\n",
145+
"0 EUR= 1.1934 1.1938\n",
146+
"1 GBP= 1.3956 1.396\n",
147+
"2 JPY= 110.92 110.95\n",
148+
"3 BADRIC <NA> <NA>\n",
149+
"Status : BADRIC:NotFound:**The record could not be found\n"
150+
]
151+
},
156152
{
157153
"data": {
158-
"text/html": [
159-
"<div>\n",
160-
"<style scoped>\n",
161-
" .dataframe tbody tr th:only-of-type {\n",
162-
" vertical-align: middle;\n",
163-
" }\n",
164-
"\n",
165-
" .dataframe tbody tr th {\n",
166-
" vertical-align: top;\n",
167-
" }\n",
168-
"\n",
169-
" .dataframe thead th {\n",
170-
" text-align: right;\n",
171-
" }\n",
172-
"</style>\n",
173-
"<table border=\"1\" class=\"dataframe\">\n",
174-
" <thead>\n",
175-
" <tr style=\"text-align: right;\">\n",
176-
" <th></th>\n",
177-
" <th>BID</th>\n",
178-
" <th>ASK</th>\n",
179-
" <th>Status</th>\n",
180-
" </tr>\n",
181-
" </thead>\n",
182-
" <tbody>\n",
183-
" <tr>\n",
184-
" <th>EUR=</th>\n",
185-
" <td>1.1901</td>\n",
186-
" <td>1.1905</td>\n",
187-
" <td></td>\n",
188-
" </tr>\n",
189-
" <tr>\n",
190-
" <th>GBP=</th>\n",
191-
" <td>1.3915</td>\n",
192-
" <td>1.3919</td>\n",
193-
" <td></td>\n",
194-
" </tr>\n",
195-
" <tr>\n",
196-
" <th>JPY=</th>\n",
197-
" <td>110.76</td>\n",
198-
" <td>110.79</td>\n",
199-
" <td></td>\n",
200-
" </tr>\n",
201-
" <tr>\n",
202-
" <th>CAD=</th>\n",
203-
" <td>1.2371</td>\n",
204-
" <td>1.2375</td>\n",
205-
" <td></td>\n",
206-
" </tr>\n",
207-
" </tbody>\n",
208-
"</table>\n",
209-
"</div>"
210-
],
211154
"text/plain": [
212-
" BID ASK Status\n",
213-
"EUR= 1.1901 1.1905 \n",
214-
"GBP= 1.3915 1.3919 \n",
215-
"JPY= 110.76 110.79 \n",
216-
"CAD= 1.2371 1.2375 "
155+
"<StreamState.Open: 3>"
217156
]
218157
},
158+
"execution_count": 4,
219159
"metadata": {},
220-
"output_type": "display_data"
160+
"output_type": "execute_result"
161+
},
162+
{
163+
"name": "stdout",
164+
"output_type": "stream",
165+
"text": [
166+
"Update : GBP=:{'BID': 1.3957, 'ASK': 1.3958}\n",
167+
"Update : JPY=:{'BID': 110.93, 'ASK': 110.94}\n",
168+
"Update : EUR=:{'BID': 1.1934, 'ASK': 1.1937}\n",
169+
"Update : EUR=:{'BID': 1.1936, 'ASK': 1.1937}\n"
170+
]
221171
}
222172
],
223173
"source": [
224174
"# Define our Streaming Price object\n",
225175
"streams = rd.content.pricing.Definition(\n",
226-
" ['EUR=', 'GBP=', 'JPY=', 'CAD='],\n",
176+
" ['EUR=', 'GBP=', 'JPY=', 'BADRIC'],\n",
227177
" fields=['BID', 'ASK']\n",
228178
").get_stream()\n",
229179
"\n",
230180
"# Callback for if we wanted to handle invidiual Refresh for each item\n",
231181
"streams.on_refresh(handle_refresh)\n",
232-
"# Callback for when we have received initial values for all items\n",
233-
"streams.on_complete(create_dataframe)\n",
234182
"# Specify callback handler for any updates\n",
235-
"streams.on_update(update_dataframe)\n",
183+
"streams.on_update(handle_update)\n",
236184
"# Specify callback handler for any updates\n",
237-
"streams.on_status(get_status)\n",
185+
"streams.on_status(handle_status)\n",
238186
"\n",
239187
"# Send the requests to the server and open the streams for all items\n",
240188
"streams.open()"
@@ -249,9 +197,20 @@
249197
},
250198
{
251199
"cell_type": "code",
252-
"execution_count": null,
200+
"execution_count": 5,
253201
"metadata": {},
254-
"outputs": [],
202+
"outputs": [
203+
{
204+
"data": {
205+
"text/plain": [
206+
"<StreamState.Closed: 1>"
207+
]
208+
},
209+
"execution_count": 5,
210+
"metadata": {},
211+
"output_type": "execute_result"
212+
}
213+
],
255214
"source": [
256215
"streams.close()"
257216
]
@@ -265,16 +224,18 @@
265224
},
266225
{
267226
"cell_type": "code",
268-
"execution_count": null,
227+
"execution_count": 6,
269228
"metadata": {},
270229
"outputs": [],
271230
"source": [
272231
"close_session()"
273232
]
274233
},
275234
{
276-
"cell_type": "markdown",
235+
"cell_type": "code",
236+
"execution_count": null,
277237
"metadata": {},
238+
"outputs": [],
278239
"source": []
279240
}
280241
],

0 commit comments

Comments
 (0)