OiO.lk Blog java Kafka Streams – Why can't I aggregate and sum up my Longs?
java

Kafka Streams – Why can't I aggregate and sum up my Longs?


I’m new to Kafka Streams and I’m trying to cobble together my first application.

I would like to add up the amounts of my BankTransactions.

@Bean
    public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
        JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
        jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
        return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
        .mapValues((readOnlyKey, value) -> value.amount)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
                        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()))
                .toStream();

    }

The event is read from the topic and deserialised correctly.

But the app crashes with the following error:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
    at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)

I don’t understand how the LongDeserializer is getting a single byte KeyValueStore when only Longs have been put in it.

Any help would be greatly appreciated.

I’ve tried debugging the code but all I can see is that the changelog of transactionAmountCount is being read and that that’s where everything blows up.



You need to sign in to view this answers

Exit mobile version