当前位置: 首页 > 知识库问答 >
问题:

Kafka Stream和KGlobalTable连接问题

赏高格
2023-03-14

我在加入一个KStream和一个GlobalKTable时遇到了一个问题,希望能得到您的帮助。

给定两个Kafka主题订单客户:

订单

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

客户

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

需求是用客户名称丰富订单流

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

我正在尝试以下操作:

    null
java prettyprint-override">KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

共有1个答案

袁谭三
2023-03-14

让我们仔细看看你复制粘贴的内容:

客户主题中:

"100"   {"CustID":"100","CustName":"Customer1"}

您可以注意到键是一个字符串,这个字符串包含双引号:“100”。通常,字符串键是不带双引号的。我宁愿看到:

 100    {"CustID":"100","CustName":"Customer1"}
 类似资料:
  • 我们的系统使用“mod\u proxy\u ajp”,与Apache 2.0、httpd-2.2.3-43配合使用。el5,tomcat 7.0。当我们在“httpd”上获得日志时,我们有以下日志: [Sun Oct05 12:14:10 2014][错误](70007)指定的超时已过期:ajp_ilink_receive()无法接收标头 [2014年10月5日星期日12:14:10][错误]aj

  • 我已经安装了Java EE Eclipse Kepler和Apache Tomcat v9.0.2。当我想将tomcat v9.0.2连接到eclipse时,我只在新的服务器窗口中看到Tomcat v3.2 up tp v7.0 Server。如何让Tomcat V.9显示在向导窗口中?

  • 问题内容: 如果我在闲置了一段时间后启动应用程序,那么我曾经遇到以下错误。(我正在使用Spring + Hibernate + MySQL作为DB) 我通过将以下内容添加到我的servlet-context.xml中解决了这个问题。 我在这里问了这个问题,这个问题是解决方案所特有的。我需要知道为什么会遇到这个错误。 我尝试了上面链接中提供的第一个选项(使用autoReconnect = true配

  • 在连接jstatd和visualvm时遇到了一些问题。以下是我设置的详细信息: 杰斯塔德。政策 叫做与 牵引端口 港口又好又开放 正在运行的应用程序是在vmware上运行的,尽管可以毫无问题地访问该应用程序。 如果有人对连接visualvm有任何想法,那就太好了。

  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf

  • 我创建了一个测试帐户来开始使用BrowserStack。我关注了以下页面:在Browserstack Automate上运行量角器测试,这真的很有帮助。 现在我得到: 这是什么意思?我没有任何请求。我只是打开一个页面,点击一个元素,就这样了。