I’m new to Kafka Streams and I’m trying to cobble together my first application.
I would like to add up the amounts of my BankTransactions.
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
.mapValues((readOnlyKey, value) -> value.amount)
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream();
}
The event is read from the topic and deserialised correctly.
But the app crashes with the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
I don’t understand how the LongDeserializer is getting a single byte KeyValueStore when only Longs have been put in it.
Any help would be greatly appreciated.
I’ve tried debugging the code but all I can see is that the changelog of transactionAmountCount
is being read and that that’s where everything blows up.
You need to sign in to view this answers