1+ # Refinitiv Data Platform Library for Python
2+ ## Delivery - OMMItemStream - Market Price data via callback inside a Python Script
3+
4+ #This demonstrates how to use the OMM Item Stream interface to request streaming Quote and Trade data
5+ #via a Script as opposed to running in a Jupyter Notebook
6+
7+ import refinitiv .data as rd
8+ from refinitiv .data ._data .legacy import get_default_session , set_default_session
9+ from refinitiv .data .delivery import omm_item_stream
10+ import datetime
11+ import json
12+ import time
13+ import logging .config
14+ import configparser as cp
15+ import asyncio
16+
17+ APP_KEY = 'YOUR_APP_KEY'
18+ RDP_LOGIN = 'YOUR_REFINITIV_DATA_PLATFORM_LOGIN'
19+ RDP_PASSWORD = 'YOUR_REFINITIV_DATA_PLATFORM_PASSWORD'
20+ DEPLOYED_PLATFORM_HOST = 'THE_HOST:PORT_OF_YOUR_DEPLOYED_PLATFORM' # e.g. 'myADS:15000'
21+ DEPLOYED_PLATFORM_USER_NAME = 'YOUR_USER_NAME_ON_YOUR_DEPLOYED_PLATFORM' # DACS user name
22+
23+ # Some Global variables
24+ session = None # Our session
25+
26+ # Run for 180 seconds
27+ exit_time = time .time () + 180
28+
29+
30+ def open_session (session_type = None ):
31+ session = get_default_session ()
32+ if session is None :
33+ if session_type == "desktop" :
34+ session = rd .session .desktop .Definition (APP_KEY ).get_session ()
35+ elif session_type == "rdp" :
36+ session = rd .session .platform .Definition (
37+ app_key = APP_KEY ,
38+ grant = rd .session .platform .GrantPassword (
39+ username = RDP_LOGIN ,
40+ password = RDP_PASSWORD
41+ )
42+ ).get_session ()
43+ elif session_type == "deployed" :
44+ session = rd .session .platform .Definition (
45+ app_key = APP_KEY ,
46+ deployed_platform_host = DEPLOYED_PLATFORM_HOST ,
47+ deployed_platform_username = DEPLOYED_PLATFORM_USER_NAME
48+ ).get_session ()
49+
50+ if session is None :
51+ raise Exception (f"Wrong session_type: { session_type } . It must be ['desktop', 'rdp', 'deployed']" )
52+ else :
53+ #session.set_log_level(logging.DEBUG)
54+ session .set_log_level (logging .WARNING )
55+ set_default_session (session )
56+ session .open ()
57+ return session
58+
59+ def close_session ():
60+ session = get_default_session ()
61+ if session :
62+ session .close ()
63+
64+ # Callback function to display data or status events
65+ # Function to handle the intial Refresh for each item
66+ def handle_refresh (streaming_prices , instrument_name , fields ):
67+ # One way to access data - get dataframe
68+ print (f"Refresh : { streaming_prices .get_snapshot ()} " )
69+ return
70+
71+ # Function to update dataframe, when we receive updates for individual items
72+ def handle_update (streaming_prices , instrument_name , fields ):
73+ # Alternative way of accesing data - access the updated fields
74+ print (f"Update : { instrument_name } :{ fields } " )
75+
76+ # Function to extract status code for an item as & when received from server
77+ # Status contains a 'code' and a more detailed 'message'
78+ def handle_status (streaming_prices , instrument_name , status ):
79+ print (f"Status : { instrument_name } :{ status ['code' ]} :{ status ['message' ]} " )
80+
81+ # Our main code section
82+
83+ # Open a session using the helper functions in the above Credentials section
84+ open_session ('deployed' )
85+
86+ # Define our Streaming Price object
87+ streams = rd .content .pricing .Definition (
88+ ['EUR=' , 'GBP=' , 'JPY=' , 'BADRIC' ],
89+ fields = ['BID' , 'ASK' ]
90+ ).get_stream ()
91+
92+ # Callback for if we wanted to handle invidiual Refresh for each item
93+ streams .on_refresh (handle_refresh )
94+ # Specify callback handler for any updates
95+ streams .on_update (handle_update )
96+ # Specify callback handler for any updates
97+ streams .on_status (handle_status )
98+
99+ # Send the requests to the server and open the streams for all items
100+ streams .open ()
101+ # We should now start to receive the initial Refresh for the current field values
102+ # followed by updates for the fields as and when they occur
103+
104+ # Loop until specified end time
105+ while (time .time () < exit_time ):
106+ # The following line ensures the async event callback mechanism works
107+ asyncio .get_event_loop ().run_until_complete (asyncio .sleep (1 ))
108+
109+ streams .close ()
110+ close_session ()
0 commit comments