当前位置: 首页 > 知识库问答 >
问题:

MQTT Kafka源连接器:有趣的字节字符

齐乐
2023-03-14

mqttpublisher.py

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttsubscriber.py

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

ConsumerRecord(Topic='mqtt.',partition=0,offset=225,timestamp=1545117270870,timestamp_type=0,key=b'\x00\x00\x00\x01\x16sensor/dist“,value=b'\x00\x00\x00\x00\x01\x16sensor/dist{”time“:”12:44:0.817462“,”val“:0}”,headers=[('mqtt.message.id“,b'0'),('mqtt.qos‘,b'0')b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

ConsumerRecord(Topic='mqtt.',partition=0,offset=226,timestamp=1545117272821,timestamp_type=0,key=b'\x00\x00\x00\x01\x16sensor/dist“,value=b'\x00\x00\x00\x01\x16sensor/dist{”time“:”12:44:32.820040“,”val“:1}”,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

ConsumerRecord(Topic='mqtt.',partition=0,offset=227,timestamp=1545117274824,timestamp_type=0,key=B'\x00\x00\x00\x01\x16sensor/dist“,value=B'\x00\x00\x00\x00\x01\x16sensor/dist{”time“:”12:44:34.822657“,”val“:2}”,headers=[('mqtt.message.id',B'0'),('mqtt.qos',B'0'),,b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

是什么原因导致了Kafka消费者中额外字节的上述预置?提前谢了。

共有1个答案

鄂伟兆
2023-03-14

作为演示的一部分,您将启动一个模式注册表

启动Kafka连接和依赖项(Kafka、Zookeeper、模式注册表):

合流开始连接

 类似资料:
  • 问题内容: 建立连接并要读取来自连接的字节后,如何读取所有字节?从我尝试过的内容中,我可以读取直到到达分隔符(如换行符)为止。但是如何获得包括换行符在内的所有信息?我试图在Go中创建一个Redis客户端,并且该协议使用\ r \ n分隔结果,因此在这种情况下,Buffer.ReadLine或Buffer.ReadSlice没有帮助。 问题答案: 要从读取器读取所有字节(例如您的TCP连接),可以使

  • 在设置我的连接之后 和我的服务器页面如下所示: const mongoose=require(“mongoose”)const app=new express()

  • 线程“main”java.net.ConnectException:连接超时:在java.net.dualStackplainsockeTimpl.Connect0(本机方法)在java.net.dualStackplainsockeTimpl.socketConnect(DualStackplainsockeTimpl.java:69)在java.net.abstractplainsockeTi

  • 问题内容: 事实证明,这是对python的粗略过渡。这里发生了什么?: 输出文件如下所示: 然后我得到这个错误: 问题答案: 返回一个字节串。 在Python 3中,unicode()对象与对象之间没有隐式转换。如果您知道输出的编码,则可以使用它来获取字符串,也可以将要添加的内容转换为

  • 问题内容: 我有以下XML: 并且正在尝试阅读office:document-meta节点以提取其下面的各种元素(dc:creator,meta:creation-date等)。 如下代码: 给我: 但是如果我尝试使用以下方法读取document-meta元素: 我懂了 我假设SimpleXML试图从$ officeXML中提取一个不存在的节点“文档”,然后减去(不存在的)常量“元”的值,导致强制

  • 问题内容: 我知道read()是一个阻塞调用,除非我使套接字成为非阻塞。因此,我希望请求4K数据的read()调用应返回正值(读取的字节数)或错误时返回-1(客户端可能重置连接等)。我的问题是:在任何情况下read()都可以返回“ 0”吗? 我这样处理read(): 如果read()返回零,则此代码炸弹轰炸,我知道如何修复它。但是read()是否有可能返回零? 问题答案: 当一侧的TCP连接关闭时