use std::sync::Arc; use futures::{Stream, StreamExt}; use pyo3::{ exceptions::PyStopAsyncIteration, pymethods, pymodule, types::PyModule, PyObject, PyRef, PyResult, Python, }; /// Here we define our Rust type, /// that implements the Stream trait. /// /// It iterates from 1 to i. pub struct RustStreamer { i: u32, current: u32, } impl RustStreamer { pub fn new(i: u32) -> Self { RustStreamer { i, current: 0 } } } /// Here goes stream implementation. /// /// /// It's a simple stream. On each poll_next call it returns next value. /// If current value is equal to i, it returns None, which means, /// that stream is finished. impl Stream for RustStreamer { type Item = u32; fn poll_next( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let this = self.get_mut(); if this.current < this.i { this.current += 1; std::task::Poll::Ready(Some(this.current)) } else { std::task::Poll::Ready(None) } } } /// Here I defined a class that can be used, /// as an async iterator. /// /// It's a simple class, that has an inner field, /// which is an object that implements the Stream trait. /// /// But I wrap it in Mutex<...> to make it thread safe /// and shareable between tokio-threads. Arc here, because /// it's cheap to clone. /// /// Also, without mutex, it's not possible to mutate /// the data inside the Arc. #[pyo3::pyclass] struct TestIterator { pub inner: Arc>, } #[pymethods] impl TestIterator { #[new] fn new(i: u32) -> Self { TestIterator { inner: Arc::new(tokio::sync::Mutex::new(RustStreamer::new(i))), } } /// We don't want to create another classes, we want this /// class to be iterable. Since we implemented __anext__ method, /// we can return self here. fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } /// This is an anext implementation. /// /// Notable thing here is that we return PyResult>. /// We cannot return &PyAny directly here, because of pyo3 limitations. /// Here's the issue about it: https://github.com/PyO3/pyo3/issues/3190 fn __anext__<'a>(&self, py: Python<'a>) -> PyResult> { // Here we clone the inner field, so we can use it // in our future. let streamer = self.inner.clone(); let future = pyo3_asyncio::tokio::future_into_py(py, async move { // Here we lock the mutex to access the data inside // and call next() method to get the next value. let val = streamer.lock().await.next().await; match val { Some(val) => Ok(val), // Here we return PyStopAsyncIteration error, // because python needs exceptions to tell that iterator // has ended. None => Err(PyStopAsyncIteration::new_err("The iterator is exhausted")), } }); Ok(Some(future?.into())) } } #[pymodule] fn _internal(_py: Python<'_>, pymod: &PyModule) -> PyResult<()> { pymod.add_class::()?; Ok(()) }