当前位置: 首页 > 知识库问答 >
问题:

在kafka流中用另一个json对象替换json对象

孙夕
2023-03-14

我创建了以下主题、流和表:

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bptcus

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic address-elasticsearch-sink

为以上创建的主题创建表和流。

CREATE table CUSTOMER_SRC  (customerId VARCHAR,name VARCHAR, age VARCHAR, address VARCHAR) WITH (KAFKA_TOPIC='bptcus', VALUE_FORMAT='JSON', KEY='customerId');

CREATE stream ADDRESS_SRC (addressId VARCHAR, city VARCHAR, state VARCHAR) WITH (KAFKA_TOPIC='address-elasticsearch-sink', VALUE_FORMAT='JSON');

我可以看到以下数据:

select * from customer_src;  
1528743137610 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] 

select * from address_src;  
1528743413826 | Parent-1528743137047 | 1 | Detroit | MI
CREATE stream CUST_ADDR_SRC as select c.name , c.age , c.address, a.rowkey, a.addressId , a.city , a.state  from  ADDRESS_SRC a left join CUSTOMER_SRC c  on c.rowkey=a.rowkey;
select * from cust_addr_src;  

1528743413826 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] | Parent-1528743137047 | 1 | Detroit | MI  

我的问题:

  1. 现在我要将addressId 1(Fremont)替换为addressId 1(Detroit)。我怎么能那样做?
  2. 我还尝试打印输入到控制台的流,如票证中所述

是否将Kafka流输入打印到控制台?

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cusadd-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.61.125:9092");
    config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.1.61.125:2181");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> source = builder.stream("cust_addr_src");
    source.foreach(new ForeachAction<String, String>() {
        public void apply(String key, String value) {
            System.out.println("Stream key values are: " + key + ": " + value);
        }
     });

只是,我可以看到下面的输出:

12:04:42.145[StreamThread-1]DEBUG org.apache.kafka.clients.consumer.internals.fetcher-将分区cust_addr_src-0的偏移量重置为最新偏移量。12:04:42.145[StreamThread-1]DEBUG org.apache.kafka.clients.NetworkClient-在hsharma-mbp15.local:9092处启动到节点0的连接。12:04:42.145[StreamThread-1]调试org.apache.kafka.common.metrics.metrics-添加名称为node的传感器-发送12:04:42.145[StreamThread-1]调试org.apache.kafka.common.metrics-添加名称为node的传感器-接收12:04:42.145[StreamThread-1]调试org.apache.kafka.common.metrics-添加名称为node的传感器-0延迟12:04:42.145[StreamThread-1]调试org.apache.kafka.common.metrics-添加名称组CusAdd-Application的rtbeat响应。

提前道谢。

共有1个答案

鲁滨海
2023-03-14

我看到两种方法:

  1. 字符串操作:address列当前是一个包含JSON对象的字符串。您可以只使用字符串操作函数来替换所需的位。不过这似乎有点怪。
  2. 结构操作:切换CREATE TABLE语句,使地址为数组 > 类型,而不是字符串。然后,您可以使用数组的元素和结构的字段来构建输出,例如
ARRAY[
  STRUCT(
    addressId := address[0]->addressId,
    city := address_src->city,
    state := address[0]->state
  ),
  ... same for second element
]

上面将创建一个包含两个结构的数组,并设置新的城市。

当然,这只有在数组中始终有两个元素的情况下才起作用。如果有一个变量,那么您需要使用一个长的窗口大小写语句来根据数组的大小执行不同的操作。例如。

CASE 
   WHEN ARRAY_LENGTH(address) = 1 
    THEN ARRAY[STRUCT(addressId := address[0]->addressId, city := address_src->city, state := address[0]->state)]
   WHEN ARRAY_LENGTH(address) = 2
     THEN ARRAY(... with two elements...)
   WHEN ARRAY_LENGTH(address) = 3
     THEN ARRAY(... with three elements...)
END

等。

 类似资料:
  • 问题内容: 我的apicontroller返回了以下JSON对象: 我要替换为 我已经尝试了下面的代码,但它仅代替的第一次出现。如何替换所有条目? 谢谢, 问题答案: 您需要使替换全局: 这样,它将继续替换null直到到达结尾 正则表达式文档: https://developer.mozilla.org/zh- CN/docs/Web/JavaScript/Reference/Global_Obj

  • 问题内容: 我正在ClassA中调用构造函数,如果满足特定条件,则希望使结果对象属于其他类(ClassB)。我试着更换的第一个参数__init ()(在下面的例子中“自我”) 内 __init (),但它似乎并没有做我想做的。 在主要方面: 在ClassA / init.py中: 在ClassB / init.py中: 问题答案: 您需要。(并且您还需要通过子类化使其成为一种新型类,假设您使用的是

  • 请帮忙。 谢谢,哈里

  • 我想Stringify我的json,但我想排除空嵌套数组的对象。 我的json看起来像这样: 我想忽略“博士”,因为它是空的。 如何在typescript/Javascript中实现它? 这里是我尝试过的代码: 谢谢

  • 我正在尝试使用jackson api中的ObjectMapper类将下面的JSON转换为Java对象,但我收到一个错误。我的java类应该是什么样子? 如果我有下面的json,我可以将其转换为java对象。 Json:

  • 问题内容: 这很简单,但是很挣扎。帮助我。 我有一个json数据{“ abc”:“ test”,“ bed”:“ cot”,“ help”:“ me”} 我想将上面的jsonObject转换为JSON ARRAY,例如[{“ abc”:“ test”,“ bed”:“ cot”,“ help”:“ me”}] 我只得到价值观。请帮我解决这个问题。 问题答案: 直接将JsonObject即obj放入