Flink 1.11新增支持CDC,包括Debezium、Canal,现修改debezium-json的format格式
1、插入
(true,1,2,3)
2、更新
(false,1,2,3)
(true,1,2,4
3、删除
(false,1,2,3)
(true,1,2,4
1、插入
(true,1,2,3,'c')
备注:‘c’代表新建
2、更新
(true,1,2,4,'u')
备注:'u’代表更新
3、删除
备注:不做任何处理
flink-release-1.11.1/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
GenericRowData payload;
if (schemaInclude) {
payload = (GenericRowData) row.getField(0);
} else {
payload = row;
}
GenericRowData before = (GenericRowData) payload.getField(0);
GenericRowData after = (GenericRowData) payload.getField(1);
int len = after.getArity();
after.setField(len-1,payload.getField(2));
String op = payload.getField(2).toString();
if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
//after.setRowKind(RowKind.INSERT);
after.setField(len-1,payload.getField(2));
out.collect(after);
} else if (OP_UPDATE.equals(op)) {
//before.setRowKind(RowKind.UPDATE_BEFORE);
//after.setRowKind(RowKind.UPDATE_AFTER);
//out.collect(before);
out.collect(after);
} else if (OP_DELETE.equals(op)) {
//before.setRowKind(RowKind.DELETE);
before.setField(len-1,payload.getField(2));
out.collect(before);
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
"Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(message)), t);
}
}
}