我需要一些关于Flink流媒体的帮助。我在下面生成了一个简单的Hello world类型的代码。这将流式传输来自RabbitMQ的Avro消息,并将其持久化到HDFS。我希望有人可以查看代码,也许它可以帮助其他人。
我发现的Flink流媒体的大多数示例都会将结果发送到std out。我实际上想把数据保存到Hadoop中。我读到,理论上,你可以和Flink一起去任何你喜欢的地方。实际上,我还没有找到任何将数据保存到HDFS的示例。但是,基于我找到的示例以及尝试和错误,我提供了以下代码。
这里的数据源是RabbitMQ。我使用客户端应用程序将“MyAvroObject”发送到RabbitMQ。MyAvroObject。java-不包括在内-是从avro IDL生成的…可以是任何avro消息。
下面的代码使用 RabbitMQ 消息,并将其作为 avro 文件保存到 HDFS...好吧,这就是我所希望的。
package com.johanw.flink.stackoverflow;
import java.io.IOException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RMQToHadoop {
public class MyDeserializationSchema implements DeserializationSchema<MyAvroObject> {
private static final long serialVersionUID = 1L;
@Override
public TypeInformation<MyAvroObject> getProducedType() {
return TypeExtractor.getForClass(MyAvroObject.class);
}
@Override
public MyAvroObject deserialize(byte[] array) throws IOException {
SpecificDatumReader<MyAvroObject> reader = new SpecificDatumReader<MyAvroObject>(MyAvroObject.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(array, null);
MyAvroObject MyAvroObject = reader.read(null, decoder);
return MyAvroObject;
}
@Override
public boolean isEndOfStream(MyAvroObject arg0) {
return false;
}
}
private String hostName;
private String queueName;
public final static String path = "/hdfsroot";
private static Logger logger = LoggerFactory.getLogger(RMQToHadoop.class);
public RMQToHadoop(String hostName, String queueName) {
super();
this.hostName = hostName;
this.queueName = queueName;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public void run() {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
logger.info("Running " + RMQToHadoop.class.getName());
DataStream<MyAvroObject> socketStockStream = env.addSource(new RMQSource<>(hostName, queueName, new MyDeserializationSchema()));
Job job;
try {
job = Job.getInstance();
AvroJob.setInputKeySchema(job, MyAvroObject.getClassSchema());
} catch (IOException e1) {
e1.printStackTrace();
}
try {
JobConf jobConf = new JobConf(Job.getInstance().getConfiguration());
jobConf.set("avro.output.schema", MyAvroObject.getClassSchema().toString());
org.apache.avro.mapred.AvroOutputFormat<MyAvroObject> akof = new AvroOutputFormat<MyAvroObject>();
HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable> hof = new HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable>(akof, jobConf);
FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>> fileSinkFunctionByMillis = new FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>(hof, 10000l);
org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, new Path(path));
socketStockStream.map(new MapFunction<MyAvroObject, Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<AvroWrapper<MyAvroObject>, NullWritable> map(MyAvroObject envelope) throws Exception {
logger.info("map");
AvroKey<MyAvroObject> key = new AvroKey<MyAvroObject>(envelope);
Tuple2<AvroWrapper<MyAvroObject>, NullWritable> tupple = new Tuple2<AvroWrapper<MyAvroObject>, NullWritable>(key, NullWritable.get());
return tupple;
}
}).addSink(fileSinkFunctionByMillis);
try {
env.execute();
} catch (Exception e) {
logger.error("Error while running " + RMQToHadoop.class + ".", e);
}
} catch (IOException e) {
logger.error("Error while running " + RMQToHadoop.class + ".", e);
}
}
public static void main(String[] args) throws IOException {
RMQToHadoop toHadoop = new RMQToHadoop("localhost", "rabbitTestQueue");
toHadoop.run();
}
}
如果您喜欢RabbitMQ之外的其他来源,那么使用其他来源也很好。例如,使用Kafka消费者:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
...
DataStreamSource<MyAvroObject> socketStockStream = env.addSource(new FlinkKafkaConsumer082<MyAvroObject>(topic, new MyDeserializationSchema(), sourceProperties));
问题:
>
请复习。这是将数据保存到HDFS的良好做法吗?
如果流式传输过程导致问题(例如在序列化期间),该怎么办?它生成和异常,代码只是退出。火花流依赖于 Yarn 自动重启应用。使用 Flink 时,这也是很好的做法吗?
我正在使用FileSinkFunctionByMillis。我实际上希望使用类似HdfsSinkFunction的东西,但它不存在。所以FileSinkFunctionByMillis是最接近这个的,这对我来说是有意义的。我找到的留档也没有任何解释,所以我只是猜测。
当我在本地运行它时,我发现一个目录结构,如“C:\hdfsroot_temporary\0_temperative\trust_ 0000_r_000001_0”,即…basare。有什么想法吗?
顺便说一下,当你想把数据保存到Kafka中时,我可以使用...
Properties destProperties = new Properties();
destProperties.setProperty("bootstrap.servers", bootstrapServers);
FlinkKafkaProducer<MyAvroObject> kafkaProducer = new FlinkKafkaProducer<L3Result>("MyKafkaTopic", new MySerializationSchema(), destProperties);
提前多谢!!!!
我认为可以使用FileSinkFunctionByMillis
,但这意味着您的流媒体程序不是容错的。这意味着,如果您的源代码或机器或写入失败,那么您的程序将崩溃而无法恢复。
我建议您考虑使用RollingSink
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#hadoop-filesystem)。这可用于创建类似Flum的管道以将数据摄取到HDFS(或其他文件系统)中。滚动接收器是一个可恢复的接收器,这意味着您的程序将是容错的,因为Kafka消费者也是容错的。您还可以指定自定义Writer
以您想要的任何格式写入数据,例如Avro。
问题内容: 我能够使用三个链接来组合一个简化的完整History.js示例,以从整个页面加载内容片段,而无需更新页面和更新浏览器历史记录。 这是相关的代码段- 完整的工作示例在此处http://jsfiddle.net/PT7qx/show 我想知道这是否正确。以前的版本可以使用#url绑定到事件。我没有看到使用此最新版本将事件绑定到url的任何示例,因此我使用了.on()click事件来调用Hi
我是firebase的新手,我正在尝试向表单中添加出生日期字段并将其保存到firebase。 表单中的输入字段如下所示: 跟着林达。com教程中创建的工作寄存器函数如下所示: 我试图简单地添加另一个key:value对在email:user之后结束。但那不起作用。 firebase不会为生日添加任何内容。 如果我使用
摘要表列出了所有要传输到目标数据库的已选择的对象。 点击“开始”按钮来运行数据传输进程。窗口会显示运行进度、运行时间和成功或失败信息。
摘要表列出了所有要传输到目标数据库的已选择的对象。 点击“开始”按钮来运行数据传输进程。窗口会显示运行进度、运行时间和成功或失败信息。
摘要表列出了所有要传输到目标数据库的已选择的对象。 点击“开始”按钮来运行数据传输进程。窗口会显示运行进度、运行时间和成功或失败信息。
我在Ubuntu 14.04上使用Hadoop-1.2.1 我正在尝试使用Flume-1.6.0将数据从twitter流式传输到HDFS。我已经下载了Flume-sources-1.0-SNAPSHOT。jar并将其包含在flume/lib文件夹中。我已经设置了flume-sources-1.0-SNAPSHOT的路径。jar在conf/FLUME环境中显示为FLUME_CLASSPATH。这是我