Contents

gradyzhuo/swift-kurrentdb

**A modern, type-safe Swift client for Kurrent (formerly EventStoreDB)**

Why swift-kurrentdb?

Event Sourcing is a powerful pattern for building scalable, auditable systems. swift-kurrentdb brings this capability to the Swift ecosystem with a modern, type-safe client.

  • Native Swift — Designed for Swift from the ground up, not a wrapper
  • Modern Concurrency — Full async/await with Swift 6 data-race safety
  • Compile-Time Safety — Swift 6 strict concurrency compliance with typed throws
  • Cluster-Ready — First-class support for multi-node TLS clusters
  • Well-Documented — Comprehensive guides on Swift Package Index
  • Typed Errors — All operations throw KurrentError for precise error handling

Quick Start

Installation

Add to your Package.swift:

// Stable release
dependencies: [
    .package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "1.12.1")
]

2.0.0 Beta available — A major new release is in active development. The 2.x API introduces a target-based design with improved type safety and composability.

.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "2.0.0")

See the Migration Guide below for what's changed.


Connect to a Cluster

import KurrentDB

// Local development — single node
let settings = ClientSettings.localhost()
    .authenticated(.credentials(username: "admin", password: "changeit"))

// Local development — multi-node TLS cluster
let settings = ClientSettings.localhost(ports: 2111, 2112, 2113)
    .secure(true)
    .tlsVerifyCert(false)
    .authenticated(.credentials(username: "admin", password: "changeit"))
    .cerificate(path: "/path/to/ca.crt")

// Production — remote cluster (TLS enabled by default)
let settings = ClientSettings.remote(
    "node1.example.com:2113",
    "node2.example.com:2113",
    "node3.example.com:2113"
).authenticated(.credentials(username: "admin", password: "changeit"))

// Connection string
let settings: ClientSettings = "esdb://admin:changeit@node1:2113,node2:2113?tls=true"

let client = KurrentDBClient(settings: settings)

Append and Read Events

// Create an event
let event = EventData(
    eventType: "OrderPlaced",
    model: ["orderId": "order-123", "total": 99.99]
)

// Append to stream
try await client.appendToStream("orders", events: [event]) {
    $0.revision(expected: .any)
}

// Read events
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .start).limit(10)
}

for try await response in responses {
    if let event = try response.event {
        print("Event: \(event.record.eventType)")
    }
}

API Overview

Streams

// Append
try await client.appendToStream("orders", events: [event]) {
    $0.revision(expected: .streamExists)
}

// Read forward
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .start).limit(50)
}

// Read backward
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .end).limit(10).backward()
}

// Read $all
let allResponses = try await client.readAllStreams {
    $0.limit(100)
}

// Subscribe (catch-up)
let subscription = try await client.subscribeStream("orders")
for try await event in subscription.events { ... }

// Subscribe to $all
let subscription = try await client.subscribeAllStreams()

// Delete / tombstone
try await client.deleteStream("orders")
try await client.tombstoneStream("orders")

// Stream metadata
try await client.setStreamMetadata("orders", metadata: metadata)
let metadata = try await client.getStreamMetadata("orders")

Projections

// Create
try await client.createContinuousProjection(name: "order-count", query: js)
try await client.createOneTimeProjection(query: js)
try await client.createTransientProjection(name: "temp", query: js)

// Lifecycle
try await client.enableProjection(name: "order-count")
try await client.disableProjection(name: "order-count")
try await client.abortProjection(name: "order-count")
try await client.resetProjection(name: "order-count")
try await client.deleteProjection(name: "order-count")

// Query state / result
let state: CountResult? = try await client.getProjectionState(of: CountResult.self, name: "order-count")
let result: Int? = try await client.getProjectionResult(of: Int.self, name: "order-count")

// List
let continuous = try await client.listAllProjections(mode: .continuous)
let all = try await client.listAllProjections(mode: .any)

Persistent Subscriptions

// Create a subscription group
try await client.createPersistentSubscription(
    stream: "orders",
    groupName: "order-workers"
) {
    $0.startFrom(revision: .start)
      .maxRetryCount(5)
}

// Subscribe and process events
let subscription = try await client.subscribePersistentSubscription(
    stream: "orders",
    groupName: "order-workers"
)

for try await result in subscription.events {
    do {
        // handle event
        try await subscription.ack(readEvents: result.event)
    } catch {
        try await subscription.nack(readEvents: result.event, action: .park, reason: "\(error)")
    }
}

// $all persistent subscription
try await client.createPersistentSubscriptionToAllStream(groupName: "all-workers")
let allSub = try await client.subscribePersistentSubscriptionToAllStreams(groupName: "all-workers")

// Update / delete
try await client.updatePersistentSubscription(stream: "orders", groupName: "order-workers") { $0 }
try await client.deletePersistentSubscription(stream: "orders", groupName: "order-workers")

User Management

// Create a user
try await client.createUser(
    loginName: "jane",
    password: "secure_password",
    fullName: "Jane Doe",
    groups: ["ops"]
)

// Manage user
try await client.enableUser(loginName: "jane")
try await client.disableUser(loginName: "jane")
try await client.changeUserPassword(loginName: "jane", currentPassword: "old", newPassword: "new")
try await client.resetUserPassword(loginName: "jane", newPassword: "reset")

Server Operations

// Scavenge
let response = try await client.startScavenge(threadCount: 2, startFromChunk: 0)
try await client.stopScavenge(scavengeId: response.scavengeId)

// System
try await client.mergeIndexes()
try await client.restartPersistentSubscriptions()

// Node
try await client.resignNode()
try await client.setNodePriority(priority: 10)

Cluster Gossip

let members = try await client.readCluster()

for member in members {
    print("\(member.httpEndPoint.host):\(member.httpEndPoint.port)\(member.state)")
}

if let leader = members.first(where: { $0.state == .leader && $0.isAlive }) {
    print("Leader: \(leader.httpEndPoint)")
}

Monitoring

let stats = try await client.stats(refreshTimePeriodInMs: 5000)

for try await snapshot in stats {
    print("Metrics: \(snapshot.stats.count) entries")
}

Migration Guide

Version 2.0.0 introduces a breaking redesign of the API. The flat methods on KurrentDBClient are replaced by a target-based, hierarchical style:

// 1.x
try await client.appendToStream("orders", events: [event]) { ... }

// 2.x
try await client.streams(of: .specified("orders")).append(events: [event]) { ... }

The 1.x API moves to KurrentDB_V1

In 2.x the old flat-method API is no longer part of the KurrentDB module. It has been moved to a separate KurrentDB_V1 library that ships in the same package. If you are not ready to migrate immediately, switch your dependency target and import:

// Package.swift
.product(name: "KurrentDB_V1", package: "swift-kurrentdb")
// Replace your existing import
import KurrentDB_V1   // was: import KurrentDB

KurrentDB_V1 gives you access to all 1.x methods (marked @deprecated) while you migrate to the new API at your own pace.

👉 Full Migration Guide — 1.x to 2.x


Features

| Category | Operations | |----------|-----------| | Streams | Append, read, delete, subscribe (catch-up), $all stream | | Persistent Subscriptions | Create, subscribe, update, delete, ACK/NAK, $all support | | Projections | Create (continuous/one-time/transient), enable, disable, state, result | | Users | Create, enable, disable, update, change/reset password | | Operations | Scavenge (start/stop), merge indexes, shutdown, node priority | | Gossip | Cluster discovery, node health, leader detection | | Monitoring | Real-time server statistics | | Connection | TLS/SSL, cluster gossip discovery, auto-reconnection, keep-alive |

Test Coverage

75% line coverage across the KurrentDB module, measured by running all 174 tests against a live 3-node TLS KurrentDB cluster.

Coverage by Subsystem

| Subsystem | Line Coverage | Lines | |-----------|:------------:|------:| | Monitoring | 88.4% | 95 | | ServerFeatures | 90.2% | 61 | | Users | 87.6% | 403 | | Streams | 86.1% | 1,560 | | Operations | 83.0% | 235 | | Projections | 77.3% | 865 | | PersistentSubscriptions | 71.9% | 1,555 | | Gossip | 66.9% | 142 | | Core | 66.8% | 2,650 | | KurrentDB (total) | 75.0% | 7,581 |

Test Suites

174 tests across 9 integration suites and 2 unit/mock suites. All integration tests run against a live 3-node TLS KurrentDB cluster.

| Suite | Tests | Type | Key Scenarios | |-------|------:|------|---------------| | StreamsTests | 35 | Integration | Append, read (forward/backward/limit/revision), subscribe, metadata, optimistic concurrency, delete, tombstone | | ProjectionsTests | 16 | Integration | Create (continuous/one-time/transient), enable/disable, abort, reset, state/result query, list | | PersistentSubscriptionsTests | 11 | Integration | Create, subscribe, ACK, NACK (park/retry), getInfo, update settings, list, delete, replay parked | | UsersTests | 7 | Integration | Create, enable/disable, update, change/reset password | | OperationsTests | 6 | Integration | Scavenge (start/stop), merge indexes, restart persistent subscriptions, node priority, resign | | GossipTests | 3 | Integration | Read cluster members, node state, custom timeout | | MonitoringTests | 3 | Integration | Server stats, refresh interval, metadata flag | | KurrentCoreTests | 67 | Unit | Connection string parsing, EventData, projection status, stream identifiers, metadata, subscription filters | | MockClientTests | 26 | Mock/DI | KurrentDBClientProtocol conformance, all factory call patterns, 5 domain service scenarios | | Total | 174 | | 0 commented-out tests |

Optimistic Concurrency

Streams write-side error paths are explicitly covered:

| Scenario | Expected Error | |----------|---------------| | Append at stale revision (.at(99), stream at 0) | wrongExpectedVersion | | Append with .noStream to an existing stream | wrongExpectedVersion | | Two concurrent writers at the same revision | One succeeds, one wrongExpectedVersion |

Persistent Subscription Lifecycle

| Scenario | Verified | |----------|---------| | Create → subscribe → append → ACK | ✓ | | NACK with park (dead-letter queue) | ✓ | | NACK with retry (re-delivery, deliveries == 2) | ✓ | | getInfo (groupName, eventSource, $all) | ✓ | | Update settings → getInfo confirms change | ✓ | | park → replayParked → re-delivered → ACK | ✓ |

Requirements

  • Swift 6.0+
  • macOS 15+ / iOS 18+ / tvOS 18+ / watchOS 11+ / visionOS 2+ / Linux

KurrentDB Server Compatibility

| Server Version | Status | Notes | |:--------------:|:------:|-------| | KurrentDB 26.0 | ✅ Supported | Full feature support | | KurrentDB 25.1 | ✅ Supported | Full feature support | | EventStoreDB 24.x | ✅ Supported | Core features supported; KurrentDB v2 batch append not available |

Local Development with Docker

Start a 3-node TLS cluster:

cd server
docker compose up -d

This generates TLS certificates automatically and starts nodes on ports 2111, 2112, and 2113.

Or a single insecure node for quick testing:

docker run --rm -d -p 2113:2113 \
  -e KURRENTDB_CLUSTER_SIZE=1 \
  -e KURRENTDB_RUN_PROJECTIONS=All \
  -e KURRENTDB_START_STANDARD_PROJECTIONS=true \
  -e KURRENTDB_INSECURE=true \
  -e KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP=true \
  docker.kurrent.io/kurrent-latest/kurrentdb:25.1

Documentation

| Guide | Description | |-------|-------------| | Migration Guide (1.x → 2.x) | What changed in 2.0 and how to update your code | | Getting Started | Connection settings, first event, basic usage | | Appending Events | EventData, concurrency control, idempotency | | Reading Events | Forward/backward reading, $all stream, filters | | Projections | Create, manage, and query projection state | | Persistent Subscriptions | Competing consumers, ACK/NAK, subscription groups | | User Management | Create, enable, disable, password management | | Cluster Gossip | Cluster discovery, node health monitoring | | Monitoring | Real-time server statistics | | Server Operations | Scavenge, index merge, shutdown, node management | | Full API Reference | Complete API documentation |

Contributing

Contributions are welcome! Whether it's bug reports, feature requests, documentation improvements, or code contributions.

License

MIT License — see LICENSE for details.

Acknowledgments

Built with:

Inspired by official Kurrent/EventStoreDB clients.


Made by Grady Zhuo

Package Metadata

Repository: gradyzhuo/swift-kurrentdb

Default branch: main

README: README.md