Contents

wendylabsinc/pulsar-client

πŸŽ‰ **Connection Issue Resolved!** The Swift Pulsar client now successfully connects to Apache Pulsar and can perform basic operations.

Features

  • πŸš€ Pure Swift Implementation: Built from the ground up using Swift 6.1 with full concurrency support
  • πŸ”„ Async/Await: Modern Swift concurrency with actors and structured concurrency
  • πŸ“¦ Type-Safe Schemas: Comprehensive schema support including primitives, JSON, and custom types
  • πŸ›‘οΈ Fault Tolerant: Built-in retry policies, automatic reconnection, and circuit breakers
  • πŸ” Security: Multiple authentication methods and encryption policies
  • πŸ“± Cross-Platform: Works on iOS, macOS, tvOS, watchOS, and Linux
  • 🎯 Producer Features: Batching, compression, partitioned topics, exclusive access modes
  • πŸ“¨ Consumer Features: Multiple subscription types, acknowledgment strategies, dead letter queues
  • πŸ“– Reader API: Sequential message reading with precise position control

Requirements

  • Swift 6.1+
  • iOS 15.0+ / macOS 12.0+ / tvOS 15.0+ / watchOS 8.0+ / Linux (Ubuntu 20.04+)

Installation

Swift Package Manager

Add PulsarClient to your Package.swift:

dependencies: [
    .package(url: "https://github.com/edgeengineer/pulsar-client.git", from: "0.0.1")
]

Then add it to your target dependencies:

.target(
    name: "YourApp",
    dependencies: ["PulsarClient"]
)

Quick Start

import PulsarClient

// Create a client
let client = PulsarClient.builder { builder in
    builder.withServiceUrl("pulsar://localhost:6650")
}

// Create a producer
let producer = try await client.newProducer(
    topic: "my-topic",
    schema: Schemas.string
) { builder in
    builder.withProducerName("my-producer")
}

// Send a message
let messageId = try await producer.send("Hello, Pulsar!")
print("Message sent with ID: \(messageId)")

// Create a consumer
let consumer = try await client.newConsumer(
    topic: "my-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("my-subscription")
           .withSubscriptionType(.exclusive)
}

// Receive messages
let message = try await consumer.receive()
print("Received: \(message.value)")
try await consumer.acknowledge(message)

// Clean up
await producer.dispose()
await consumer.dispose()
await client.dispose()

Detailed Usage

### Client Configuration

```swift
let client = PulsarClient.builder { builder in
    builder.withServiceUrl("pulsar://localhost:6650")
           .withAuthentication(TokenAuthentication(token: "your-token"))
           .withEncryptionPolicy(.enforceEncrypted)
           .withConnectionTimeout(30.0)
           .withOperationTimeout(30.0)
}
```

### Producer Patterns

#### Basic Producer

```swift
let producer = try await client.newProducer(
    topic: "persistent://public/default/my-topic",
    schema: Schemas.string
) { builder in
    builder.withProducerName("my-producer")
}

// Send with metadata
var metadata = MessageMetadata()
metadata.key = "partition-key"
metadata.properties["app"] = "my-app"
metadata.eventTime = Date()

let messageId = try await producer.send("Hello, World!", metadata: metadata)
```

#### Batching and Compression

```swift
let producer = try await client.newProducer(
    topic: "high-volume-topic",
    schema: Schemas.bytes
) { builder in
    builder.withBatchingEnabled(true)
           .withBatchingMaxMessages(100)
           .withBatchingMaxDelay(0.01) // 10ms
           .withCompressionType(.lz4)
}
```

#### Partitioned Topics

```swift
let producer = try await client.newProducer(
    topic: "partitioned-topic",
    schema: Schemas.string
) { builder in
    builder.withMessageRouter(KeyBasedMessageRouter())
}

// Messages with the same key go to the same partition
let metadata = MessageMetadata().withKey("user-123")
try await producer.send("User event", metadata: metadata)
```

### Consumer Patterns

#### Subscription Types

```swift
// Exclusive - Only one consumer can subscribe
let exclusiveConsumer = try await client.newConsumer(
    topic: "exclusive-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("exclusive-sub")
           .withSubscriptionType(.exclusive)
}

// Shared - Multiple consumers share messages
let sharedConsumer = try await client.newConsumer(
    topic: "shared-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("shared-sub")
           .withSubscriptionType(.shared)
}

// Key_Shared - Messages with same key go to same consumer
let keySharedConsumer = try await client.newConsumer(
    topic: "key-shared-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("key-shared-sub")
           .withSubscriptionType(.keyShared)
}
```

#### Batch Message Processing

```swift
let consumer = try await client.newConsumer(
    topic: "batch-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("batch-processor")
           .withReceiverQueueSize(1000)
}

// Process messages in batches
let messages = try await consumer.receiveBatch(maxMessages: 100)
for message in messages {
    // Process message
    print("Processing: \(message.value)")
}

// Acknowledge all at once
try await consumer.acknowledgeBatch(messages)
```

#### Negative Acknowledgment and Retry

```swift
let consumer = try await client.newConsumer(
    topic: "retry-topic",
    schema: Schemas.string
) { builder in
    builder.withSubscriptionName("retry-sub")
           .withNegativeAckRedeliveryDelay(5.0) // 5 seconds
}

do {
    let message = try await consumer.receive()
    // Process message
    try await processMessage(message)
    try await consumer.acknowledge(message)
} catch {
    // Message will be redelivered after delay
    try await consumer.negativeAcknowledge(message)
}
```

### Reader API

```swift
let reader = try await client.newReader(
    topic: "reader-topic",
    schema: Schemas.string
) { builder in
    builder.withStartMessageId(.earliest)
           .withReaderName("my-reader")
}

// Read messages sequentially
while try await reader.hasMessageAvailable() {
    let message = try await reader.readNext()
    print("Read: \(message.value) at \(message.publishTime)")
}

// Seek to specific position
try await reader.seek(to: MessageId.latest)
```

### Schema Types

#### Built-in Schemas

```swift
// Primitive types
let stringSchema = Schemas.string
let int32Schema = Schemas.int32
let int64Schema = Schemas.int64
let boolSchema = Schemas.boolean
let doubleSchema = Schemas.double
let bytesSchema = Schemas.bytes

// Date/Time schemas
let dateSchema = Schemas.date
let timeSchema = Schemas.time
let timestampSchema = Schemas.timestamp
```

#### JSON Schema

```swift
struct UserEvent: Codable {
    let userId: String
    let action: String
    let timestamp: Date
}

let jsonSchema = JSONSchema<UserEvent>()

let producer = try await client.newProducer(
    topic: "user-events",
    schema: jsonSchema
) { builder in
    builder.withProducerName("user-event-producer")
}

let event = UserEvent(
    userId: "user-123",
    action: "login",
    timestamp: Date()
)
try await producer.send(event)
```

### Error Handling

```swift
do {
    let message = try await consumer.receive()
    try await processMessage(message)
    try await consumer.acknowledge(message)
} catch PulsarClientError.timeout(let operation) {
    print("Operation timed out: \(operation)")
} catch PulsarClientError.consumerBusy(let reason) {
    print("Consumer busy: \(reason)")
} catch {
    print("Unexpected error: \(error)")
}
```

### State Management

Monitor component states:

```swift
// Producer states
producer.onStateChange { state in
    switch state {
    case .connected:
        print("Producer connected")
    case .disconnected:
        print("Producer disconnected")
    case .faulted(let error):
        print("Producer faulted: \(error)")
    default:
        break
    }
}

// Consumer states
consumer.onStateChange { state in
    switch state {
    case .active:
        print("Consumer active")
    case .inactive:
        print("Consumer inactive (failover)")
    case .reachedEndOfTopic:
        print("No more messages")
    default:
        break
    }
}
```

### Authentication

```swift
// Token Authentication
let tokenAuth = TokenAuthentication(token: "your-jwt-token")

// OAuth2 Authentication
let oauth2 = OAuth2Authentication(
    issuerUrl: "https://auth.example.com",
    audience: "pulsar",
    privateKey: privateKeyData
)

// TLS Authentication
let tlsAuth = TLSAuthentication(
    certPath: "/path/to/cert.pem",
    keyPath: "/path/to/key.pem"
)

let client = PulsarClient.builder { builder in
    builder.withServiceUrl("pulsar+ssl://localhost:6651")
           .withAuthentication(tlsAuth)
}
```

Advanced Features

Message Routing

// Round-robin routing
let roundRobinRouter = RoundRobinMessageRouter()

// Single partition routing
let singleRouter = SinglePartitionMessageRouter(partitionIndex: 0)

// Custom routing
class CustomRouter: MessageRouter {
    func choosePartition(messageMetadata: MessageMetadata, numberOfPartitions: Int) -> Int {
        // Your routing logic
        return messageMetadata.key?.hashValue ?? 0 % numberOfPartitions
    }
}

Fault Tolerance

// Configure retry policy
let retryPolicy = RetryPolicy(
    maxRetries: 3,
    initialDelay: 1.0,
    maxDelay: 30.0,
    backoffMultiplier: 2.0
)

// Custom exception handler
class MyExceptionHandler: ExceptionHandler {
    func handleException(_ context: inout ExceptionContext) async {
        switch context.exception {
        case PulsarClientError.connectionFailed:
            context.result = .retryAfter(5.0)
        default:
            context.result = .fail
        }
    }
}

Testing

Prerequisites for Running Tests

Before running integration tests, ensure you have the following installed:

  1. Docker - Required for running Apache Pulsar locally

- macOS: Install Docker Desktop - Linux: Install Docker Engine via your package manager

  1. Docker Compose - Usually included with Docker Desktop

``bash # Verify installation docker --version docker-compose --version ``

  1. Swift 6.1+ - Required for building and running tests

``bash swift --version ``

Running Tests

The project includes both unit tests and integration tests.

Unit Tests
# Run unit tests only
swift test --filter PulsarClientTests

# Or using the helper script
./scripts/run-tests.sh --unit
Integration Tests

Integration tests require a running Pulsar instance. We provide Docker-based infrastructure:

# Option 1: Use the all-in-one script (recommended)
./scripts/run-tests.sh --integration --env standalone

# Option 2: Manual steps
# Start Pulsar for testing
./scripts/test-env.sh start

# Run integration tests
ENABLE_INTEGRATION_TESTS=1 swift test --filter IntegrationTests

# Stop Pulsar when done
./scripts/test-env.sh stop

Note: If you already have a Pulsar instance running on the default ports (6650, 8080), the tests will use that instance instead.

Test Environments
  1. Standalone Mode (default)

``bash ./scripts/test-env.sh start ``

  1. Cluster Mode

``bash ./scripts/test-env.sh start --cluster ``

  1. Authentication Mode

``bash ./scripts/test-env.sh start --auth ``

Docker Infrastructure

The project includes comprehensive Docker Compose configurations:

  • docker/docker-compose.yml - Standalone Pulsar with Toxiproxy
  • docker/docker-compose.cluster.yml - Full Pulsar cluster
  • docker/docker-compose.auth.yml - Pulsar with authentication

See docker/README.md for detailed Docker setup information.

Development Setup

# One-time setup
./scripts/setup-dev.sh

# Common tasks
make test           # Run all tests
make test-unit      # Run unit tests only
make test-integration # Run integration tests
make docker-up      # Start Pulsar
make docker-down    # Stop Pulsar
make docker-logs    # View logs

Best Practices

  1. Always dispose resources: Use defer or structured concurrency to ensure cleanup
  2. Handle backpressure: Configure appropriate queue sizes for consumers
  3. Use schemas: Type-safe schemas prevent serialization errors
  4. Monitor states: React to state changes for robust applications
  5. Configure timeouts: Set appropriate timeouts for your use case
  6. Use batching: For high-throughput scenarios, enable producer batching
  7. Implement error handling: Always handle errors appropriately

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Acknowledgments

This Swift implementation is inspired by the official Apache Pulsar DotNet client and follows similar patterns adapted for Swift's unique features.

Package Metadata

Repository: wendylabsinc/pulsar-client

Default branch: main

README: README.md