'How to get a response with data from CoinAPI?
I have been working on a data science project and wrote these two scripts to write crypto data from api or update data if already present, I wrote one script to write and update data and one scripts full of helper functions, I tried de-bugging using print statements and traced back to get_lists() in helper script, when I am running my main scripts I am getting empty response from server, response.status_code returns 200 but response.text is empty. Please help, I am not posting any links to api website(coinapi) but if you want I can provide those in comments. Also I am trying to write a function to make api calls with different keys if limit exceeds with current key, any suggestion on that.
download_update_csv.py
#!/usr/bin/env python3
"""
Download or Update data
"""
import os
import sys
import traceback
from datetime import date, timedelta
from preprocess import Preprocess
preprocess = Preprocess()
date_today = date.today()
start_date = date_today - timedelta(days=180)
filepath = './unprocessed_data/'
coins = {'btcusd': ['BTC/USD', './unprocessed_data/btcusd.csv'], 'ethusd': ['ETH/USD', './unprocessed_data/ethusd.csv']}
key = "get your key from coinapi it's free"
headers = {'X-CoinAPI-Key' : key}
def download_csv(coin: list, start: date, end: date) -> None:
"""
def download_csv(coin: list, start: date, end: date) -> None
writes coin.csv files in uprocessed_data folder
"""
url = f"https://rest.coinapi.io/v1/exchangerate/{coin[0]}/history?period_id=1HRS&time_start={start}T00:00:00&time_end={end}T00:00:00&limit=100000"
lists = preprocess.get_lists(url, headers)
time_step, rate_close = preprocess.list_from_dict(lists)
print(f"Writing {coin[0]}.csv file")
preprocess.write_csv(filepath=coin[1], time_steps=time_step, rate_close=rate_close)
def update_csv(coin: list, start: date, end: date) -> None:
"""
def update_csv(coin: list) -> None
updates coin.csv with new data in uprocessed_data folder
"""
url = f"https://rest.coinapi.io/v1/exchangerate/{coin[0]}/history?period_id=1HRS&time_start={start}T00:00:00&time_end={end}T00:00:00&limit=100000"
lists = preprocess.get_lists(url, headers)
time_steps, rate_close = preprocess.list_from_dict(lists)
preprocess.write_csv(filepath=coin[1], time_steps=time_steps, rate_close=rate_close)
def main() -> None:
"""
Driver Function
"""
try:
if not os.path.exists(filepath):
print(f'Creating folder {filepath}')
os.mkdir(filepath)
for coin in coins.values():
if os.path.exists(coin[1]):
last_date = preprocess.get_last_date(coin[1])
update_csv(coin=coin, start=date_today, end=last_date)
else:
download_csv(coin=coin, start=date_today, end=start_date)
except Exception:
print(traceback.format_exc())
if __name__ == "__main__":
sys.exit(main())
preprocess.py
import requests
import json
import csv
import matplotlib.pyplot as plt
import os
from datetime import datetime
class Preprocess:
"""Helper functions for preprocessing and saving and retrieving crypto-exchangerate data"""
def __init__(self):
pass
def get_lists(self, url: str, headers: dict) -> list:
"""
def get_lists(self, url: str, headers: dict) -> list
returns list_of_dictionaries
"""
response = requests.get(url=url, headers=headers)
lists = json.loads(response.text)
return lists
def list_from_dict(self, lists: list) -> tuple:
"""
def list_from_dict(self, lists: list) -> tuple
returns (time_steps, rate_close)
"""
time_steps = ['time']
rate_close = ['close']
for dictionary in lists:
date_string = dictionary.get('time_close')[:-9]
datetimeObj = datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S")
time_steps.append(datetimeObj.timestamp())
rate_close.append(dictionary.get('rate_close'))
return (time_steps, rate_close)
def write_csv(self, filepath: str, time_steps: list, rate_close: list) -> None:
"""
def write_csv(self, filename: str, time_steps: list, rate_close: list) -> None
writes time_steps and rate_low to a csv file
"""
if len(time_steps) == 0 or len(rate_close) == 0:
raise Exception('Empty lists, no useful data returned.')
with open(filepath, 'a+') as csvfile:
writer = csv.writer(csvfile)
for i in range(len(time_steps)):
writer.writerow([time_steps[i], rate_close[i]])
def read_csv(self, filepath: str) -> tuple:
"""
def read_csv(self, filepath: str) -> tuple
returns time_steps and rate_low lists
"""
time_steps = []
rate_close = []
with open(filepath) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
next(reader)
for column in reader:
time_steps.append(float(column[0]))
rate_close.append(float(column[1]))
return (time_steps, rate_close)
def plot_series(time: list, series: list, format="-", start=0, end=None, name='fig.jpg', figsize=(10, 6)) -> None:
"""
def plot_series(time: list, series: list, format="-", start=0, end=None, name='fig.jpg', figsize=(10, 6))
plots figure for given parameters
"""
plt.figure(figsize=figsize)
plt.plot(time[start:end], series[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
plt.savefig(name)
def get_last_date(self, filepath: str) -> datetime:
"""
def get_last_date(coin: list
returns last date when coin.csv file was updated
"""
time_steps_coin, _ = self.read_csv(filepath)
last_date_coin = datetime.fromtimestamp(time_steps_coin[-1])
return last_date_coin
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
