Spring Cloud Stream’s Record Recoverable Processor
Table of Contents
Open Table of Contents
The Old Way (and its Pain Points)
In the past, when you had an error in your Spring Cloud Stream processor, often the go-to solution was to configure a Dead Letter Queue. The idea is simple: if a message can’t be processed after a few retries, shunt it off to a separate queue (the DLQ) for later investigation.
Unfortunately, DLQs in Spring Cloud Streams Kafka Streams binder are limited to deserialization. Once the message enters your defined stream there’s no configurable DLQ capability, it must be manually written.
Why am I writing this post? Because I requested this feature and wrote the examples!
https://github.com/spring-cloud/spring-cloud-stream/issues/2779
Kudos to Soby Chako
Documentation on the Spring Cloud Stream Kafka Streams Binder
And while DLQs are useful for capturing messages that are truly unprocessable, they can be unsophisticated for everyday errors. What if you want to do something specific when a particular type of error occurs? DLQs are great for “fire and forget” error handling – the message is bad, let’s just put it aside. But often, we need more control.
Let’s think about a scenario. Imagine you have a processor that enriches data from an external service. If that service is temporarily unavailable, your processor will fail. You might want to retry, log the error, or even send it to a different topic for specific error handling.
Enter RecordRecoverableProcessor
This is where the RecordRecoverableProcessor comes in! Introduced in Spring Cloud
Streams version 4.1.0, this class gives you a much more granular and configurable way to deal with errors directly within your
processor. Instead of just letting errors bubble up, crashing the stream or relying on framework-level retry
and DLQ mechanisms, you can now define exactly what happens when an error occurs inside your processing logic.
So, what happens when an error occurs in a traditional processor versus one using RecordRecoverableProcessor?
Traditional Processor (Error):
- Error occurs in your
FunctionorConsumer. - Spring Cloud Stream’s error handling kicks in (retry, DLQ, etc., based on your configuration). Potentially crashing your stream!
- Limited control over what happens specifically when an error occurs in your code.
RecordRecoverableProcessor (Error):
- Error occurs in the
applymethod of yourFunction. - The
RecordRecoverableProcessorcatches the error. - Your custom error handling logic (provided as a
BiConsumer) is executed. ThisBiConsumergets access to both the errored record and the exception! - You can decide what to do: log the error, send it to an error topic, apply custom recovery logic, and more.
The beauty of RecordRecoverableProcessor is that it puts you in the driver’s seat for error handling within your
processing logic.
Check out the Spring Cloud Stream Samples under the kafka-recoverable
sub-project if you want to go straight to the published Spring samples.
No More Clunky Workarounds
Before RecordRecoverableProcessor, achieving this level of fine-grained control often involved writing somewhat clunky
and less-than-elegant error handling logic directly within your processing functions. You might have seen (or even
written!) code that tried to catch exceptions, manually send messages to error topics using StreamBridge, and
generally make the core processing logic harder to read and maintain.
RecordRecoverableProcessor sidesteps these older, less ideal approaches. It provides a clean separation of
concerns: your Function focuses on the core processing, and your BiConsumer handles the errors, but right there,
connected to your processor.
The Code Example!
Let’s get into a practical example. We’ll use a Spring Boot application with Spring Cloud Stream and Kafka (because
that’s a super common setup). Imagine we’re building a system that processes PaymentEvent messages. Sometimes,
this payment might fail (maybe a service is down, or the data is temporarily unavailable).
Here’s how we can use RecordRecoverableProcessor to handle these potential errors gracefully.
First, let’s define a BiConsumer that will handle our errors. A good starting point is to send the errored record and
the exception to a dedicated “error topic”. This allows us to inspect these errors later and potentially reprocess them
or take other actions.
public class BiConsumerErrorHandlingSupplier<K, V> implements Supplier<BiConsumer<Record<K, V>, Exception>> {
private final StreamBridge streamBridge;
private final String errorTopicName;
public BiConsumerErrorHandlingSupplier(StreamBridge streamBridge, String errorTopicName) {
this.streamBridge = streamBridge;
this.errorTopicName = errorTopicName;
}
@Override
public BiConsumer<Record<K, V>, Exception> get() {
return (erroredRecord, ex) -> streamBridge.send(errorTopicName, new ErrorRecord(erroredRecord.key(), erroredRecord.value(), ex));
}
}
This BiConsumerErrorHandlingSupplier is a Supplier of a BiConsumer. Why a Supplier? This is a common pattern in
Spring to allow for lazy initialization and dependency injection. In this case, it makes it easy to inject
the StreamBridge and configure the errorTopicName. The BiConsumer itself takes two arguments: the Record that
caused the error and the Exception itself. Inside the BiConsumer, we’re using StreamBridge (a handy tool in Spring
Cloud Stream for sending messages programmatically) to send an ErrorRecord (you’d need to define this class to hold
the relevant error information) to our errorTopicName.
Now, let’s look at our processor, which we’ll call PaymentsProcessor.
@Component
@AllArgsConstructor
public class PaymentsProcessor implements Function<Record<String, PaymentEvent>, Record<String, Object>> {
private final StreamBridge streamBridge;
@Override
public Record<String, Object> apply(Record<String, PaymentEvent> stringPaymentEventRecord) {
// ... your core processing logic here that might throw an exception ...
throw new RuntimeException("Error retrieving payment data!");
}
public RecordRecoverableProcessor<String, PaymentEvent, String, Object> get() {
return new RecordRecoverableProcessor<>(this,
new BiConsumerErrorHandlingSupplier<String, PaymentEvent>(streamBridge, "error-handler").get());
}
}
Notice that PaymentsProcessor itself is still a regular Function. The magic happens in the get()
method. This method returns a RecordRecoverableProcessor. We create a new instance of RecordRecoverableProcessor,
passing two things:
this: The instance of ourPaymentsProcessor(which is theFunctionthat contains our core processing logic).new BiConsumerErrorHandlingSupplier<String, PaymentEvent>(streamBridge, "error-handler").get(): This is how we provide our custom error handlingBiConsumer. We’re using ourBiConsumerErrorHandlingSupplierto create and configure theBiConsumerthat will be executed when errors occur in theapplymethod.
Finally, let’s see how to wire this up in our Spring Cloud Stream configuration:
@Bean
public Consumer<KStream<String, PaymentEvent>> paymentConsumer(PaymentsProcessor paymentsProcessor) {
return input -> input.process(() -> paymentsProcessor.get());
// .process(paymentsProcessor::get) also works!
}
Key things to note in this @Bean definition:
- We’re using a
Consumer<KStream<String, PaymentEvent>>because we’re processing a stream ofPaymentEventmessages. .process(() -> paymentsProcessor.get()): This is crucial! Instead of just using.process(paymentsProcessor), we are using.process(() -> paymentsProcessor.get()). This is how we tell Spring Cloud Stream to use ourRecordRecoverableProcessoras the actual processor. We’re calling theget()method to get theRecordRecoverableProcessorinstance.
Understanding Function and BiConsumer in this Context
Let’s quickly recap the roles of Function and BiConsumer in RecordRecoverableProcessor:
Function<Record<K, V>, Record<KR, VR>>(orConsumer<Record<K, V>>,BiConsumer<Record<K, V>, Record<KR, VR>>, etc.): This is your core processing logic. It’s what you want to do with each message when everything goes right. It’s the heart of your stream processing application. In our example, it’s thePaymentsProcessor.BiConsumer<Record<K, V>, Exception>: This is your error handling logic. It’s executed only when an exception occurs within yourFunction. It gets access to the record that caused the error and the exception itself. You define what should happen in error scenarios. In our example, it’s theBiConsumercreated byBiConsumerErrorHandlingSupplier.
RecordRecoverableProcessor acts as the intermediary, neatly connecting your processing logic (Function) and your
error handling logic (BiConsumer).
Full Example (Putting It All Together)
To make this completely runnable , let’s flesh out a bit more code. You’ll need to define
PaymentEvent and ErrorRecord classes (these can be simple POJOs). You’ll also need to configure your Kafka
bindings and potentially an error topic.
Example PaymentEvent
public class PaymentEvent {
private String paymentId;
private String payload;
// Getters, setters, constructors...
}
Example ErrorRecord
public class ErrorRecord<K, V> {
private final K key;
private final V value;
private final Exception exception;
public ErrorRecord(K key, V value, Exception exception) {
this.key = key;
this.value = value;
this.exception = exception;
}
}
Processor
@Component
public class PaymentsProcessor implements Function<Record<String, PaymentEvent>, Record<String, Object>>
{
private final StreamBridge streamBridge;
// You can inject the error topic binding or topic name into the processor instead of hardcoding
public PaymentsProcessor(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
public Record<String, Object> apply(Record<String, PaymentEvent> paymentEventRecord) {
// your business logic!
return null;
}
public RecordRecoverableProcessor<String, PaymentEvent, String, Object> get() {
return new RecordRecoverableProcessor<>(this,
new BiConsumerErrorHandlingSupplier<String, PaymentEvent>(streamBridge, "error-handler").get());
}
}
Spring Boot Application
@SpringBootApplication
public class RecordRecoverableProcessorExampleApplication {
public static void main(String[] args) {
SpringApplication.run(RecordRecoverableProcessorExampleApplication.class, args);
}
@Bean
public Consumer<KStream<String, PaymentEvent>> paymentConsumer(PaymentsProcessor paymentsProcessor) {
return input -> input
.process(() -> paymentsProcessor.get());
}
}
application.yml (or application.properties) - Example Kafka Bindings:
spring:
cloud:
stream:
function:
definition: paymentConsumer-in-0
bindings:
paymentConsumer-in-0:
destination: payment-events-topic
group: payment-group
kafka:
binder:
brokers: localhost:9092
End
If you’ve been struggling with error handling in Spring Cloud Stream, or if you’re looking for a more elegant
alternative to just relying on DLQs, definitely explore RecordRecoverableProcessor.