当前位置: 首页 > 知识库问答 >
问题:

Kafka:Scala到Python的转换

漆雕博
2023-03-14

参考:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

这个例子有python版本吗?参考只有java等价物。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html.发现了一些相似之处

我可以将引导数据库服务器与bootstrap_servers,key.序列化程序与key_serializer,value.序列化程序与value_serializer相匹配,但我无法匹配最后3个“group.id”,“自动.offset.reset”和“enable.auto.commit”。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

共有2个答案

贝财
2023-03-14

我在 https://kafka-python.readthedocs.io 找到了一些相似之处

但这不是Spark代码。Spark需要Java Kafka API的属性。

< code > spark-streaming-Kafka-0-10 在Python中不可用,但如果您想使用Spark,这是0.8 API,其中有Python示例

from pyspark.streaming.kafka import KafkaUtils

# ssc = <get a StreamingContext>
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
漆雕奇逸
2023-03-14

您正在查看的Scala代码是针对消费者的。因此您需要检查消费者设置,而不是生产者的设置。

如果你看看https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html你可以找到他们的等价物:

  • <code>组。id:group_id
  • <code>自动偏移。重置:auto_offset_reset
  • <code>启用自动。提交:enable_auto_commit

另请注意,消费者有反序列化器而不是序列化器,因此:

  • <code>键。反序列化程序:key_deserializer
  • <code>值。反序列化程序:value_deserializer
 类似资料:
  • 问题内容: 我希望将标准JSON对象处理为一个对象,其中每行必须包含一个单独的,自包含的有效JSON对象。查看JSON行 : 我当前的解决方案是将JSON文件读取为文本文件,并从开头和结尾删除。因此,在每行上创建一个有效的JSON对象,而不是包含行的嵌套对象。 我想知道是否有更优雅的解决方案?我怀疑在文件上使用字符串操作可能会出错。 目的是在Spark上将文件读入RDD。查看相关问题-使用Apac

  • 问题内容: 我无法在不丢失数据的情况下将以下Unicode转换为ASCII: 我尝试了,他们不会这样做。 有人有建议吗? 问题答案: Unicode字符,并且没有任何对应的ASCII值。因此,如果您不想丢失数据,则必须以某种有效的ASCII方式对数据进行编码。选项包括: 所有这些都是ASCII字符串,并且包含来自原始Unicode字符串的所有信息(因此可以将它们全部逆转而不会丢失数据),但是对于最

  • 问题内容: 我想转到.txt文件中的第34行并阅读它。您将如何在Python中做到这一点? 问题答案: 使用Python标准库的linecache模块: 应该正是您想要的。您甚至不需要打开文件-一切都为您完成!

  • 我的程序需要一个函数,该函数从csv文件(“all.csv”)读取数据,并在特定日期提取与某个州有关的所有数据(提取其中包含“state name”和“date”的每一行),然后将提取的数据写入另一个名为:state.csv的csv文件 当数据被写入时,每个州在特定日期的病例数和死亡数被统计和合计。然后,该函数以元组形式返回总病例和死亡数(病例,死亡) 例如州=‘加利福尼亚’日期=‘2020-03