admin管理员组

文章数量:1794759

Consider revisiting the entries above or defining a bean of type in your configuration.

Consider revisiting the entries above or defining a bean of type in your configuration.

开发spring-kafka

自定义 consumerFactory

@configuration public class KafkaConfig { @Autowired private KafkaProperties kafkaProperties; /** 名为test的topic 传输的是字符串. */ public static final String TOPIC_TEST = "test"; /** 名为model的topic 传输的是json. */ public static final String TOPIC_MODEL = "model"; /** 自定义接受json的listener. */ public static final String KAFKA_JSON_LISTENER_CONTAINER_FACTORY = "kafkaJsonListenerContainerFactory"; @Bean(name = KAFKA_JSON_LISTENER_CONTAINER_FACTORY) public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaJsonListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(10); factory.setMessageConverter(new StringJsonMessageConverter()); factory.getContainerProperties().setIdleEventInterval(60000L * 60); factory.getContainerProperties().setPollTimeout(10000); return factory; } /** */ private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(20); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); /** groupId */ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId()); /** 消费者是否自动提交偏移 量,默认值是 true */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); /** 服务器从每个分区里返回给消费者的最大字节数 默认值是 1MB */ props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "512000"); /** 消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s */ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 35000); /** 指定 broker 的等待时间,默认是 500ms */ props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 35000); /** 请求超时配置 */ props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); /** 多长时间自动提交一次 */ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); /** 控制单次调用 call() 方法能够返回的记录数量 */ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } // 如果spring.kafka属性不足 则需要自定义ConsumerFactory @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps()); }

配置文件如下

spring kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafkamon.serialization.StringSerializer value-serializer: org.apache.kafkamon.serialization.StringSerializer consumer: group-id: sboot.kfk enable-auto-commit: true fetch-max-wait: 35000 auto-commit-interval: 5000 max-poll-records: 1000 key-deserializer: org.apache.kafkamon.serialization.StringDeserializer value-deserializer: org.apache.kafkamon.serialization.StringDeserializer listener: concurrency: 4 poll-timeout: 10000

结果启动报错

Description: Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. The following candidates were found but could not be injected: - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory Action: Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

说在配置中找不到ConsumerFactory,但是我实实在在的写了这个@Bean了。最后发现源码里面的ConsumerFactory是这样的。

@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()); }

仔细看,有什么不同? ConsumerFactory<?, ?> 而不是ConsumerFactory<String, String>,泛型错误,spring的源码中应该有地方是把这个自定义的bean拿到,但是校验的时候发现泛型对不上,因此报错。以此类推比如要自定义RedisTemplate的时候也注意泛型 RedisTemplate<Object,Object>和RedisTemplate<String,Object>的区别,看看到底要求注入的是哪个。

结论: 注意自定义spring bean的时候看好泛型是否能对的上

本文标签: definingentriesrevisitingconfigurationtype