我试图连接到我的本地机器上的Kafka(2.1),并在Flink(1.7.2)附带的scalashell中读取。 下面是我正在做的: 之后,最后一条语句我得到了以下错误: 我已经创建了一个名为“topic”的主题,我能够通过另一个客户端正确地生成和读取来自它的消息。我正在使用java版本1.8.0\u 201,并遵循https://ci.apache.org/projects/flink/flin
我正在尝试使用此处提供的确切代码从启用了的发送/接收数据。 https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/dotnet/EventHubsForKafkaSample 我成功地将消息发送到事件中心,但每次尝试初始化接收器时,都会出现此无效会话超时错误。 我唯一指定的超时是,我也尝试过没有它,但错
亲爱的Spring开发者! 我想从官方文档和示例代码中设置spring cloud configserver。它适合我,如果我更改它,我想在运行时刷新我的属性github。 我已经添加了spring-cloud d-config-监视依赖项,我想使用Kafka进行更改事件广播。我在文档中阅读过,这是可能的,但我无法配置配置服务器的spring-cloud d-starter-bus-kafka和客
这不是一个大问题,但我很好奇一些额外的流消费者来自哪里,如果这是一个设置,我可以改变。 我有一个针对本地Kafka经纪人的非常简单的spring cloud stream消费者设置。这是spring配置 以及消费者阶层本身: 但当我运行应用程序时,我可以看到输出中创建了3个消费者。但是,当我在我的本地代理中检查消费者组成员时,它总是只有一个消费者,并且总是创建的第二个消费者(即使用客户id测试组2
我想使用spring cloud stream手动提交偏移量-仅当消息处理成功时。这是我的代码应用程序。yml公司 但我的确认对象为空,因为在标头中,对象“kafka\u确认”本身不存在。 如何获取确认对象
我们有两个Kafka节点,出于本问题范围之外的原因,我们希望设置一个负载平衡器来终止生产者(客户端)的SSL。负载平衡器托管的SSL证书将由客户端本机应信任的受信任/根CA签名。 所以连接看起来像: 这是否可行,或者Kafka是否以某种方式要求直接在Kafka服务器上设置SSL? 谢谢
我有一个拓扑,其中我有2个不同的源主题(2个子拓扑)。其中一个是Avro格式,另一个是JSON格式。有没有办法为不同的处理器使用不同的SERDE?我已经看到商店里有消费。使用()但我看不到处理器有这样的功能。 除了编写自己的序列化程序或反序列化程序来区分主题和相应的序列化/反序列化之外,是否有使用不同Serde的配置?
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
我使用Confluent-3.2.1作为Kafka拖缆。我正在尝试聚合我的KGroupedStream 流媒体代码为 当我运行代码时,我发现了错误:
我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。 MySQL配置: 注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果: 我的第一个问题:在表中“金额”列是“十
我试图创建Kafka直接流与提供偏移外部在我的火花流模块,但它导致以下编译错误。 下面是创建Kafka直接流的代码 下面是我遇到的编译错误。有什么想法/指针吗?
在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的
使用Spring kafka模板,我有2个不同的生产者,他们使用相同的键向主题发送不同的消息,始终以相同的形式: 生产者1:发送密钥:1消息:abc分区0 生产者2:发送密钥:2消息:def 我有3个分区,所有消息都根据消息键发送到同一个分区。 现在,我需要确保,根据某些属性,特定消息将发送到特定分区,以便能够管理系统中的某些优先级。 问题是生产者2无法知道生产者1选择了哪个分区。 Kafka确保
我正在使用SpringBoot构建一个Web应用程序后端,我必须使用Kafka发送消息。我想有一个主题,例如“testTopic”,我想在那里生成一些来自不同用户的消息,以便稍后将消息发送到不同的机器。 如果用户A向其机器发送消息,而用户B向其机器发送消息。我如何区分谁发送了哪条消息以及该消息应该到达哪台机器? 我读过关于Kafka主题划分的文章,但我不知道我在代码中是否做得很好。 我在这里建立我
我尝试从Kafka加载数据,这是成功的,但我无法转换为火花RDD, 现在如何读取此流对象???我的意思是将其转换为Spark数据帧并执行一些计算 我尝试转换到dataframe 但是toDf不工作错误:value toDf不是org.apache.spark.rdd.RDD的成员[org.apache.spark.sql.行]