我尝试使用Flume 1.7将数据加载到HDFS中。我创建了以下配置:
# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = LoggerSink hdfs-sink LocalOut
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Describing/Configuring the sink
Agent.sinks.LoggerSink.type = logger
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0
# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel
之后,我使用 netcat 将以下文件发送到源:
cat textfile.csv | nc <IP of flume agent> 56565
该文件包含以下元素:
Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492
我面临的问题是,没有任何错误,flume正在写入hdfs,但只有一行传输的文件。如果您开始使用nectat多次将文件推送到源文件,那么有时flume会将多个文件写入hdfs,包括多个行。但很少是所有行。
我试图改变hdfs参数roll大小,批量大小和其他,但它并没有真正改变的行为。
也配置了本地文件的接收器工作正常。
有人知道如何配置它以确保所有条目都写入hdfs而不会丢失条目吗?
谢谢你的帮助。
更新1.12.2016
我删除了除HDFS接收器外的所有接收器,并更改了一些参数。在此之后,HDFS接收器按其应有的方式运行。
这里的配置:
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = hdfs-sink
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
有人知道为什么它在这种配置下工作,但在两个或更多的水槽下就不再工作了吗?
我自己找到了解决方案。据我所知,我为两个接收器使用了相同的通道。因此,更快的接收器接管了所有条目,只有部分条目被传递到hdfs接收器。
使用不同的通道并使用参数为源包括扇动后
Agent.sources.Netcat.selector.type = replicating
Flume 按预期写入本地文件和 hdfs。
我需要你的帮助。有一个Vue应用程序,我使用vuex商店和Vue路由器。问题是,我从API调用中获取存储数据,如果我从其他地方导航到页面,用户数据已经存储在存储中,并且在组件中,我可以使用getter接收到的数据。但是,如果我在create或mount方法中重新加载页面,则getter不包含任何数据。但如果在Vuex开发工具中查看,我可以看到fetchedd数据。如何修复此行为?
我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?
我正在使用从Kafka到HDFS的Flink bucketing水槽。Flink的版本是1.4.2 我发现每次重新启动作业时都会有一些数据丢失,即使使用保存点也是如此 我发现如果我设置writer SequenceFile,这个问题可以解决。挤压型。记录而不是序列文件。挤压型。块当Flink试图保存检查点时,有效长度似乎与实际长度不同,实际长度应该包括压缩数据 但如果不能使用压缩类型,则可能会出现
我用C编写了一个服务器应用程序,它从客户机应用程序(我自己没有编写)接收数据包,并将数据打印到控制台。问题是,当我试图一次接收并存储整个包体时,数据存储不正确,但是当使用recv()的多次调用接收并存储包体时,它确实正确存储。 关于endianness,客户端和服务器都运行在一个小端机器上,客户端作为小端发送数据,服务器读取数据而不需要转换。 这是客户端应用程序发送给服务器应用程序的数据包: 下面
本文向大家介绍android使用SharedPreferences进行数据存储,包括了android使用SharedPreferences进行数据存储的使用技巧和注意事项,需要的朋友参考一下 很多时候我们开发的软件需要向用户提供软件参数设置功能,例如我们常用的QQ,用户可以设置是否允许陌生人添加自己为好友。对于软件配置参数的保存,如果是window软件通常我们会采用ini文件进行保存,如果是j2s
求你了,我需要你的帮助。 我正在使用TCP连接在java服务器和android应用程序客户端之间建立TCP连接。假设我将发送一个序列化对象,但是每次在客户端,代码都会在中的Obj=(Person)处被阻塞。readObject;其中in是数据对象InputStream,Person是序列化对象。 然而,如果我发送的是字符串或整数,并且我使用Obj=in,代码就可以工作。readObject;直接地