当前位置: 首页 > 知识库问答 >
问题:

KeyBy没有为不同的密钥创建不同的键控流

赵开诚
2023-03-14

我读取一个简单的JSON字符串作为输入,并基于两个字段aB对流进行键控。但是KeyBy为B的不同值生成相同的键控流,但为aB的特定组合生成相同的键控流。

输入:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}

这是我的Flink代码的核心逻辑:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
                    return genericDataObject;
                }
            });
testStream.print();
5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}

共有1个答案

唐阳飇
2023-03-14

首先,你没做错什么。

为什么它们在同一个子任务中?

假设您有数千个键,而Apache Flink不可能为每个键创建数千个线程。因此,必须有另一种机制来确保在一个线程中单独处理一组键。

 类似资料:
  • 我正在使用Gradle构建系统在Android Studio上编写一个具有多种风格的应用程序。主应用程序/AndroidManifest文件有自己定义的启动程序活动,我想用Flavor/AndroidManifest文件覆盖它,我在其中定义了其他启动程序活动,这些活动只是Flavor/source代码的一部分。 编辑:以下是文件层次结构: 应用程序名称- src公司- 但当我这样做时,它给出了一个

  • 出于安全原因,我希望在Redis中使用类似的代码,因为目前我可以使用,但我必须将其添加到每个希望连接到Redis的应用程序中。(至少有一种方法可以为使用多个密码吗? 我可以用一个应用程序连接到一个数据库,但是这个应用程序也可以切换到另一个数据库。(我至少可以用某种方法阻止这种切换吗?) 出于性能原因,我希望避免并行运行多个redis实例。

  • 问题内容: 我有一种情况,我需要用一个替代键()来代替组合键(组合在一起的4个字段是唯一的:),以便在其他表中轻松引用它。 为此,我将字段用作主键,并将上面提到的其他4个字段用作唯一键。这在MySQL中是允许的,但在MemSQL中是不允许的。 因此,我将该字段添加为分片键,但没有用。 如何克服MemSQL中的这种情况? 问题答案: 因此,您想要唯一键(id)以及唯一键(project_id,dat

  • 当数据不断传来时,整个堆栈向左移动时,条形图是动态添加的,有没有办法将第一个条形图的颜色设置为不同的颜色?谢谢。 编辑和解决方案:这是我在图表中添加新条目的代码,它每500毫左右动态发生一次。 感谢@Philip Jahoda,我让它正常工作,只需在您的addEntry方法中添加这段代码:

  • 也许很简单,但我被困住了。。。我有一个使用Spring Boot和springframework的应用程序。网状物客户RestTemplate,所以后座上的Jackson会自动执行JSON任务。 我想用JSON反序列化REST输入,如下所示: 请帮我为这个模型创建POJO...如果我创建了以下类(用lombok简化): 反序列化在技术上还可以——我得到了一个serialNo Long值的对象,不幸