When using an AMQP 1.0 consumer on a RabbitMQ stream (including streams within a super stream), users may encounter the following behavior.
* Setting the consumer offset to `FIRST` (e.g., `ConsumerBuilder.StreamOffsetSpecification.FIRST`) allows the consumer to read historical events correctly.
* After reading the historical events, the consumer stops receiving any new events.
* If the offset is not specified (defaulting to the current position), new messages are received as expected.
VMware Tanzu RabbitMQ
AMQP 1.0
RabbitMQ Streams / Super Streams
Java AMQP 1.0 Stream Client
This issue is caused by the "link credit" mechanism inherent to the AMQP 1.0 protocol.
When a consumer starts from the `FIRST` offset, the broker immediately uses available link credits (typically a default of 100) to deliver historical messages to the client. If the consumer application processes these messages but does not explicitly acknowledge them, the credit balance at the broker remains at zero. Once all initial credits are exhausted by the historical data, the broker stops sending any further messages including new events until more credits are granted.
To resolve this, the consumer must acknowledge received messages to replenish link credits and signal to the broker that it is ready to receive more data.
In the Java client `messageHandler`, you must explicitly call `ctx.accept()` after processing the message.
Example
```java
.messageHandler((ctx, inputMessage) -> {
try {
// 1. Process the input message
var event = messageConverter.convert(inputMessage.body());
log.info("Received: {}", event);
// 2. REQUIRED: Acknowledge the message to replenish link credits
ctx.accept();
} catch (Exception e) {
log.error("Error processing message: {}", String.valueOf(e));
// Ensure errors are handled or the message is rejected/released
// depending on your retry logic requirements.
throw e;
}
})
```