swift-server/swift-kafka-client
The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native [librdkafka](http
Features
- Async/await producer with awaitable delivery acknowledgements (
sendAndAwait) - High-throughput fire-and-forget producer with batched delivery reports
AsyncSequence-based consumer with automatic rebalancing- At-least-once delivery semantics via manual offset storage
- Consumer group support with dynamic subscription management
- Pause/resume partition consumption
- Typed error handling with retriable and fatal error classification
- SASL and TLS authentication
- Integration with swift-service-lifecycle, swift-log, and swift-metrics
- Full librdkafka configuration exposed as typed Swift properties
Adding Kafka as a Dependency
To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:
.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")Include "Kafka" as a dependency for your executable target:
.target(name: "<target>", dependencies: [
.product(name: "Kafka", package: "swift-kafka-client"),
]),Finally, add import Kafka to your source code.
Usage
Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.
Producer API
The sendAndAwait(_:) method produces a message and asynchronously awaits broker acknowledgement — giving you confirmation of exactly which partition and offset your message landed at, without blocking any threads:
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
let producer = try KafkaProducer(config: config, logger: logger)
let serviceGroup = ServiceGroup(
services: [producer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
logger: logger
)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { try await serviceGroup.run() }
group.addTask {
let message = KafkaProducerMessage(topic: "topic-name", value: "Hello, World!")
let report = try await producer.sendAndAwait(message)
switch report.status {
case .acknowledged(let ack):
print("Delivered to partition \(ack.partition) at offset \(ack.offset)")
case .failure(let error):
print("Delivery failed: \(error)")
}
}
}For high-throughput pipelines where you need to maximize send rate, use the fire-and-forget send(_:) method and process delivery reports in batches through the events sequence:
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
config: config,
logger: logger
)
let serviceGroup = ServiceGroup(
services: [producer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
logger: logger
)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { try await serviceGroup.run() }
group.addTask {
let message = KafkaProducerMessage(topic: "topic-name", value: "Hello, World!")
try producer.send(message)
for await event in events {
switch event {
case .deliveryReports(let reports):
for report in reports {
// Handle delivery acknowledgement
}
default:
break
}
}
}
}Consumer API
Messages are delivered as an AsyncSequence — consume them with a standard for try await loop that integrates naturally with Swift concurrency, structured tasks, and cancellation:
Consumer Groups
var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])
let consumer = try KafkaConsumer(config: config, logger: logger)
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
logger: logger
)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { try await serviceGroup.run() }
group.addTask {
for try await message in consumer.messages {
print("Received: \(message.topic)/\(message.partition) at offset \(message.offset)")
}
}
}At-Least-Once Processing
For at-least-once delivery semantics, disable automatic offset storage and manually store offsets after processing:
var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])
config.enableAutoOffsetStore = false
let consumer = try KafkaConsumer(config: config, logger: logger)
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
logger: logger
)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { try await serviceGroup.run() }
group.addTask {
for try await message in consumer.messages {
// Process message...
try consumer.storeOffset(message)
// Offset will be committed automatically by the background auto-commit timer
}
}
}Manual Commits
To control exactly when offsets are committed to the broker:
var config = KafkaConsumerConfig()
config.bootstrapServers = ["localhost:9092"]
config.consumptionStrategy = .group(id: "example-group", topics: ["topic-name"])
config.enableAutoCommit = false
let consumer = try KafkaConsumer(config: config, logger: logger)
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: [.sigterm]),
logger: logger
)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { try await serviceGroup.run() }
group.addTask {
for try await message in consumer.messages {
// Process message...
try await consumer.commit(message)
}
}
}To commit all previously stored offsets at once:
try await consumer.commit()Dynamic Subscription Management
Topics can be changed at runtime:
// Subscribe to additional topics
try consumer.subscribe(topics: ["topic-a", "topic-b"])
// Query current subscription
let topics = try consumer.subscribedTopics()
// Unsubscribe from all topics
try consumer.unsubscribe()Pause and Resume
Partition consumption can be temporarily paused and resumed, useful for applying backpressure or performing maintenance without leaving the consumer group:
let partition = KafkaTopicPartition(topic: "topic-name", partition: KafkaPartition(rawValue: 0))
try consumer.pause(topicPartitions: [partition])
// ... later
try consumer.resume(topicPartitions: [partition])Security Mechanisms
Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms via the securityProtocol property.
Plaintext
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .plaintextTLS (SSL)
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .sslSASL + Plaintext
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .sasl_plaintext
config.saslMechanisms = "PLAIN"
config.saslUsername = "user"
config.saslPassword = "password"SASL + TLS
var config = KafkaProducerConfig()
config.bootstrapServers = ["localhost:9092"]
config.securityProtocol = .sasl_ssl
config.saslMechanisms = "SCRAM-SHA-256"
config.saslUsername = "user"
config.saslPassword = "password"Error Handling
Errors from librdkafka are surfaced through the events sequence with typed error codes:
let (consumer, events) = try KafkaConsumer.makeConsumerWithEvents(config: config, logger: logger)
for await event in events {
switch event {
case .error(let error):
if error.isFatal {
// Client is irrecoverable — initiate shutdown
} else if error.isRetriable {
// Transient error — will likely resolve
}
print("Error: \(error.rdKafkaCode)")
default:
break
}
}librdkafka
The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.
Dependencies
- macOS:
brew install openssl@3 - Linux:
apt-get install libssl-dev libsasl2-dev
Development Setup
Running tests locally
Integration tests require a running Kafka broker which can be started with Docker:
docker run -d -p 9092:9092 apache/kafka:3.9.1
# After starting the container run the tests
swift testRunning tests in Docker
We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:
docker compose -f docker/docker-compose.yaml run client swift testAlternatively you can use a Makefile target:
make docker-testYou can specify Swift compiler version using SWIFT_VERSION environment variable:
SWIFT_VERSION=6.2 make docker-testPackage Metadata
Repository: swift-server/swift-kafka-client
Default branch: main
README: README.md