当前位置: 首页 > 知识库问答 >
问题:

KStream在非键值上与GlobalKTable联接

呼延原
2023-03-14

我试图用GlobalKTable连接KStream,连接不完全在键上。

GlobalKTable<String, Employee> employeesDetails = builder.globalTable("EMPLOYEE_TOPIC",..);
KStream<String,String> empIdOverLoginUserId = builder.stream("LOG_TOPIC", ….);

我想通过empIdOverLoginUserId的值通过employeesDetails的键将empIdOverLoginUserId与employeesDetails连接

共有1个答案

柴博
2023-03-14

KStream-GlobalKTable联接的第二个参数是keyValueMapper,用于将KStream记录(key-value)映射到要联接的GlobalKTable的键,在与GlobalKTable联接时,可以使用此参数将empidoverloginuserid的值作为键:

empIdOverLoginUserId.join(
                employeesDetails, 
                (userKey, userValue) -> userValue, 
                (userValue, employeesDetailValue) -> employeesDetailValue)
 类似资料:
  • 我对Kafka的溪流很陌生。我想执行以下KStream-GlobalKTable纯基于DSL的左联接操作,而不使用map操作。 和另一个输入主题,它是 ,其中value: 我要执行左联接操作是一个流,主数据是一个全局表,以实现结果值为 连接条件为 代码:

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。

  • 我有一个Kafka Streams应用程序,其中我将读取“topic1”的KStream与读取“topic2”的GlobalKTable连接起来,然后再与读取“topic3”的GlobalKTable连接起来。 当我尝试同时推送消息到所有3个主题时,我会得到以下异常- org.apache.kafka.streams.errors.invalidStateStoreException 如果我在这些

  • 问题内容: 我正在使用以下伪代码处理类型断言,但出现错误: 无法键入打开非接口值 有人知道这是什么意思吗? http://play.golang.org/p/Ti4FG0m1mc 问题答案: 类型开关需要自省的接口。如果将已知类型的值传递给它,则它会炸毁。如果您创建一个接受接口作为参数的函数,它将起作用: 请参阅http://play.golang.org/p/QNyf0eG71_上的完整代码,以

  • 我们需要在一个非常大的窗口中执行kstream-kstream联接,在这个窗口中,左侧的一个刻度只会触发与右侧最新记录的联接,反之亦然。 这不是默认窗口的工作方式,因为中的window.fetch返回的是一个可以包含多条记录的迭代器。 特别是,我们注意到有一个属性设置为true,我们希望它设置为false。 我们如何为KStream KStream join定制存储实现?