设置:
Flink作业:
>
两个输入kafka主题和一个输出kafka主题
输入1:是一个巨大的主题,每秒有300K到500K条消息。每条消息有600个字段。
Input2:是一个关于每天一次每秒20K条消息的小话题。每条消息有22个字段。
目标是用Input2丰富Input1,输出是一个Kafka主题,其中每条消息都有来自Input1的100个字段和来自Input2的13个字段。
我将input2中的状态保留为MapState
我使用RichCoMapFunction进行映射
这是我连接两个流的代码片段:
stream1.connect(stream2)
.keyBy(_.getKey1,_.getKey2)
.map(new RichCoMapFunction)
我使用setAutoWatermark Interval=300000
当前未使用检查点或保存点
Flink配置:
>
Input1=120的分区数
输入的分区数2=30
输出主题的分区数=120
并行总数=700
输入的并行数1=120
输入的并行数2=30
Join Parallelism:700(连接两个流的并行数。设置如下:
stream1.connect(stream2)
.keyBy(_.getKey1,_.getKey2)
.map(new RichCoMapFunction)
.setParallelism(700)
JobManager内存链接大小:4096m
TaskManager内存链接大小:3072m
taskManagerMemoryManagedSize:1b
ClusterEvenlyspreadoutslot:真
AKKA吞吐量:1500
纱线配置:
>
yarnSlots=4
yarnjobManagerMemory=5120m
yarntaskManagerMemory=4096m
任务槽总数=700
任务经理人数=175
问题:输出主题的延迟大约为30分钟,这对于我们的用例来说是不可接受的。我尝试了许多其他与内存分配和vCore相关的Flink配置,但都没有用。如果您对我们如何扩展以达到更高的吞吐量和更低的延迟有任何建议,那就太好了。
EDIT1:RichCoMapFunction代码:
class Stream1WithStream2CoMapFunction extends RichCoMapFunction[Input1, Input2, Option[Output]] {
private var input2State: MapState[Long, Input2] = _
override def open(parameters: Configuration): Unit = {
val ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
val mapStateDescriptor = new MapStateDescriptor[Long, Input2]("input2State", classOf[Long], classOf[Input2])
mapStateDescriptor.enableTimeToLive(ttlConfig)
input2State = getRuntimeContext.getMapState(mapStateDescriptor)
}
override def map1(value: Input1): Option[Output] = {
// Create a new object of type Output (enrich input1 with fields from input2 from the state)
}
override def map2(value: Input2): Option[Output] = {
// Put the value in the input2State
}
}
您可以使用探查器(或添加到Flink 1.13中的火焰图)来尝试诊断为什么运行缓慢。Flink 1.13中添加的背压/繁忙监控也会有所帮助。
但我的猜测是,塞德正在付出巨大的努力。如果您还没有这样做,那么应该在管道中尽早从stream1中删除所有不必要的字段,以便不使用的数据永远不必序列化。对于第一个过程,您可以在链接到源的映射操作符中执行此操作(与源的并行度相同),但自定义序列化程序最终将产生更好的性能。
您没有提到水槽,但在这些情况下,水槽往往是罪魁祸首。我假设它是Kafka(因为您提到了输出主题),并且我假设您没有使用Kafka事务(因为检查点被禁用)。但是接收器是如何配置的?
如果作业没有使用水印,为什么要将自动水印间隔设置为300000?如果您在某处使用水印,这将增加多达5分钟的延迟。如果未使用水印,则此设置没有意义。
为什么要设置akka吞吐量:1500
?这看起来很可疑。我会尝试将其重置为默认值(15)。
是否有其他自定义调整,如网络缓冲?我会质疑所有非默认配置设置(尽管我确信有些设置是合理的,比如内存)。
我还将整个作业的并行度设置为统一的值,例如700。对管道的各个阶段进行微调很少有帮助,而且可能有害。
如何设置maxParallelism?我会将其设置为2800或3500,这样每个插槽至少有4或5个键组。
可能是少数实例正在完成大部分工作吗?您可以检查RichCoMapFunction的各个子任务的指标并寻找偏差。例如,查看numRecordsInPerTwo。
Szenario:我有两个扩展,它们用一些特定字段扩展了。在TYPO3 9之前,我必须使用以下打字脚本配置对新闻扩展的依赖关系进行配置: 模型扩展了基本扩展的模型: 在TYPO3 10中,在(中断:#87623): 只要您只有一个扩展新闻扩展名的扩展名,它就可以工作多久。如果您有第二个扩展并启用TYPO3缓存,您将得到一个错误,即在第一个扩展中添加的字段在新闻扩展的模板中不可用。奇怪的是,这个问题
问题内容: 我正在学习Java,并且希望将自己的课堂变成可观察的课堂。 但是我已经有了它扩展了另一个类。 我该怎么办? 问题答案: 我建议避免完全使用该类,而应 定义事件特定的侦听器和相应的事件定义 。然后在您的类中定义一个侦听器列表,以及添加和删除侦听器以及向其传播事件的方法(请参见下文)。 强制您使用它来表示事件,然后使用来检查事件类型,这是一种丑陋的非OO方法,并使代码更难以理解。如果查看j
我觉得我需要稍微重新表述一下这个问题。 更新问题如下。 我有一个包含: 它包含以下三个面板: 具有固定大小“x”和“y”的J面板 没有固定大小的JPanel 没有固定尺寸和小高度的J面板 第二个JGroup包含一个JTable,因此它会扩展以填充整个高度,并按预期将底部面板一直向下推。 像这样: t=顶部面板,m=中间面板,b=底部面板。 这很有效。但底部面板感觉不到填充整个父对象的宽度,这是一个
你好,有人知道如何发现扩展< code>ArrayBlockingQueue的类吗?例如,我想监视下面的< code>MyBufferQueue类 在属于java库的< code>ArrayBlockingQueue类中,有这样一个方法: 我遇到的问题是,当我监视类< code>MyBufferQueue时,以及在测试期间访问方法< code > ArrayBlockingQueue . put(
我希望能够在我的应用程序中使用此颜色选择器: http://wpftoolkit.codeplex.com/wikipage?title=ColorPicker 我正在使用安装了. NET 4的Visual Studio 2010 Ultimate。我正在用C#和WPF(XAML)编码。 到目前为止我所做的: > 试图使用 谷歌搜索解决方案、教程或示例,但没有取得太大成功。 请解释扩展WPF工具包
问题内容: Typescript总是抱怨调色板中缺少某些属性。如果添加,我的应用程序运行正常,但显然我想避免这种情况。我是Typescript的新手,这是我尝试过的。 这会引发错误, 对我来说这是一个令人困惑的错误,因为类型I不在任何地方使用该类型。 如何在此处适当扩展调色板? 问题答案: 解 参考 如果我们看一下createMuiTheme.d.ts 我们会发现这一点,并发挥不同的作用。 主题: