我想根据id加入Customer和Address对象。这些是我对kafka stream for Customer主题的输入
{"id": 1,"name": "Yogesh"}
{"id": 2,"name": "Swati" }
{"id": 3,"name": "Shruti"}
{"id": 4,"name": "Amol" }
{"id": 5,"name": "Pooja" }
{"id": 6,"name": "Kiran" }
和以下fro地址
{"id": 1,"address":"Pune" }
{"id": 2,"address":"Pune" }
{"id": 3,"address":"Pune" }
{"id": 4,"address":"Kalyan"}
{"id": 5,"address": "Pimpri"}
我使用了间隔连接以及使用TumblingEventTimeWindows和滑动窗口的JoinFunction,但它没有连接客户和地址流。我不明白我在代码中遗漏了什么。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = setupEnvironment();
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
init(params);
FlinkKafkaConsumerBase<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(customerTopic,
new SimpleStringSchema(), CommonConfig.getConsumerProperties(ip, port, customerTopic))
.setStartFromEarliest();
DataStream<Customer> customerStream = env.addSource(flinkKafkaConsumer)
.flatMap(new FlatMapFunction<String, Customer>() {
private static final long serialVersionUID = 2142214034515856836L;
@Override
public void flatMap(String value, Collector<Customer> out) throws Exception {
Customer customer = null;
try {
customer = mapper.readValue(value, Customer.class);
} catch (Exception exception) {
System.out.println(exception);
}
if (null != customer) {
out.collect(customer);
}
}
});
customerStream.print();
DataStream<Address> addressStream = env
.addSource(new FlinkKafkaConsumer<>(addressTopic, new SimpleStringSchema(),
CommonConfig.getConsumerProperties(ip, port, addressTopic)).setStartFromEarliest())
.flatMap(new FlatMapFunction<String, Address>() {
private static final long serialVersionUID = 2142214034515856836L;
@Override
public void flatMap(String value, Collector<Address> out) throws Exception {
Address address = null;
try {
address = mapper.readValue(value, Address.class);
} catch (Exception exception) {
System.out.println(exception);
}
if (null != address) {
out.collect(address);
}
}
});
addressStream.print();
customerStream.keyBy(new IdSelectorCustomer()).intervalJoin(addressStream.keyBy(new IdSelectorAddress()))
.between(Time.seconds(-2), Time.seconds(1))
.process(new ProcessJoinFunction<Customer, Address, CustomerInfo>() {
private static final long serialVersionUID = -3658796606815087434L;
@Override
public void processElement(Customer customer, Address address,
ProcessJoinFunction<Customer, Address, CustomerInfo>.Context ctx,
Collector<CustomerInfo> collector) throws Exception {
collector.collect(new CustomerInfo(customer.getId(), customer.getName(), address.getAddress()));
}
}).print();
DataStream<CustomerInfo> joinResultStream = customerStream.join(addressStream).where(new IdSelectorCustomer())
.equalTo(new IdSelectorAddress()).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Customer, Address, CustomerInfo>() {
private static final long serialVersionUID = -8913244745978230585L;
@Override
public CustomerInfo join(Customer first, Address second) throws Exception {
return new CustomerInfo(first.getId(), first.getName(), second.getAddress());
}
});
joinResultStream.print();
env.execute("Execute");
}
// ===============================================================================
public class IdSelectorAddress implements KeySelector<Address,Long> {
private static final long serialVersionUID = 7642739595630647992L;
@Override
public Long getKey(Address value) throws Exception {
return value.getId();
}
}
// ========================================================================
public class IdSelectorCustomer implements KeySelector<Customer,Long> {
private static final long serialVersionUID = 7642739595630647992L;
@Override
public Long getKey(Customer value) throws Exception {
return value.getId();
}
}
由于您使用的是事件时间,因此必须在两个流上使用赋值时间戳和水印
,以便Flink可以将事件分配给窗口,并知道窗口何时完成并可以触发。
要阅读有关此主题的更多信息,您可以从有关流式分析的本教程开始,或者从文档中的此处开始,了解如何实现时间戳提取器和水印赋值器。
我想你没有看到任何结果。原因是您的窗口从未被启动/评估/关闭。
您的事件没有时间戳。仍然可以设置时间特性。事件时间。由于不分配时间戳和水印,Flink无法判断如何以窗口连接或间隔连接的方式连接事件。
使用DataStream#assignTimestampsAndWatermarks
或flinkkafaconsumer#assignTimestampsAndWatermarks
处理事件时间或将时间特性更改为ProcessingTime
。
我希望这将引导你朝着正确的方向前进。
我有一个通过Interface Builder定义的布局约束视图。由于它们无法暂时停用,我决定通过拨打以下电话有选择地删除它们: 但是,之后约束仍然驻留在视图中。约束。此外,我还希望以编程方式添加约束(同样,因为我无法(取消)激活它们): 对我的方法的任何调用都会导致变量wasAdded的值NO。这也反映在用户界面上,它根本没有改变。 最后,我既不能以编程方式添加约束,也不能删除添加到情节提要的约
我以个人身份申请了一个微信小程序,在:基础功能 > 订阅消息 > 公共模板库 > 长期订阅 显示没有可用模板。是什么原因导致我没有长期订阅的可用模板?是企业认证才有的吗?还有个疑问是不是小程序推送的模板消息在用户微信上是显示在“服务通知”中,所有小程序的通知都被集中在这里,有没有方法可以单独显示一个小程序的通知(就像服务号一样)
我在Flink(Java)中创建了一个程序来计算3个不同房间的9个假传感器的平均值。如果我启动jar文件,该程序运行良好。所以我决定启动flink独立集群来检查运行我的作业和相应任务的TaskManager,如这里(https://ci.apache.org/projects/flink/flink-docs-stable/tutorials/local_setup.html)。我正在我的机器上运
返回文件的URL。 starfxdemodoc.fxml为: 而StarfXDemodocController.java是这样的: 有谁能帮我一下吗? 编辑/更新: 根据james_d的评论(谢谢你James,显然我已经盯着这个问题太久了……),我修复了控制器中的明显错误,并将SimNameField设置为TextField,而不是Label(并更新了上面的代码块以反映这一点)。我还将异常处理更改
我有一个java应用程序,它在JTabbedPane上加载并显示一个applet。所以基本上GUI是: 小程序-- 我已经在JFrame类中放入了这一标准行: 下面是我在 JFrame 类中的方法,它应该在窗口关闭时调用: 然后,在我的Tabbed Pane类中,我有一个管理Applets的arrayList: 我循环遍历JPanels的ArrayList,如下所示: JPanel类中的stopG
问题内容: 以下内容无法编译,并给出“非法前向引用”消息: 但是,以下内容会编译: 但是以下内容无法编译,并给出“非法前向引用”消息: 为什么InstanceInitialisation1不能编译StaticInitialisation和InstanceInitialisation2? 问题答案: JLS的第8.3.3节涵盖了这一点: 有时会限制使用其声明在文本后出现的类变量,即使这些类变量在范围