当前位置: 首页 > 知识库问答 >
问题:

格式化Apache Flume HDFS串行器

洪宏硕
2023-03-14

我刚刚开始使用水槽,需要插入一些标题到hdfs水槽。

尽管格式错误并且无法控制列,但我仍然可以正常工作。

使用此配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = syslogudp
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sources.r1.interceptors.i2.preserveExisting = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/vagrant/syslog/%y-%m-%d/
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 100
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.sinks.k1.serializer = header_and_text
a1.sinks.k1.serializer.columns = timestamp hostname
a1.sinks.k1.serializer.format = CSV
a1.sinks.k1.serializer.appendNewline = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

写入HDFS的日志除了串行化的方面外,主要是正常的:

{timestamp=1415574695138, Severity=6, host=PolkaSpots, Facility=3, hostname=127.0.1.1} hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

如何格式化日志,使其看起来像这样:

1415574695138 127.0.1.1 hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

时间戳首先是主机名,然后是系统日志 msg 正文。

共有1个答案

傅高逸
2023-03-14

原因是您配置的两个拦截器正在将值写入Flume事件头中,这些值由HeaderAndBodyTextEventSerializer序列化到主体中。后者只是这样做:

public void write(Event e) throws IOException {
    out.write((e.getHeaders() + " ").getBytes());
    out.write(e.getBody());
    if (appendNewline) {
      out.write('\n');
    }
  }

委托给e.getHeaders()只会将映射序列化为JSON字符串。

为了解决这个问题,我建议创建自己的序列化程序并重载 write() 方法以将输出格式化为制表符分隔值。在这种情况下,您只需在以下位置指定类的路径:

a1.sinks.k1.serializer = com.mycompany.MySerlizer

并将罐子放到Flume的类路径中。

 类似资料:
  • 问题内容: 我正在尝试格式化字符串,以便所有内容在两者之间对齐。 我正在尝试这样做: 我如何获得要排队的列?我看了文档但是很困惑。我当时以为它将使它成为30个空格,然后它将打印下一个项目,但是似乎从上一个项目结束处开始有30个空格。 谢谢 问题答案: 使您的字段在可用空间内保持对齐。使用对齐方式说明符更改对齐方式: 强制字段在可用空间内左对齐(这是大多数对象的默认设置)。 强制字段在可用空间内右对

  • 主要内容:指定最小输出宽度,指定对齐方式,指定小数精度我们在《 第一个Python程序——在屏幕上输出文本》中讲到过 print() 函数的用法,这只是最简单最初级的形式,print() 还有很多高级的玩法,比如格式化输出,这就是本节要讲解的内容。 熟悉C语言 printf() 函数的读者能够轻而易举学会 Python print() 函数,它们是非常类似的。 print() 函数使用以 开头的转换说明符对各种类型的数据进行格式化输出,具体请看下表。

  • Parameter Position参数位置 Type参数类型 Required必需 Default默认 Description描述 1 string Yes n/a This is what format to use. (sprintf) 使用的格式化方式 This is a way to format strings, such as decimal numbers and such. Us

  • Go对字符串格式化提供了良好的支持。下面我们看些常用的字符串格式化的例子。 package main import "fmt" import "os" type point struct { x, y int } func main() { // Go提供了几种打印格式,用来格式化一般的Go值,例如 // 下面的%v打印了一个point结构体的对象的值 p := p

  • 3.5. 格式化字符串 Python 支持格式化字符串的输出 。尽管这样可能会用到非常复杂的表达式, 但最基本的用法是将一个值插入到一个有字符串格式符 %s 的字符串中。 在 Python 中, 字符串格式化使用与 C 中 sprintf 函数一样的语法。 例 3.21. 字符串的格式化 >>> k = "uid" >>> v = "sa" >>> "%s=%s" % (k, v) 'uid=s

  • 由来 我一直对Slf4j的字符串格式化情有独钟,通过{}这种简单的占位符完成字符串的格式化。于是参考Slf4j的源码,便有了StrFormatter。 StrFormatter.format的快捷使用方式为StrUtil.format,推荐使用后者。 使用 //通常使用 String result1 = StrFormatter.format("this is {} for {}", "a", "