Last active
November 20, 2024 10:26
-
-
Save OliverVea/af19678f1ab2334bc71f83fe79c33687 to your computer and use it in GitHub Desktop.
Retrieve, Process, Store
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime, timedelta | |
| from typing import List | |
| import csv | |
| START_TIME = datetime.now() | |
| def call_api(symbol_string) -> str: | |
| """ | |
| Please just ignore this method. It's just a mock for the sake of the example. | |
| """ | |
| import random | |
| headers = ['datetime', 'closing', 'opening', 'something else'] | |
| row_count = 100 | |
| row_datetime = START_TIME | |
| row_closing = random.random() * 100 | |
| data = [headers] | |
| for _ in range(row_count): | |
| row_datetime -= timedelta(hours=1) | |
| row_opening = row_closing | |
| row_closing += random.normalvariate(0.01, 0.1) | |
| row_something_else = random.random() | |
| row_data = [row_datetime.strftime("%d/%m/%Y-%H:%M"), row_closing, row_opening, row_something_else ] | |
| data.append(row_data) | |
| csv_data = str.join("\n", [str.join(";", [str(cell) for cell in row]) for row in data]) | |
| return csv_data | |
| class PriceHistoryEntry: | |
| def __init__(self, date: str, price: float): | |
| self.date = date | |
| self.price = price | |
| def __str__(self): | |
| return f'(date: {self.date}, price: {self.price:.2f})' | |
| class PriceHistory: | |
| def __init__(self, symbol: str, currency: str, history: List[PriceHistoryEntry]): | |
| self.symbol = symbol | |
| self.currency = currency | |
| self.history = history | |
| def summary(self): | |
| return f'PriceHistory({self.symbol}/{self.currency}):\n' + str.join("\n", [str(entry) for entry in self.history[:3]]) | |
| def get_prices_from_api(symbol: str, currency: str): | |
| symbol_string = f"{symbol.upper()}/{currency.upper()}" | |
| return call_api(symbol_string) | |
| def parse_prices(prices_from_api: str) -> List[PriceHistoryEntry]: | |
| prices: List[PriceHistoryEntry] = [] | |
| csv_entries = list(csv.reader(prices_from_api.split("\n"), delimiter=';')) | |
| for csv_row in csv_entries[1:]: | |
| row_datetime, row_closing, *_ = csv_row | |
| price_history_entry = PriceHistoryEntry( | |
| date=row_datetime, | |
| price=float(row_closing) | |
| ) | |
| prices.append(price_history_entry) | |
| return prices | |
| def get_symbol_price_history(symbol: str, currency: str) -> PriceHistory: | |
| print(f'Getting price history for {symbol} in currency {currency}') | |
| prices_from_api = get_prices_from_api(symbol, currency) | |
| print(f'Got from api:\n{prices_from_api[:150]}...') | |
| history = parse_prices(prices_from_api) | |
| price_history = PriceHistory(symbol, currency, history) | |
| print(f'Parsed into: {price_history.summary()}') | |
| return price_history | |
| def convert_price_history(symbol_price_history: PriceHistory, currency_price_history: PriceHistory) -> PriceHistory: | |
| assert symbol_price_history.currency == currency_price_history.currency, "Price history currency and conversion currency must match" | |
| print(f'Convertimg prices of {symbol_price_history.symbol} from {symbol_price_history.currency} to {currency_price_history.symbol}') | |
| currency_price_by_date = {entry.date: entry.price for entry in currency_price_history.history} | |
| converted_price_history = [] | |
| for entry in symbol_price_history.history: | |
| currency_price = currency_price_by_date[entry.date] | |
| converted_price = entry.price / currency_price | |
| converted_history_entry = PriceHistoryEntry(entry.date, converted_price) | |
| converted_price_history.append(converted_history_entry) | |
| return PriceHistory(symbol_price_history.symbol, currency_price_history.symbol, converted_price_history) | |
| def save_file_to_s3(price_history: PriceHistory): | |
| file_name = f'{price_history.symbol}/{price_history.currency}' | |
| print(f'Saving file to s3: \"{file_name}\n{price_history.summary()}\"') | |
| def save_data_to_s3(price_histories: list[PriceHistory]): | |
| for price_history in price_histories: | |
| save_file_to_s3(price_history) | |
| def main(): | |
| USD = 'USD' | |
| JPY = 'JPY' | |
| symbols = ['A', 'B', 'C'] | |
| print("> Getting price history") | |
| usd_to_jpy = get_symbol_price_history(JPY, USD) | |
| price_histories_usd: List[PriceHistory] = [] | |
| for symbol in symbols: | |
| price_history_in_usd = get_symbol_price_history(symbol, USD) | |
| price_histories_usd.append(price_history_in_usd) | |
| print("> Processing data") | |
| price_histories_in_jpy: List[PriceHistory] = [] | |
| for price_history_in_usd in price_histories_usd: | |
| price_history_in_jpy = convert_price_history(price_history_in_usd, usd_to_jpy) | |
| price_histories_in_jpy.append(price_history_in_jpy) | |
| print("> Storing data to S3") | |
| save_data_to_s3(price_histories_in_jpy) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment