Flink DataStream方式 读取 Doris 并写回 Doris 失败.

左丘宜然
2023-12-01

2022/11/08 菜鸟记录.

场景: Flink 读取 Doris 的 a表, 写回 Doris 的 b表.

背景: Flink版本 1.14.4

         Doris版本 1.1.0

代码

public class DorisToDoris {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        String host = "192.168.**.**";
        String port = "8030";
        String userName = "******";
        String passWord = "******";
        //Source
        String tableA = "demo.exm1";
        String fieldA = "user_id,days,city";
        DorisSource<JSONObject> tableASource = DorisUtils.getJsonDorisSource(host + ":" + port, tableA, userName, passWord, fieldA);
        DataStreamSource<JSONObject> tableADS = env.fromSource(tableASource, WatermarkStrategy.noWatermarks(), "tableASource");
        //Sink
        String tableB = "exm2";
        DorisSink<String> tableBSink = DorisUtils.getDorisSink(host, port, "demo", tableB, userName, passWord, "tableB" + System.currentTimeMillis());
        tableADS.map(JSONAware::toJSONString)
                .sinkTo(tableBSink);
        
        env.execute();
    }
}
public class DorisUtils {
    public static DorisSource<JSONObject> getJsonDorisSource(String fe_ip, String tableName, String userName, String passWord, String fields) {
        DorisSource<JSONObject> dorisSource = DorisSourceBuilder.<JSONObject>builder()
                .setDorisOptions(
                        DorisOptions.builder()
                                .setFenodes(fe_ip)
                                .setTableIdentifier(tableName)
                                .setUsername(userName)
                                .setPassword(passWord)
                                .build())
                .setDorisReadOptions(DorisReadOptions.builder().setReadFields(fields).build())
                .setDeserializer(new DorisDeserializationSchema<JSONObject>() {
                    @Override
                    public void deserialize(List<?> record, Collector<JSONObject> out) throws Exception {
                        int index = 0;
                        JSONObject result = new JSONObject();
                        for (String key : fields.split(",")) {
                            result.put(key, record.get(index));
                            index++;
                        }
                        out.collect(result);
                    }

                    @Override
                    public TypeInformation<JSONObject> getProducedType() {
                        return TypeInformation.of(new TypeHint<JSONObject>() {
                        });
                    }
                })
                .build();
        return dorisSource;
    }

    public static DorisSink<String> getDorisSink(String host, String port, String db, String tb, String user, String pw, String labelPrefix) {
        //doris-sink
        DorisSink.Builder<String> builder = DorisSink.builder();
        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        pro.setProperty("line_delimiter", "\n");
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(host + ":" + port)
                .setTableIdentifier(db + "." + tb)
                .setUsername(user)
                .setPassword(pw);

        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
        executionBuilder
                .setStreamLoadProp(pro)
                .setLabelPrefix(labelPrefix);

        DorisSink<String> dorisSink = builder.setDorisReadOptions(readOptionBuilder.build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(new SimpleStringSerializer())
                .setDorisOptions(dorisBuilder.build())
                .build();
        return dorisSink;
    }
}

问题: 读取√ , 写入×. 并且写入前测试了有数据.

分析: 1. sinkTo()之前, 流是有数据的, 说明问题出现在写入的时候.

         2. 试了下 Flink 消费 Kafka , 写入 Doris 的 b表成功.        

         原来是个低级错误: Kafka 的数据是无界的, Doris 的数据是有界的. 

更正: 在主程序修改env的运行模式为批处理. 写入成功.

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

 类似资料: