当前位置: 首页 > 知识库问答 >
问题:

在Apache Flink中对两个消息流使用相同的接收器

元叶秋
2023-03-14

我们有两种消息向Flink传来

  1. 控制消息->仅滚动文件
  2. 数据消息->将使用接收器存储在S3中
package com.ranjit.com.flinkdemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;

import org.apache.flink.streaming.connectors.fs.StringWriter;;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);

        ctrl_message_stream.broadcast();

        DataStream<String> message_stream = env.socketTextStream("localhost", 8087);

        RollingSink sink = new RollingSink<String>("/base/path");
        sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
        sink.setWriter(new StringWriter<String>() );
        sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

        ctrl_message_stream.broadcast().addSink(sink);
        message_stream.addSink(sink);

        env.execute("stream");
    }

}
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<String> controlMessageList = new ArrayList<String>();
        controlMessageList.add("controlMessage1");
        controlMessageList.add("controlMessage2");

        List<String> dataMessageList = new ArrayList<String>();
        dataMessageList.add("Person1");
        dataMessageList.add("Person2");
        dataMessageList.add("Person3");
        dataMessageList.add("Person4");

        DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
        DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);

        DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
                 gr.put("TYPE", value);
                 return gr;
            }
        });

        DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
                 gr.put("FIRSTNAME", value);
                 gr.put("LASTNAME", value+": lastname");
                 return gr;
            }
        });

        //Displaying Generic records
        dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data before union: "+ value);
                return value;
            }
        });

        controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data after union: " + value);
                return value;
            }
        });
        env.execute("stream");
    }
}
05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED.

我们可以看到LASTNAME值不正确,它会被每个记录的FIRSTNAME值替换

共有1个答案

梅修贤
2023-03-14

您的代码基本上用您定义的接收器的副本终止这两个流。你想要的是这样的东西:

java prettyprint-override">final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);

DataStream<String> message_stream = env.socketTextStream("localhost", 8087);

RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

ctrl_message_stream.broadcast().union(message_stream).addSink(sink);

env.execute("stream");
 类似资料:
  • 我有一个主应用程序将消息发送到SQS队列,希望4个消费者应用程序使用相同的消息,并按自己的意愿进行处理 我不确定用于此目的的队列体系结构。 我看到标准SQS、SQS FIFO、(SQS SNSTopic)的选项 对于我想要的功能,似乎(SQS SNS主题)或Kenesis将是一条可行的道路。 但是我也有一个关于标准SQS的问题 我想我是混淆之间的所有选项和压倒了所有的信息可用的队列但仍然感到困惑哪

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条

  • 早上好在我的时区。 事先表示感谢并致以最良好的问候

  • 概述 为了能够让轻应用订阅号的开发者接收到用户在消息窗口的留言消息,开发者可以在管理后台设置消息服务器并开启接收用户对话消息模式。 设置消息服务器时接需要提供可用的接收消息的回调URL地址,为了让通信更加安全,建议使用https。 设置成功并开启了接收对话消息模式后,用户在轻应用或订阅号窗口里发送的消息会推送给设置的URL,服务器接收到消息后,可以通过异步发送消息接口给用户回复消息。 设置消息服务