October 26, 2024
Chicago 12, Melborne City, USA
java

Kafka Streams – Why can't I aggregate and sum up my Longs?


I’m new to Kafka Streams and I’m trying to cobble together my first application.

I would like to add up the amounts of my BankTransactions.

@Bean
    public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
        JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
        jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
        return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
        .mapValues((readOnlyKey, value) -> value.amount)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
                        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()))
                .toStream();

    }

The event is read from the topic and deserialised correctly.

But the app crashes with the following error:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
    at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)

I don’t understand how the LongDeserializer is getting a single byte KeyValueStore when only Longs have been put in it.

Any help would be greatly appreciated.

I’ve tried debugging the code but all I can see is that the changelog of transactionAmountCount is being read and that that’s where everything blows up.



You need to sign in to view this answers

Leave feedback about this

  • Quality
  • Price
  • Service

PROS

+
Add Field

CONS

+
Add Field
Choose Image
Choose Video