我有一个使用Kafka流的Spring Boot应用程序。我有一个包含一些金融货币报价的KTable,它是这样创建的:
@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.table(quoteTopicName,
Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(Quote.class)));
}
我@autowire这个bean在另一个组件中,并用以下代码测试它:
@Autowired
private KTable<String, Quote> indicativeQuotes;
@PostConstruct
private void postConstruct() {
doPrint();
}
public void doPrint() {
ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
indicativeQuotes.foreach((k,v) -> log.info(k));}
当通过store查询时,代码记录了正确的值,但它在foreach()中没有输出任何内容,就好像like表是空的一样。我还尝试了print()和其他选项--所有选项都不输出任何内容,没有任何例外。
我的用例是,我有一个预定的Quartz作业,它应该在触发时将KTable的当前状态写入Kafka主题,如下所示:
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
log.info("Job was triggered by: {}", triggerKey.getName());
indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
.mapValues(quoteToCourseFixedMapper)
.toStream()
.peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
.to(quoteEventTopicName);
}
但我认为这段代码不起作用,因为它不是处理拓扑的一部分,而且我不能只是按需从Ktable中获取数据。我在这里有点困惑,当然我可以在事件触发时通过store查询数据,但是也许对于这样的用例有更好的模式?基本上,我感兴趣的是,是否有可能合并这个触发的作业事件作为处理管道的一部分。
如果只想将更新发布到另一个主题,请将KTable转换为KStream,并使用to()函数。
KTable ktable = ...;
KStream ksteram = ktable.toStream();
kstream.to("topic", Produces.with(keySerde, valueSerde))
主题将包含该表的更改日志。
显然,由于一些生命周期相关的概念,您不能仅仅注入(@autowire
)kstream/ktable。您应该尽可能保持与KafkaStreams相关的代码内联。
问题内容: 我为一个朋友做了一些网络工作,帮助了他。他需要的一部分是在他的站点上更改几段文字的简单方法。与其让他编辑HTML,我决定提供一个带有消息的XML文件,然后我使用jQuery将它们从文件中拉出并插入到页面中。 它的效果非常好…在Firefox和Chrome中,在IE7中效果不佳。我希望你们中的一个能告诉我原因。我做了一个公平的但谷歌搜索,但是找不到我想要的东西。 这是XML: 这是我的j
我试图创建一个代码,检查一个单元格是否选中(或未选中)复选框,然后将该行相应单元格的数据复制到另一个“摘要”表上。下面没有包含“如果选中,则复制数据”部分。 我遇到的问题是getValues和getBackgound似乎都没有返回数据。 当我运行代码时,记录器显示每个复选框的值都是“false”(即使有些复选框被选中,并且我使用电子表格设置中的数据验证将它们的值设置为1(选中)和0(未选中)。 如
我已经为Postgresql启用了复制,并且正在使用PGPool进行负载平衡。 我在使用HikariCP甚至Apache DBCP连接到Postgres时遇到了问题。 在SpringBoot应用程序中有没有使用PGPool的方法? 请查找堆栈跟踪: 2018-08-10 10:20:19.124信息37879----[main]com.zaxxer.hikari.hikaridatasource:
我试图在SpringMVC中运行SpringBoot应用程序,在SpringMVCPOM中添加SpringBoot应用程序依赖项,并扫描SpringBoot包,但我面临以下问题
app.py reg_account.html 错误: 我想让的结果在中的复选框未选中时返回False,但我不明白为什么当我勾选了该复选框时,结果可以存储到数据库中?我试着调试了几次,但还是找不到一个可能的解决方案,所以有没有人可以帮忙?
这就是我正在做的。 部署此应用程序在 jboss 中失败,因为它在 tomcat 中部署,并且像 charm 一样工作。 我使用来自eclipse的动态web应用程序作为项目源。 将项目导出到war文件并在jboss服务器中部署也不起作用。 web.xml: Spring.xml: 控制器: 和 JBOSS 中的错误: