'Convert Tick Data to OHLC in Realtime with Python Pandas?

I know how to convert static Tick data to OHLC Candlestick data using resampling with Pandas Module of Python.

But how can I do this in Realtime (Just like a Websocket sends it)?

Let's say I am using this while loop code to prepare Tick Data

import time, requests

ohlc []
while True:
    r = requests.get('https://api1.binance.com/api/v3/ticker/price?symbol=ETHUSDT')
    resp_dict = r.json()
    time.time()
    print({'time' : time.time(), 'price' : resp_dict["price"]})

Now, how can I keep resample this data in Realtime with Pandas (Just like a Websocket keeps sending us this OHLC data every second which makes it possible to plot candlestick data in realtime)?

Thanks in Advance



Solution 1:[1]

I mean, unless you wanna deploy a data provider business it's all cool to make a standalone autonomous bot. In realtime you gotta actually aggregate OHLC bars live.

  1. Ticks(trades) via WebSocket (or another streaming API); ->
  2. Queue to hold dem ticks; ->
  3. logic to take em ticks from the queue, send em to an aggregator logic and pop em out of the queue so we won't accidentally process one tick several times; ->
  4. Logic to aggregate OHLCV live;
  5. Logic to tell when to start constructing a new bar.

FTX exchange & Python, 15 sec chart aggregation. FTX has an "official" WebSocket Client, I modified it to process all the ticks correctly;

def _handle_trades_message(self, message: Dict) -> None:
    self._trades[message['market']].extend(reversed(message['data']))

After that, that's what I got:

import ftx
import time
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datetime                          import datetime

def process_tick(tick):
    global data
    global flag
    global frontier
    
    if (flag == True) and (tick['time'] >= frontier):
        start_time = datetime.utcnow().isoformat() #"almost"
        time_      = time.time() * 1000 # with higher precision
        op         = tick['price']
        hi         = tick['price']
        lo         = tick['price']
        cl         = tick['price']
        vol        = tick['size' ]
        row        = {'startTime' : start_time, 
                      'time'      : time_     , 
                      'open'      : op        , 
                      'high'      : hi        , 
                      'low'       : lo        , 
                      'close'     : cl        ,
                      'volume'    : vol        }
        data       = data.append(row, True)
        flag = False
        print('Opened')
        print(tick)
    else:
        
        if   (tick['price'] > data['high'].iloc[-1]):
            data['high'].iloc[-1] = tick['price']
        elif (tick['price'] < data['low' ].iloc[-1]):
            data['low' ].iloc[-1] = tick['price']

        data['close' ].iloc[-1]  = tick['price']
        data['volume'].iloc[-1] += tick['size' ]
        print(tick)
     
def on_tick():
    
    while True:

        try:

            try:
                process_tick(trades[-1])
                trades.pop()
            except IndexError:
                pass

            time.sleep(0.001)
        except KeyboardInterrupt:
            client_ws._reset_data()
            scheduler.remove_job('onClose')
            scheduler.shutdown()
            print('Shutdown')
            break
    
def on_close():
    global flag
    global frontier
    flag     = True
    frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
    print('Closed')

ASSET = 'LTC-PERP'
RES   = '15'

flag      = True
frontier  = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
cols      = ['startTime', 'time', 'open', 'high', 'low', 'close', 'volume']
data      = pd.DataFrame(columns=cols)
client_ws = ftx.FtxClientWs()
client_ws._subscribe({'channel': 'trades', 'market': ASSET})
trades    = client_ws._trades[ASSET]
scheduler = BackgroundScheduler()
scheduler.configure(timezone='utc')
scheduler.add_job(on_close, trigger='cron', second='0, 15, 30, 45', id='onClose')
scheduler.start()    
on_tick()

On top of my stuff, just make on_tick() run in its own thread, data variable will be constantly updated in the background and hold the OHLCV, so you'll be able to use this data within the same program.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 gorx1