当前位置: 首页 > 知识库问答 >
问题:

Clickhouse无法从Kafka获取格式为TabSeparated的消息

郭俊人
2023-03-14

我从Spring Boot应用程序向Kafka发送消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
future.addCallback(
        (v) -> System.out.println("SUCCESS: " + v),
        (v) -> System.out.println("FAIL: " + v)
);
kafkaTemplate.flush();

application.properties

spring.kafka.consumer.group-id=app.1
kafka.server=<kafka_host>:9092
kafka.producer.id=kafkaProducerId

配置

@Value("${kafka.server}")
private String kafkaServer;

@Value("${kafka.producer.id}")
private String kafkaProducerId;

@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
    return props;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(
            new DefaultKafkaProducerFactory<>(producerConfigs()));
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

    return new DefaultKafkaConsumerFactory<>(props);
}

在日志中,我可以看到如下消息:

SUCCESS: SendResult[producerRecords=产品记录(主题=uniqTopic123,分区=null,标头=RecordHeaders(标头 = [], isReadOnly=true),key=testKey,value=Test Test,时间戳=null),recdMetadata=uniqTopic123-0@1]

但我的听众听不到任何信息

@KafkaListener(topics="uniqTopic123")
public void msgListener(ConsumerRecord<String, String> record){
    System.out.println("test ======> " + record.value());
}

ClickHouse的桌子是空的。我的ClickHouse桌子

CREATE TABLE IF NOT EXISTS test (key String, message String)
ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');

CREATE TABLE IF NOT EXISTS test_table (key String, message String)
ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO test_table AS
SELECT key, message FROM test;

我的代码出了什么问题?

UPD:错误是在消息"Test\tTest\n"末尾没有换行

共有1个答案

南宫海超
2023-03-14

错误是在消息"Test\tTest\n"末尾没有换行

 类似资料:
  • 我设置了3个节点汇流/Kafka都指向同一个动物园管理员 所有3台服务器都已播发。Listener=带有明文的公共ipv4 当我运行消费者py客户机时,它只是保持打开状态,没有得到任何消息来澄清上面我在网上找到的测试代码,我没有编写它,因为我还在学习Kafka API

  • 我是Storm世界的新手。在我的拓扑中,我使用Kafka的数据,并使用。 通过一些测试,我得到了以下警告消息: 2015-10-01 23:31:51.753 s.k.KafkaUtils[警告]获取了偏移量超出范围的获取请求:[85970]2015-10-01 23:31:51.755 s.k.PartitionManager[警告]使用新偏移量:0 我的\\\\\\\\\\\\\\\\\\\\

  • 问题内容: 我在将字符串格式化为ZonedDateTime时遇到麻烦。 我的客户希望使用ddMMyyyyhhmmss之类的日期作为日期,而没有分隔符或类似的内容。 这是我到目前为止所做的 当它正确生成字符串时,在创建LocalDateTime变量的解析过程中会发生错误 在SO上搜索,我发现对同一问题的一些答案建议使用LocalDateTime类作为中间类,然后解析为ZonedDateTime,但它

  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 我们最近对我们的Kafka集群进行了Kerbertic化,我们开始在阅读来自代理上的主题的消息时遇到问题。 我们使用的是spring kafka 1.1.2版本和kafka client 0.10.0.1。 在研究了Apache Kafka文档中的建议之后,我在项目中做了以下更改。 在使用者属性中添加了security.protocol SASL_PLAINTEXT。 添加了适当的JAAS文件,并

  • 我有一个数据输入流,它包含格式为“yyyy-mm-dd hh:mm:ss z”的日期,其中需要在进一步传播日期时保留时区。下面是测试程序(带有错误堆栈)--我的解决方案有什么问题? 输出(异常消息): 第一种方法中的异常:无法解析文本“2016-12-09 09:30:21 UTC”:无法从TemporalAccessor获取ZonedDateTime:{HourofamPM=9,MinuteOf