""" Asyncio pipeline example program that retrieves sink speed for first 100 meetbouten. First fetches ids using list endpoint and calls detail endpoint for each id. p.s. A meetbout is a physical screw on the outside of a building which is used to determine the "sink" rate of the structure. """ import asyncio import json import random import time from dataclasses import dataclass import httpx client = httpx.AsyncClient() @dataclass() class DataAB: meetbout_id: int @dataclass() class DataBC: meetbout_id: int zakkingsnelheid: float result = [] async def sleep_about(t: float): sleep_s = t + 0.5 * random.random() await asyncio.sleep(sleep_s) async def do_stepA(queue_out, N): r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/?page_size={min(100, N)}') data = json.loads(r.content) meetbouten = data.get('results') print(f'A, #meetbouten: {len(meetbouten)}') ids = [x.get('id') for x in meetbouten[:N]] print(f'A, meetbouten ids: {ids}') for id in ids: await queue_out.put(DataAB(id)) async def do_stepB(queue_in, queue_out, task_idx): while True: data: DataAB = await queue_in.get() # perform actual step id = data.meetbout_id r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/{id}/') data = json.loads(r.content) zakkingsnelheid = data.get("zakkingssnelheid") print(f'B {task_idx}, meetbout {id} sink speed {zakkingsnelheid}') await queue_out.put(DataBC(id, zakkingsnelheid)) queue_in.task_done() async def do_stepC(queue_in): """ Accumulates results, for demonstration purpose only """ global result while True: data: DataBC = await queue_in.get() # perform actual step result.append({ 'id': data.meetbout_id, 'sink_speed': data.zakkingsnelheid }) queue_in.task_done() async def main(): t0 = time.time() N_MEETBOUTEN = 100 N_STAGE_B_TASKS = 2 queue_AB = asyncio.Queue() queue_BC = asyncio.Queue() stepA = asyncio.create_task(do_stepA(queue_AB, N_MEETBOUTEN)) stepsB = [asyncio.create_task(do_stepB(queue_AB, queue_BC, task_idx)) for task_idx in range(N_STAGE_B_TASKS)] stepC = asyncio.create_task(do_stepC(queue_BC)) await stepA print('step A done') await queue_AB.join() print('queue A - B done') for step in stepsB: step.cancel() # no more date is going to show up at B await queue_BC.join() print('queue B - C done') stepC.cancel() # no more date is going to show up at C print(f'main complete, result: {result[:3]}...') difference = time.time() - t0 print(f'total script time: {round(difference, 3)}s') asyncio.run(main()) print('program complete')