当前位置: 首页 > 知识库问答 >
问题:

来自Netcat源的事件不通过Kafka通道

长孙绍辉
2023-03-14

我使用水槽代理通过水槽代理收集外部数据。外部数据批次几乎是每 10 秒 1MB。我按如下方式配置了水槽代理。

# Flume agent configuration as /flume/conf/agent.conf
agent.sources = netcat-source
agent.channels = kafka-channel
agent.sinks = logger-sink

########################################
#   Netcat Source
########################################

agent.sources.netcat-source.type = netcat
agent.sources.netcat-source.bind = 0.0.0.0
agent.sources.netcat-source.port = 4141
agent.sources.netcat-source.max-line-length = 500000
agent.sources.netcat-source.channels = kafka-channel

########################################
#   Kafka Channel
########################################

agent.channels.kafka-channel.type =  org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafka-channel.brokerList = 10.212.136.108:9092,10.212.136.108:9092
agent.channels.kafka-channel.zookeeperConnect = 10.212.136.108:2181,10.212.136.108:2181/kafka
agent.channels.kafka-channel.topic = channel
agent.channels.kafka-channel.groupId = fcd-group


########################################
#   Logger Sink
########################################

agent.sinks.logger-sink.type = logger
agent.sinks.logger-sink.channel = kafka-channel

我按以下方式激活了代理。

flume-ng agent -n agent -c /flume/conf -f /flume/conf/agent.conf 

可惜后来发现netcat source运行良好,channel或者sink出了问题。从Ubuntu的资源监视器,我可以看到以下性能。网络性能。蓝色曲线表示输入,而红色曲线表示在没有其他应用程序运行网络io的情况下的输出,我确信这个图展示了我的Flume代理发生了什么。

当我通过控制台消费者查看主题“频道”中的Kafka内容时,我一无所获。还有,当我检查水槽时。日志,我只得到了Flume的状态输出,没有数据。

我已使用 验证了传入的数据

nc -lk 4141 >> my_data_check_file

我的渠道或水槽有什么问题?

另外,当我使用内存通道、文件通道时,事情也变得同样棘手。

共有1个答案

锺离刚洁
2023-03-14

啊,终于我自己解决了这个问题!

关键点是行分隔符“\n”。

在Flume源代码NetcatSource中。java,我们有如下一行

private int processEvents(CharBuffer buffer, Writer writer) throws IOException {
  int numProcessed = 0;

  boolean foundNewLine = true;
  while (foundNewLine) {
    foundNewLine = false;

    int limit = buffer.limit();
    for (int pos = buffer.position(); pos < limit; pos++) {
      if (buffer.get(pos) == '\n') {  
        // parse event body bytes out of CharBuffer
        buffer.limit(pos); // temporary limit
        ByteBuffer bytes = Charsets.UTF_8.encode(buffer);
        buffer.limit(limit); // restore limit
... ...
... ...

代码强制输入数据以“\n”结尾。否则,频道不会占用任何事件。我们可以根据需要更改此字符,并将自定义源代码放入 $FLUME_HOME/lib 中

 类似资料:
  • 问题内容: 我正在尝试在React JS中创建一个模式 我有一个外部div,它是整个身体,而我有一个内部div。如果要在内部div之外单击它,我想应用该函数以关闭模式。 我的代码如下: 当我单击带有图标的链接时,或者在内部div外部单击时,它都可以正常工作。 但是问题是,如果我在内部div中单击,它也会关闭。 我不想使用jQuery。 有什么建议吗? 更新 而在我的情况是一个接触的形式: 问题答案

  • 不知道如何缩短标题。 我基本上是想了解CQRS的概念(http://en.wikipedia.org/wiki/command-query_deparenation)和相关概念。 > 系统发出一个聚合UpdateQuestionCommand,该命令可以分为两个较小的命令:针对问题聚合根的UpdateQuestion和针对用户聚合根的UpdateUserAction(以计数点数等)。这些是使用点对

  • Tendermint 会发出不同的事件,您可以通过Websocket订阅这些事件。这对于第三方应用程序(如 analysys)或检查状态非常有用。 事件列表 您可以通过 Websocket 调用 subscribe RPC 方法订阅上面的任何事件。 { "jsonrpc": "2.0", "method": "subscribe", "id": "0", "para

  • 本章介绍 UDP 介绍 ChannelHandler, Decoder, 和 Encoder 引导基于 Netty 的应用 前面的章节都是在示例中使用 TCP 协议,这一章,我们将使用UDP。UDP是一种无连接协议,若需要很高的性能和对数据的完成性没有严格要求,那使用 UDP 是一个很好的方法。最著名的基于UDP协议的是用来域名解析的DNS。这一章将给你一个好的理解的无连接协议所以你能够做出明智的

  • 我认为我对Flink窗口的理解可能是错误的,因为它们没有像我期望的那样从文档或Flink书中进行评估。目标是将具有相当静态数据的Kafka主题与具有不断传入数据的Kafka主题连接起来。 返回FlinkKafkaConsumer 是我的键选择器的占位符。 我的关键问题: 这里到底发生了什么?是否在窗口完成处理后发出记录?我希望有一个实时输出到水槽,但这将解释很多。 与此相关的是:我可以用onEle

  • 我在我的代码中为我的机器人配置和一般的东西建立了一个类,我很整洁,所以,客户端不协调。Client()和bot。是我的类在我的bot类中我存储了一些通道ID,然后我注意到了客户端。发送消息(channelid,message)方式不再支持。但是,我将通道id变量编辑为 客户=不和谐。Client()位于我的类之上,它已定义。 我用这个发信息 我期待着那个频道的消息,但现实是: 简而言之,我的cha