我是Flink的新手,我需要从Kafka读取数据,通过使用一些API有条件地丰富这些数据(如果记录属于X类),然后写入S3。
我用上面的逻辑制作了一个你好世界Flink应用程序,它就像一个魅力。
但是,我用来充实的API没有100%的正常运行时间SLA,所以我需要设计一些带有重试逻辑的东西。
以下是我找到的选项,
选项1)进行指数重试,直到从API获得响应,但这会阻塞队列,所以我不喜欢这样
选项2)使用另一个主题(称为主题失败),并在API关闭时将其发布到主题失败。这样就不会阻塞实际的主队列。我还需要一个工人来处理队列主题failure中的数据。同样,如果API长时间停机,则必须将此队列用作循环队列。例如,从队列主题失败中读取消息,如果无法将其推送到名为主题失败的同一队列并使用队列主题失败中的下一条消息,请尝试充实。
我更喜欢选项2,但这看起来不是一件容易的事情。是否有任何标准的Flink方法可用于实现选项2?
这是从微服务迁移时出现的一个相当常见的问题。正确的解决方案是将查找数据也放在Kafka或某些DB中,这些DB可以作为附加源集成到同一Flink应用程序中。
如果您无法做到这一点(例如,API是外部的或数据无法轻松映射到数据存储),这两种方法都是可行的,并且它们具有不同的优势。
1) 将允许您保留输入事件的顺序。如果下游应用程序需要有序性,则需要重试。
2) 常见的术语是死信队列(尽管更常用于无效记录)。在Flink中有两种简单的集成方法,要么有一个单独的源,要么使用一个主题模式/列表和一个源。
您的拓扑如下所示:
Kafka Source -\ Async IO /-> Filter good -> S3 sink
+-> Union -> with timeout -+
Kafka Source dead -/ (for API call!) \-> Filter bad -> Kafka sink dead
我正在按照这个教程使用s3和lambda实现即时生成缩略图。我被困在将不存在的请求重定向到lambda上 我的s3存储桶是私有的,我使用带有Cognito idToken的预签名url来访问其内容,它可以工作。 现在,如果我向尚未生成的缩略图(s3中不存在)发出请求,我希望s3将请求重定向到lambda。我不知道如何做这件事。 以下是s3路由角色 > < li> 当我使用预先签名的url访问不存在
问题内容: 我目前正在使用名为s3-upload-stream的node.js插件将非常大的文件流式传输到Amazon S3。它使用了多部分的API,并且在大多数情况下效果很好。 但是,该模块显示了它的年龄,我已经不得不对其进行修改(作者也已弃用了它)。今天,我遇到了另一个与亚马逊有关的问题,我真的很想接受作者的建议,并开始使用官方的aws- sdk完成上传。 但。 官方的SDK似乎不支持管道到。
我有一些。S3上bucket中的zip文件。我需要解压并将其保存回bucket中,无需本地文件系统。 我知道S3是静态存储,但我可以通过给S3提供路径来解压S3本身上的文件。 我有以下问题。 > 我可以通过桶/文件夹的路径到,所以它解压文件直接在那里。 BufferedOutputStream=new BufferedOutputStream(new FileOutputStream(filePa
问题内容: 我正在尝试使用将图片上传到亚马逊s3 ,但出现此错误: TypeError:预期opts.s3为对象node_modules / multer-s3 / index.js:69:20 这是我的服务器代码: 为什么我收到此错误? 问题答案: 完成 并正常工作的Node Cheat | 使用multer-s3上传到s3 。 码: 对于完整的回购 : 克隆node-cheat express
我试图使用API接口将视频上传到S3存储桶,我遵循了预签名的URL过程,下面是我的lambda函数,它返回预签名的URL(它正确地返回了预签名的URL,看起来): 当我尝试上传一个像这样卷曲的mp4视频时,例如: curl-X PUT-F'data=@ch01_00000100055009702.mp4'https://redacted-bucket-instance.s3.amazonaws.c
我目前正在研究在测试套件执行后根据testng XML中的参数将testng报告上传到Amazon S3的能力。考虑以下testng XML: 给定前四个参数(upload-test-reports、aws-provider-type、aws-s3-bucket和target-reports-directory),Java项目将把默认测试输出目录(当前为/build/reports/tests)中