我被困在这个问题上,我不知道发生了什么。我试图使用Kafka流来写一个主题的日志。在另一端,我有Kafka连接将每个条目输入MySQL。所以,基本上我需要的是一个Kafka流程序,它将主题读取为字符串,并将其解析为Avro格式,然后将其输入不同的主题。
这是我写的代码:
//Define schema
String userSchema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"ID\", \"type\":\"int\" },"
+ " { \"name\":\"COL_NAME_1\", \"type\":\"string\" },"
+ " { \"name\":\"COL_NAME_2\", \"type\":\"string\" }"
+ "]}";
String key = "key1";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
//Settings
System.out.println("Kafka Streams Demonstration");
//Settings
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
// Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
// default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.STATE_DIR_CONFIG ,"/tmp");
// to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
// at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
// see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo
// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(getProperties());
final Serde < String > stringSerde = Serdes.String();
final Serde < Long > longSerde = Serdes.Long();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
// building Kafka Streams Model
KStreamBuilder kStreamBuilder = new KStreamBuilder();
// the source of the streaming analysis is the topic with country messages
KStream<byte[], String> instream =
kStreamBuilder.stream(byteArraySerde, stringSerde, "sqlin");
final KStream<byte[], GenericRecord> outstream = instream.mapValues(new ValueMapper<String, GenericRecord>() {
@Override
public GenericRecord apply(final String record) {
System.out.println(record);
GenericRecord avroRecord = new GenericData.Record(schema);
String[] array = record.split(" ", -1);
for (int i = 0; i < array.length; i = i + 1) {
if (i == 0)
avroRecord.put("ID", Integer.parseInt(array[0]));
if (i == 1)
avroRecord.put("COL_NAME_1", array[1]);
if (i == 2)
avroRecord.put("COL_NAME_2", array[2]);
}
System.out.println(avroRecord);
return avroRecord;
}
});
outstream.to("sqlout");
以下是我得到空指针异常后的输出:
java -cp streams-examples-3.2.1-standalone.jar io.confluent.examples.streams.sql
Kafka Streams Demonstration
Start
Now started CountriesStreams Example
5 this is
{"ID": 5, "COL_NAME_1": "this", "COL_NAME_2": "is"}
Exception in thread "StreamThread-1" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
主题sqlin包含几个由数字和两个单词组成的消息。注意两行打印:函数得到一条消息,并在捕获空指针之前成功解析它。问题是我对Java、Kafka和Avro不熟悉,所以我不确定哪里出错了。我设置Avro模式正确吗?还是使用kstream错误?非常感谢这里的任何帮助。
我认为问题出在下面这一行:
outstream.to("sqlout");
默认情况下,您的应用程序配置为使用String
serde来记录键和记录值:
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
由于extream
具有类型KStream
// sth like
outstream.to(Serdes.ByteArray(), yourGenericAvroSerde, "sqlout");
仅供参考:融合平台的下一个版本(预计到达时间:本月=2017年6月)将附带一个现成的通用特定Avro serde,它与融合模式注册表集成。这应该会让你的生活更轻松。
请看我的答案https://stackoverflow.com/a/44433098/1743580了解更多细节。
我得到一个空指针异常在我的code.Please帮我解决它。这是我的代码。 这是我的原木猫。 第138行为点击法-
问题内容: 如何将类型中的字符串指针的引用值设置为空字符串?考虑以下示例: 我尝试了和运算符的所有组合都无济于事: 显然其中一些很愚蠢,但我没有发现尝试的危害。我也尝试使用和: 这会导致编译错误 恐慌:反映:使用不可寻址的值reflect.Value.SetString 我假设这是因为Go中的字符串是不可变的? 问题答案: 字符串文字不可寻址。 取包含空字符串的变量的地址: 或使用新的:
主要内容:到底使用字符数组还是字符串常量C语言中没有特定的字符串类型,我们通常是将字符串放在一个字符数组中,这在《 C语言字符数组和字符串》中已经进行了详细讲解,这里不妨再来演示一下: 运行结果: https://www.xnip.cn https://www.xnip.cn 字符数组归根结底还是一个数组,上节讲到的关于 指针和数组的规则同样也适用于字符数组。更改上面的代码,使用指针的方式来输出字符串: 运行结果: https://ww
我想确保JTextField中始终有一个正整数。例如,在创建GUI时,JTextField当前有一个默认的“1”,我希望它能够在用户决定点击backspace时自动将文本设置为“0”,而不是变成一个空文本字段。这是因为文本字段还有一个监听器,它调用一个方法来根据这个数字更新GUI的另一部分。 我对DocumentFilter还很陌生,所以我甚至不确定自己的方向是否正确,但以下是我目前掌握的信息:
我使用的是spring boot 1.4, 当使用@SpringBootTest注释进行集成测试时,它会给出一个空指针。 对于主类: 然后在我的控制器中: HelloService 但在处理请求时,它会告诉helloService NullPointException。 我错过了什么?
问题内容: 我想将字符串切片转换为指向字符串的指针切片 %!p(string = a)=>字符串 %!p(string = b)=>字符串 %!p(string = c)=>字符串 [0xc42000e1d0 0xc42000e1d0 0xc42000e1d0] 据我了解, 我的变量似乎是一个字符串,而不是指向字符串的指针。 因此应从迭代时复制。 显然我不正确,因为地址仍然相同。如果值不是指针,该