当前位置: 首页 > 知识库问答 >
问题:

为什么我的flink程序没有加入两条流?

羊舌承颜
2023-03-14

我想根据id加入Customer和Address对象。这些是我对kafka stream for Customer主题的输入

{"id": 1,"name": "Yogesh"}
{"id": 2,"name": "Swati" }
{"id": 3,"name": "Shruti"}
{"id": 4,"name": "Amol"  }
{"id": 5,"name": "Pooja" }
{"id": 6,"name": "Kiran" }

和以下fro地址

{"id": 1,"address":"Pune" }
{"id": 2,"address":"Pune" }
{"id": 3,"address":"Pune" }
{"id": 4,"address":"Kalyan"}
{"id": 5,"address": "Pimpri"}

我使用了间隔连接以及使用TumblingEventTimeWindows和滑动窗口的JoinFunction,但它没有连接客户和地址流。我不明白我在代码中遗漏了什么。

public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = setupEnvironment();
            final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(params);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            init(params);

            FlinkKafkaConsumerBase<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(customerTopic,
                    new SimpleStringSchema(), CommonConfig.getConsumerProperties(ip, port, customerTopic))
                            .setStartFromEarliest();

            DataStream<Customer> customerStream = env.addSource(flinkKafkaConsumer)
                    .flatMap(new FlatMapFunction<String, Customer>() {
                        private static final long serialVersionUID = 2142214034515856836L;

                        @Override
                        public void flatMap(String value, Collector<Customer> out) throws Exception {
                            Customer customer = null;
                            try {
                                customer = mapper.readValue(value, Customer.class);
                            } catch (Exception exception) {
                                System.out.println(exception);
                            }
                            if (null != customer) {
                                out.collect(customer);
                            }
                        }
                    });

            customerStream.print();

            DataStream<Address> addressStream = env
                    .addSource(new FlinkKafkaConsumer<>(addressTopic, new SimpleStringSchema(),
                            CommonConfig.getConsumerProperties(ip, port, addressTopic)).setStartFromEarliest())
                    .flatMap(new FlatMapFunction<String, Address>() {
                        private static final long serialVersionUID = 2142214034515856836L;

                        @Override
                        public void flatMap(String value, Collector<Address> out) throws Exception {
                            Address address = null;
                            try {
                                address = mapper.readValue(value, Address.class);
                            } catch (Exception exception) {
                                System.out.println(exception);
                            }
                            if (null != address) {
                                out.collect(address);
                            }
                        }
                    });

            addressStream.print();

            customerStream.keyBy(new IdSelectorCustomer()).intervalJoin(addressStream.keyBy(new IdSelectorAddress()))
                    .between(Time.seconds(-2), Time.seconds(1))
                    .process(new ProcessJoinFunction<Customer, Address, CustomerInfo>() {
                        private static final long serialVersionUID = -3658796606815087434L;
                        @Override
                        public void processElement(Customer customer, Address address,
                                ProcessJoinFunction<Customer, Address, CustomerInfo>.Context ctx,
                                Collector<CustomerInfo> collector) throws Exception {
                            collector.collect(new CustomerInfo(customer.getId(), customer.getName(), address.getAddress()));
                        }
                    }).print();

            DataStream<CustomerInfo> joinResultStream = customerStream.join(addressStream).where(new IdSelectorCustomer())
                    .equalTo(new IdSelectorAddress()).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new JoinFunction<Customer, Address, CustomerInfo>() {
                        private static final long serialVersionUID = -8913244745978230585L;
                        @Override
                        public CustomerInfo join(Customer first, Address second) throws Exception {
                            return new CustomerInfo(first.getId(), first.getName(), second.getAddress());
                        }
                    });
            joinResultStream.print();
            env.execute("Execute");
}

    // ===============================================================================
    public class IdSelectorAddress implements KeySelector<Address,Long> {
        private static final long serialVersionUID = 7642739595630647992L;
        @Override
        public Long getKey(Address value) throws Exception {
            return value.getId();
        }
    }

    // ========================================================================
    public class IdSelectorCustomer implements KeySelector<Customer,Long> {
        private static final long serialVersionUID = 7642739595630647992L;
        @Override
        public Long getKey(Customer value) throws Exception {
            return value.getId();
        }
    }

共有2个答案

闾丘鸣
2023-03-14

由于您使用的是事件时间,因此必须在两个流上使用赋值时间戳和水印,以便Flink可以将事件分配给窗口,并知道窗口何时完成并可以触发。

要阅读有关此主题的更多信息,您可以从有关流式分析的本教程开始,或者从文档中的此处开始,了解如何实现时间戳提取器和水印赋值器。

路和悌
2023-03-14

我想你没有看到任何结果。原因是您的窗口从未被启动/评估/关闭。

您的事件没有时间戳。仍然可以设置时间特性。事件时间。由于不分配时间戳和水印,Flink无法判断如何以窗口连接或间隔连接的方式连接事件。

使用DataStream#assignTimestampsAndWatermarksflinkkafaconsumer#assignTimestampsAndWatermarks处理事件时间或将时间特性更改为ProcessingTime

我希望这将引导你朝着正确的方向前进。

 类似资料:
  • 我有一个通过Interface Builder定义的布局约束视图。由于它们无法暂时停用,我决定通过拨打以下电话有选择地删除它们: 但是,之后约束仍然驻留在视图中。约束。此外,我还希望以编程方式添加约束(同样,因为我无法(取消)激活它们): 对我的方法的任何调用都会导致变量wasAdded的值NO。这也反映在用户界面上,它根本没有改变。 最后,我既不能以编程方式添加约束,也不能删除添加到情节提要的约

  • 我以个人身份申请了一个微信小程序,在:基础功能 > 订阅消息 > 公共模板库 > 长期订阅 显示没有可用模板。是什么原因导致我没有长期订阅的可用模板?是企业认证才有的吗?还有个疑问是不是小程序推送的模板消息在用户微信上是显示在“服务通知”中,所有小程序的通知都被集中在这里,有没有方法可以单独显示一个小程序的通知(就像服务号一样)

  • 我在Flink(Java)中创建了一个程序来计算3个不同房间的9个假传感器的平均值。如果我启动jar文件,该程序运行良好。所以我决定启动flink独立集群来检查运行我的作业和相应任务的TaskManager,如这里(https://ci.apache.org/projects/flink/flink-docs-stable/tutorials/local_setup.html)。我正在我的机器上运

  • 返回文件的URL。 starfxdemodoc.fxml为: 而StarfXDemodocController.java是这样的: 有谁能帮我一下吗? 编辑/更新: 根据james_d的评论(谢谢你James,显然我已经盯着这个问题太久了……),我修复了控制器中的明显错误,并将SimNameField设置为TextField,而不是Label(并更新了上面的代码块以反映这一点)。我还将异常处理更改

  • 我有一个java应用程序,它在JTabbedPane上加载并显示一个applet。所以基本上GUI是: 小程序-- 我已经在JFrame类中放入了这一标准行: 下面是我在 JFrame 类中的方法,它应该在窗口关闭时调用: 然后,在我的Tabbed Pane类中,我有一个管理Applets的arrayList: 我循环遍历JPanels的ArrayList,如下所示: JPanel类中的stopG

  • 问题内容: 以下内容无法编译,并给出“非法前向引用”消息: 但是,以下内容会编译: 但是以下内容无法编译,并给出“非法前向引用”消息: 为什么InstanceInitialisation1不能编译StaticInitialisation和InstanceInitialisation2? 问题答案: JLS的第8.3.3节涵盖了这一点: 有时会限制使用其声明在文本后出现的类变量,即使这些类变量在范围