当前位置: 首页 > 知识库问答 >
问题:

无法在AWS EC2上向kafka发送消息,来自用AWS Lambda函数编写的生产者

上官羽
2023-03-14

冲突Kafka 5.0.0已安装在AWS EC2上,它有公共IP,比如54.XX.XX.XX在EC2机器上打开端口9092,0.0.0.0

在 /etc/kafka/server.properties我有advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092和侦听器=PLAINTEXT://0.0.0.0:9092在 /etc/kafka/producer.properties我有bootstrap.servers=0.0.0.0:9092

本地文件iotstatesboto.py如下所述,它具有融合的生产者代码:

from confluent_kafka import Producer
import json

broker = '54.XX.XX.XX'
topic = 'mytopic'

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {}'.format(msg.topic()))

def lambda_handler(event, context): 
    p = Producer({'bootstrap.servers': broker})
    message = json.dumps(event)
    print(message)
    p.produce(topic, message.encode('utf-8'), callback=delivery_report)
    return { 
        'message' : message
    }  

zip包在本地制作,如下所示:

pip install confluent_html" target="_blank">kafka has been done in the same directory
zip -r iotstatesboto.zip iotstatesboto.py confluent*

此zip文件上载到Lambda函数。然后,当“Test”函数发送一条虚拟消息时,出现以下错误

第一:

{
  "errorMessage": "Unable to import module 'iotstatesboto'"
}

还有一个是:

Unable to import module 'iotstatesboto': No module named 'confluent_kafka.cimpl'

我已经将处理程序名设置为“iotstatesboto.lambda _ handler”

要让生产者从lambda函数写入EC2上的kakfa流,这些步骤中可能缺少什么线索?

共有1个答案

景河
2023-03-14

使用AWS Lambda时,您必须手动提供所有库,即将它们添加到您用于lambda函数代码的zip中。如果有的话,您还必须添加所有共享对象库。

在本例中,AWS Lambda为您提供了由python标准库Boto3组成的python环境,因此没有其他库。

 类似资料:
  • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如

  • 这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。

  • 我的代码如下:, 从服务器通知FCM(C#) 我对上述要求的答复 {"multicast_id":5002368547300,"成功": 1,"失败": 0,"canonical_ids": 0,"结果":[{"message_id":"0:14200031 c4rrr5787蛋"}]} 我假设一旦FCM收到新的通知,它将把这些通知推送到各自的Android设备。 但对我来说,它不起作用。