我正在尝试加入两个Kafka主题的两个数据流。
每个主题都有一个key值对,其中key是整数数据类型,value包含字符串格式的json。来自这两个源的数据类似于下面的示例(key、value):
2232, {"uniqueID":"2164103","ConsumerID":"63357","CategoryID":"8","BrandID":"5","ProductID":"2232","ProductDetails":"[]","Date":"2013-03-28","Flag":"0"}
1795, {"ProductName":"Frost Free","ProductID":"1795","BrandID":"16","BrandName":"ABC","CategoryID":"3"}
现在我正尝试基于ProductID左联接这两个流,因此所有这些记录的键都设置为ProductID。但不幸的是,我在连接的正确流值中不断得到空值。甚至连一条记录都没有正确连接。下面是我加入这两个记录的代码:
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.concurrent.TimeUnit;
import java.util.*;
public class Tester {
public static void main(String[] args){
final Properties streamsConfiguration = new Properties();
final Serde<String> stringSerde = Serdes.String();
final Serde<Integer> intSerde = Serdes.Integer();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-streams");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "joining-Client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, intSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 9000);
final KStreamBuilder builder = new KStreamBuilder();
KStream<Integer,String> pData = builder.stream(intSerde,stringSerde,"Ptopic");
KStream<Integer,String> streamData = builder.stream(intSerde,stringSerde,"Dtopic");
// Test the data type and value of the key
pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;});
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;});
KStream<Integer,String> joined = streamData.leftJoin(pbData,(table1Value,table2Value)->returnJoin(table1Value,table2Value),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)));
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static HashMap convertToHashMap(String jsonString, String tablename){
try{
HashMap<String,String> map = new Gson().fromJson(jsonString, new TypeToken<HashMap<String, String>>(){}.getType());
return map;
}
catch(Exception x){
//couldn't properly parse json
HashMap<String,String> record = new HashMap<>();
if (tablename.equals("PB")){
List<String> keys = new ArrayList<>(Arrays.asList("ProductName", ", "CategoryID", "ProductID", "BrandID", "BrandName", "ProductCategoryID"));
for(String key : keys){
record.put(key,null);
}
}
else{
List<String> keys = new ArrayList<>(Arrays.asList("UniqueID", "ConsumerID", "CategoryID", "BrandID", "ProductID", "Date","Flag","ProductDetails"));
for(String key : keys){
record.put(key,null);
}
}
return record;
}
}
private static String returnJoin(String map1, String map2){
HashMap h1 = convertToHashMap(map1,"consumer_product");
HashMap h2 = convertToHashMap(map2,"PB");
HashMap map3 = new HashMap<>();
System.out.println("First : " + map1);
System.out.println("Second : " + map2);
//else{System.out.println("Null only");}
for (Object key : h1.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, h1.get(key));
}
try {
for (Object key : h2.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, h2.get(key));
}
System.out.println("Worked Okay PB!!!\n--------------------------------------------------------------------------------------");
}
catch (NullPointerException ex){
/*System.out.println("Exception\n----------------------------------------------------------------------------");
HashMap fakeC = getHashMap("{","consumer");
for (Object key : fakeC.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, fakeC.get(key));
}*/
return "INVALID";
}
//return map3;
return serializeObjectJSON(map3);
}
private static String serializeObjectJSON(Map row){
StringBuilder jsonString = new StringBuilder();
jsonString.append("{");
for ( Object key : row.keySet()){
jsonString.append("\""+key.toString()+"\":");
try {
jsonString.append("\"" + row.get(key).toString() + "\",");
}
catch (NullPointerException Nexp){
jsonString.append("\"" + "null" + "\",");
}
}
jsonString.deleteCharAt(jsonString.length()-1);
jsonString.append("}");
String jsString = jsonString.toString();
////System.out.println("JString :"+jsString);
return jsString;
}
}
我不明白为什么当我尝试以任何一种方式连接这两个流时,我只在左连接的右流中得到null,但是当我尝试用它自己连接同一个流时,连接会起作用。
在检查这两个流的类型和键值时(可以在上面的代码中检查),我已经确保这两个流中的所有记录的键类型都是整数,并且不存在null。而且这两个流都有重叠的键,以便连接发生,因为我认为要么键不会重叠,要么数据类型可能不同,因为那是我们在连接中得到空值的时候。
谁能帮我弄清楚我做错了什么?
更新:
这两个主题中的数据(我将加入其中)来自两个流。其中一个流是type(Integer,recordHashmap)的自定义(Key,value)对的流,另一个流只是(Integer,string)的流。这里,recordHashmap是我定义的一个自定义对象,用于将嵌套的json字符串解析为一个对象。其定义如下:
public class recordHashmap {
private String database;
private String table;
private String type;
private Integer ts;
private Integer xid;
private Map<String,String> data;
public Map getdata(){
return data;
}
public String getdatabase(){return database;}
public String gettable(){return table;}
public String gettype(){return type;}
public Integer getts(){return ts;}
public Integer getxid(){return xid;}
public void setdata(Map<String, String> dta){
data=dta;
}
public void setdatabase(String db){ database=db; }
public void settable(String tble){ table=tble; }
public void settype(String optype){type=optype;}
public void setts(Integer unixTime){ts = unixTime;}
public void setxid(Integer Xid){xid = Xid;}
public String toString() {
return "Database=" + this.database + ", Table=" + this.table+", OperationType="+this.type+", UnixOpTime"+this.ts + ", Data="
+ this.data;
}
}
将密钥设置为产品id的代码如下所示:
KStream<Integer,recordHashmap> rekeyedProductID = inserts.selectKey((k,v)->setTheKey(v.getdata(),"ProductID"));
KStream<Integer,String> consumer_product_Stream = rekeyedProductID.mapValues((v)->serializeObjectJSON(v.getdata()));
并且函数setTheKey定义为
private static Integer setTheKey(Map map, String Key){
try {
//System.out.println("New Key : " + map.get(Key));
return Integer.parseInt(map.get(Key).toString());
}
catch (NumberFormatException nmb){
//fake return a custom value
return -1;
}
}
以下两条语句的控制台日志示例如下所示(注意:整体日志太大,无法添加,但主要是流键都是整数,并且键重叠):
pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;});
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;});
控制台日志:
Table : streamRecord, Type:class java.lang.Integer Value:1342
Table : streamRecord, Type:class java.lang.Integer Value:595
Table : streamRecord, Type:class java.lang.Integer Value:1934
Table : streamRecord, Type:class java.lang.Integer Value:2384
Table : streamRecord, Type:class java.lang.Integer Value:1666
Table : streamRecord, Type:class java.lang.Integer Value:665
Table : streamRecord, Type:class java.lang.Integer Value:2671
Table : streamRecord, Type:class java.lang.Integer Value:949
Table : streamRecord, Type:class java.lang.Integer Value:2455
Table : streamRecord, Type:class java.lang.Integer Value:928
Table : streamRecord, Type:class java.lang.Integer Value:1602
Table : streamRecord, Type:class java.lang.Integer Value:74
Table : P, Type:class java.lang.Integer Value:2
Table : streamRecord, Type:class java.lang.Integer Value:1795
Table : P, Type:class java.lang.Integer Value:21
Table : streamRecord, Type:class java.lang.Integer Value:1265
Table : P, Type:class java.lang.Integer Value:22
Table : streamRecord, Type:class java.lang.Integer Value:2420
Table : P, Type:class java.lang.Integer Value:23
Table : streamRecord, Type:class java.lang.Integer Value:1419
Table : P, Type:class java.lang.Integer Value:24
Table : streamRecord, Type:class java.lang.Integer Value:1395
Table : P, Type:class java.lang.Integer Value:26
Table : streamRecord, Type:class java.lang.Integer Value:1783
Table : P, Type:class java.lang.Integer Value:29
Table : streamRecord, Type:class java.lang.Integer Value:1177
Table : P, Type:class java.lang.Integer Value:34
Table : streamRecord, Type:class java.lang.Integer Value:1395
Table : P, Type:class java.lang.Integer Value:35
Table : streamRecord, Type:class java.lang.Integer Value:2551
Table : P, Type:class java.lang.Integer Value:36
Table : P, Type:class java.lang.Integer Value:2551
Table : streamRecord, Type:class java.lang.Integer Value:2530
Table : P, Type:class java.lang.Integer Value:37
Table : streamRecord, Type:class java.lang.Integer Value:541
Table : P, Type:class java.lang.Integer Value:39
Table : streamRecord, Type:class java.lang.Integer Value:787
Table : P, Type:class java.lang.Integer Value:40
Table : streamRecord, Type:class java.lang.Integer Value:2498
Table : P, Type:class java.lang.Integer Value:41
Table : streamRecord, Type:class java.lang.Integer Value:1439
Table : P, Type:class java.lang.Integer Value:44
Table : streamRecord, Type:class java.lang.Integer Value:784
Table : P, Type:class java.lang.Integer Value:284
Table : P, Type:class java.lang.Integer Value:285
Table : P, Type:class java.lang.Integer Value:929
Table : P, Type:class java.lang.Integer Value:286
Table : P, Type:class java.lang.Integer Value:287
Table : P, Type:class java.lang.Integer Value:2225
Table : P, Type:class java.lang.Integer Value:288
Table : P, Type:class java.lang.Integer Value:289
Table : P, Type:class java.lang.Integer Value:290
Table : P, Type:class java.lang.Integer Value:295
Table : P, Type:class java.lang.Integer Value:297
Table : P, Type:class java.lang.Integer Value:300
Table : P, Type:class java.lang.Integer Value:302
Table : P, Type:class java.lang.Integer Value:305
Table : P, Type:class java.lang.Integer Value:306
Table : P, Type:class java.lang.Integer Value:307
Table : P, Type:class java.lang.Integer Value:308
Table : P, Type:class java.lang.Integer Value:309
Table : P, Type:class java.lang.Integer Value:310
Table : streamRecord, Type:class java.lang.Integer Value:929
Table : streamRecord, Type:class java.lang.Integer Value:1509
Table : streamRecord, Type:class java.lang.Integer Value:136
Table : streamRecord, Type:class java.lang.Integer Value:2225
Table : streamRecord, Type:class java.lang.Integer Value:906
Table : streamRecord, Type:class java.lang.Integer Value:1013
Table : streamRecord, Type:class java.lang.Integer Value:1759
Table : streamRecord, Type:class java.lang.Integer Value:1759
Table : streamRecord, Type:class java.lang.Integer Value:885
Table : streamRecord, Type:class java.lang.Integer Value:1165
Table : streamRecord, Type:class java.lang.Integer Value:453
Update-2:这里有趣的是,leftJoin对于同一组键值对的KTables可以很好地工作。但由于某种原因不适合KStreams。但我需要使用KStreams,因为我有许多与密钥相关的记录。通常情况下,这种连接在溪流上的作用就像是一种魅力,但在这种特殊情况下,它的作用很奇怪。我猜这可能与RocksDB或内部缓存有关。
您似乎没有将ProductID设置为键:
java prettyprint-override">pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;});
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;});
在这两个语句中,您都返回原始键->return k
;而是从JSON解析productId并返回它。
更新
我仍然不确定我是否可以正确地把所有的片段放在一起,就像在更新中一样,您使用
KStream<Integer,recordHashmap> rekeyedProductID = inserts.selectKey((k,v)->setTheKey(v.getdata(),"ProductID"));
KStream<Integer,String> consumer_product_Stream =
RekeyedProductid.MapValues((v)->SerializeObjectJSON(v.GetData()));
并且不清楚inserts
和rekeyedProductID
是什么(类型是什么?)。总之,我认为这部分是正确的。正如您提到,如果右侧是一个KTable(使用相同的数据),它可以工作,我只是假设您的join窗口不够大,这样两个具有相同键的记录彼此之间的(时间上)距离就超过了您指定的30秒。你能再次检查两个输入流的记录时间戳吗?(参见https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-as-topic-partition-and-offset-information)
问题内容: 假设我有一个,其中将包含一组元素(div),这些元素可能具有不同的高度,但是所有元素的宽度都相同。 目前,我已经通过同位素+砌体实现了这一目标,但是由于某些浏览器已经支持CSS3多列,因此我希望为这些浏览器提供唯一的CSS解决方案,而其余的则使用Javascript。 这是我一直在尝试的CSS: 但是,这使元素的流动从上到下从左到右。我想要的是左右自上而下的流程。这是我想要的示例: 但
我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。
我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st
我想使用Kafka Streams API对带有KTable的KStream执行左联接,将表的一些字段添加到流中。 使用包含所有相关条目的较小版本的表(大约1300个条目),一切工作都很好。 使用整个表(大约200,000个条目)后,在获得KTable的Avro消息(GenericRecord)的相关字段的行中会得到一个。 当我在KSQL中执行相同的左联接操作时,从表中添加的字段为NULL。相关的
我试着加入两个Kafka的话题。一个是KStream,另一个是Ktable。左联接抱怨处理器的状态存储不存在。我确实查看了kafka、GitHub和其他地方的许多代码示例,其中StateStore不是由KStream客户机代码显式创建的。请告知以下代码中缺少什么。 应用程序流与users表保持连接,以发出app和user一起的记录。应用程序的所有者是用户。 版本:1.1.0
问题内容: 假设我们有下表t1和t2: 我们希望找到以下结果: 这基本上是右连接与左连接的并集。以下代码有效,但感觉很笨拙: 有没有更好的方法来实现这一目标? 问题答案: