I have about 10K messages sitting in the topic/subscription.
Here is the code I am using.
Properties set for concurrency:
spring.cloud.gcp.pubsub.subscriber.executor-threads=20
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count=4
@Bean
public PubSubInboundChannelAdapter messageChannelAdapterTry(
@Qualifier("tryInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(
pubSubTemplate, tryConfig.getSubscription());
adapter.setOutputChannel(inputChannel);
adapter.setPayloadType(tryClass.class);
return adapter;
}
@ServiceActivator(inputChannel = "tryInputChannel")
public void MessageReceiver(tryClass tryMessage, @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE)
BasicAcknowledgeablePubsubMessage message, @Headers Map<String, Object> headerMap) {
log.info("About to go async");
asyncProcessor(tryMessage, message);
log.info("Async processor is working");
}
private void asyncProcessor(tryClass tryMessage, BasicAcknowledgeablePubsubMessage message) {
CompletableFuture.supplyAsync(() -> {
log.info("Process in Async Fashion");
// Do heavy lifting here that can take 20ish seconds to consume tryMessage
}
}
The processing is so slow that the log `Process in Async Fashion` appears once every many seconds. The delta time between each log varies and can be upto 30 seconds. I have never seen them back to back.
What is the issue here for slowness?
Another worry I have is that my handler just calls "asyncProcessor" and the Future might run on some other Executor and wont be limited by above to properties that restricts to `20x4=80` based on my config? I am afraid eventually it can cause out of memory? Or the 80 limit will be honored until my 80 asyncProcessor finish? I know I can set
`spring.cloud.gcp.pubsub.subscription.[subscription-name].flow-control.max-outstanding-element-count=<count>`
Do I need that?
But my primary worry is the slowness
You need to sign in to view this answers
Leave feedback about this