当前位置: 首页 > 知识库问答 >
问题:

在Kafkastate Stores中,什么被认为是当前和最新的状态?

牟嘉
2023-03-14

此存储区将保存在“Word-Count-Input”主题中找到的任何单词的最新计数。

假设消息成功发送到主题测试

  1. 这是否意味着从T构建的表上的所有交互查询都能立即看到更改?(生成为builder.table(“t”,...))?
  2. 这是否意味着从T聚合的表上的所有交互查询都能立即看到更改?(生成为Builder.Stream(“T”).GroupByKey().Aggregate(...))?
    null

添加:

这里我的主要问题是什么时候可以通过交互式查询或联接请求数据。有没有可能得到过时的数据/有没有滞后的可能?

共有1个答案

方承弼
2023-03-14

经过一番调试为第三部分找到了答案:

这里有一个例子:

测试类:

package myapps;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.Test;

import java.util.Properties;

public class TopologyTest {

    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    @Test
    public void testStreams() {

        Topology topology = createTopology();

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {

            ConsumerRecordFactory<String, Long> factory = new ConsumerRecordFactory<>(
                    INPUT_TOPIC, new StringSerializer(), new LongSerializer());

            testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 1L));
            testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 2L));
            testDriver.pipeInput(factory.create(INPUT_TOPIC, "key", 3L));

            ProducerRecord<String, String> pr1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
            ProducerRecord<String, String> pr2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());
            ProducerRecord<String, String> pr3 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());

            Assert.assertEquals("1,1", pr1.value());
            Assert.assertEquals("2,3", pr2.value());
            Assert.assertEquals("3,6", pr3.value());
        }
    }

    private Topology createTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Long> inputStream = builder.stream(INPUT_TOPIC);

        KTable<String, Long> table = inputStream.groupByKey().aggregate(
                () -> 0L,
                (key, value, aggregate) -> value + aggregate,
                Materialized.as("store")
        );

        KStream<String, String> joined = inputStream
                .join(table, (value, aggregate) -> value + "," + aggregate);

        joined.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

        return builder.build();
    }

}
<dependencies>
    <!-- Apache Kafka dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.3.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.3.0</version>
        <scope>test</scope>
    </dependency>

</dependencies>
 类似资料:
  • 问题内容: 有人知道Java 7是否会使用闭包吗? 问题答案: 在Devoxx 2008上,Mark Reinhold明确指出Java 7 中将不 包括闭包。 等待!闭包 将 包含在Java 7中。Mark Reinhold 在Devoxx 2009上宣布了这一逆转。 系住那个!闭包( lambda表达式 )已推迟到Java 8为止。有关更多信息,请遵循Project Lambda(JSR 335

  • 问题内容: 在过去的一年中,我听说有关Scala语言的炒作越来越多。我知道有几个现有项目计划将Scala支持与IDE集成在一起。但是,并不总是很清楚集成到底有多好。 他们当前是否支持Intellisense以及Eclipse和Netbeans是否支持Java语言?他们还支持即时验证吗? 问题答案: 我不能亲自谈谈IntelliJ或NetBeans插件的稳定性(尽管我听过很多好消息),但是用于Ecl

  • 我有一个问题,关于什么是正确的做法,使用SwingU实用程序的调用稍后方法。 所以首先,我想确认我理解正确。 据我所知,对GUI的更改必须在EDT上完成,因为Swing组件不是线程安全的。invokeLater方法将Runnable作为参数,该Runnable中包含的任何内容都将在EDT上运行。因此,对Swing组件的任何调用都被放入一种队列中,在EDT上一次执行一个。 有了这些,我的问题是:使用

  • 问题内容: 我问了一个一般性的Spring问题:自动播发Spring Bean,并让多个人回答说应尽可能避免调用Spring 。这是为什么? 我还应该如何访问配置了Spring创建的Bean? 我在非Web应用程序中使用Spring,并计划按照LiorH的描述访问共享对象。 修正案 我接受下面的答案,但这是Martin Fowler的另一种选择,他讨论了依赖注入与使用(本质上与调用相同)的优点。

  • 根据我的理解,如果子例程不作用于类的实例(其作用仅限于显式输入/输出),则它是;如果子例程作用于类的实例,则它是(它可能会对实例产生副作用,使其不纯)。 关于这个话题,这里有一个很好的讨论。请注意,根据接受答案的定义,静态实际上应该是一个函数,因为实例从不隐式传递,而且它不能访问任何实例的成员。 不过,考虑到这一点,静态实际上不应该是函数吗? 我想确保我使用了正确的措辞。 有人能澄清一下吗?

  • 我有一个父组件,它有许多子组件(子组件是一个带有彩色背景的空div)。 我想改变孩子的颜色,但要确保我正在使它变成一个不同的颜色比它已经是-例如,如果孩子是蓝色的,我想改变它到另一个颜色,不是蓝色。 为此,我使用一个道具来设置初始颜色,并将此道具保存到孩子的状态。 当我点击div更改颜色时,我想访问div的当前颜色,以便我可以将其从选择中排除,当然可以从中进行选择 1)这被认为是可以做的反应吗?2