Pub/Sub
The TopicClient and SubscriptionClient classes in the multicloudj library provide a comprehensive, cloud-agnostic interface to interact with publish/subscribe messaging services like Google Cloud Pub/Sub, AWS SNS/SQS, and Alibaba Cloud Message Service.
These clients enable sending messages to topics, receiving messages from subscriptions, and managing message acknowledgment across multiple cloud providers with a consistent API.
Feature Support Across Providers
Core API Features
| Feature Name | GCP | AWS | ALI | Comments |
|---|---|---|---|---|
| Send Messages | β Supported | β Supported | π In Roadmap | Send messages to topics |
| Receive Messages | β Supported | β Supported | π In Roadmap | Pull messages from subscriptions |
| Acknowledge Messages | β Supported | β Supported | π In Roadmap | Confirm message processing |
| Batch Acknowledgment | β Supported | β Supported | π In Roadmap | Acknowledge multiple messages at once |
| Negative Acknowledgment | β Supported | β Supported | π In Roadmap | Reject messages for redelivery |
| Subscription Attributes | β Supported | β Supported | π In Roadmap | Retrieve subscription name and topic |
Advanced Features
| Feature Name | GCP | AWS | ALI | Comments |
|---|---|---|---|---|
| Async Batch Acknowledgment | β Supported | β Supported | π In Roadmap | CompletableFuture-based async ack/nack |
| Double Acknowledgment Safety | β Supported | β Supported | π In Roadmap | Safe to ack same message multiple times |
| Nack Visibility Timeout | β Supported | β Supported | π In Roadmap | Control redelivery delay on nack (default and per-call) |
| Retryable Error Detection | β Supported | β Supported | π In Roadmap | isRetryable() classifies errors as transient or permanent |
Configuration Options
| Configuration | GCP | AWS | ALI | Comments |
|---|---|---|---|---|
| Region | β Supported | β Supported | π In Roadmap | Target region for the topic/subscription |
| Endpoint Override | β Supported | β Supported | π In Roadmap | Custom endpoint configuration |
| Proxy Support | β Supported | β Supported | π In Roadmap | HTTP proxy configuration |
| Credentials Override | β Supported | β Supported | π In Roadmap | Custom credential providers via STS |
Provider IDs
The provider ID passed to TopicClient.builder(...) and SubscriptionClient.builder(...) selects the backing implementation:
| Provider | Topic provider ID | Subscription provider ID |
|---|---|---|
| GCP (Google Cloud Pub/Sub) | gcp | gcp |
| AWS SNS | awssns | aws |
| AWS SQS | awssqs | aws |
On AWS, messages are published through either SNS (awssns) or SQS (awssqs), while messages are always received from an SQS queue using the aws subscription provider.
Provider-Specific Notes
GCP (Google Cloud Pub/Sub)
- Topic names must use the full resource format:
projects/{projectId}/topics/{topicId} - Subscription names must use the full resource format:
projects/{projectId}/subscriptions/{subscriptionId}
AWS (SNS / SQS)
- SNS topics (
awssns) are identified by their topic ARN, e.g.arn:aws:sns:us-west-2:123456789012:my-topic. The topic is validated to exist when the client is built. - SQS topics (
awssqs) accept either a queue name (resolved to a queue URL automatically) or a full queue URL. - Subscriptions (
aws) read from an SQS queue, identified by a queue name or queue URL.
Creating Clients
Topic Client
// GCP
TopicClient topicClient = TopicClient.builder("gcp")
.withTopicName("projects/my-project/topics/my-topic")
.build();
// AWS SNS
TopicClient snsTopicClient = TopicClient.builder("awssns")
.withTopicName("arn:aws:sns:us-west-2:123456789012:my-topic")
.withRegion("us-west-2")
.build();
// AWS SQS
TopicClient sqsTopicClient = TopicClient.builder("awssqs")
.withTopicName("my-queue")
.withRegion("us-west-2")
.build();
You can also configure advanced options:
URI endpoint = URI.create("https://custom-endpoint.com");
URI proxy = URI.create("https://proxy.example.com");
topicClient = TopicClient.builder("gcp")
.withTopicName("projects/my-project/topics/my-topic")
.withEndpoint(endpoint)
.withProxyEndpoint(proxy)
.build();
Subscription Client
// GCP
SubscriptionClient subscriptionClient = SubscriptionClient.builder("gcp")
.withSubscriptionName("projects/my-project/subscriptions/my-subscription")
.build();
// AWS
SubscriptionClient awsSubscriptionClient = SubscriptionClient.builder("aws")
.withSubscriptionName("my-queue")
.withRegion("us-west-2")
.build();
You can also configure advanced options:
URI endpoint = URI.create("https://custom-endpoint.com");
URI proxy = URI.create("https://proxy.example.com");
subscriptionClient = SubscriptionClient.builder("gcp")
.withSubscriptionName("projects/my-project/subscriptions/my-subscription")
.withEndpoint(endpoint)
.withProxyEndpoint(proxy)
.build();
To delay redelivery whenever a message is nacked, set a default nack visibility timeout on the builder:
SubscriptionClient subscriptionClient = SubscriptionClient.builder("aws")
.withSubscriptionName("my-queue")
.withRegion("us-west-2")
.withNackVisibilityTimeout(Duration.ofSeconds(30))
.build();
Duration.ZERO (the default) makes nacked messages immediately available for redelivery; a positive value delays redelivery by that amount.
Sending Messages
Basic Message
try (TopicClient topic = topicClient) {
Message message = Message.builder()
.withBody("Hello, World!".getBytes())
.build();
topic.send(message);
}
Message with Metadata
Message message = Message.builder()
.withBody("Order processed".getBytes())
.withMetadata(Map.of(
"order-id", "12345",
"priority", "high",
"source", "api-service"
))
.build();
topic.send(message);
Sending Multiple Messages
List<Message> messages = List.of(
Message.builder().withBody("Message 1".getBytes()).withMetadata(Map.of("batch-id", "1")).build(),
Message.builder().withBody("Message 2".getBytes()).withMetadata(Map.of("batch-id", "2")).build(),
Message.builder().withBody("Message 3".getBytes()).withMetadata(Map.of("batch-id", "3")).build()
);
for (Message message : messages) {
topic.send(message);
}
Receiving Messages
Single Message Receive
The receive() method blocks until a message is available:
try (SubscriptionClient subscription = subscriptionClient) {
Message message = subscription.receive();
String data = new String(message.getBody());
System.out.println("Received: " + data);
// Process the message...
// Acknowledge successful processing
subscription.sendAck(message.getAckID());
}
Continuous Message Processing
try (SubscriptionClient subscription = subscriptionClient) {
while (true) {
Message message = subscription.receive();
try {
String data = new String(message.getBody());
// Process the message
processMessage(data);
// Acknowledge on success
subscription.sendAck(message.getAckID());
} catch (Exception e) {
// Nack on failure for redelivery (if supported)
if (subscription.canNack()) {
subscription.sendNack(message.getAckID());
}
}
}
}
Message Acknowledgment
Single Acknowledgment
Message message = subscription.receive();
if (message != null && message.getAckID() != null) {
// Process the message...
// Acknowledge successful processing
subscription.sendAck(message.getAckID());
}
Batch Acknowledgment
Batch acknowledgment returns a CompletableFuture:
List<AckID> ackIDs = new ArrayList<>();
// Collect messages
for (int i = 0; i < 10; i++) {
Message message = subscription.receive();
if (message != null && message.getAckID() != null) {
// Process message...
ackIDs.add(message.getAckID());
}
}
// Acknowledge all at once
subscription.sendAcks(ackIDs).join();
Negative Acknowledgment (Nack)
Use nack to reject a message and make it available for redelivery. Check if nacking is supported first:
Message message = subscription.receive();
if (message != null) {
try {
// Try to process
processMessage(message);
subscription.sendAck(message.getAckID());
} catch (Exception e) {
// Failed to process, nack for redelivery (if supported)
if (subscription.canNack()) {
subscription.sendNack(message.getAckID());
} else {
// Provider doesn't support nack
System.err.println("Nack not supported");
}
}
}
Nack with a Custom Visibility Timeout
Override the subscriptionβs default nack visibility timeout for an individual message. This requests a specific redelivery delay without reconfiguring the subscription:
Message message = subscription.receive();
if (message != null && subscription.canNack()) {
// Delay redelivery of this message by 60 seconds
subscription.sendNack(message.getAckID(), Duration.ofSeconds(60));
}
Batch Negative Acknowledgment
List<AckID> nackIDs = new ArrayList<>();
for (Message message : messages) {
if (shouldReject(message)) {
nackIDs.add(message.getAckID());
}
}
if (subscription.canNack()) {
subscription.sendNacks(nackIDs).join();
}
You can also apply a visibility timeout to an entire nack batch:
if (subscription.canNack()) {
// Delay redelivery of all nacked messages by 60 seconds
subscription.sendNacks(nackIDs, Duration.ofSeconds(60)).join();
}
Passing null for the timeout uses the subscriptionβs default nack visibility timeout.
Subscription Information
Check Nack Support
Not all providers support negative acknowledgment. Check before using:
if (subscription.canNack()) {
System.out.println("This subscription supports nacking");
} else {
System.out.println("This subscription does not support nacking");
}
Get Subscription Attributes
Retrieve common subscription metadata via getAttributes(), which returns a GetAttributeResult:
GetAttributeResult attributes = subscription.getAttributes();
System.out.println("Subscription name: " + attributes.getName());
System.out.println("Bound topic: " + attributes.getTopic());
Error Handling
Null Check for AckID
Always validate that an AckID is not null before acknowledging:
Message message = subscription.receive();
if (message != null && message.getAckID() != null) {
subscription.sendAck(message.getAckID());
}
Attempting to acknowledge with a null AckID will throw InvalidArgumentException:
try {
subscription.sendAck(null);
} catch (InvalidArgumentException e) {
System.err.println("Cannot acknowledge null AckID");
}
Retryable Errors
Use isRetryable() to decide whether a failed operation is worth retrying or should be treated as a permanent failure:
try {
Message message = subscription.receive();
subscription.sendAck(message.getAckID());
} catch (SubstrateSdkException e) {
if (subscription.isRetryable(e)) {
// Transient error - safe to retry
} else {
// Permanent failure - handle accordingly
}
}
Exception Handling
All topic operations may throw SubstrateSdkException:
try {
topic.send(message);
} catch (SubstrateSdkException e) {
// Handle access denied, quota exceeded, network errors, etc.
e.printStackTrace();
}
All subscription operations may throw SubstrateSdkException:
try {
Message message = subscription.receive();
subscription.sendAck(message.getAckID());
} catch (SubstrateSdkException e) {
// Handle access denied, network errors, etc.
e.printStackTrace();
}