为了应用实时语音分析使用大数据技术,我尝试在一开始使用Kafka。因此,首先我使用WAVIO API将.wav文件转换为字节,然后将包含[data(nparray的类型)、rate(整数)和sampwidth(整数)]的消息发送给kafka,然后消费者将使用这些消息,消费者将再次将它们转换为.wav文件。
问题是我如何在一条消息(每条消息代表.wav文件)中发送和接收这些[data,rate,sampwidth]到kafka和从kafka接收这些[data,rate,sampwidth]?
producer = KafkaProducer(bootstrap_servers='localhost:9092')
x = wav2bytes("bush_read") # return tuple containing(data, rate, sampwidth)
#here I'm sending 3 messages
producer.send("TestTopic", key=b'data', value=b'%s' % (x[0])) # data -> nparray
producer.send("TestTopic", key=b'rate', value=b'%d' % (x[1])) # rate -> int
producer.send("TestTopic", key=b'sampwidth', value=b'%d' % (x[2])) #sampwidth -> int
send("TestTopic","bush_read")
for message in consumer:
msg = message # I want somthing like this
file = bytes2wav("name", msg.data, msg.rate, msg.sampwidth )
如果需要,您可以将其作为json(或任何其他序列化)发送,创建一个json,如
{“data”:data,“rate”:rate,“sampwidth”:sampwidth}
您可以在消费者中反序列化它
在阅读了大量的Kafka合流式聊天的文章后,我想尝试一下如何实现一个普通的聊天系统。但是我在做一些结构设计的时候遇到了一些问题。当使用mysql作为数据的数据库时,我可以给每个有意义的消息赋予,比如user表中的user_id,消息表中的message_id。模型表中有了id后,可以方便地进行客户端和服务器端的通信。但是在Kafka流中,我如何在Ktable中给每个有意义的模型一个唯一的id?还是
我只是在探索,目前我正在使用One
在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中的一个队列中的数据,并将其写入一个具有一个分区的Kafka主题。在使用MQ的数据之后,消息负载被拆分为3000个较小的消息,这些消息存储在字符串序列中。然后使用KafkaProducer将这3000条消息中的每一条发送到Kafka(2.x)。 你怎么发那3000条信息? 我不能增加IBM MQ中的队列数(不在我的控制之下),也不能增
我正在使用java API实现apache kafka producer。Apache Kafka安装在localhost上。Zookeeper也在运行,但Producer.send()函数仍然卡在发送消息上,消息没有发布。 我已经创建了“快速消息”主题。
问题内容: 我想从numpy中的2D数组创建“心率监视器”效果,并希望音调能够反映数组中的值。 问题答案: 您可以使用from函数来创建一个wav文件,然后您可以根据需要播放该文件。请注意,数组必须是整数,因此,如果您有浮点数,则可能需要适当地缩放它们: 如果您希望Python实际播放音频,则此页面概述了某些软件包/模块。
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R