当前位置: 首页 > 知识库问答 >
问题:

火花流集成水槽

臧梓
2023-03-14

我遵循火花流水槽集成的指导。但我最终无法获得任何事件。(https://spark.apache.org/docs/latest/streaming-flume-integration.html)谁能帮我分析一下?在烟雾中,我创建了“avro_flume.conf”的文件,如下所示:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 123.57.54.113
a1.sources.r1.port = 4141
a1.sinks = k1
a1.sinks.k1.type = avro
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 123.57.54.113
a1.sinks.k1.port = 6666
a1.sources = r1
a1.sinks = spark
a1.channels = c1

在文件中,123.57.54.113是本地主机的ip。

flume-ng agent -c . -f conf/avro_spark.conf -n a1 Start Spark-streaming
bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 123.57.54.113 6666
flume-ng avro-client -c . -H 123.57.54.113 -p 4141 -F test/log.01

最后,根本没有任何事件。

怎么了?谢谢!

共有1个答案

陈飞
2023-03-14

我在标题“将源和汇绑定到通道”下看到“a1.sinks=spark”。但是名为“spark”的接收器没有在配置中的其他地方定义。你是在尝试“https://spark.apache.org/docs/latest/streaming-flume-integration.html"?

如果您正在尝试方法1,请尝试删除行“a1.sinks = spark”。

对于方法 2,请使用以下模板:

agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel
 类似资料:
  • 我在Scala/Spark中有一个批处理作业,它根据一些输入动态创建Drools规则,然后评估规则。我还有一个与要插入到规则引擎的事实相对应的输入。 到目前为止,我正在一个接一个地插入事实,然后触发关于这个事实的所有规则。我正在使用执行此操作。 seqOp运算符的定义如下: 以下是生成的规则的示例: 对于同一RDD,该批次花了20分钟来评估3K规则,但花了10小时来评估10K规则! 我想知道根据事

  • 我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 如果spark streaming在10秒的批处理间隔中获得50行消息,并且在40.5行消息之后,这10秒就结束了,剩下的时间落入另一个10秒的间隔中,前40.5行的文本是一个RDD被首先处理,在我的用例中,前40行是有意义的,但是下一个。5行没有意义,第二个RDD首先也是这样。5行,我的问题是否有效?。请提供建议如何处理这个问题?。 谢谢比尔。

  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一

  • 我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF