我正在编写一个 Kafka Streams 应用程序,我想在此应用程序中包含两个应用程序 ID,但我不断收到错误消息,指出“没有输入主题的拓扑将创建没有流线程和全局线程,必须订阅至少一个源主题或全局表。你能告诉我我在哪里犯了错误吗?非常感谢!
public class KafkaStreamsConfigurations {
...
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
@Primary
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
return new KafkaStreamsConfiguration(props);
}
public void setDefaults(Map<String, Object> props) {...}
@Bean("snowplowStreamBuilder")
public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
...
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
Properties properties = new Properties();
props.forEach(properties::put);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(properties);
return streamsBuilderFactoryBean;
}
}
这是我的应用程序类。
public class SnowplowStreamsApp {
@Bean("snowplowStreamsApp")
public KStream<String, String> [] startProcessing(
@Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
.with(Serdes.String(), Serdes.String()))
.mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
.branch(...);
return branches;
}
}
将工厂 Bean 命名为 DEFAULT_STREAMS_BUILDER_BEAN_NAME
而不是 snowplowStreamBuilder
- 否则,默认工厂 Bean 将在没有定义流的情况下启动。
给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne
我有一个python脚本,所以我在python中使用线程模块并发执行。 def run(self):db=MySQLdb。connect('localhost','mytable','user','mytable')游标=db。cursor()query=“dig short”str(反向ip)”按键try:output=子进程。检查\u输出(查询,shell=True)输出\u编辑=输出。条带(
我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?
我正在尝试使用MyBatis和一个雪花数据库。我的情况和这张海报差不多: 配置mybatis以使用现有连接 本质上,我可以获得java.sql.Connection对象,但无法通过DataSource或Oracle等RDBMS数据库通常执行的其他步骤获得该对象。一个建议的解决方案是这样做: 这些将在多线程环境中。如果有人关闭了SnowflakesSession对象,那么是否关闭了底层java.sq
问题内容: 我正在测试本地计算机上的套接字。我正在尝试使用线程在一个程序中同时运行套接字和服务器。我的服务器是回显服务器,因此它可以将收到的所有消息发回。我的问题是,当我同时在客户端和服务器上启动两个线程时,当它们到达我从输入流读取的部分时,它们将“冻结”。它可以很好地工作到客户端发送消息的那部分。然后,它停止了,因为似乎客户端正在等待消息,服务器也是如此,即使我已经通过写入输出流向服务器发送了消
我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于