Skip to content

Instantly share code, notes, and snippets.

@fabriciovergara
Last active March 27, 2026 14:11
Show Gist options
  • Select an option

  • Save fabriciovergara/ba1332b5a3ff113fb37e2f8880f8da2c to your computer and use it in GitHub Desktop.

Select an option

Save fabriciovergara/ba1332b5a3ff113fb37e2f8880f8da2c to your computer and use it in GitHub Desktop.
extension Observable where Self : AnyObject, Self: Sendable {
func stream<T: Sendable>(for keyPath: KeyPath<Self, T>) -> AsyncStream<T> {
withObservationTrackingStream(on: self, for: keyPath)
}
}
func withObservationTrackingStream<O, T: Sendable>(
on observer: O?,
for keyPath: KeyPath<O, T>
) -> AsyncStream<T> where O: AnyObject, O: Sendable {
AsyncStream { [weak observer] continuation in
let signal = AsyncStream<Void>.makeStream()
let task = Task { @MainActor [weak observer] in
defer { signal.continuation.finish() }
while !Task.isCancelled {
let value: (value: T, token: Void)? = withObservationTracking {
guard let observer else { return nil }
return (value: observer[keyPath: keyPath], token: ())
} onChange: {
signal.continuation.yield(())
}
guard let value else { break }
continuation.yield(value.value)
var iterator = signal.stream.makeAsyncIterator()
guard await iterator.next() != nil else { break }
}
continuation.finish()
}
continuation.onTermination = { _ in
task.cancel()
}
}
}
@Observable
final class Foo {
var increment: Int = 1
init() {
Task { [weak self] in
while !Task.isCancelled {
self?.increment += 1
try? await Task.sleep(seconds: 1)
}
}
Task { [weak self] in
guard let stream = self?.stream(for: \.increment) else { return }
for await value in stream {
print("Incremented \(value)")
}
}
}
deinit {
print("Foo Deinitialized")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment