我在一个Apache Flink项目中遇到了以下情况。
具有不同对象的3个流,例如
Person->字符串id,字符串firstName,字符串lastName(即101,John,Doe)
PersonDetail->字符串id,字符串地址,字符串城市,字符串电话号码,long personId(即99,Stefansplatz 1,+43066012345678,101)
PersonAddDetail->字符串id、字符串AddDetailType、对象AddDetailValue、长personId(即77,1,Hansi或78,2,1234或80,3,true)
我想将这些流中的对象聚合(不确定这里的措词是否正确)到一个新对象,然后放入一个新流中。聚合应该基于Person id,作为附加的catch,我需要仅用特定的AddDetailType筛选出PersonAddDetailType(假设我只对类型为1和2的对象感兴趣)。
聚合对象看起来应该像
PersonReport->长id、字符串名字、字符串名字、字符串地址、字符串城市、字符串电话号码、ArrayList
现在的问题是,这是否可能,如果是,我该如何完成。欢迎输入。
感谢@Jeremy Grand comment,我自己想出了一个解决方案,我想分享我的想法和代码。我引入了一个名为PersonContainer的新类
public class PersonContainer {
private String id;
private Person person;
private PersonDetail personDetail;
private List<PersonAddDetail> personAddDetailList = new ArrayList<>();
public PersonContainer(Person person) {
this.id = person.getID();
this.person = person;
}
public PersonContainer(PersonDetail personDetail) {
this.id = personDetail.getOTTRID();
this.personDetail = personDetail;
}
public PersonContainer(PersonAddDetail personAddDetail) {
this.id = personAddDetail.getOTTRID();
this.timeStamp = ttrDetailAddEvent.getDATECREATED();
this.personAddDetailList.add(personAddDetail);
}
public PersonContainer merge(PersonContainer other) {
if (other.person != null) {
this.person = other.person;
return this;
}
if (other.personDetail != null) {
this.personDetail = other.personDetail;
return this;
}
if (other.personAddDetailList.size() > 0) {
this.personAddDetailList.addAll(other.personAddDetailList);
return this;
}
return null;
}
public String getId() {
return id;
}
public Person getPerson() {
return person;
}
public PersonDetail getPersonDetail() {
return personDetail;
}
public List<PersonAddDetail> getPersonAddDetailList() {
return PersonAddDetailList;
}
public boolean isComplete() {
return person != null && personDetail != null && personAddDetailList.size() > 1;
}
}
这是很重要的部分,因为我首先要将三个输入流的对象映射到这个公共对象,然后再联合流。
下面是我所做的,我在评论中描述了单个步骤。简而言之,我将三个输入流映射到新引入容器的新流。然后,我对这三个流进行联合,并使用迭代模式对这些对象进行键控,并使用我的自定义merge方法合并它们。最后,我定义了一个自定义的complete方法来区分完全合并的容器,这些容器最终映射到输出,而未完成的容器则反馈到合并过程中。
//Filter PersonAddDetail to have just the types needed
DataStream<PersonContainer> filteredPersonAddDetail = unfilteredPersonAddDetail.filter(new FilterFunction<OboTtrDetailAddEvent>() {
@Override
public boolean filter(PersonAddDetail personAddDetail) throws Exception {
return personAddDetail.getAddDetailType().matches("1|2");
}
});
//map Person stream to common object
DataStream<PersonContainer> mappedPersonStream = personInputStream.map(new MapFunction<Person, PersonContainer>() {
@Override
public PersonContainer map(Person Person) throws Exception {
return new PersonContainer(Person);
}
});
//map PersonDetail stream to common object
DataStream<PersonContainer> mappedPersonDetailStream = personDetailInputStream.map(new MapFunction<PersonDetail, PersonContainer>() {
@Override
public PersonContainer map(PersonDetail PersonDetail) throws Exception {
return new PersonContainer(PersonDetail);
}
});
//map PersonAddDetail stream to common object
DataStream<PersonContainer> mappedPersonAddDetailStream = filteredPersonAddDetail.map(new MapFunction<PersonAddDetail, PersonContainer>() {
@Override
public PersonContainer map(PersonAddDetail PersonAddDetail) throws Exception {
return new PersonContainer(PersonAddDetail);
}
});
//union the three input streams to one single stream
DataStream<PersonContainer> combinedInput = mappedPersonStream.union(mappedPersonDetailStream, mappedPersonAddDetailStream);
// Iteration pattern is in place here and I'm going to recursively try to merge corresponding objects togehter
IterativeStream<PersonContainer> iteration = combinedInput.iterate();
// Group objects by there shared ID and then use reduce to merge them
DataStream<PersonContainer> iterationBody = iteration.keyBy(new KeySelector<PersonContainer, String>() {
@Override
public String getKey(PersonContainer personContainer) throws Exception {
return personContainer.getId();
}
})
.reduce(new ReduceFunction<PersonContainer>() {
@Override
public PersonContainer reduce(PersonContainer personContainer, PersonContainer other) throws Exception {
return personContainer.merge(other);
}
});
// use the containers complete method to check whether the merge is finished or we need to wait for further objects in the stream
DataStream<PersonContainer> containersNotCompleteYet = iterationBody.filter(new FilterFunction<PersonContainer>() {
@Override
public boolean filter(PersonContainer PersonContainer) throws Exception {
return !personContainer.isComplete();
}
});
// partially merged or not merged at all containers are put back on the stream
iteration.closeWith(containersNotCompleteYet);
// fully merged containers are processed further
DataStream<PersonContainer> completeContainers = iterationBody.filter(new FilterFunction<PersonContainer>() {
@Override
public boolean filter(PersonContainer PersonContainer) throws Exception {
return personContainer.isComplete();
}
});
// finally the container is mapped to the correct output object
DataStream<PersonReport> personReport = completeContainers.map(new MapFunction<PersonContainer, PersonReport>() {
@Override
public PersonReport map(PersonContainer personContainer) throws Exception {
// map personContainer to final PersonReport
return personContainer;
}
});
这种方法对我很有效,好的是我可以处理在流中晚到达的对象(比方说PersonAddDetail在其他对象之后几分钟到达),而且我不需要定义某种窗口。谢谢你的投入
您的问题听起来像是join
操作。您可以执行以下操作:
personDataStream.join(personDetailDataStream).where(new KeySelector<Person, Long>() {
...
}).equalTo(new KeySelector<PersonDetail, Long>() {
...
}).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new JoinFunction<Person, PersonDetail, PersonWithDetail>() {
...
});
注意,通常情况下,连接操作是不可能在无界(无限)集合上进行的,所以您需要将其绑定到Windows中。
通过聚合和分组将一个对象映射到另一个对象 ToQ的输出应为 我尝试使用分组函数,但它生成了HashMap但不确定如何转换为对象。
问题内容: 我可以使用通用的特定于Javascript / Coffeescript的习惯用法吗?主要出于好奇。 我有两个数组,一个数组由所需的键组成,另一个数组由所需的值组成,我想将其合并到一个对象中。 问题答案: var r = {}, i, keys = [‘one’, ‘two’, ‘three’], values = [‘a’, ‘b’, ‘c’];
我有一个类似这样的数组: 我想要一个如下所示的结果对象: 我如何在Javascript中实现这一点?
首先,我对反应式编程有点陌生。在参加了一些关于reactor和spring 5.0的演讲后,我想自己尝试一下这个编程模型。 我有一个应用程序,它使用WebClient从不同的API构建两个Flux对象。我想将这2个对象组合成一个并将其返回给用户。 代码示例如下所示: 现在在我的handler方法中: 我应该使用反应器API中的什么方法来实现这一点?我找到了一些方法来组合像组合最新的对象,但是在这种
我正处于复杂的聚合查询(具有查找和许多组/展开阶段)的中间,并且遇到来自不同字段的两个不同数组的合并问题。 至于现在,在其中一个阶段之后(在我的大查询的中间或查找之后),我有以下输出: 我想: 因此,每个值,我想根据它的索引添加到内的每个对象,比如第一个内的应该有第一个来自 具有数组索引的架构): 问题是: 我无法在聚合之后或之前执行此操作,因为: > 我没有必要的数据,因为它是从不同集合中查找的