'Convert Tick Data to OHLC in Realtime with Python Pandas?
I know how to convert static Tick data to OHLC Candlestick data using resampling with Pandas Module of Python.
But how can I do this in Realtime (Just like a Websocket sends it)?
Let's say I am using this while loop code to prepare Tick Data
import time, requests
ohlc []
while True:
r = requests.get('https://api1.binance.com/api/v3/ticker/price?symbol=ETHUSDT')
resp_dict = r.json()
time.time()
print({'time' : time.time(), 'price' : resp_dict["price"]})
Now, how can I keep resample this data in Realtime with Pandas (Just like a Websocket keeps sending us this OHLC data every second which makes it possible to plot candlestick data in realtime)?
Thanks in Advance
Solution 1:[1]
I mean, unless you wanna deploy a data provider business it's all cool to make a standalone autonomous bot. In realtime you gotta actually aggregate OHLC bars live.
- Ticks(trades) via WebSocket (or another streaming API); ->
- Queue to hold dem ticks; ->
- logic to take em ticks from the queue, send em to an aggregator logic and pop em out of the queue so we won't accidentally process one tick several times; ->
- Logic to aggregate OHLCV live;
- Logic to tell when to start constructing a new bar.
FTX exchange & Python, 15 sec chart aggregation. FTX has an "official" WebSocket Client, I modified it to process all the ticks correctly;
def _handle_trades_message(self, message: Dict) -> None:
self._trades[message['market']].extend(reversed(message['data']))
After that, that's what I got:
import ftx
import time
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
def process_tick(tick):
global data
global flag
global frontier
if (flag == True) and (tick['time'] >= frontier):
start_time = datetime.utcnow().isoformat() #"almost"
time_ = time.time() * 1000 # with higher precision
op = tick['price']
hi = tick['price']
lo = tick['price']
cl = tick['price']
vol = tick['size' ]
row = {'startTime' : start_time,
'time' : time_ ,
'open' : op ,
'high' : hi ,
'low' : lo ,
'close' : cl ,
'volume' : vol }
data = data.append(row, True)
flag = False
print('Opened')
print(tick)
else:
if (tick['price'] > data['high'].iloc[-1]):
data['high'].iloc[-1] = tick['price']
elif (tick['price'] < data['low' ].iloc[-1]):
data['low' ].iloc[-1] = tick['price']
data['close' ].iloc[-1] = tick['price']
data['volume'].iloc[-1] += tick['size' ]
print(tick)
def on_tick():
while True:
try:
try:
process_tick(trades[-1])
trades.pop()
except IndexError:
pass
time.sleep(0.001)
except KeyboardInterrupt:
client_ws._reset_data()
scheduler.remove_job('onClose')
scheduler.shutdown()
print('Shutdown')
break
def on_close():
global flag
global frontier
flag = True
frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
print('Closed')
ASSET = 'LTC-PERP'
RES = '15'
flag = True
frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
cols = ['startTime', 'time', 'open', 'high', 'low', 'close', 'volume']
data = pd.DataFrame(columns=cols)
client_ws = ftx.FtxClientWs()
client_ws._subscribe({'channel': 'trades', 'market': ASSET})
trades = client_ws._trades[ASSET]
scheduler = BackgroundScheduler()
scheduler.configure(timezone='utc')
scheduler.add_job(on_close, trigger='cron', second='0, 15, 30, 45', id='onClose')
scheduler.start()
on_tick()
On top of my stuff, just make on_tick() run in its own thread, data variable will be constantly updated in the background and hold the OHLCV, so you'll be able to use this data within the same program.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | gorx1 |
