当前位置: 首页 > 知识库问答 >
问题:

如何使Serdes与多步kafka流一起工作

东方俊明
2023-03-14

我是Kafka的新手,我正在构建一个使用Twitter API作为数据源的入门项目。我已经创建了一个生产者,它可以查询Twitter API,并使用字符串序列化器将数据发送到我的kafka主题,以获得键和值。我的Kafka Stream应用程序读取这些数据并进行字数统计,但也按Tweet的日期分组。这一部分是通过一个名为wordCounts的KTable来完成的,以利用它的upsert功能。这个KTable的结构是:

键:{Word:exampleWord,Date:exampleDate},Value:numberOfOccurences

然后尝试将KTable流中的数据重组为平面结构,以便稍后将其发送到数据库。您可以在wordCountsStructured KStream对象中看到这一点。这将重新结构数据,使其看起来像下面的结构。该值最初是一个JsonObject,但我将其转换为一个字符串,以匹配我设置的Serdes。

Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}

然而,当我试图将此发送到我的第二个Kafka主题时,我得到了下面的错误。

序列化程序(键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.common.serialization.StringSerializer)与实际的键或值类型(键类型:com.google.gson.jsonObject/值类型:com.google.gson.jsonObject)不兼容。更改StreamConfig中的默认SERDE,或者通过方法参数提供正确的Serdes。

我对此感到困惑,因为我发送到主题的KStream是 类型。有人知道我该怎么解决吗?

public class TwitterWordCounter {

private final JsonParser jsonParser = new JsonParser();

public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String, String> textLines = builder.stream("test-topic2");
    KTable<JsonObject, Long> wordCounts = textLines
            //parse each tweet as a tweet object
            .mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
            //map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
            .flatMapValues(TwitterWordCounter::tweetWordDateMapper)
            //update the key so it matches the word-date combination so we can do a groupBy and count instances
            .selectKey((key, wordDate) -> wordDate)
            .groupByKey()
            .count(Materialized.as("Counts"));

    /*
        In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
        so we have to:
         1. take the columns which include the dimensional data and put this into the value of the stream.
         2. lable the count with 'count' as the column name
     */
    KStream<String, String> wordCountsStructured = wordCounts.toStream()
            .map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));

    KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
            (key, value) -> System.out.println("key: " + key + "value:" + value)
    );

    wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    TwitterWordCounter wordCountApp = new TwitterWordCounter();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
    try{

        List<String> words = Arrays.asList(tweet.tweetText.split("\\W+"));
        List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
        for(String word: words) {
            JsonObject tweetJson = new JsonObject();
            tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
            tweetJson.add("word", new JsonPrimitive(word));
            tweetsJson.add(tweetJson);
        }

        return tweetsJson;
    }
    catch (Exception e) {
        System.out.println(e);
        System.out.println(tweet.serialize().toString());
        return new ArrayList<JsonObject>();
    }

}

public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
    key.addProperty("count", countOfWord); //new JsonPrimitive(count));
    return key;
}

共有1个答案

宰鸿博
2023-03-14

因为您正在groupBy()之前执行一个键更改操作,所以它将创建一个重分区主题,并且对于该主题,它将依赖于默认的键值serdes,您已经将其设置为String serde。

您可以将groupby()调用修改为groupby(grouped.with(StringSerde,JsonSerde),这将有所帮助。

 类似资料:
  • 我想知道我是否能做这样的事情。假设我有一个数字流1-20。我想利用一个特性,比如drop 3(我想用Java术语来说是限制还是跳过?)并产生一个流,即数字流: 1-20、4-20、7-20等 然后可能平坦地将这些全部映射到一条溪流中。我尝试了使用Stream.iterate的各种组合,主要是从流生成流,但我一直收到一个IllegalStateExcema,说流已经操作或关闭。 例如,人们可能期望这

  • 方法和由于某种原因没有被计算。 如何返回结果, 谢谢你!

  • 我的pom。xml如下所示 我已经尝试了三天,使用REdhat入门指南让这个简单的示例代码与Infinispan一起使用,并下载了快速入门zip来运行它,但仍然不起作用!我一直收到Spring JMS的错误“无法连接到foo: 11222”或“池未打开”,然后是关于混合Uber和Jars版本的警告。我开始使用ehcache,这很难实现,因为只有有限的简单示例展示了如何从rest调用等中存储、检索和

  • 我最近安装了privacy vpn,结果发现启用的openvpn会破坏Docker。 当我尝试运行时,我得到以下错误 禁用vpn可以解决这个问题(不过,我宁愿不禁用它)。有没有办法使这两者和平共处?我使用debian jessie,我的openvpn有以下版本字符串 null

  • 所以我尝试在magnolia中制作一些页面,以便它们在phoneGap中工作,phoneGap是一个包装器,包装HTML5/CSS/JavaScript,并将其部署为移动应用程序。 Magnolia的问题是,它使用诸如jtl或jsp之类的servlet,而as phoneGap只接受这些servlet。html页面,不呈现jtl或jsp页面。 那么,我该如何只制作HTML5页面呢? 如有任何想法/