我正在研究我的第一个Kafka流样本:
package com.example;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
class DslExample {
public static void main(String[] args) {
// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();
// read from the source topic, "users"
KStream<Void, String> stream = builder.stream("users");
// for each record that appears in the source topic,
// print the value
stream.foreach(
(key, value) -> {
System.out.println("(DSL) Hello, " + value);
});
// you can also print using the `print` operator
// stream.print(Printed.<String, String>toSysOut().withLabel("source"));
// set the required properties for running Kafka Streams
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.24:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// build the topology and start streaming
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
当我试图运行它时,会出现以下错误:
Caused by: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer.
这是来自“用户”主题的示例消息:
价值:
{
"registertime": 1517518703752,
"userid": "User_8",
"regionid": "Region_7",
"gender": "OTHER"
}
标题:
[
{
"key": "task.generation",
"stringValue": "0"
},
{
"key": "task.id",
"stringValue": "0"
},
{
"key": "current.iteration",
"stringValue": "86144"
}
]
关键:
User_8
我应该怎么做才能避免这个问题?
如果密钥实际上包含数据,则不应使用Serdes。Void()
或KStream
我试图用我的gradle项目运行junit测试,但不管是什么测试,它们都会抛出以下堆栈跟踪 我已经看到了一些建议的解决方案,但似乎没有一个奏效。当我在ant中运行它们时,我的测试运行良好。我已经确保xerces使用的是最新版本,我还添加了建议的版本 对于JVM,但似乎什么都不起作用。 任何建议都将不胜感激。我可以根据要求提供更多信息。 **更新** 做了以下更改- 但现在的结果如图所示 **更新2
阅读了一下Java8,我读到了这篇博文,解释了一些关于流和它们的减少,以及什么时候可以短路减少。在底部,它说: 请注意,在个或我们只需要与谓词匹配的第一个值(尽管值不能保证返回第一个值)。但是,如果流没有排序,那么我们希望的行为类似于。所有、和 操作可能根本不会使流短路,因为可能需要计算所有值来确定运算符是还是。因此,使用这些的无限流可能不会终止。 我知道或可能会使还原短路,因为一旦您找到一个元素
问题内容: 我有这个Java代码段。我是Java的菜鸟。 错误: 码: 问题答案: 是的,这是问题所在: 在课程的最高级别,您只能拥有: 实例初始化程序块() 静态初始值设定块() 变量声明 构造函数声明 方法声明 嵌套类型声明 终结器声明 这些都不是。如果你 的意思 来声明一个变量,你应该这样做: 如果这 不是 您想要的,则应解释您的意图。 编辑:修复此问题后,此编译器错误似乎很明显: Conf
我已经红色了多篇文章和讨论,但我仍然有一些不确定性:我不确定是否应该使用或任何其他类型来存储预订-在“在线预订”的意义上(因此来自不同国家/时区的参与者需要在时间线上的同一时刻会面)。我倾向于使用,因为DB和Backend设置为UTC,并且由于传入的“创建预订”json消息包含ISO 8601(带偏移量)start DateTime 让我们采用以下设置:1.数据库(UTC、Oracle、MSSQL
我的问题是关于cucumber特性文件的并行执行。在Selenium Java中,可以通过一个runner类并行运行多个cucumber特性文件吗? 我尝试过不同的方法,但都没有成功。
我如何解决这个问题?我正在从Excel文件中读取数据,输出为字符串。 DataProvider的方法“getData”需要Object[][]返回类型。 错误:(17,16)Java:不兼容类型:java.lang.String无法转换为java.lang.Object[]