当前位置: 首页 > 知识库问答 >
问题:

Apache Flink浓缩

卞坚成
2023-03-14

我有一个这样的事件来源

class Event {
    String userName;
    String webPage;
}

我需要通过用户过去的网页访问来丰富我的事件流。(我在数据库中拥有信息,我可以将其用作Flink源)

class EventStats {
    String userName;
    Map<String,Integer> webPageCounters; 
}

如何确保在开始处理事件流之前,我已经准备好了扩展数据
我不想从流中进行DB调用。

共有1个答案

韩烈
2023-03-14

使用Flink tbh做到这一点可能很困难。我想到的第一个想法是在作业启动时进行数据库扫描并创建一个单独的流。该流可用于初始化,您可以简单地将其与实际的EventStats流结合起来,但由于这个问题,这目前是不可能的。因此,基本上可以使用两种解决方案。

第一个非常简单,因此如果您手动进行连接,您可以保留来自Event流的元素,这些元素没有匹配的EventStats。如果您收到EventStats,只需检查是否有任何匹配的事件可以发出即可。如果元素不匹配,您可能还应该有一个逻辑,在一段时间后将其从状态中删除。

另一种解决方案有点棘手,但也更优雅。因此,基本上您可以实现自定义操作符,该操作符确实实现了InputSelectable,它首先尝试使用EventStats中的所有内容,然后才开始读取事件流的元素。这里有一些注意事项,您可以参考文档了解更多信息。另外,请注意,Flink 1.9中引入了可选择的输入。

 类似资料:
  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 问题内容: 我正在用Python编写脚本,但有一个问题: 如您所见,此代码非常多余。我试着像这样压缩它: 但是,PyQt4期望为类本身而不是实例提供类方法。因为没有为该类定义代码,所以将代码移出该块也不起作用,所以我真的不知道该怎么做。 谁能看到压缩此代码的方法? 问题答案: 有很多方法可以做到:类装饰器,元类和Mixin。 常用助手功能: 类装饰器 用法 如果不支持类装饰器,则可以尝试: 元类

  • 我注意到,每次我运行一个新作业时,它所花费的时间比我再次启动它时长20%左右? 如果一个作业运行多次,flink是否缓存一些结果并重用它们?如果是,我如何控制这一点? 我想测量我的任务运行了多长时间,但每次我重新运行它们时,速度都比以前快。

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 在我的主要活动中,我有initUi函数,它将触发对webviewActivity的意图,在webviewActivity中,有一个FragWebView,其中加载了url。 以下是来自FragWebView的示例代码: 我从我的主要活动中传递打开webview的意图是: 请让我知道如何解决这个问题。 问候

  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2