I’m new to Kafka Streams and I’m trying to cobble together my first application.
I would like to add up the amounts of my BankTransactions.
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
.mapValues((readOnlyKey, value) -> value.amount)
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream();
}
The event is read from the topic and deserialised correctly.
But the app crashes with the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
I don’t understand how the LongDeserializer is getting a single byte KeyValueStore when only Longs have been put in it.
Any help would be greatly appreciated.
I’ve tried debugging the code but all I can see is that the changelog of transactionAmountCount
is being read and that that’s where everything blows up.
You need to sign in to view this answers
Leave feedback about this