当前位置: 首页 > 知识库问答 >
问题:

Flume Kafka HDFS:拆分消息

扈高逸
2023-03-14

我有以下Flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器

tier1.sources  = source1
tier 1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.0.100:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.brokerList = 192.168.0.100:9092

tier1.channels.channel1.topic = test
tier1.channels.channel1.zookeeperConnect = 192.168.0.100:2181/kafka
tier1.channels.channel1.parseAsFlumeEvent = false

tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.filePrefix = test-kafka
tier1.sinks.sink1.hdfs.fileSufix = .avro
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%y-%m-%d
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollSize=0

如果每个轮询周期只有一条kafka消息到达,则kafka消息内容是avro数据,并且正确地序列化为文件。

当两个kafka消息到达同一批次时,它们被分组在同一个HDFS文件上,因为avro消息包含两个模式数据,结果文件包含模式数据模式数据,导致它是无效的. avro文件。

如何拆分avro事件以将不同的kafka消息拆分为每个消息写入不同的文件

谢谢你

共有1个答案

丁曦
2023-03-14

一种方法:假设您将源kafka传入数据称为“SourceTopic”。您可以将自定义接收器注册到此“SourceTopic”。

<FlumeNodeRole>.sinks.<your-sink>.type =net.my.package.CustomSink

在CustomSink中,您可以编写一个方法来区分传入的消息、拆分它并重新发送到不同的“DestinationTopic”。这个“DestinationTopic”现在可以作为文件序列化的新水槽源。

 类似资料:
  • 我编写了一个基于Netty4的REST服务器。客户端处理程序如下所示。 netty提供的msg中的bytebuffer容量各不相同。当客户端消息大于缓冲区时,消息将被拆分。我发现每个片段都调用channelRead和ChannelReadComplete。我通常看到的是ByteBuf在512左右,message在600左右。对于前512个字节,我得到一个channelRead,然后是一个Chann

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 关于SI中的元素,我有几个基本问题。 我知道要形成定制的拆分器逻辑,我们需要扩展并覆盖方法。 然后,这些拆分消息的集合将显示在拆分器的输出通道上(假设在传入消息上没有配置回复通道)。

  • 我有一个Spring Boot应用程序,其中一个Kafka侦听器实现了BatchAcknowledgeingMessageListener 生成记录的代码如下所示: Kafka的配置看起来是这样的(这仍处于使用Testcontainers的集成测试阶段,因此制作人正在制作与消费者正在听的主题相同的内容): 最后,消费者逻辑: 此示例的调试输出为: 正如你所看到的,这条消息正在用逗号分割,我收到的一

  • 我是NFC的新手,我正在从事一个项目,我需要通过NFC将错误日志从燃气表传输到手机。由于NFC正在使用NDEF消息,因此我遇到了一个问题。 目前我正处于研究阶段,我可以看到NFC每次传输的数据量相当低。我还不知道日志的确切大小,但是如果日志对于一条NDEF消息来说太大了,我该怎么办?不,我不是指NDEF记录,我是指NDEf消息。我知道NDEF消息是由NDEF记录组成的,但是在日志对于一个NDEF消

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?