当前位置: 首页 > 知识库问答 >
问题:

KTable在Spring Boot应用程序中不返回数据,但可以查询

云欣嘉
2023-03-14

我有一个使用Kafka流的Spring Boot应用程序。我有一个包含一些金融货币报价的KTable,它是这样创建的:

@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(quoteTopicName,
            Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(Quote.class)));
}

我@autowire这个bean在另一个组件中,并用以下代码测试它:

@Autowired
private KTable<String, Quote> indicativeQuotes;

@PostConstruct
private void postConstruct() {
    doPrint();
}

public void doPrint() {
        ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
        store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
        indicativeQuotes.foreach((k,v) -> log.info(k));}

当通过store查询时,代码记录了正确的值,但它在foreach()中没有输出任何内容,就好像like表是空的一样。我还尝试了print()和其他选项--所有选项都不输出任何内容,没有任何例外。

我的用例是,我有一个预定的Quartz作业,它应该在触发时将KTable的当前状态写入Kafka主题,如下所示:

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
    log.info("Job was triggered by: {}", triggerKey.getName());

    indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
            .mapValues(quoteToCourseFixedMapper)
            .toStream()
            .peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
            .to(quoteEventTopicName);
}

但我认为这段代码不起作用,因为它不是处理拓扑的一部分,而且我不能只是按需从Ktable中获取数据。我在这里有点困惑,当然我可以在事件触发时通过store查询数据,但是也许对于这样的用例有更好的模式?基本上,我感兴趣的是,是否有可能合并这个触发的作业事件作为处理管道的一部分。

共有1个答案

越健
2023-03-14

如果只想将更新发布到另一个主题,请将KTable转换为KStream,并使用to()函数。

KTable ktable = ...;
KStream ksteram = ktable.toStream();
kstream.to("topic", Produces.with(keySerde, valueSerde))

主题将包含该表的更改日志。

显然,由于一些生命周期相关的概念,您不能仅仅注入(@autowire)kstream/ktable。您应该尽可能保持与KafkaStreams相关的代码内联。

 类似资料:
  • 问题内容: 我为一个朋友做了一些网络工作,帮助了他。他需要的一部分是在他的站点上更改几段文字的简单方法。与其让他编辑HTML,我决定提供一个带有消息的XML文件,然后我使用jQuery将它们从文件中拉出并插入到页面中。 它的效果非常好…在Firefox和Chrome中,在IE7中效果不佳。我希望你们中的一个能告诉我原因。我做了一个公平的但谷歌搜索,但是找不到我想要的东西。 这是XML: 这是我的j

  • 我已经为Postgresql启用了复制,并且正在使用PGPool进行负载平衡。 我在使用HikariCP甚至Apache DBCP连接到Postgres时遇到了问题。 在SpringBoot应用程序中有没有使用PGPool的方法? 请查找堆栈跟踪: 2018-08-10 10:20:19.124信息37879----[main]com.zaxxer.hikari.hikaridatasource:

  • 我试图创建一个代码,检查一个单元格是否选中(或未选中)复选框,然后将该行相应单元格的数据复制到另一个“摘要”表上。下面没有包含“如果选中,则复制数据”部分。 我遇到的问题是getValues和getBackgound似乎都没有返回数据。 当我运行代码时,记录器显示每个复选框的值都是“false”(即使有些复选框被选中,并且我使用电子表格设置中的数据验证将它们的值设置为1(选中)和0(未选中)。 如

  • 我试图在SpringMVC中运行SpringBoot应用程序,在SpringMVCPOM中添加SpringBoot应用程序依赖项,并扫描SpringBoot包,但我面临以下问题

  • app.py reg_account.html 错误: 我想让的结果在中的复选框未选中时返回False,但我不明白为什么当我勾选了该复选框时,结果可以存储到数据库中?我试着调试了几次,但还是找不到一个可能的解决方案,所以有没有人可以帮忙?

  • 这就是我正在做的。 部署此应用程序在 jboss 中失败,因为它在 tomcat 中部署,并且像 charm 一样工作。 我使用来自eclipse的动态web应用程序作为项目源。 将项目导出到war文件并在jboss服务器中部署也不起作用。 web.xml: Spring.xml: 控制器: 和 JBOSS 中的错误: