当前位置: 首页 > 知识库问答 >
问题:

Kafka流编程配置不工作

蒋无尘
2023-03-14

我正在做一个Spring Boot应用程序,并试图以编程方式配置kafka,但由于某些原因,我仍然在从应用程序获取属性。yaml而不是我通过编程设置的

@Configuration
public class KafkaConfiguration {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(30000);

        return factory;
    }

    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

@Component
public class StreamListener {
    @StreamListener(TestStreams.TEST_STREAM_IN)
    public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
        log.debug("Received generic event {} with headers {}", response, headers);
    }
}

public interface TestStreams {
    String TEST_STREAM_IN = "test-stream-in";

    @Input(TEST_STREAM_IN)
    SubscribableChannel inputTestStream();
}

@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication .class, args);
    }
}

共有1个答案

端木志诚
2023-03-14

活页夹不使用KafkaListenerContainerFactory,而是从yaml创建容器本身。

您可以通过添加ListenerContainerCustomizerbean来修改容器。

这里的示例当使用Spring Cloud Stream Kafka 3.0.3时,我可以应用优雅的关闭吗?释放?

 类似资料:
  • 那是我学习Kafka的初期。我正在检查我本地机器中的每一个Kafka属性/概念。 所以我遇到了属性,下面是我的理解。如果我误解了什么,请纠正我。 将消息发送到主题后,必须将消息写入至少关注者数。 还包括引导。 如果可用活动代理的数量(间接地,在同步副本中)少于指定的,则生产者将引发发布消息失败的异常。 以下是我创建上述场景所遵循的步骤 在本地启动了3个代理,代理ID为0、1和2 创建了主题insy

  • 来自jvisualvm的快照

  • 将是什么 线程不足,无法执行工作流。如果此消息始终显示,请选择WorkerOptions。应减小maxConcurrentWorklfowExecutionSize或WorkerOptions。maxWorkflowThreads增加。 处于阻塞状态的工作流在内存中保持活动状态??处于等待状态的工作流是否持续检查条件??更多的 -

  • 我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂

  • 我为3节点Kafka集群设置了ACL,并能够通过生产者控制台和消费者控制台发送和接收主题。现在我想用ACL配置Kafka连接。我尝试了SASL_PLAINTEXT组合和连接。日志文件,它显示以下错误。它没有从源表同步到主题,请在我缺少任何配置的地方提供帮助。 错误日志 我的配置如下文件所示。我在jaas中提到过用户。conf文件并设置到环境中。 1: zookeeper.properties 2:

  • 主要内容:发布订阅消息传递的工作流,队列消息/消费者组的工作流,ZooKeeper的角色截至目前,我们已经了解了Kafka的核心概念。 现在让我们来看看Kafka的工作流程。 Kafka只是分成一个或多个分区的主题集合。 Kafka分区是消息的线性排序序列,每个消息由其索引标识(称为偏移量)。 Kafka集群中的所有数据都是不相关的分区联合。 传入消息写在分区的末尾,消费者依次读取消息。 通过将消息复制到不同的经纪人来提供持久性。 Kafka以快速,可靠,持久的容错和零停机方式提供基