我正在尝试使用flume将数据从Kafka源接收到hdfs。下面是我的flume配置文件。
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks = hdfs-sink-1
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.bootstrap.servers = localhost:9092
flume1.sources.kafka-source-1.zookeeperConnect = localhost:2181
flume1.sources.kafka-source-1.topic = MYNEWSFEEDS
flume1.sources.kafka-source-1.batchSize = 100
flume1.sources.kafka-source-1.channels = hdfs-channel-1
flume1.channels.hdfs-channel-1.type = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = hdfs://quickstart.cloudera:8020/tmp
flume1.sinks.hdfs-sink-1.hdfs.rollCount=100
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
我正在使用以下命令运行flume agent:
sudo flume-ng agent --name flume1 --conf-file '/etc/flume-ng/conf/flafka.conf' Dflume.root.logger=TRACE,console
但我得到以下错误:
18/03/12 16:49:18 ERROR节点. AbstractConfigurationProvider: Source kafka-source-1已被删除,由于配置过程中的错误unnable.runConfigurationException: Bootstrap Servers必须在rovider.java:140指定KafkaSjava.util.concurrent.配置(KafkaSdapter.call)在xecutors.java:471BasicSourceSjava.util.concurrent.(BasicSourceSask.run)在ask.java:304AbstractPollableSjava.util.concurrent.(AbstractPollableSource.java:63)在org.apache.flume.conf.Configurables.configure(Configurables.java:41)在org.apache.flume.node.AbstractConfigurationProvider.load源(AbstractConfigurationProvider.java:326)在org.apache.flume.node.AbstractConfigurationProvider.get配置(AbstractConfigurationProvider.java:97)在org.apache.flume.node.PollingProperty tiesFileConfigurationProvider$FileWatcherRorg.apache.flume.conf.(PollingProperty tiesFileConfigurationPorg.apache.flume.source.kafka.)在ource.do执行人$RunnableAource.java:330(Eorg.apache.flume.source.)在emantics.configureFutureTemantics.java:65AndReset(FutureTorg.apache.flume.source.)在ource.configureSchduledThreadPoolExecitor$Schdudu
虽然我已经在conf文件中指定了引导服务器,但它仍然给出了相同的错误。尝试了许多排列和组合,但没有成功。
根据官方的JavaDoc,你应该更换
flume1.sources.kafka-source-1.bootstrap.servers = localhost:9092
具有
flume1.sources.kafka-source-1.kafka.bootstrap.servers = localhost:9092
我正在使用水槽从kafka主题HDFS文件夹加载消息。所以, 我创建了一个主题 TT 我通过Kafka控制台制作人向 TT 发送了消息 我配置了水槽代理 FF 运行 flume agent flume-ng agent -n FF -c conf -f flume.conf - Dflume.root.logger=INFO,console 代码执行停止,没有错误,并且不会向 HDFS 写入任何内
我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!
我一整天都在不断收到这个日志信息。 2016-10-12 21:32:05,696(conf-file-poller-0)[DEBUG-org . Apache . FLUME . node . pollingpropertiesfileconfigurationprovider $ filewatcherrunnable . run(pollingpropertiesfileconfigurat
下面是中的gulpfile.js 我已经通过启动并在之后删除了以前关于的错误,但在上仍然存在此问题
我有以下Flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器 如果每个轮询周期只有一条kafka消息到达,则kafka消息内容是avro数据,并且正确地序列化为文件。 当两个kafka消息到达同一批次时,它们被分组在同一个HDFS文件上,因为avro消息包含两个模式数据,结果文件包含模式数据模式数据,导致它是无效的. avro文件。 如何拆分avro事件以将不同的kafka消息
当使用“sdo_util.to_wktgeometrige()”函数将数据类型转换为长字符串时,似乎不可能。对于第一个处理器,第二个处理器会出现错误:“sdo_util.to_wktgeometrion()非法字符” 有人有关于通过NIFI从Oracle(12+)迁移几何数据类型的技巧吗?一个线字符串就足够了几何图形对象的信息(就目前而言)