wendylabsinc/pulsar-client
π **Connection Issue Resolved!** The Swift Pulsar client now successfully connects to Apache Pulsar and can perform basic operations.
Features
- π Pure Swift Implementation: Built from the ground up using Swift 6.1 with full concurrency support
- π Async/Await: Modern Swift concurrency with actors and structured concurrency
- π¦ Type-Safe Schemas: Comprehensive schema support including primitives, JSON, and custom types
- π‘οΈ Fault Tolerant: Built-in retry policies, automatic reconnection, and circuit breakers
- π Security: Multiple authentication methods and encryption policies
- π± Cross-Platform: Works on iOS, macOS, tvOS, watchOS, and Linux
- π― Producer Features: Batching, compression, partitioned topics, exclusive access modes
- π¨ Consumer Features: Multiple subscription types, acknowledgment strategies, dead letter queues
- π Reader API: Sequential message reading with precise position control
Requirements
- Swift 6.1+
- iOS 15.0+ / macOS 12.0+ / tvOS 15.0+ / watchOS 8.0+ / Linux (Ubuntu 20.04+)
Installation
Swift Package Manager
Add PulsarClient to your Package.swift:
dependencies: [
.package(url: "https://github.com/edgeengineer/pulsar-client.git", from: "0.0.1")
]Then add it to your target dependencies:
.target(
name: "YourApp",
dependencies: ["PulsarClient"]
)Quick Start
import PulsarClient
// Create a client
let client = PulsarClient.builder { builder in
builder.withServiceUrl("pulsar://localhost:6650")
}
// Create a producer
let producer = try await client.newProducer(
topic: "my-topic",
schema: Schemas.string
) { builder in
builder.withProducerName("my-producer")
}
// Send a message
let messageId = try await producer.send("Hello, Pulsar!")
print("Message sent with ID: \(messageId)")
// Create a consumer
let consumer = try await client.newConsumer(
topic: "my-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("my-subscription")
.withSubscriptionType(.exclusive)
}
// Receive messages
let message = try await consumer.receive()
print("Received: \(message.value)")
try await consumer.acknowledge(message)
// Clean up
await producer.dispose()
await consumer.dispose()
await client.dispose()Detailed Usage
### Client Configuration
```swift
let client = PulsarClient.builder { builder in
builder.withServiceUrl("pulsar://localhost:6650")
.withAuthentication(TokenAuthentication(token: "your-token"))
.withEncryptionPolicy(.enforceEncrypted)
.withConnectionTimeout(30.0)
.withOperationTimeout(30.0)
}
```
### Producer Patterns
#### Basic Producer
```swift
let producer = try await client.newProducer(
topic: "persistent://public/default/my-topic",
schema: Schemas.string
) { builder in
builder.withProducerName("my-producer")
}
// Send with metadata
var metadata = MessageMetadata()
metadata.key = "partition-key"
metadata.properties["app"] = "my-app"
metadata.eventTime = Date()
let messageId = try await producer.send("Hello, World!", metadata: metadata)
```
#### Batching and Compression
```swift
let producer = try await client.newProducer(
topic: "high-volume-topic",
schema: Schemas.bytes
) { builder in
builder.withBatchingEnabled(true)
.withBatchingMaxMessages(100)
.withBatchingMaxDelay(0.01) // 10ms
.withCompressionType(.lz4)
}
```
#### Partitioned Topics
```swift
let producer = try await client.newProducer(
topic: "partitioned-topic",
schema: Schemas.string
) { builder in
builder.withMessageRouter(KeyBasedMessageRouter())
}
// Messages with the same key go to the same partition
let metadata = MessageMetadata().withKey("user-123")
try await producer.send("User event", metadata: metadata)
```
### Consumer Patterns
#### Subscription Types
```swift
// Exclusive - Only one consumer can subscribe
let exclusiveConsumer = try await client.newConsumer(
topic: "exclusive-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("exclusive-sub")
.withSubscriptionType(.exclusive)
}
// Shared - Multiple consumers share messages
let sharedConsumer = try await client.newConsumer(
topic: "shared-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("shared-sub")
.withSubscriptionType(.shared)
}
// Key_Shared - Messages with same key go to same consumer
let keySharedConsumer = try await client.newConsumer(
topic: "key-shared-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("key-shared-sub")
.withSubscriptionType(.keyShared)
}
```
#### Batch Message Processing
```swift
let consumer = try await client.newConsumer(
topic: "batch-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("batch-processor")
.withReceiverQueueSize(1000)
}
// Process messages in batches
let messages = try await consumer.receiveBatch(maxMessages: 100)
for message in messages {
// Process message
print("Processing: \(message.value)")
}
// Acknowledge all at once
try await consumer.acknowledgeBatch(messages)
```
#### Negative Acknowledgment and Retry
```swift
let consumer = try await client.newConsumer(
topic: "retry-topic",
schema: Schemas.string
) { builder in
builder.withSubscriptionName("retry-sub")
.withNegativeAckRedeliveryDelay(5.0) // 5 seconds
}
do {
let message = try await consumer.receive()
// Process message
try await processMessage(message)
try await consumer.acknowledge(message)
} catch {
// Message will be redelivered after delay
try await consumer.negativeAcknowledge(message)
}
```
### Reader API
```swift
let reader = try await client.newReader(
topic: "reader-topic",
schema: Schemas.string
) { builder in
builder.withStartMessageId(.earliest)
.withReaderName("my-reader")
}
// Read messages sequentially
while try await reader.hasMessageAvailable() {
let message = try await reader.readNext()
print("Read: \(message.value) at \(message.publishTime)")
}
// Seek to specific position
try await reader.seek(to: MessageId.latest)
```
### Schema Types
#### Built-in Schemas
```swift
// Primitive types
let stringSchema = Schemas.string
let int32Schema = Schemas.int32
let int64Schema = Schemas.int64
let boolSchema = Schemas.boolean
let doubleSchema = Schemas.double
let bytesSchema = Schemas.bytes
// Date/Time schemas
let dateSchema = Schemas.date
let timeSchema = Schemas.time
let timestampSchema = Schemas.timestamp
```
#### JSON Schema
```swift
struct UserEvent: Codable {
let userId: String
let action: String
let timestamp: Date
}
let jsonSchema = JSONSchema<UserEvent>()
let producer = try await client.newProducer(
topic: "user-events",
schema: jsonSchema
) { builder in
builder.withProducerName("user-event-producer")
}
let event = UserEvent(
userId: "user-123",
action: "login",
timestamp: Date()
)
try await producer.send(event)
```
### Error Handling
```swift
do {
let message = try await consumer.receive()
try await processMessage(message)
try await consumer.acknowledge(message)
} catch PulsarClientError.timeout(let operation) {
print("Operation timed out: \(operation)")
} catch PulsarClientError.consumerBusy(let reason) {
print("Consumer busy: \(reason)")
} catch {
print("Unexpected error: \(error)")
}
```
### State Management
Monitor component states:
```swift
// Producer states
producer.onStateChange { state in
switch state {
case .connected:
print("Producer connected")
case .disconnected:
print("Producer disconnected")
case .faulted(let error):
print("Producer faulted: \(error)")
default:
break
}
}
// Consumer states
consumer.onStateChange { state in
switch state {
case .active:
print("Consumer active")
case .inactive:
print("Consumer inactive (failover)")
case .reachedEndOfTopic:
print("No more messages")
default:
break
}
}
```
### Authentication
```swift
// Token Authentication
let tokenAuth = TokenAuthentication(token: "your-jwt-token")
// OAuth2 Authentication
let oauth2 = OAuth2Authentication(
issuerUrl: "https://auth.example.com",
audience: "pulsar",
privateKey: privateKeyData
)
// TLS Authentication
let tlsAuth = TLSAuthentication(
certPath: "/path/to/cert.pem",
keyPath: "/path/to/key.pem"
)
let client = PulsarClient.builder { builder in
builder.withServiceUrl("pulsar+ssl://localhost:6651")
.withAuthentication(tlsAuth)
}
```Advanced Features
Message Routing
// Round-robin routing
let roundRobinRouter = RoundRobinMessageRouter()
// Single partition routing
let singleRouter = SinglePartitionMessageRouter(partitionIndex: 0)
// Custom routing
class CustomRouter: MessageRouter {
func choosePartition(messageMetadata: MessageMetadata, numberOfPartitions: Int) -> Int {
// Your routing logic
return messageMetadata.key?.hashValue ?? 0 % numberOfPartitions
}
}Fault Tolerance
// Configure retry policy
let retryPolicy = RetryPolicy(
maxRetries: 3,
initialDelay: 1.0,
maxDelay: 30.0,
backoffMultiplier: 2.0
)
// Custom exception handler
class MyExceptionHandler: ExceptionHandler {
func handleException(_ context: inout ExceptionContext) async {
switch context.exception {
case PulsarClientError.connectionFailed:
context.result = .retryAfter(5.0)
default:
context.result = .fail
}
}
}Testing
Prerequisites for Running Tests
Before running integration tests, ensure you have the following installed:
- Docker - Required for running Apache Pulsar locally
- macOS: Install Docker Desktop - Linux: Install Docker Engine via your package manager
- Docker Compose - Usually included with Docker Desktop
``bash # Verify installation docker --version docker-compose --version ``
- Swift 6.1+ - Required for building and running tests
``bash swift --version ``
Running Tests
The project includes both unit tests and integration tests.
Unit Tests
# Run unit tests only
swift test --filter PulsarClientTests
# Or using the helper script
./scripts/run-tests.sh --unitIntegration Tests
Integration tests require a running Pulsar instance. We provide Docker-based infrastructure:
# Option 1: Use the all-in-one script (recommended)
./scripts/run-tests.sh --integration --env standalone
# Option 2: Manual steps
# Start Pulsar for testing
./scripts/test-env.sh start
# Run integration tests
ENABLE_INTEGRATION_TESTS=1 swift test --filter IntegrationTests
# Stop Pulsar when done
./scripts/test-env.sh stopNote: If you already have a Pulsar instance running on the default ports (6650, 8080), the tests will use that instance instead.
Test Environments
- Standalone Mode (default)
``bash ./scripts/test-env.sh start ``
- Cluster Mode
``bash ./scripts/test-env.sh start --cluster ``
- Authentication Mode
``bash ./scripts/test-env.sh start --auth ``
Docker Infrastructure
The project includes comprehensive Docker Compose configurations:
docker/docker-compose.yml- Standalone Pulsar with Toxiproxydocker/docker-compose.cluster.yml- Full Pulsar clusterdocker/docker-compose.auth.yml- Pulsar with authentication
See docker/README.md for detailed Docker setup information.
Development Setup
# One-time setup
./scripts/setup-dev.sh
# Common tasks
make test # Run all tests
make test-unit # Run unit tests only
make test-integration # Run integration tests
make docker-up # Start Pulsar
make docker-down # Stop Pulsar
make docker-logs # View logsBest Practices
- Always dispose resources: Use
deferor structured concurrency to ensure cleanup - Handle backpressure: Configure appropriate queue sizes for consumers
- Use schemas: Type-safe schemas prevent serialization errors
- Monitor states: React to state changes for robust applications
- Configure timeouts: Set appropriate timeouts for your use case
- Use batching: For high-throughput scenarios, enable producer batching
- Implement error handling: Always handle errors appropriately
Contributing
We welcome contributions! Please see CONTRIBUTING.md for details.
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Acknowledgments
This Swift implementation is inspired by the official Apache Pulsar DotNet client and follows similar patterns adapted for Swift's unique features.
Package Metadata
Repository: wendylabsinc/pulsar-client
Default branch: main
README: README.md