在阅读了大量的Kafka合流式聊天的文章后,我想尝试一下如何实现一个普通的聊天系统。但是我在做一些结构设计的时候遇到了一些问题。当使用mysql作为数据的数据库时,我可以给每个有意义的消息赋予id
,比如user表中的user_id,消息表中的message_id。模型表中有了id后,可以方便地进行客户端和服务器端的通信。但是在Kafka流中,我如何在Ktable中给每个有意义的模型一个唯一的id?还是我真的有必要这么做?
也许我可以自己回答这个问题。
在mysql中,我们可以直接使用sequenceId,因为所有数据都将被分配到一个位置,然后自动分配一个新的ID。但是当表变得太大时,我们还需要将表拆分为几个小表,在这种情况下,我们还需要为每个记录重新生成唯一的id,因为这些表中的自动生成id是从0开始的。
也许在Kafaka身上也是如此。当我们在kafka中只有一个分区时,我们也可以使用kafka生成的id中的id,因为所有的消息将只发到一个地方,所以它们永远不会被包住。但是当我们需要更多的分区时,我们也必须小心这些从不同分区生成的id不是全局唯一的。
[nodeID+threadid+current_time+auto_increaded_number]
为了应用实时语音分析使用大数据技术,我尝试在一开始使用Kafka。因此,首先我使用WAVIO API将.wav文件转换为字节,然后将包含[data(nparray的类型)、rate(整数)和sampwidth(整数)]的消息发送给kafka,然后消费者将使用这些消息,消费者将再次将它们转换为.wav文件。 问题是我如何在一条消息(每条消息代表.wav文件)中发送和接收这些[data,rate,sa
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R
我正在尝试将KAFKA与Spring集成,我的JAVA应用程序正在与KAFKA服务器通信,当我使用HTTP运行应用程序时,我也会收到消息。 现在我想使用 Spring 在 KAFKA 上添加 SSL,我已经完成了在 SSL KAFKA 和 SPRING KAFKA 上指定的更改 当我使用命令行(使用 SSL)运行生产者和消费者时,通信会正常发生,但是当我更改 Java 应用程序的配置并尝试生成和使
我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创
从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种
我正在使用以下客户端代码创建远程KafkaProducer 一旦我创建了生产者,我就可以运行下面的行并返回有效的主题信息,假设strTopic是一个现有的主题名称。 当我尝试发送消息时,我会执行以下操作: