2022/11/08 菜鸟记录.
场景: Flink 读取 Doris 的 a表, 写回 Doris 的 b表.
背景: Flink版本 1.14.4
Doris版本 1.1.0
代码:
public class DorisToDoris {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
String host = "192.168.**.**";
String port = "8030";
String userName = "******";
String passWord = "******";
//Source
String tableA = "demo.exm1";
String fieldA = "user_id,days,city";
DorisSource<JSONObject> tableASource = DorisUtils.getJsonDorisSource(host + ":" + port, tableA, userName, passWord, fieldA);
DataStreamSource<JSONObject> tableADS = env.fromSource(tableASource, WatermarkStrategy.noWatermarks(), "tableASource");
//Sink
String tableB = "exm2";
DorisSink<String> tableBSink = DorisUtils.getDorisSink(host, port, "demo", tableB, userName, passWord, "tableB" + System.currentTimeMillis());
tableADS.map(JSONAware::toJSONString)
.sinkTo(tableBSink);
env.execute();
}
}
public class DorisUtils {
public static DorisSource<JSONObject> getJsonDorisSource(String fe_ip, String tableName, String userName, String passWord, String fields) {
DorisSource<JSONObject> dorisSource = DorisSourceBuilder.<JSONObject>builder()
.setDorisOptions(
DorisOptions.builder()
.setFenodes(fe_ip)
.setTableIdentifier(tableName)
.setUsername(userName)
.setPassword(passWord)
.build())
.setDorisReadOptions(DorisReadOptions.builder().setReadFields(fields).build())
.setDeserializer(new DorisDeserializationSchema<JSONObject>() {
@Override
public void deserialize(List<?> record, Collector<JSONObject> out) throws Exception {
int index = 0;
JSONObject result = new JSONObject();
for (String key : fields.split(",")) {
result.put(key, record.get(index));
index++;
}
out.collect(result);
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return TypeInformation.of(new TypeHint<JSONObject>() {
});
}
})
.build();
return dorisSource;
}
public static DorisSink<String> getDorisSink(String host, String port, String db, String tb, String user, String pw, String labelPrefix) {
//doris-sink
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("line_delimiter", "\n");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(host + ":" + port)
.setTableIdentifier(db + "." + tb)
.setUsername(user)
.setPassword(pw);
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder
.setStreamLoadProp(pro)
.setLabelPrefix(labelPrefix);
DorisSink<String> dorisSink = builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build())
.build();
return dorisSink;
}
}
问题: 读取√ , 写入×. 并且写入前测试了有数据.
分析: 1. sinkTo()之前, 流是有数据的, 说明问题出现在写入的时候.
2. 试了下 Flink 消费 Kafka , 写入 Doris 的 b表成功.
原来是个低级错误: Kafka 的数据是无界的, Doris 的数据是有界的.
更正: 在主程序修改env的运行模式为批处理. 写入成功.
env.setRuntimeMode(RuntimeExecutionMode.BATCH);