当前位置: 首页 > 知识库问答 >
问题:

Kafka流处理过程中的外部系统查询

酆意智
2023-03-14
    null

共有1个答案

刁星渊
2023-03-14

关于外部系统的查询模式,您有多种可能:

  • 建议:使用Kafka Connect将数据从外部系统导入到Kafka中,并将这些主题阅读为KTables,以执行KStream-KTable查找联接。
  • 您可以在UDF代码中实现自己的自定义查找联接。根据详细信息,您可以使用kstream方法#mapvalues()#map()或更低级别的方法,如#transform()#process()。因此,您手动打开到外部系统的连接,并为处理的每个记录发出查找查询。
    • 同步查找:如果对外部系统进行同步调用,则无需考虑其他任何问题(例如,可以使用#mapvalues()来实现此操作)
    • async Lookpus:对于外部系统的异步调用,要想正确地调用是比较困难的(您应该非常小心--这不是一种推荐的模式,因为目前没有库支持)。首先,您需要以可靠的方式记住您发出的所有异步调用(即,您需要附加一个状态,并在实际启动之前将您要发出的每个请求写入该状态)。其次,在每次回调时,您需要以某种方式缓冲结果,并在稍后再次调用发出请求的同一个运算符时处理它(不可能在异步回调处理程序中生成下游结果,而只能在UDF代码中生成)。下游发出后,您可以从状态中删除请求。第三,在故障恢复中,您需要检查未完成请求的状态,并再次发出这些请求。还要记住,这种异步处理打破了一些内部流的假设,例如关于记录主题偏移量的保证处理顺序。

    比较一下关于流中的失败处理和偏移量提交的问题:使用Kafka流DSL时如何处理错误和不提交

 类似资料:
  • Git 有一些可以与其他的版本控制系统集成的命令。 git svn git svn 可以使 Git 作为一个客户端来与 Subversion 版本控制系统通信。 这意味着你可以使用 Git 来检出内容,或者提交到 Subversion 服务器。 Git 与 Subversion 一章深入讲解了此命令。 git fast-import 对于其他版本控制系统或者从其他任何的格式导入,你可以使用 git

  • 我正在使用Spring云流Kafka流编写Java应用程序。下面是我正在使用的函数方法片段: fetch_data_from_database()可以抛出异常。 如果fetch\u from\u database()发生异常,如何停止对入站KStream的处理(不应提交偏移量),并使其使用相同的偏移量数据重试处理?

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 6.1.4 查看统计信息 Mrds:6379> info 在cli下执行info。 info Replication 只看其中一部分。 config resetstat 重新统计。 # Server redis_version:2.8.19 ###redis版本号 redis_git_sha1:00000000 ###git SHA1 redis_git_dirty:0 ###git dir

  • 1.1 配置依赖 <!-- log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误: