我有3个文件- kafka producer.py,consumer.py和spark-job.py。我不知道如何启动spark file来处理来自kafka的生成数据流。
> < li>
在第一个终端中启动zookeeper服务器:
。\ bin \ windows \ zookeeper-start . bat。\config\zookeeper.properties
然后在第二个单独的终端中启动kafka-server:
.\bin\windows\kafka-server-start.bat .\config\server.properties
然后在两个独立的终端中,我启动了producer.py和consumer.py。
producer kafka文件只生成一些数据字典:
{分行、货币、金额}
并每隔 5 秒左右将其生成到 Kafka 集群。
from json import dumps
from time import sleep
from numpy.random import choice, randint
from kafka import KafkaProducer
def get_random_value():
new_dict = {}
branch_list = ["Almaty", "Astana", "Taraz", "Semei"]
currency_list = ["KZT", "RUB", "GBP", "USD"]
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(1, 100)
new_dict['branch'] = choice(branch_list)
# print(new_dict)
return new_dict
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'),
compression_type='gzip')
topic_name = 'transaction'
while True:
for _ in range(100):
data = get_random_value()
try:
message = producer.send(topic=topic_name, value=data)
record_data = message.get(timeout=10)
print('data: {}, offset: {}' \
.format(data, record_data.offset))
#print(data)
except Exception as e:
print(e)
finally:
producer.flush()
sleep(5)
producer.close()
消费者只要打印出这句格言:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('transaction',bootstrap_servers=['127.0.0.1:9092'])
print("start consuming")
for message in consumer:
aa = json.loads(message.value.decode())
print("currency: %s, amount: %d, branch: %s" %(aa['currency'], aa['amount'], aa['branch']))
生产者、消费者工作——同时向终端输出。
火花作业。py监听localhost:9902(kafka也位于其中),并简单地将传入数据写入数据库。
import sys
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils
import json
outputPath = 'C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01'
def get_sql_query():
strSQL = 'select from_unixtime(unix_timestamp()) as curr_time,t.branch as city,t.currency as currency,sum(amount) as amount from exchanges_stream t'
return strSQL
# -------------------------------------------------
# Lazily instantiated global instance of SparkSession
# -------------------------------------------------
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
# -------------------------------------------------
# What I want to do per each RDD...
# -------------------------------------------------
def process(time, rdd):
print("===========-----> %s <-----===========" % str(time))
try:
spark = getSparkSessionInstance(rdd.context.getConf())
rowRdd = rdd.map(lambda w: Row(city=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("exchanges_stream")
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://xxx") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
print('DB write succesfull !')
except Exception as e:
print("-->Error with DB working!", e)
except Exception as e:
print("--> Error!", e)
# -------------------------------------------------
# General function
# -------------------------------------------------
def createContext():
sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)# 2
broker_list, topic = sys.argv[1:]
try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list={} topic={}".format(broker_list, topic))
parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# RDD handling
parsed_lines.foreachRDD(process)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()
我不知道如何启动spark-job.py。
当producer不断生成消息时,我尝试启动
spark-submit \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\
org.postgresql:postgresql:9.4.1207 \
spark_job.py localhost:9092 transaction
这给出了:
Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR org.postgresql:postgresql:9.4.1207 with URI org.postgresql. Please specify a class through --class.
如果我尝试启动这个命令:
python.exe .\spark_job.py 127.0.0.1:2181 transaction
它可以启动并创建新的上下文,但是仍然找不到一些文件:
--> Creating new context
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/07/25 06:12:46 WARN Checkpoint: Checkpoint directory C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01 does not exist
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.6 ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.6.
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...
________________________________________________________________________________________________
Traceback (most recent call last):
File ".\spark_job.py", line 88, in createContext
{"metadata.broker.list": broker_list})
File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
helper = KafkaUtils._get_helper(ssc._sc)
File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\spark_job.py", line 114, in <module>
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
File "C:\python37\lib\site-packages\pyspark\streaming\context.py", line 107, in getOrCreate
ssc = setupFunc()
File ".\spark_job.py", line 114, in <lambda>
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
File ".\spark_job.py", line 91, in createContext
broker_list={} topic={}".format(broker_list, topic))
ConnectionError: Kafka error: Connection refused: broker_list=127.0.0.1:2181 topic=transaction
< code >-packages org . Apache . spark:spark-streaming-Kafka-0-8 _ 2.11:2 . 0 . 2 是正确的
假设
spark-sql-kafka
)...想指出:您的错误说您有Spark 2.4.6,其中不推荐使用Spark Streaming,SQL-Kafka包将为您省去将RDD转换为数据帧的麻烦第一个错误与缺少Postgres类有关。如前所述,我强烈建议在Kafka Connect存在时不要使用Spark,但要解决这个问题,您需要将postgres JAR添加到包列表(或者更准确地说,Spark类路径)
第二个错误是因为您现在通过名为PYSPARK_SUBMIT_ARGS
的环境变量丢失了--
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages <kafka>"
sc = get_context()
我正在使用一个与kafka集成的spring boot应用程序,我想实现一个endpoint来停止和启动kafka发布消息。消息由另一个endpoint以异步方式触发。 豆子卡
启动Apache Spark集群通常是通过代码库提供的spark-submit shell脚本完成的。但问题是,每次集群关闭并重新启动时,您都需要执行那些shell脚本来启动spark集群。 我也对其他解决方案持开放态度。
我有两个< code>kafka 0.10.1的代理集群,之前在我的开发服务器上正确运行< code>zookeeper 3.3.6。 我最近尝试将broker版本升级到最新的,但没有开始。配置没有太大变化 谁能告诉我可能会出什么问题吗。为什么经纪人没有起步? 已更改服务器。代理服务器1上的属性 已更改代理服务器2上的server.properties 注意: 1.Zookeeper正在两台服务器
Kafka初学者和融合包。我想启动多个代理,以便了解主题。可以通过此设置完成- 此设置可以在服务器配置文件中定义,也可以在脚本中定义。 但是我该如何运行它们呢?。如果我只是向引导服务器添加多个endpoint,就会出现以下错误: Java . lang . illegalargumentexception:要求失败:每个侦听器必须有不同的名称,listeners: PLAINTEXT://:909
大家好,我正在windows 7中使用xampp v3.2.1。我想在windows启动时自动启动它,但无法启动。 我尝试了这里提供的解决方案,但在服务中找不到任何东西。 我在D驱动器中安装了xampp,这就是为什么apache和mysql在从run打开services.msc时不显示在服务中的原因。 我已经从xampp控制面板的config(配置)按钮尝试过了,我选中了autostart(自动启