此存储区将保存在“Word-Count-Input”主题中找到的任何单词的最新计数。
假设消息成功发送到主题测试
添加:
这里我的主要问题是什么时候可以通过交互式查询或联接请求数据。有没有可能得到过时的数据/有没有滞后的可能?
经过一番调试为第三部分找到了答案:
这里有一个例子:
测试类:
package myapps;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class TopologyTest {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
@Test
public void testStreams() {
Topology topology = createTopology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
ConsumerRecordFactory<String, Long> factory = new ConsumerRecordFactory<>(
INPUT_TOPIC, new StringSerializer(), new LongSerializer());
testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 1L));
testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 2L));
testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 3L));
ProducerRecord<String, String> pr1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
ProducerRecord<String, String> pr2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
ProducerRecord<String, String> pr3 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
Assert.assertEquals("1,1", pr1.value());
Assert.assertEquals("2,3", pr2.value());
Assert.assertEquals("3,6", pr3.value());
}
}
private Topology createTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> inputStream = builder.stream(INPUT_TOPIC);
KTable<String, Long> table = inputStream.groupByKey().aggregate(
() -> 0L,
(key, value, aggregate) -> value + aggregate,
Materialized.as("store")
);
KStream<String, String> joined = inputStream
.join(table, (value, aggregate) -> value + "," + aggregate);
joined.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
}
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
问题内容: 有人知道Java 7是否会使用闭包吗? 问题答案: 在Devoxx 2008上,Mark Reinhold明确指出Java 7 中将不 包括闭包。 等待!闭包 将 包含在Java 7中。Mark Reinhold 在Devoxx 2009上宣布了这一逆转。 系住那个!闭包( lambda表达式 )已推迟到Java 8为止。有关更多信息,请遵循Project Lambda(JSR 335
问题内容: 在过去的一年中,我听说有关Scala语言的炒作越来越多。我知道有几个现有项目计划将Scala支持与IDE集成在一起。但是,并不总是很清楚集成到底有多好。 他们当前是否支持Intellisense以及Eclipse和Netbeans是否支持Java语言?他们还支持即时验证吗? 问题答案: 我不能亲自谈谈IntelliJ或NetBeans插件的稳定性(尽管我听过很多好消息),但是用于Ecl
我有一个问题,关于什么是正确的做法,使用SwingU实用程序的调用稍后方法。 所以首先,我想确认我理解正确。 据我所知,对GUI的更改必须在EDT上完成,因为Swing组件不是线程安全的。invokeLater方法将Runnable作为参数,该Runnable中包含的任何内容都将在EDT上运行。因此,对Swing组件的任何调用都被放入一种队列中,在EDT上一次执行一个。 有了这些,我的问题是:使用
问题内容: 我问了一个一般性的Spring问题:自动播发Spring Bean,并让多个人回答说应尽可能避免调用Spring 。这是为什么? 我还应该如何访问配置了Spring创建的Bean? 我在非Web应用程序中使用Spring,并计划按照LiorH的描述访问共享对象。 修正案 我接受下面的答案,但这是Martin Fowler的另一种选择,他讨论了依赖注入与使用(本质上与调用相同)的优点。
根据我的理解,如果子例程不作用于类的实例(其作用仅限于显式输入/输出),则它是;如果子例程作用于类的实例,则它是(它可能会对实例产生副作用,使其不纯)。 关于这个话题,这里有一个很好的讨论。请注意,根据接受答案的定义,静态实际上应该是一个函数,因为实例从不隐式传递,而且它不能访问任何实例的成员。 不过,考虑到这一点,静态实际上不应该是函数吗? 我想确保我使用了正确的措辞。 有人能澄清一下吗?
我有一个父组件,它有许多子组件(子组件是一个带有彩色背景的空div)。 我想改变孩子的颜色,但要确保我正在使它变成一个不同的颜色比它已经是-例如,如果孩子是蓝色的,我想改变它到另一个颜色,不是蓝色。 为此,我使用一个道具来设置初始颜色,并将此道具保存到孩子的状态。 当我点击div更改颜色时,我想访问div的当前颜色,以便我可以将其从选择中排除,当然可以从中进行选择 1)这被认为是可以做的反应吗?2