在阅读了Apache Flume及其在处理客户端事件方面提供的好处之后,我决定是时候开始更详细地研究这个问题了。另一个很大的好处似乎是它可以处理Apache Avro对象:-)然而,我很难理解Avro模式是如何用来验证收到的Flume事件的。
为了帮助更详细地了解我的问题,我在下面提供了代码片段;
出于这篇文章的目的,我使用了一个示例模式,定义了一个包含2个字段的嵌套Object1
记录。
{
"namespace": "com.example.avro",
"name": "Example",
"type": "record",
"fields": [
{
"name": "object1",
"type": {
"name": "Object1",
"type": "record",
"fields": [
{
"name": "value1",
"type": "string"
},
{
"name": "value2",
"type": "string"
}
]
}
}
]
}
在我的Java项目中,我目前正在使用Apache Flume嵌入式代理,详见下文;
public static void main(String[] args) {
final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));
final Map<String, String> properties = new HashMap<>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "100");
properties.put("sinks", "sink1");
properties.put("sink1.type", "avro");
properties.put("sink1.hostname", "192.168.99.101");
properties.put("sink1.port", "11111");
properties.put("sink1.batch-size", "1");
properties.put("processor.type", "failover");
final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
embeddedAgent.configure(properties);
embeddedAgent.start();
try {
embeddedAgent.put(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
在上面的例子中,我用“Test”创建了一个新的Flume事件,它被定义为将事件发送到一个运行在VM (192.168.99.101)中的单独的Apache Flume代理的事件体。
如上所述,我已将此代理配置为从嵌入式 Flume 代理接收事件。此代理的 Flume 配置如下所示;
# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink
# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel
# Describe the sink
hello.sinks.loggerSink.type = logger
# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000
# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel
我正在执行以下命令来启动代理;
./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
当我执行 Java 项目 main 方法时,我看到“Test”事件通过以下输出传递到我的记录器接收器;
2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74 Test }
但是,我不清楚我应该在哪里配置 Avro 模式以确保 Flume 只接收和处理有效事件。有人可以帮我了解我哪里出错了吗?或者,如果我误解了Flume如何将Flume事件转换为Avro事件的意图?
除此之外,我还尝试在更改Avro模式后使用Avro RPC客户端来指定直接与我的远程Flume代理通信的协议,但当我尝试发送事件时,我看到以下错误;
Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我的目标是能够确保应用程序填充的事件符合生成的Avro模式,以避免发布无效事件。我更希望使用嵌入式Flume代理来实现这一点,但如果这不可能,我会考虑使用AvroRPC方法直接与远程Flume agent对话。
任何帮助/指导都是很大的帮助。提前谢谢。
进一步阅读后,我想知道我是否误解了Apache Flume的目的。我最初认为这可以用来根据数据/模式自动创建 Avro 事件,但现在想知道应用程序是否应该承担生成 Avro 事件的责任,这些事件将根据通道配置存储在 Flume 中,并通过接收器作为批处理发送(在我的例子中是 Spark 流集群)。
如果上述内容正确,那么我想知道 Flume 是否需要了解架构或仅了解最终将处理此数据的 Spark Streaming 集群?如果需要Flume了解架构,那么您能否提供如何实现此架构的详细信息?
先谢谢你。
由于你的目标是使用 Spark 流式处理群集处理数据,因此可以使用 2 种解决方案解决此问题
1)使用flume客户端(用flume-ng-sdk 1.9.0测试)和Spark Streaming(用spark-streaming_2.11 2.4.0和Spark-Streaming-Flume _ 2.11 2 . 3 . 0测试),在网络拓扑之间没有Flume服务器。
客户端类在端口41416发送Flume json事件
public class JSONFlumeClient {
public static void main(String[] args) {
RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 41416);
String jsonData = "{\r\n" + " \"namespace\": \"com.example.avro\",\r\n" + " \"name\": \"Example\",\r\n"
+ " \"type\": \"record\",\r\n" + " \"fields\": [\r\n" + " {\r\n"
+ " \"name\": \"object1\",\r\n" + " \"type\": {\r\n" + " \"name\": \"Object1\",\r\n"
+ " \"type\": \"record\",\r\n" + " \"fields\": [\r\n" + " {\r\n"
+ " \"name\": \"value1\",\r\n" + " \"type\": \"string\"\r\n" + " },\r\n"
+ " {\r\n" + " \"name\": \"value2\",\r\n" + " \"type\": \"string\"\r\n"
+ " }\r\n" + " ]\r\n" + " }\r\n" + " }\r\n" + " ]\r\n" + "}";
Event event = EventBuilder.withBody(jsonData, Charset.forName("UTF-8"));
try {
client.append(event);
} catch (Throwable t) {
System.err.println(t.getMessage());
t.printStackTrace();
} finally {
client.close();
}
}
}
火花流服务器类侦听端口41416
public class SparkStreamingToySample {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local[2]")
.setAppName("SparkStreamingToySample");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils
.createStream(ssc, "localhost", 41416);
lines.map(sfe -> new String(sfe.event().getBody().array(), "UTF-8"))
.foreachRDD((data,time)->
System.out.println("***" + new Date(time.milliseconds()) + "=" + data.collect().toString()));
ssc.start();
ssc.awaitTermination();
}
}
2) 在Spark Streaming(作为Flume Sink)之间使用Flume客户端Flume服务器作为网络拓扑。
对于该选项,代码是相同的,但如果您在本地运行SparkStreaming服务器进行测试,则SparkStreaming必须指定完整的dns限定主机名而不是localhost,以在同一端口41416启动SparkStreaming服务器。Flume客户端将连接到Flume服务器端口41415。现在棘手的部分是如何定义您的Flume拓扑。您需要同时指定源和接收器才能工作。
参见下面的水槽配置
agent1.channels.ch1.type = memory
agent1.sources.avroSource1.channels = ch1
agent1.sources.avroSource1.type = avro
agent1.sources.avroSource1.bind = 0.0.0.0
agent1.sources.avroSource1.port = 41415
agent1.sinks.avroSink.channel = ch1
agent1.sinks.avroSink.type = avro
agent1.sinks.avroSink.hostname = <full dns qualified hostname>
agent1.sinks.avroSink.port = 41416
agent1.channels = ch1
agent1.sources = avroSource1
agent1.sinks = avroSink
您应该使用两种解决方案获得相同的结果,但是回到您的问题,即来自 Json 流的 Spark 流内容是否真的需要 Flume,答案是这取决于,Flume 支持拦截器,因此在这种情况下,它可用于清理或过滤 Spark 项目的无效数据,但由于您要向拓扑添加额外的组件,因此它可能会影响性能并且比没有 Flume 需要更多的资源(CPU/内存)。
我使用子JPanel构造JScrollPane,然后尝试将JScrollPane添加到父JPanel中,希望有一个可滚动的自定义JPanel。 我有一个大的细白线前面的所有我的组件,他们是没有滚动。有什么想法吗?
我已经为android Studio创建了一个webview应用程序。但没有加载web URL。错误为NET::ERR_ACCESS_DENIED。有谁能帮忙吗
我有一个使用Java Version8的Spring和Maven的项目。当我运行maven update时,它适用于Java9或更高版本,当我编译它时,它适用于Java8。我注意到了这一点,因为用Maven更新会将一个特定的类更改为Java9,而我无法导入这个类。 这个类是:javax.annotation.generated(Java8)。 这是要导入的类。但是,当我进行maven更新时,文件的
当应用程序启动EncryptionBootstrapConfiguration无法自动装配我的自定义TextEncryptor-https://github.com/spring-cloud/spring-cloud-commons/blob/cde7c7f3118382490c28776f66e0a56f248141fd/spring-cloud-context/src/main/java/or
我有一个问题,找出我的源代码中的错误。 有什么办法可以解决它吗? 我的代码中抛出了“ArrayIndexOutOfBoundsException”,但我无法找到它的位置。 android studio中的logcat:
RISC-V 与中断相关的寄存器和指令 [info] 回顾:RISC-V 中的机器态(Machine Mode,机器模式,M 模式) 是 RISC-V 中的最高权限模式,一些底层操作的指令只能由机器态进行使用。 是所有标准 RISC-V 处理器都必须实现的模式。 默认所有中断实际上是交给机器态处理的,但是为了实现更多功能,机器态会将某些中断交由内核态处理。这些异常也正是我们编写操作系统所需要实现的
加载EGL入口点时出错。 Java运行时环境检测到一个致命错误: exception_access_visultion(0xC0000005)在pc=0x000000000000,pid=22096,tid=21896 这是错误日志文件中的线程(完整文件太长):------------------------------------------------- 非常感谢你的回答。埃吉尔
java.lang.ClassCastException:com.kk.tutorial.domain.dtos.employeedTo类不能强制转换为com.kk.tutorial.domain.dtos.employeedTo类(com.kk.tutorial.domain.dtos.employeedTo位于加载器'app'的未命名模块中;com.kk.tutorial.domain.dto