当前位置: 首页 > 知识库问答 >
问题:

火花流未阅读所有Kafka记录

商池暝
2023-03-14

我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。

密码

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaToSparkData {
 public static void main(String[] args) throws InterruptedException {
    int timeDuration = 100;
    int consumerNumberOfThreads = 1;
    String consumerTopic = "InputDataTopic";
    String zookeeperUrl = "localhost:2181";
    String consumerTopicGroup =  "testgroup";
    String producerKafkaUrl = "localhost:9092";
    String producerTopic =  "OutputDataTopic";
    String sparkMasterUrl = "local[2]";

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(consumerTopic, consumerNumberOfThreads);

    SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));

    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);

    JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
        private static final long serialVersionUID = 1L;

        public String call(Tuple2<String, String> line) throws Exception {

            String responseToKafka = "";
            System.out.println(" Data IS " + line);

            String ValueData = line._2;
            responseToKafka = ValueData + "|" + "0";

            Properties configProperties = new Properties();
            configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
            configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
            configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);

            ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
            producer.send(topicMessage);
            producer.close();

            return responseToKafka;
        }
    });

    System.out.println(" Printing Record" );
    NewRecord.print();

    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();
    javaStreamingContext.close();

    }
}

bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

kafka-console-consumer.sh--引导服务器localhost:9092--主题输出数据主题--从开始#1|0 2|0 3|0 4|0 5|0 6|0 7|0 8|0 9|0 10|0 11|0

有人能帮我吗?

共有1个答案

沈琨
2023-03-14

我们在这里看到的是Spark中懒惰操作的效果。这里,我们使用一个< code>map操作来产生一个副作用,即向Kafka发送一些数据。

然后使用print将流具体化。默认情况下,print将显示流的前10个元素,但需要n1元素,以便显示“…”以指示何时有更多元素。

这个< code>take(11)强制具体化前11个元素,因此它们是从原始流中提取的,并用< code>map函数进行处理。这导致了Kafka的部分出版。

如何解决这个问题?嗯,提示已经在上面:不要在map函数中使用副作用。在这种情况下,消费流并将其发送到Kafka的正确输出操作应该是ForeachRDD

此外,为了避免为每个元素创建 Kafka 生产者实例,我们使用 foreachPartition 处理内部 RDD

该流程的代码框架如下所示:

messages.foreachRDD{rdd => 
  rdd.foreachPartition{partitionIter => 
       producer = // create producer
       partitionIter.foreach{elem =>
           record = createRecord(elem)
           producer.send(record)
       }
       producer.flush()  
       producer.close()
   }
}    

 类似资料:
  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除

  • 为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?

  • 我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可