当前位置: 首页 > 知识库问答 >
问题:

Flink Python数据流API Kafka消费者

郭鸿信
2023-03-14

我是pyflink的新手。我正在尝试编写一个python程序来从kafka主题读取数据并将数据打印到标准输出。我按照链接Flink Python Datastream API Kafka Producer Sink Serializaion进行了操作。但由于版本不匹配,我一直看到NoSuchMethod odError。我添加了https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.0/flink-sql-connector-kafka_2.11-1.13.0.jar.可用的flink-sql-kafka连接器。有人能帮我提供一个合适的示例来做到这一点吗?以下是我的代码

import json
import os

from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types


def my_map(obj):
    json_obj = json.loads(json.loads(obj))
    return json.dumps(json_obj["name"])


def kafkaread():
    env = StreamExecutionEnvironment.get_execution_environment()

    env.add_jars("file:///automation/flink/flink-sql-connector-kafka_2.11-1.10.1.jar")

    deserialization_schema = SimpleStringSchema()

    kafkaSource = FlinkKafkaConsumer(
        topics='test',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '10.234.175.22:9092', 'group.id': 'test'}
    )

    ds = env.add_source(kafkaSource).print()
    env.execute('kafkaread')


if __name__ == '__main__':
    kafkaread()

但是python无法识别jar文件,并抛出以下错误。

Traceback (most recent call last):
  File "flinkKafka.py", line 31, in <module>
    kafkaread()
  File "flinkKafka.py", line 20, in kafkaread
    kafkaSource = FlinkKafkaConsumer(
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 186, in __init__
    j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 336, in _get_kafka_consumer
    j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
    raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
           

添加 jar 文件的正确位置是什么?

共有3个答案

祁烨
2023-03-14

你应该添加flink-sql-connector-kafka的jar文件,这取决于你的pyflink和scala版本。如果版本是真的,如果jar包在这里,检查add_jars函数中的路径。

胥英奕
2023-03-14

只需要检查到flink-sql-connector jar的路径

沈鸿光
2023-03-14

我看到你下载了flink-sql-connector-kafka_2.11-1.13.0.jar,但是代码加载flink-sql-connector-kafka_2.11-1.10.1.jar.

也许你可以开一张支票

 类似资料:
  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 我想制作下面的数据发送架构。 生产商-- 消费者服务器可以关闭,因此我认为应该至少有两个消费者。是这样吗? 当一个数据流有两个使用者时,是否有任何方法可以处理每个使用者一半的数据?正如我所知,这是不可能的。如果每个消费者都使用相同的数据,那就是浪费时间和成本。因为我只为高可用性提供了两个消费者。(用于故障切换) 在web was体系结构中,ELB或L4可以通过负载平衡将一半数据发送到每个was服务

  • 我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。 我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes

  • 数据消费 Talos的高级Consumer为用户解决了数据消费的很多问题,其中一点就是“有记忆”的消费,保证用户在启动Consumer的时候能从“上次消费过”的地方开始消费; Commit Offset:TalosConsumer运行过程中,会不定时的对用户已经消费过的数据进行commit,我们叫做‘commit offset’,含义就是提交到server端记录已经消费的offset,请注意Com

  • 我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?