gradyzhuo/swift-kurrentdb
**A modern, type-safe Swift client for Kurrent (formerly EventStoreDB)**
Why swift-kurrentdb?
Event Sourcing is a powerful pattern for building scalable, auditable systems. swift-kurrentdb brings this capability to the Swift ecosystem with a modern, type-safe client.
- Native Swift — Designed for Swift from the ground up, not a wrapper
- Modern Concurrency — Full async/await with Swift 6 data-race safety
- Compile-Time Safety — Swift 6 strict concurrency compliance with typed throws
- Cluster-Ready — First-class support for multi-node TLS clusters
- Well-Documented — Comprehensive guides on Swift Package Index
- Typed Errors — All operations throw
KurrentErrorfor precise error handling
Quick Start
Installation
Add to your Package.swift:
// Stable release
dependencies: [
.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "1.12.1")
]2.0.0 Beta available — A major new release is in active development. The 2.x API introduces a target-based design with improved type safety and composability.
.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "2.0.0")See the Migration Guide below for what's changed.
Connect to a Cluster
import KurrentDB
// Local development — single node
let settings = ClientSettings.localhost()
.authenticated(.credentials(username: "admin", password: "changeit"))
// Local development — multi-node TLS cluster
let settings = ClientSettings.localhost(ports: 2111, 2112, 2113)
.secure(true)
.tlsVerifyCert(false)
.authenticated(.credentials(username: "admin", password: "changeit"))
.cerificate(path: "/path/to/ca.crt")
// Production — remote cluster (TLS enabled by default)
let settings = ClientSettings.remote(
"node1.example.com:2113",
"node2.example.com:2113",
"node3.example.com:2113"
).authenticated(.credentials(username: "admin", password: "changeit"))
// Connection string
let settings: ClientSettings = "esdb://admin:changeit@node1:2113,node2:2113?tls=true"
let client = KurrentDBClient(settings: settings)Append and Read Events
// Create an event
let event = EventData(
eventType: "OrderPlaced",
model: ["orderId": "order-123", "total": 99.99]
)
// Append to stream
try await client.appendToStream("orders", events: [event]) {
$0.revision(expected: .any)
}
// Read events
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .start).limit(10)
}
for try await response in responses {
if let event = try response.event {
print("Event: \(event.record.eventType)")
}
}API Overview
Streams
// Append
try await client.appendToStream("orders", events: [event]) {
$0.revision(expected: .streamExists)
}
// Read forward
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .start).limit(50)
}
// Read backward
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .end).limit(10).backward()
}
// Read $all
let allResponses = try await client.readAllStreams {
$0.limit(100)
}
// Subscribe (catch-up)
let subscription = try await client.subscribeStream("orders")
for try await event in subscription.events { ... }
// Subscribe to $all
let subscription = try await client.subscribeAllStreams()
// Delete / tombstone
try await client.deleteStream("orders")
try await client.tombstoneStream("orders")
// Stream metadata
try await client.setStreamMetadata("orders", metadata: metadata)
let metadata = try await client.getStreamMetadata("orders")Projections
// Create
try await client.createContinuousProjection(name: "order-count", query: js)
try await client.createOneTimeProjection(query: js)
try await client.createTransientProjection(name: "temp", query: js)
// Lifecycle
try await client.enableProjection(name: "order-count")
try await client.disableProjection(name: "order-count")
try await client.abortProjection(name: "order-count")
try await client.resetProjection(name: "order-count")
try await client.deleteProjection(name: "order-count")
// Query state / result
let state: CountResult? = try await client.getProjectionState(of: CountResult.self, name: "order-count")
let result: Int? = try await client.getProjectionResult(of: Int.self, name: "order-count")
// List
let continuous = try await client.listAllProjections(mode: .continuous)
let all = try await client.listAllProjections(mode: .any)Persistent Subscriptions
// Create a subscription group
try await client.createPersistentSubscription(
stream: "orders",
groupName: "order-workers"
) {
$0.startFrom(revision: .start)
.maxRetryCount(5)
}
// Subscribe and process events
let subscription = try await client.subscribePersistentSubscription(
stream: "orders",
groupName: "order-workers"
)
for try await result in subscription.events {
do {
// handle event
try await subscription.ack(readEvents: result.event)
} catch {
try await subscription.nack(readEvents: result.event, action: .park, reason: "\(error)")
}
}
// $all persistent subscription
try await client.createPersistentSubscriptionToAllStream(groupName: "all-workers")
let allSub = try await client.subscribePersistentSubscriptionToAllStreams(groupName: "all-workers")
// Update / delete
try await client.updatePersistentSubscription(stream: "orders", groupName: "order-workers") { $0 }
try await client.deletePersistentSubscription(stream: "orders", groupName: "order-workers")User Management
// Create a user
try await client.createUser(
loginName: "jane",
password: "secure_password",
fullName: "Jane Doe",
groups: ["ops"]
)
// Manage user
try await client.enableUser(loginName: "jane")
try await client.disableUser(loginName: "jane")
try await client.changeUserPassword(loginName: "jane", currentPassword: "old", newPassword: "new")
try await client.resetUserPassword(loginName: "jane", newPassword: "reset")Server Operations
// Scavenge
let response = try await client.startScavenge(threadCount: 2, startFromChunk: 0)
try await client.stopScavenge(scavengeId: response.scavengeId)
// System
try await client.mergeIndexes()
try await client.restartPersistentSubscriptions()
// Node
try await client.resignNode()
try await client.setNodePriority(priority: 10)Cluster Gossip
let members = try await client.readCluster()
for member in members {
print("\(member.httpEndPoint.host):\(member.httpEndPoint.port) — \(member.state)")
}
if let leader = members.first(where: { $0.state == .leader && $0.isAlive }) {
print("Leader: \(leader.httpEndPoint)")
}Monitoring
let stats = try await client.stats(refreshTimePeriodInMs: 5000)
for try await snapshot in stats {
print("Metrics: \(snapshot.stats.count) entries")
}Migration Guide
Version 2.0.0 introduces a breaking redesign of the API. The flat methods on KurrentDBClient are replaced by a target-based, hierarchical style:
// 1.x
try await client.appendToStream("orders", events: [event]) { ... }
// 2.x
try await client.streams(of: .specified("orders")).append(events: [event]) { ... }The 1.x API moves to KurrentDB_V1
In 2.x the old flat-method API is no longer part of the KurrentDB module. It has been moved to a separate KurrentDB_V1 library that ships in the same package. If you are not ready to migrate immediately, switch your dependency target and import:
// Package.swift
.product(name: "KurrentDB_V1", package: "swift-kurrentdb")// Replace your existing import
import KurrentDB_V1 // was: import KurrentDBKurrentDB_V1 gives you access to all 1.x methods (marked @deprecated) while you migrate to the new API at your own pace.
👉 Full Migration Guide — 1.x to 2.x
Features
| Category | Operations | |----------|-----------| | Streams | Append, read, delete, subscribe (catch-up), $all stream | | Persistent Subscriptions | Create, subscribe, update, delete, ACK/NAK, $all support | | Projections | Create (continuous/one-time/transient), enable, disable, state, result | | Users | Create, enable, disable, update, change/reset password | | Operations | Scavenge (start/stop), merge indexes, shutdown, node priority | | Gossip | Cluster discovery, node health, leader detection | | Monitoring | Real-time server statistics | | Connection | TLS/SSL, cluster gossip discovery, auto-reconnection, keep-alive |
Test Coverage
75% line coverage across the KurrentDB module, measured by running all 174 tests against a live 3-node TLS KurrentDB cluster.
Coverage by Subsystem
| Subsystem | Line Coverage | Lines | |-----------|:------------:|------:| | Monitoring | 88.4% | 95 | | ServerFeatures | 90.2% | 61 | | Users | 87.6% | 403 | | Streams | 86.1% | 1,560 | | Operations | 83.0% | 235 | | Projections | 77.3% | 865 | | PersistentSubscriptions | 71.9% | 1,555 | | Gossip | 66.9% | 142 | | Core | 66.8% | 2,650 | | KurrentDB (total) | 75.0% | 7,581 |
Test Suites
174 tests across 9 integration suites and 2 unit/mock suites. All integration tests run against a live 3-node TLS KurrentDB cluster.
| Suite | Tests | Type | Key Scenarios | |-------|------:|------|---------------| | StreamsTests | 35 | Integration | Append, read (forward/backward/limit/revision), subscribe, metadata, optimistic concurrency, delete, tombstone | | ProjectionsTests | 16 | Integration | Create (continuous/one-time/transient), enable/disable, abort, reset, state/result query, list | | PersistentSubscriptionsTests | 11 | Integration | Create, subscribe, ACK, NACK (park/retry), getInfo, update settings, list, delete, replay parked | | UsersTests | 7 | Integration | Create, enable/disable, update, change/reset password | | OperationsTests | 6 | Integration | Scavenge (start/stop), merge indexes, restart persistent subscriptions, node priority, resign | | GossipTests | 3 | Integration | Read cluster members, node state, custom timeout | | MonitoringTests | 3 | Integration | Server stats, refresh interval, metadata flag | | KurrentCoreTests | 67 | Unit | Connection string parsing, EventData, projection status, stream identifiers, metadata, subscription filters | | MockClientTests | 26 | Mock/DI | KurrentDBClientProtocol conformance, all factory call patterns, 5 domain service scenarios | | Total | 174 | | 0 commented-out tests |
Optimistic Concurrency
Streams write-side error paths are explicitly covered:
| Scenario | Expected Error | |----------|---------------| | Append at stale revision (.at(99), stream at 0) | wrongExpectedVersion | | Append with .noStream to an existing stream | wrongExpectedVersion | | Two concurrent writers at the same revision | One succeeds, one wrongExpectedVersion |
Persistent Subscription Lifecycle
| Scenario | Verified | |----------|---------| | Create → subscribe → append → ACK | ✓ | | NACK with park (dead-letter queue) | ✓ | | NACK with retry (re-delivery, deliveries == 2) | ✓ | | getInfo (groupName, eventSource, $all) | ✓ | | Update settings → getInfo confirms change | ✓ | | park → replayParked → re-delivered → ACK | ✓ |
Requirements
- Swift 6.0+
- macOS 15+ / iOS 18+ / tvOS 18+ / watchOS 11+ / visionOS 2+ / Linux
KurrentDB Server Compatibility
| Server Version | Status | Notes | |:--------------:|:------:|-------| | KurrentDB 26.0 | ✅ Supported | Full feature support | | KurrentDB 25.1 | ✅ Supported | Full feature support | | EventStoreDB 24.x | ✅ Supported | Core features supported; KurrentDB v2 batch append not available |
Local Development with Docker
Start a 3-node TLS cluster:
cd server
docker compose up -dThis generates TLS certificates automatically and starts nodes on ports 2111, 2112, and 2113.
Or a single insecure node for quick testing:
docker run --rm -d -p 2113:2113 \
-e KURRENTDB_CLUSTER_SIZE=1 \
-e KURRENTDB_RUN_PROJECTIONS=All \
-e KURRENTDB_START_STANDARD_PROJECTIONS=true \
-e KURRENTDB_INSECURE=true \
-e KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP=true \
docker.kurrent.io/kurrent-latest/kurrentdb:25.1Documentation
| Guide | Description | |-------|-------------| | Migration Guide (1.x → 2.x) | What changed in 2.0 and how to update your code | | Getting Started | Connection settings, first event, basic usage | | Appending Events | EventData, concurrency control, idempotency | | Reading Events | Forward/backward reading, $all stream, filters | | Projections | Create, manage, and query projection state | | Persistent Subscriptions | Competing consumers, ACK/NAK, subscription groups | | User Management | Create, enable, disable, password management | | Cluster Gossip | Cluster discovery, node health monitoring | | Monitoring | Real-time server statistics | | Server Operations | Scavenge, index merge, shutdown, node management | | Full API Reference | Complete API documentation |
Contributing
Contributions are welcome! Whether it's bug reports, feature requests, documentation improvements, or code contributions.
- GitHub Discussions — Ask questions, share ideas
- Issues — Report bugs
- Contributing Guide — Get started contributing
License
MIT License — see LICENSE for details.
Acknowledgments
Built with:
- grpc-swift — Swift gRPC implementation
- swift-nio — Non-blocking I/O
Inspired by official Kurrent/EventStoreDB clients.
Made by Grady Zhuo
Package Metadata
Repository: gradyzhuo/swift-kurrentdb
Default branch: main
README: README.md