我有一个以本地模式运行的Spark流程序,在该程序中,我从TCP套接字连接接收JSON消息,每个批处理间隔几条。
这些消息中的每一条都有一个ID,我用它来创建一个键/值JavaPairDStream,这样在我的JavaDStream中的RDD的每个分区中,都有一个键/值对,每个分区只有一条消息。
我现在的目标是将具有相同ID的消息分组在同一个分区中,以便我可以并行映射它们,每个分区由不同的核心处理。
以下是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String>streamData1=streamData2.repartition(1);
JavaPairDStream<String,String> streamGiveKey= streamData1.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
}
return a;
}
});
因此,在这段代码的末尾,我有一个带有RDD的数据流,由于重新分区{1},它只有一个分区,其中包含所有键/值对。
我现在应该如何对具有相同密钥的消息进行分组,并将它们放在不同的分区中,以便可以分别映射它们?
根据OP,这个问题是通过同时回答另一个问题来解决的:如何在Spark中按分区对键/值进行分组?
假设我们有一个键-值映射的数据结构,其中键本身也是一个键-值映射。例如: 现在,假设我们要查询此映射中与键的某个键值子集匹配的所有顶级键/值。示例: 我们的查询是“给我所有key值,其中key包含,它将返回第一个和第三个值将返回所有同时具有和的键值,生成第二个值。显然,我们可以在每一个查询的完整地图中进行搜索,但我正在寻找比这更高效的方法。 我四处查看了一下,但是找不到一个高效、易用的C解决方案。
输入: 输出: 输入: 线程“main”java.lang.IllegalStateException中的异常:重复键-1.0 我怎么才能修好这个?
只有当每个键在map1中都有唯一的值时,我才会陷入如何将键值对从map1转移到map2的困境。 假设我有以下地图: 地图1:[1,2][2,4][4,4] 我想算法应该是: 遍历第一张地图中的条目。 向map2添加密钥。 将一个值添加到检查map2的值的集合 如果值是重复的,则不会将该值添加到集合中,并且忽略将其相应的键添加到map2。 代码片段: 我的想法是如何在正确的轨道上完成的吗?这里迷失了
我有我创建的地图的地图 我尝试在第二个映射中迭代键的值。 所以它可以是这样的: key1->keya->value1 -------->键->值2 -------->键->值4 -------->KeyC->value1 我想继续多久就继续多久。 因此,我有,并尝试使用名为的值变量和增量计数器if来循环中的所有值 我不知道如何获得。我试着这样做 但这给了我信笺。 如果我执行,它会给出我想要的正确值
问题内容: 我试图弄清楚如何构建JPA实体bean,以使数据适用于我的设备。该数据库是旧的,一成不变的,所以我不能更改架构。设备模型具有复合主键,其中的一列是设备类型的FK。 我尝试了几种不同的方法。首先是设备具有DeviceModel和DeviceType,但是这给了我一个错误,那就是太多的东西在引用dev_type。因此,然后我尝试让DeviceModel引用DeviceType,但遇到了相同