我是Kafka和spark的初学者。我想通过spark streaming对我从Kafka收到的特定主题的数据进行实时处理。我无法使用createStream函数返回的JavaPairReceiverInputStream。
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"testwordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(1));
Map<String, Integer> topics_map = new HashMap<String, Integer>();
topics_map.put("Customtopic", 10);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
下面的代码给出了一个错误:
JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
方法图(Function, R
我使用的spark版本是1.2.0。我找不到处理Kafka消息的java api示例。有人能告诉我我需要改变什么吗?
kafkaStream返回一个元组。检查这个
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
JavaDStream<String> lines = kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
您调用了错误的方法。在java中,如果您想获得一对,应该调用MapToPair。请尝试以下代码:
JavaPairDStream<String, Integer> pairs = kafkaStream
.mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
@Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception {
return new Tuple2<String, Integer>(word._2(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
pairs.print();
jssc.start();
jssc.awaitTermination();
我试图创建Kafka直接流与提供偏移外部在我的火花流模块,但它导致以下编译错误。 下面是创建Kafka直接流的代码 下面是我遇到的编译错误。有什么想法/指针吗?
问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp
我有一个Kafka Streams应用程序版本-0.11,它从很少的主题中获取数据,并将数据连接到另一个主题中。 我在一些jira问题上读到过,清理流可能有助于修复问题。但是每次启动Kafka流应用程序时清理流是正确的解决方案还是补丁?此外,流清理会延迟应用程序的启动,对吗? 注意:每次启动Kafka Streams应用程序时,在调用Streams.start()之前是否需要调用Streams.c
谢谢!
使用的build.sbt文件如下: Scala中以下2行给出了以下异常 线程“main”java.lang.noClassDeffounder异常错误:org/apache/kafka/streams/streamsbuilder at tradesapp$.main(tradesapp.scala:21)at tradesapp.main(tradesapp.scala)at java.base
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka