admin管理员组

文章数量:1794759

kafka no record information is available

kafka no record information is available

kafka no record information is available
  • springcloud-config-bus报错
    • 解决

springcloud-config-bus报错 [KafkaConsumerDestination{consumerDestinationName='springCloudBus', partitions=1, dlqName='null'}.container-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer exception java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatormitOffsetsSync(ConsumerCoordinator.java:983) at org.apache.kafka.clients.consumer.KafkaConsumermitSync(KafkaConsumer.java:1510) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205) at com.sun.proxy.$Proxy257mitSync(Unknown Source) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2366) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumermitSync(KafkaMessageListenerContainer.java:2361) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumermitIfNecessary(KafkaMessageListenerContainer.java:2347) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2161) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059) ... 4 common frames omitted

在这种情况下,异常是在记录处理之外(处理完成后)引起的。由于重新平衡导致提交失败,而且因为记录不再可用,所以无论如何也无法重新查找。它只是重新调用异常,并将无限期地重试。

解决

kafka加入max.poll.interval.ms配置,配置如下:

spring: kafka: bootstrap-servers: 192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092 consumer: #用于标识此使用者所属的使用者组的唯一字符串。 group-id: xxx #ID在发出请求时传递给服务器: 用于服务器端日志记录。 client-id: xxx #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量 #可选的值为latest, earliest, none auto-offset-reset: earliest #如果'enable.automit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 auto-commit-interval: 5000ms #如果为true,则消费者的偏移量将在后台定期提交,默认值为true enable-auto-commit: true #如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位) #默认值为500 fetch-max-wait: 500ms #服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。 fetch-min-size: 1 #心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000 heartbeat-interval: 3000ms #密钥的反序列化器类,实现类实现了接口org.apache.kafkamon.serialization.Deserializer key-deserializer: org.apache.kafkamon.serialization.StringDeserializer #值的反序列化器类,实现类实现了接口org.apache.kafkamon.serialization.Deserializer value-deserializer: org.apache.kafkamon.serialization.StringDeserializer #一次调用poll()操作时返回的最大记录数,默认值为500 max-poll-records: 500 properties: max: poll: interval: ms: 600000

本文标签: kafkarecordInformation