当前位置: 首页 > 知识库问答 >
问题:

没有输入主题的拓扑将不会创建流线程和全局线程

焦宁
2023-03-14

我正在编写一个 Kafka Streams 应用程序,我想在此应用程序中包含两个应用程序 ID,但我不断收到错误消息,指出“没有输入主题的拓扑将创建没有流线程和全局线程,必须订阅至少一个源主题或全局表。你能告诉我我在哪里犯了错误吗?非常感谢!

public class KafkaStreamsConfigurations {
    ...
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    @Primary
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        setDefaults(props);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
        return new KafkaStreamsConfiguration(props);
    }

    public void setDefaults(Map<String, Object> props) {...}

    @Bean("snowplowStreamBuilder")
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        Map<String, Object> props = new HashMap<>();
        setDefaults(props);
        ...
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);

        Properties properties = new Properties();
        props.forEach(properties::put);
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
        streamsBuilderFactoryBean.setStreamsConfiguration(properties);
        return streamsBuilderFactoryBean;
    }
}

这是我的应用程序类。

public class SnowplowStreamsApp {
    @Bean("snowplowStreamsApp")
    public KStream<String, String> [] startProcessing(
        @Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
                KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
            .with(Serdes.String(), Serdes.String()))
            .mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
            .branch(...);
        return branches;
    }
}

共有1个答案

松建本
2023-03-14

将工厂 Bean 命名为 DEFAULT_STREAMS_BUILDER_BEAN_NAME 而不是 snowplowStreamBuilder - 否则,默认工厂 Bean 将在没有定义流的情况下启动。

 类似资料:
  • 给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne

  • 我有一个python脚本,所以我在python中使用线程模块并发执行。 def run(self):db=MySQLdb。connect('localhost','mytable','user','mytable')游标=db。cursor()query=“dig short”str(反向ip)”按键try:output=子进程。检查\u输出(查询,shell=True)输出\u编辑=输出。条带(

  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 我正在尝试使用MyBatis和一个雪花数据库。我的情况和这张海报差不多: 配置mybatis以使用现有连接 本质上,我可以获得java.sql.Connection对象,但无法通过DataSource或Oracle等RDBMS数据库通常执行的其他步骤获得该对象。一个建议的解决方案是这样做: 这些将在多线程环境中。如果有人关闭了SnowflakesSession对象,那么是否关闭了底层java.sq

  • 问题内容: 我正在测试本地计算机上的套接字。我正在尝试使用线程在一个程序中同时运行套接字和服务器。我的服务器是回显服务器,因此它可以将收到的所有消息发回。我的问题是,当我同时在客户端和服务器上启动两个线程时,当它们到达我从输入流读取的部分时,它们将“冻结”。它可以很好地工作到客户端发送消息的那部分。然后,它停止了,因为似乎客户端正在等待消息,服务器也是如此,即使我已经通过写入输出流向服务器发送了消

  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于