我根据https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为kafka流应用程序编写了一个测试类,代码如下
import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class KafkaStreamsConfigTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;
private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();
private String key="test";
private Object value = "some value";
private Object expected_value = "real value";
String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";
String applicationId = "my-app";
String test_dummy = "dummy:1234";
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);
topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);
topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
, ProcessRouter.class.getSimpleName());
topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
, WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
, CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
, WorkforceLocationUpdate.class.getSimpleName());
topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
, WorkforceVisit.class.getSimpleName()
, CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
, WorkforceLocationUpdate.class.getSimpleName());
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());
testDriver = new TopologyTestDriver(topology, properties);
//setup test topics
inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void outputEqualsTrue()
{
inputTopic.pipeInput(key, value);
Object b = outputTopic.readValue();
System.out.println(b.toString());
assertEquals(b,expected_value);
}
其中我使用EventSerde
类序列化和反序列化值。
当我运行这段代码时,它会给出错误java.util.NoSuchelementException:Uninitialized topic:processed_events
,其StackTrace如下:
java.util.NoSuchElementException: Uninitialized topic: processed_events
at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
如您所见,我已经初始化了输入和输出主题。我还调试了代码,当我从output topic中读取值时,就会发生错误
outputTopic.readValue();
我不明白还应该做什么来初始化OutputTopic。有人能帮我解决这个问题吗?
我使用的是apache kafka-streams-test-utils 2.4.0和kafka-streams 2.4.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
为了避免/克服此异常,在尝试读取输出主题之前,您需要检查该主题是否为空。
@Test
public void outputEqualsTrue()
{
inputTopic.pipeInput(key, value);
assert(outputTopic.isEmpty(), false);
Object b = outputTopic.readValue();
System.out.println(b.toString());
assertEquals(b,expected_value);
}
只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)
我有一个使用kotlin开发的spring-boot应用程序--总体来说一切都很顺利。(spring 1.5.6.发行版,kotlin 1.1.4-3) 我尝试过的一些addl方法:-确保我没有使用不推荐的TestRestTemplate-尝试使用setter注入而不是field注入,但这是浪费时间。-禁用kotlin编译器插件
我的Spring云流应用程序中有一个简单的Kafka生成器。当我的Spring应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送给Kafka生产者。 问题是,当对账开始将enet发送到其中时,我的Kafka制作人还没有准备好,导致以下情况: org . spring framework . messaging . messagedeliveryexceptio
以下是我的配置 java版本“1.8.0_101”java(TM)SE运行时环境(构建1.8.0_101-b13)java热点(TM)64位服务器虚拟机(构建25.101-b13,混合模式) 使用Apache http客户端v4.4调用Restful服务,服务URL具有有效的证书(SHA2) 我们使用apache http客户端调用服务。下面是代码 服务调用是间歇性失败的,有一个批处理过程在一个循
我正在尝试运行一个Spring项目。pom.xml: 结果是这样的: 2017-08-17 01:11:01.405信息9156---[restartedMain]org.ocp.TestruleemallApplication:在桌面上启动TestruleemallApplication-PL25CTR,PID 9156(C:\Users\Ilias\DesktoP\TestRuleEmall\
本文向大家介绍tensorflow 初始化未初始化的变量实例,包括了tensorflow 初始化未初始化的变量实例的使用技巧和注意事项,需要的朋友参考一下 今日在Stack Overflow上看到一个问如何只初始化未初始化的变量,有人提供了一个函数,特地粘贴过来共大家品鉴: 通过tf.global_variables()返回一个全局变量的列表global_vars, 然后以python列表解析式的