当前位置: 首页 > 知识库问答 >
问题:

kafka引发流式java api问题

澹台举
2023-03-14

我是Kafka和spark的初学者。我想通过spark streaming对我从Kafka收到的特定主题的数据进行实时处理。我无法使用createStream函数返回的JavaPairReceiverInputStream。

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
                "testwordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf,
                Durations.seconds(1));

        Map<String, Integer> topics_map = new HashMap<String, Integer>();

        topics_map.put("Customtopic", 10);

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
                .createStream(jssc, "localhost:2181", "kafkasparkconsumer",
                        topics_map);

下面的代码给出了一个错误:

 JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
                 new PairFunction<String, String, Integer>() {
                 @Override public Tuple2<String, Integer> call(String s) {
                 return new Tuple2<String, Integer>(s, 1);
                 }
                 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                 @Override
                 public Integer call(Integer i1, Integer i2) {
                 return i1 + i2;
                 }
                 });
                 wordCounts.print();

方法图(Function, R

我使用的spark版本是1.2.0。我找不到处理Kafka消息的java api示例。有人能告诉我我需要改变什么吗?

共有2个答案

南宫凯康
2023-03-14

kafkaStream返回一个元组。检查这个

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
                .createStream(jssc, "localhost:2181", "kafkasparkconsumer",
                        topics_map);
JavaDStream<String> lines = kafkaStream
                .map(new Function<Tuple2<String, String>, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });
花阳秋
2023-03-14

您调用了错误的方法。在java中,如果您想获得一对,应该调用MapToPair。请尝试以下代码:

JavaPairDStream<String, Integer> pairs = kafkaStream
            .mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
                @Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception {
                    return new Tuple2<String, Integer>(word._2(), 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });

    pairs.print();

    jssc.start();
    jssc.awaitTermination();
 类似资料:
  • 我试图创建Kafka直接流与提供偏移外部在我的火花流模块,但它导致以下编译错误。 下面是创建Kafka直接流的代码 下面是我遇到的编译错误。有什么想法/指针吗?

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我有一个Kafka Streams应用程序版本-0.11,它从很少的主题中获取数据,并将数据连接到另一个主题中。 我在一些jira问题上读到过,清理流可能有助于修复问题。但是每次启动Kafka流应用程序时清理流是正确的解决方案还是补丁?此外,流清理会延迟应用程序的启动,对吗? 注意:每次启动Kafka Streams应用程序时,在调用Streams.start()之前是否需要调用Streams.c

  • 使用的build.sbt文件如下: Scala中以下2行给出了以下异常 线程“main”java.lang.noClassDeffounder异常错误:org/apache/kafka/streams/streamsbuilder at tradesapp$.main(tradesapp.scala:21)at tradesapp.main(tradesapp.scala)at java.base

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka