will-lumley/asynccombine
AsyncCombine brings familiar Combine-style operators like `sink`, `assign`, and `store(in:)` to the world of Swift Concurrency.
✨ Features
🔗 Combine-like Syntax
- Write familiar, declarative pipelines without pulling in Combine.
- Use
.sink {}to respond to values from anyAsyncSequence. - Use
.assign(to:on:)to bind values directly to object properties (e.g.label.textColor). - Manage lifetimes with
.store(in:)on Task sets, just likeAnyCancellable.
👀 Observation Integration
- Seamlessly connect to Swift’s new Observation framework.
- Turn
@Observableproperties into streams withobserved(\.property). - Automatically replay the current value, then emit fresh values whenever the property changes.
- Perfect for keeping UI state in sync with your models.
🔁 CurrentValueSubject Replacement
- Ship values through your app with a hot, replay-1 async primitive.
CurrentValueRelay<Value>holds the latest value and broadcasts it to all listeners.- Similar to Combine’s
CurrentValueSubject, but actor-isolated and async-first. - Exposes an AsyncStream for easy consumption in UI or domain code.
🔗 Publishers.CombineLatest Replacement
- Pair two AsyncSequences and emit the latest tuple whenever either side produces a new element (after both have emitted at least once).
- Finishes when both upstream sequences finish (CombineLatest semantics).
- Cancellation of the downstream task cancels both upstream consumers.
- Plays nicely with Swift Async Algorithms (e.g. you can map, debounce, etc. before/after).
⚡ Async Algorithms Compatible
- Compose richer pipelines using Swift Async Algorithms.
- Add
.debounce,.throttle,.merge,.zip, and more to your async streams. - Chain seamlessly with AsyncCombine operators (
sink,assign, etc.). - Great for smoothing UI inputs, combining event streams, and building complex state machines.
🌍 Cross-Platform
- AsyncCombine doesn’t rely on Combine or other Apple-only frameworks.
- Runs anywhere Swift Concurrency works: iOS, macOS, tvOS, watchOS.
- Fully portable to Linux and even SwiftWasm for server-side and web targets.
- Ideal for writing platform-agnostic domain logic and unit tests.
🚀 Usage
Observe @Observable properties
Turn any @Observable property into an AsyncStream that replays the current value and then emits on every change. Chain standard AsyncSequence operators (map, filter, compactMap, ...) and finish with sink or assign.
import AsyncCombine
import Observation
@Observable @MainActor
final class CounterViewModel {
var count: Int = 0
}
let viewModel = CounterViewModel()
var subscriptions = Set<SubscriptionTask>()
// $viewModel.count → viewModel.observed(\.count)
viewModel.observed(\.count)
.map { "Count: \($0)" }
.sink { print($0) }
.store(in: &subscriptions)
viewModel.count += 1 // prints "Count: 1"Why it works: observed(_:) uses withObservationTracking under the hood and reads on MainActor, so you always get the fresh value (no stale reads).
Bind to UI (UIKit / AppKit / SpriteKit / custom objects)
// UILabel example
let label = UILabel()
viewModel.observed(\.count)
.map {
UIColor(
hue: CGFloat($0 % 360) / 360,
saturation: 1,
brightness: 1,
alpha: 1
)
}
.assign(to: \.textColor, on: label)
.store(in: &subscriptions)Works the same for NSTextField.textColor, SKShapeNode.fillColor`, your own class properties, etc.
Use CurrentValueRelay for hot, replay-1 state
CurrentValueRelay<Value> holds the latest value and broadcasts it to all listeners. stream() yields the current value immediately, then subsequent updates.
let relay = CurrentValueRelay(false)
var subs = Set<SubscriptionTask>()
relay.stream()
.map { $0 ? "ON" : "OFF" }
.sink { print($0) } // "OFF" immediately (replay)
.store(in: &subs)
Task {
await relay.send(true) // prints "ON"
await relay.send(false) // prints "OFF"
}Cancel tasks when you’re done (e.g., deinit).
subs.cancelAll()Combine multiple AsyncSequences into a single AsyncSequence
import AsyncAlgorithms
import AsyncCombine
// Two arbitrary async streams
let a = AsyncStream<Int> { cont in
Task {
for i in 1...3 {
try? await Task.sleep(nanoseconds: 100_000_000)
cont.yield(i) // 1, 2, 3
}
cont.finish()
}
}
let b = AsyncStream<String> { cont in
Task {
for s in ["A", "B"] {
try? await Task.sleep(nanoseconds: 150_000_000)
cont.yield(s) // "A", "B"
}
cont.finish()
}
}
// combineLatest-style pairing
var tasks = Set<SubscriptionTask>()
AsyncCombine.CombineLatest(a, b)
.map { i, s in "Pair: \(i) & \(s)" }
.sink { print($0) }
.store(in: &tasks)
// Prints (timing-dependent, after both have emitted once):
// "Pair: 2 & A"
// "Pair: 3 & A"
// "Pair: 3 & B"
Debounce, throttle, merge (with Swift Async Algorithms)
AsyncCombine plays nicely with [Swift Async Algorithms]. Import it to get reactive operators you know from Combine.
import AsyncAlgorithms
viewModel.observed(\.count)
.debounce(for: .milliseconds(250)) // smooth noisy inputs
.map { "Count: \($0)" }
.sink { print($0) }
.store(in: &subscriptions)You can also merge multiple streams, zip them, removeDuplicates, etc.
Lifecycle patterns (Combine-style ergonomics)
Keep your subscriptions alive as long as you need them:
final class Monitor {
private var subscriptions = Set<SubscriptionTask>()
private let vm: CounterViewModel
init(vm: CounterViewModel) {
self.vm = vm
vm.observed(\.count)
.map(String.init)
.sink { print("Count:", $0) }
.store(in: &subscriptions)
}
deinit {
subscriptions.cancelAll()
}
}Handle throwing streams (works for both throwing & non-throwing)
sink(catching:_:) uses an iterator under the hood, so you can consume throwing sequences too. If your pipeline introduces errors, add an error handler:
someThrowingAsyncSequence // AsyncSequence whose iterator `next()` can throw
.map { $0 } // your transforms here
.sink(catching: { error in
print("Stream error:", error)
}) { value in
print("Value:", value)
}
.store(in: &subscriptions)If your stream is non-throwing (e.g., AsyncStream, relay.stream()), just omit catching:.
Quick Reference
observed(\.property)→AsyncStream<Value>(replay-1, Observation-backed)sink { value in … }→ consume elements (returns Task you can cancel or.store(in:))assign(to:on:)→ main-actor property bindingCurrentValueRelay<Value>→send(_:),stream(replay: true)subscriptions.cancelAll()→ cancel everything (like clearing AnyCancellables)
SwiftUI Tip
SwiftUI already observes @Observable models. You usually don’t need observed(:) inside a View for simple UI updates—bind directly to the model. Use observed(:) when you need pipelines (debounce, merge, etc) or when binding to non-SwiftUI objects (eg., SpriteKit, UIKit).
🧪 Testing
AsyncCombine ships with lightweight testing utilities that make it easy to record, inspect, and assert values emitted by AsyncSequences. This lets you write deterministic async tests without manual loops, sleeps, or boilerplate cancellation logic.
📹 Testing with Recorder
The Recorder class helps you capture and assert values emitted by any AsyncSequence.
It continuously consumes the sequence on a background task and buffers each element, allowing your test to await them one by one with predictable timing.
import AsyncCombine
import Testing // or XCTest
@Test
func testRelayEmitsExpectedValues() async throws {
let relay = CurrentValueRelay(0)
let recorder = relay.stream().record()
await relay.send(1)
await relay.send(2)
let first = try await recorder.next()
let second = try await recorder.next()
#expect(first == 1)
#expect(second == 2)
}Recorder makes it easy to verify asynchronous behaviour without juggling timers or nested loops.
If the next value doesn’t arrive within the timeout window, it automatically reports a failure (via Issue.record or XCTFail, depending on your test framework).
🥇 Finding the First Matching Value
AsyncCombine also extends AsyncSequence with a convenience helper for asserting specific values without fully consuming the sequence.
import AsyncCombine
import Testing
@Test
func testStreamEmitsSpecificValue() async throws {
let stream = AsyncStream<Int> { cont in
cont.yield(1)
cont.yield(2)
cont.yield(3)
cont.finish()
}
// Wait for the first element equal to 2.
let match = await stream.first(equalTo: 2)
#expect(match == 2)
}This suspends until the first matching element arrives, or returns nil if the sequence finishes first.
It’s ideal when you just need to confirm that a certain value appears somewhere in an async stream.
📦 Installation
Add this to your Package.swift:
dependencies: [
.package(url: "https://github.com/will-lumley/AsyncCombine.git", from: "1.0.3")
]Or in Xcode: File > Add Packages... and paste the repo URL.
License
AsyncCombine is available under the MIT license. See the LICENSE file for more info.
Package Metadata
Repository: will-lumley/asynccombine
Default branch: main
README: README.md