当前位置: 首页 > 知识库问答 >
问题:

Flink流媒体示例,生成自己的数据

尉迟招
2023-03-14

早些时候,我问了Flink一个简单的hello world示例。这给了我一些很好的例子!

然而,我想问一个更“流”的例子,我们每秒生成一个输入值。这在理想情况下是随机的,但即使每次都是相同的值也可以。

目标是获得一个无需/最少外部接触就能“移动”的流。

因此,我的问题是:

我发现如何显示这与外部生成数据和写入Kafka,或听一个公共源,但是我试图解决它与最小的依赖性(像在Nifi与GenerateFlowFile开始)。

共有1个答案

严正初
2023-03-14

这里有一个例子。这是作为如何使您的源和接收器可插拔的示例构建的。这个想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用硬连线的输入事件列表并将结果收集在列表中,而在生产中,您将使用真正的源和汇。

工作如下:

/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */

public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;

    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));

        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }

    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {

        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }

}

以下是RandomLongSource

public class RandomLongSource extends RichParallelSourceFunction<Long> {

    private volatile boolean cancelled = false;
    private Random random;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}
 类似资料:
  • 我正在尝试运行官方示例,该示例展示了如何使用Apache Flink Streaming与Twitter:https://github.com/apache/flink/tree/master/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter 如果我不提供到属性的路径。文件,推特流数

  • 如果是,请把我放在轨道上实现。

  • 我有一个Java应用程序午餐一个flink工作来处理Kafka流。

  • Flink版本:1.2.0 Scala版本:2.11.8 我想使用一个数据流来预测使用scala在flink中的模型。我在flink中有一个使用scala的DataStream[String],其中包含来自kafka源的json格式的数据。我想用这个数据流来预测已经训练过的Flink ml模型。问题是所有flink ml示例都使用DataSet api进行预测。我对flink和scala比较陌生,

  • 我正在使用Google的YouTube API Explorer(备用)来查找属于其他人的任意流媒体广播的信息。 无论我在字段中输入了什么,我都会返回 这似乎很荒谬,考虑到视频显然是流媒体。 我突然想到,我可能误解了字段的说明,所以我尝试了几种不同的可能性。这些包括。。。 频道ID() 用户ID() 视频ID() ...每个都无济于事。 我如何询问一个频道有关其直播流视频的信息?这个问题在过去可以

  • 首先,我是流处理框架的新手。我想对其中一些进行基准测试,所以我从Flink开始。 对于我的用例,我需要将窗口t中的事件与窗口t-1中的事件进行比较,两者的大小都是15分钟,然后进行一些聚合。 以下是我的用例的简化版本: 我们将分析的事件视为形式的元组。在窗口1中,我们有:(A,1),(B,2),(C,3),在窗口2中,我们有:(D,6)和(B,7)。然后,我需要将当前窗口中的事件与前一个窗口中的事