我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。
密码
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaToSparkData {
public static void main(String[] args) throws InterruptedException {
int timeDuration = 100;
int consumerNumberOfThreads = 1;
String consumerTopic = "InputDataTopic";
String zookeeperUrl = "localhost:2181";
String consumerTopicGroup = "testgroup";
String producerKafkaUrl = "localhost:9092";
String producerTopic = "OutputDataTopic";
String sparkMasterUrl = "local[2]";
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(consumerTopic, consumerNumberOfThreads);
SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);
JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> line) throws Exception {
String responseToKafka = "";
System.out.println(" Data IS " + line);
String ValueData = line._2;
responseToKafka = ValueData + "|" + "0";
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
producer.send(topicMessage);
producer.close();
return responseToKafka;
}
});
System.out.println(" Printing Record" );
NewRecord.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
javaStreamingContext.close();
}
}
bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
kafka-console-consumer.sh--引导服务器localhost:9092--主题输出数据主题--从开始#1|0 2|0 3|0 4|0 5|0 6|0 7|0 8|0 9|0 10|0 11|0
有人能帮我吗?
我们在这里看到的是Spark中懒惰操作的效果。这里,我们使用一个< code>map操作来产生一个副作用,即向Kafka发送一些数据。
然后使用print
将流具体化。默认情况下,print
将显示流的前10个元素,但需要n1
元素,以便显示“…”以指示何时有更多元素。
这个< code>take(11)强制具体化前11个元素,因此它们是从原始流中提取的,并用< code>map函数进行处理。这导致了Kafka的部分出版。
如何解决这个问题?嗯,提示已经在上面:不要在map
函数中使用副作用。在这种情况下,消费流并将其发送到Kafka的正确输出操作应该是ForeachRDD
。
此外,为了避免为每个元素创建 Kafka 生产者实例,我们使用
foreachPartition
处理内部 RDD
。
该流程的代码框架如下所示:
messages.foreachRDD{rdd =>
rdd.foreachPartition{partitionIter =>
producer = // create producer
partitionIter.foreach{elem =>
record = createRecord(elem)
producer.send(record)
}
producer.flush()
producer.close()
}
}
我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-
我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除
为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka
我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?
我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可