Contents

swift-server/swift-kafka-client

The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native [librdkafka](http

Features

  • Async/await producer with awaitable delivery acknowledgements (sendAndAwait)
  • High-throughput fire-and-forget producer with batched delivery reports
  • AsyncSequence-based consumer with automatic rebalancing
  • At-least-once delivery semantics via manual offset storage
  • Consumer group support with dynamic subscription management
  • Pause/resume partition consumption
  • Typed error handling with retriable and fatal error classification
  • SASL and TLS authentication
  • Integration with swift-service-lifecycle, swift-log, and swift-metrics
  • Full librdkafka configuration exposed as typed Swift properties

Adding Kafka as a Dependency

To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

Include "Kafka" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),
]),

Finally, add import Kafka to your source code.

Usage

Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.

Producer API

The sendAndAwait(_:) method produces a message and asynchronously awaits broker acknowledgement — giving you confirmation of exactly which partition and offset your message landed at, without blocking any threads:

var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]

let producer = try KafkaProducer(config: config, logger: logger)

let serviceGroup = ServiceGroup(
    services: [producer],
    configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
    logger: logger
)

try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { try await serviceGroup.run() }

    group.addTask {
        let message = KafkaProducerMessage(topic: "topic-name", value: "Hello, World!")
        let report = try await producer.sendAndAwait(message)
        switch report.status {
        case .acknowledged(let ack):
            print("Delivered to partition \(ack.partition) at offset \(ack.offset)")
        case .failure(let error):
            print("Delivery failed: \(error)")
        }
    }
}

For high-throughput pipelines where you need to maximize send rate, use the fire-and-forget send(_:) method and process delivery reports in batches through the events sequence:

var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    config: config,
    logger: logger
)

let serviceGroup = ServiceGroup(
    services: [producer],
    configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
    logger: logger
)

try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { try await serviceGroup.run() }

    group.addTask {
        let message = KafkaProducerMessage(topic: "topic-name", value: "Hello, World!")
        try producer.send(message)

        for await event in events {
            switch event {
            case .deliveryReports(let reports):
                for report in reports {
                    // Handle delivery acknowledgement
                }
            default:
                break
            }
        }
    }
}

Consumer API

Messages are delivered as an AsyncSequence — consume them with a standard for try await loop that integrates naturally with Swift concurrency, structured tasks, and cancellation:

Consumer Groups
var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])

let consumer = try KafkaConsumer(config: config, logger: logger)

let serviceGroup = ServiceGroup(
    services: [consumer],
    configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
    logger: logger
)

try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { try await serviceGroup.run() }

    group.addTask {
        for try await message in consumer.messages {
            print("Received: \(message.topic)/\(message.partition) at offset \(message.offset)")
        }
    }
}
At-Least-Once Processing

For at-least-once delivery semantics, disable automatic offset storage and manually store offsets after processing:

var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])
config.enableAutoOffsetStore = false

let consumer = try KafkaConsumer(config: config, logger: logger)

let serviceGroup = ServiceGroup(
    services: [consumer],
    configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
    logger: logger
)

try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { try await serviceGroup.run() }

    group.addTask {
        for try await message in consumer.messages {
            // Process message...
            try consumer.storeOffset(message)
            // Offset will be committed automatically by the background auto-commit timer
        }
    }
}
Manual Commits

To control exactly when offsets are committed to the broker:

var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])
config.enableAutoCommit = false

let consumer = try KafkaConsumer(config: config, logger: logger)

let serviceGroup = ServiceGroup(
    services: [consumer],
    configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
    logger: logger
)

try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { try await serviceGroup.run() }

    group.addTask {
        for try await message in consumer.messages {
            // Process message...
            try await consumer.commit(message)
        }
    }
}

To commit all previously stored offsets at once:

try await consumer.commit()
Dynamic Subscription Management

Topics can be changed at runtime:

// Subscribe to additional topics
try consumer.subscribe(topics: ["topic-a", "topic-b"])

// Query current subscription
let topics = try consumer.subscribedTopics()

// Unsubscribe from all topics
try consumer.unsubscribe()
Pause and Resume

Partition consumption can be temporarily paused and resumed, useful for applying backpressure or performing maintenance without leaving the consumer group:

let partition = KafkaTopicPartition(topic: "topic-name", partition: KafkaPartition(rawValue: 0))
try consumer.pause(topicPartitions: [partition])
// ... later
try consumer.resume(topicPartitions: [partition])

Security Mechanisms

Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms via the securityProtocol property.

Plaintext
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .plaintext
TLS (SSL)
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .ssl
SASL + Plaintext
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .sasl_plaintext
config.saslMechanisms = "PLAIN"
config.saslUsername = "user"
config.saslPassword = "password"
SASL + TLS
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .sasl_ssl
config.saslMechanisms = "SCRAM-SHA-256"
config.saslUsername = "user"
config.saslPassword = "password"

Error Handling

Errors from librdkafka are surfaced through the events sequence with typed error codes:

let (consumer, events) = try KafkaConsumer.makeConsumerWithEvents(config: config, logger: logger)

for await event in events {
    switch event {
    case .error(let error):
        if error.isFatal {
            // Client is irrecoverable — initiate shutdown
        } else if error.isRetriable {
            // Transient error — will likely resolve
        }
        print("Error: \(error.rdKafkaCode)")
    default:
        break
    }
}

librdkafka

The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.

Dependencies

  • macOS: brew install openssl@3
  • Linux: apt-get install libssl-dev libsasl2-dev

Development Setup

Running tests locally

Integration tests require a running Kafka broker which can be started with Docker:

docker run -d -p 9092:9092 apache/kafka:3.9.1

# After starting the container run the tests
swift test

Running tests in Docker

We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:

docker compose -f docker/docker-compose.yaml run client swift test

Alternatively you can use a Makefile target:

make docker-test

You can specify Swift compiler version using SWIFT_VERSION environment variable:

SWIFT_VERSION=6.2 make docker-test

Package Metadata

Repository: swift-server/swift-kafka-client

Default branch: main

README: README.md