Pub/Sub

The TopicClient and SubscriptionClient classes in the multicloudj library provide a comprehensive, cloud-agnostic interface to interact with publish/subscribe messaging services like Google Cloud Pub/Sub, AWS SNS/SQS, and Alibaba Cloud Message Service.

These clients enable sending messages to topics, receiving messages from subscriptions, and managing message acknowledgment across multiple cloud providers with a consistent API.


Feature Support Across Providers

Core API Features

Feature Name GCP AWS ALI Comments
Send Messages ✅ Supported End of Nov ‘25 📅 In Roadmap Send messages to topics
Receive Messages ✅ Supported End of Nov ‘25 📅 In Roadmap Pull messages from subscriptions
Acknowledge Messages ✅ Supported End of Nov ‘25 📅 In Roadmap Confirm message processing
Batch Acknowledgment ✅ Supported End of Nov ‘25 📅 In Roadmap Acknowledge multiple messages at once
Negative Acknowledgment ✅ Supported End of Nov ‘25 📅 In Roadmap Reject messages for redelivery

Advanced Features

Feature Name GCP AWS ALI Comments
Async Batch Acknowledgment ✅ Supported End of Nov ‘25 📅 In Roadmap CompletableFuture-based async ack/nack
Double Acknowledgment Safety ✅ Supported End of Nov ‘25 📅 In Roadmap Safe to ack same message multiple times

Configuration Options

Configuration GCP AWS ALI Comments
Endpoint Override ✅ Supported End of Nov ‘25 📅 In Roadmap Custom endpoint configuration
Proxy Support ✅ Supported End of Nov ‘25 📅 In Roadmap HTTP proxy configuration
Credentials Override ✅ Supported End of Nov ‘25 📅 In Roadmap Custom credential providers via STS

Provider-Specific Notes

GCP (Google Cloud Pub/Sub)

  • Topic names must use full resource format: projects/{projectId}/topics/{topicId}
  • Subscription names must use full resource format: projects/{projectId}/subscriptions/{subscriptionId}

Creating Clients

Topic Client

TopicClient topicClient = TopicClient.builder("gcp")
    .withTopicName("projects/my-project/topics/my-topic")
    .build();

You can also configure advanced options:

URI endpoint = URI.create("https://custom-endpoint.com");
URI proxy = URI.create("https://proxy.example.com");

topicClient = TopicClient.builder("gcp")
    .withTopicName("projects/my-project/topics/my-topic")
    .withEndpoint(endpoint)
    .withProxyEndpoint(proxy)
    .build();

Subscription Client

SubscriptionClient subscriptionClient = SubscriptionClient.builder("gcp")
    .withSubscriptionName("projects/my-project/subscriptions/my-subscription")
    .build();

You can also configure advanced options:

URI endpoint = URI.create("https://custom-endpoint.com");
URI proxy = URI.create("https://proxy.example.com");

subscriptionClient = SubscriptionClient.builder("gcp")
    .withSubscriptionName("projects/my-project/subscriptions/my-subscription")
    .withEndpoint(endpoint)
    .withProxyEndpoint(proxy)
    .build();

Sending Messages

Basic Message

try (TopicClient topic = topicClient) {
    
    Message message = Message.builder()
        .withBody("Hello, World!".getBytes())
        .build();
    
    topic.send(message);
}

Message with Metadata

Message message = Message.builder()
    .withBody("Order processed".getBytes())
    .withMetadata(Map.of(
        "order-id", "12345",
        "priority", "high",
        "source", "api-service"
    ))
    .build();

topic.send(message);

Sending Multiple Messages

List<Message> messages = List.of(
    Message.builder().withBody("Message 1".getBytes()).withMetadata(Map.of("batch-id", "1")).build(),
    Message.builder().withBody("Message 2".getBytes()).withMetadata(Map.of("batch-id", "2")).build(),
    Message.builder().withBody("Message 3".getBytes()).withMetadata(Map.of("batch-id", "3")).build()
);

for (Message message : messages) {
    topic.send(message);
}

Receiving Messages

Single Message Receive

The receive() method blocks until a message is available:

try (SubscriptionClient subscription = subscriptionClient) {
    
    Message message = subscription.receive();
    String data = new String(message.getBody());
    System.out.println("Received: " + data);
    
    // Process the message...
    
    // Acknowledge successful processing
    subscription.sendAck(message.getAckID());
}

Continuous Message Processing

try (SubscriptionClient subscription = subscriptionClient) {
    
    while (true) {
        Message message = subscription.receive();
        try {
            String data = new String(message.getBody());
            // Process the message
            processMessage(data);
            
            // Acknowledge on success
            subscription.sendAck(message.getAckID());
        } catch (Exception e) {
            // Nack on failure for redelivery (if supported)
            if (subscription.canNack()) {
                subscription.sendNack(message.getAckID());
            }
        }
    }
}

Message Acknowledgment

Single Acknowledgment

Message message = subscription.receive();
if (message != null && message.getAckID() != null) {
    // Process the message...
    
    // Acknowledge successful processing
    subscription.sendAck(message.getAckID());
}

Batch Acknowledgment

Batch acknowledgment returns a CompletableFuture:

List<AckID> ackIDs = new ArrayList<>();

// Collect messages
for (int i = 0; i < 10; i++) {
    Message message = subscription.receive();
    if (message != null && message.getAckID() != null) {
        // Process message...
        ackIDs.add(message.getAckID());
    }
}

// Acknowledge all at once
subscription.sendAcks(ackIDs).join();

Negative Acknowledgment (Nack)

Use nack to reject a message and make it available for redelivery. Check if nacking is supported first:

Message message = subscription.receive();
if (message != null) {
    try {
        // Try to process
        processMessage(message);
        subscription.sendAck(message.getAckID());
    } catch (Exception e) {
        // Failed to process, nack for redelivery (if supported)
        if (subscription.canNack()) {
            subscription.sendNack(message.getAckID());
        } else {
            // Provider doesn't support nack
            System.err.println("Nack not supported");
        }
    }
}

Batch Negative Acknowledgment

List<AckID> nackIDs = new ArrayList<>();

for (Message message : messages) {
    if (shouldReject(message)) {
        nackIDs.add(message.getAckID());
    }
}

if (subscription.canNack()) {
    subscription.sendNacks(nackIDs).join();
}

Subscription Information

Check Nack Support

Not all providers support negative acknowledgment. Check before using:

if (subscription.canNack()) {
    System.out.println("This subscription supports nacking");
} else {
    System.out.println("This subscription does not support nacking");
}

Get Subscription Attributes

Retrieve provider-specific subscription metadata:

Map<String, String> attributes = subscription.getAttributes();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
    System.out.println(entry.getKey() + ": " + entry.getValue());
}

Important GCP Format Requirements:

  • Topic names: projects/{projectId}/topics/{topicId}
  • Subscription names: projects/{projectId}/subscriptions/{subscriptionId}

Error Handling

Null Check for AckID

Always validate that an AckID is not null before acknowledging:

Message message = subscription.receive();
if (message != null && message.getAckID() != null) {
    subscription.sendAck(message.getAckID());
}

Attempting to acknowledge with a null AckID will throw InvalidArgumentException:

try {
    subscription.sendAck(null);
} catch (InvalidArgumentException e) {
    System.err.println("Cannot acknowledge null AckID");
}

Exception Handling

All topic operations may throw SubstrateSdkException:

try {
    topic.send(message);
} catch (SubstrateSdkException e) {
    // Handle access denied, quota exceeded, network errors, etc.
    e.printStackTrace();
}

All subscription operations may throw SubstrateSdkException:

try {
    Message message = subscription.receive();
    subscription.sendAck(message.getAckID());
} catch (SubstrateSdkException e) {
    // Handle access denied, network errors, etc.
    e.printStackTrace();
}