为了开始Kafka的项目,我在这里按照Databricks的指示行事:
结构化流媒体Kafka集成指南(Kafka broker版本0.10.0或更高版本)
代码:
# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
trainingSchema = StructType([
StructField("code",StringType(),True),
StructField("ean",StringType(),True),
StructField("name",StringType(),True),
StructField("description",StringType(),True),
StructField("category",StringType(),True),
StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)
broker, topic =
['kafka.partner.stg.some.domain:9092','hybris.products']
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"kafka.partner.stg.some.domain:9092") \
.option("subscribe", "hybris.products") \
.option("startingOffsets", "earliest") \
.load()
我的Hadoop版本是2.6,Spark版本是2.3.0
带有spack-提交
的命令行是:
spark提交——jars-jars/spark-sql-kafka-0-10_2.11-2.3.0。jar kafka-test-002。py
错误消息:
Py4JJavaError:在elation.scala:33010调用ource.scala:94时发生错误:org.apache.spark.sql.execution.streaming.NoClassDefFoundError:org/apache/kafka/公共/序列化/ByteArrayDeserializer。KafkaSourceProvider$.(KafkaSourcePorg.apache.spark.sql.streaming.)在eader.load010。KafkaSourceProvider$.(KafkaSourcePeader.scala:170)在sun.reflect.010。KafkaSourcePmpl.invokeStreamOptions(KafkaSourceProvider.scala:360)在org.apache.spark.sql.kafka010。KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:64)在org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:231)在org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)在org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSo48.load.)在java.lang.StreamingRelation$. Application(StreamingRorg.apache.spark.sql.kafka)在rovider.scala:413DataStreamRorg.apache.spark.sql.kafka(DataStreamRrovider.scala)在org.apache.spark.sql.kafkaNativeMethodAccessorIrovider.validate0(本机方法URLClassLoader. findClass(URLClassLoader. java:381)在java. lang。ClassLoader. loadClass(ClassLoader. java:424)在java. lang。ClassLoader. loadClass(ClassLoader. java:357)
正如你可以在我上面提到的网站上看到的,我正在导入的jar文件是完全相同的文件。所以,我不知道为什么会这样。也许还有一个模块没提到?我真的迷路了
提到的JAR不包括kafka客户端的所有依赖项。你更应该使用--packages org。阿帕奇。spark:spark-sql-kafka-0-10_2.11:2.3.0
(如部署部分的文档所述:https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying)
一、背景 先说一下,为什么要使用 Flume + Kafka? 以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击
主要内容:Spark是什么?,与Spark整合在本章中,将讨论如何将Apache Kafka与Spark Streaming API集成。 Spark是什么? Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂算法进行处理,例如:映射,缩小,连接和窗口等高级功能。 最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。 弹
主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递
一、版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher A
我使用QueryDSL v4.1.4执行此查询 我的代码生成此SQL 但我预计会生成此SQL(此SQL在我的DBMS中运行良好) 尝试执行查询时发生此异常 无法修复的想法 从子句中添加
本文向大家介绍Kafka 缺点?相关面试题,主要包含被问及Kafka 缺点?时的应答技巧和注意事项,需要的朋友参考一下 由于是批量发送,数据并非真正的实时; 对于mqtt协议不支持; 不支持物联网传感数据直接接入; 仅支持统一分区内消息有序,无法实现全局消息有序; 监控不完善,需要安装插件; 依赖zookeeper进行元数据管理;