附录: debezium
https://kgithub.com/debezium/debezium
Debezium Connector for Oracle :: Debezium Documentation
oracle logminer
logminer 捕获流程
1、构建数据字典,(可跳过)
execute dbms_logmnr_d.build(options=>dbms_logmnr_d.store_in_redo_logs);
2、添加日志文件
EXECUTE DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => '/arch/redo4.log’, OPTIONS => DBMS_LOGMNR.NEW);
3、启动logminer
ECECUTE SYS.DBMS_LOGMNR_START_LOGMNR(OPTIONS=>DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT)
4、操作数据库blablabla.....
5、查询
SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION from V$LOGMNR_CONTENTS;
debezium 源码相关logminer
debezium-connector-oracle
1、构建数据字典
io.debezium.connector.oracle.logminer.SqlUtils.BUILD_DICTIONARY
static final String BUILD_DICTIONARY = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";
2、添加日志文件
io.debezium.connector.oracle.logminer.SqlUtils.addLogFileStatement(String, String)
static String addLogFileStatement(String option, String fileName) {
return "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '" + fileName + "', OPTIONS => " + option + ");END;";
}
3、启动logminer
io.debezium.connector.oracle.logminer.SqlUtils.startLogMinerStatement(Scn, Scn, LogMiningStrategy, boolean)
// ***** LogMiner methods ***
/**
* This returns statement to build LogMiner view for online redo log files
* @param startScn mine from
* @param endScn mine till
* @param strategy Log Mining strategy
* @return statement todo: handle corruption. STATUS (Double) — value of 0 indicates it is executable
*/
static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) {
String miningStrategy;
if (strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)) {
miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
}
else {
miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";
}
if (isContinuousMining) {
miningStrategy += " + DBMS_LOGMNR.CONTINUOUS_MINE ";
}
return "BEGIN sys.dbms_logmnr.start_logmnr(" +
"startScn => '" + startScn + "', " +
"endScn => '" + endScn + "', " +
"OPTIONS => " + miningStrategy +
" + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +
"END;";
}
5、查询解析结果
io.debezium.connector.oracle.logminer.LogMinerQueryBuilder.build(OracleConnectorConfig, OracleDatabaseSchema, String)
debezium 查询v$LOGMNR_CONTENTS时,添加了一些过滤条件,例如表、SCN、OPERATION_CODE等
/**
* Builds the LogMiner contents view query.
*
* The returned query will contain 2 bind parameters that the caller is responsible for binding before
* executing the query. The first bind parameter is the lower-bounds of the SCN mining window that is
* not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive.
*
* The built query relies on the following columns from V$LOGMNR_CONTENTS:
* <pre>
* SCN - the system change number at which the change was made
* SQL_REDO - the reconstructed SQL statement that initiated the change
* OPERATION - the database operation type name
* OPERATION_CODE - the database operation numeric code
* TIMESTAMP - the time when the change event occurred
* XID - the transaction identifier the change participated in
* CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes
* TABLE_NAME - the name of the table for which the change is for
* SEG_OWNER - the name of the schema for which the change is for
* USERNAME - the name of the database user that caused the change
* ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value
* ROLLBACK - the rollback flag, value of 0 or 1. 1 implies the row was rolled back
* RS_ID - the rollback segment idenifier where the change record was record from
* </pre>
*
* @param connectorConfig connector configuration, should not be {@code null}
* @param schema database schema, should not be {@code null}
* @param userName jdbc connection username
* @return the SQL string to be used to fetch changes from Oracle LogMiner
*/
public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema, String userName) {
final StringBuilder query = new StringBuilder(1024);
query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");
query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID ");
query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
// These bind parameters will be bound when the query is executed by the caller.
query.append("WHERE SCN > ? AND SCN <= ? ");
// Restrict to configured PDB if one is supplied
final String pdbName = connectorConfig.getPdbName();
if (!Strings.isNullOrEmpty(pdbName)) {
query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");
}
query.append("AND (");
// Always include START, COMMIT, MISSING_SCN, and ROLLBACK operations
query.append("(OPERATION_CODE IN (6,7,34,36)");
if (!schema.storeOnlyCapturedTables()) {
// In this mode, the connector will always be fed DDL operations for all tables even if they
// are not part of the inclusion/exclusion lists.
query.append(" OR ").append(buildDdlPredicate(userName)).append(" ");
// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
if (connectorConfig.isLobEnabled()) {
query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");
}
else {
query.append(") OR (OPERATION_CODE IN (1,2,3) ");
}
}
else {
// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
if (connectorConfig.isLobEnabled()) {
query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");
}
else {
query.append(") OR ((OPERATION_CODE IN (1,2,3) ");
}
// In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists
query.append("OR ").append(buildDdlPredicate(userName)).append(") ");
}
// Always ignore the flush table
query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' ");
// There are some common schemas that we automatically ignore when building the runtime Filter
// predicates and we put that same list of schemas here and apply those in the generated SQL.
if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) {
query.append("AND SEG_OWNER NOT IN (");
for (Iterator<String> i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) {
String excludedSchema = i.next();
query.append("'").append(excludedSchema.toUpperCase()).append("'");
if (i.hasNext()) {
query.append(",");
}
}
query.append(") ");
}
String schemaPredicate = buildSchemaPredicate(connectorConfig);
if (!Strings.isNullOrEmpty(schemaPredicate)) {
query.append("AND ").append(schemaPredicate).append(" ");
}
String tablePredicate = buildTablePredicate(connectorConfig);
if (!Strings.isNullOrEmpty(tablePredicate)) {
query.append("AND ").append(tablePredicate).append(" ");
}
query.append("))");
return query.toString();
}