当前位置: 首页 > 工具软件 > LogMiner > 使用案例 >

ORACLE LOGMINER DEBEZIUM

孟鹤龄
2023-12-01

附录: debezium

https://kgithub.com/debezium/debezium

Debezium Connector for Oracle :: Debezium Documentation

oracle logminer

DBMS_LOGMNR

V$LOGMNR_SESSION

logminer 捕获流程

1、构建数据字典,(可跳过)

execute dbms_logmnr_d.build(options=>dbms_logmnr_d.store_in_redo_logs);

2、添加日志文件

EXECUTE DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => '/arch/redo4.log’, OPTIONS => DBMS_LOGMNR.NEW);

3、启动logminer

ECECUTE SYS.DBMS_LOGMNR_START_LOGMNR(OPTIONS=>DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT)

4、操作数据库blablabla.....

5、查询

SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION from V$LOGMNR_CONTENTS;

debezium 源码相关logminer 

debezium-connector-oracle

1、构建数据字典

io.debezium.connector.oracle.logminer.SqlUtils.BUILD_DICTIONARY

static final String BUILD_DICTIONARY = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";

2、添加日志文件

io.debezium.connector.oracle.logminer.SqlUtils.addLogFileStatement(String, String)


    static String addLogFileStatement(String option, String fileName) {
        return "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '" + fileName + "', OPTIONS => " + option + ");END;";
    }

3、启动logminer

io.debezium.connector.oracle.logminer.SqlUtils.startLogMinerStatement(Scn, Scn, LogMiningStrategy, boolean)


    // ***** LogMiner methods ***
    /**
     * This returns statement to build LogMiner view for online redo log files
     * @param startScn mine from
     * @param endScn mine till
     * @param strategy Log Mining strategy
     * @return statement todo: handle corruption. STATUS (Double) — value of 0 indicates it is executable
     */
    static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) {
        String miningStrategy;
        if (strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)) {
            miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
        }
        else {
            miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";
        }
        if (isContinuousMining) {
            miningStrategy += " + DBMS_LOGMNR.CONTINUOUS_MINE ";
        }
        return "BEGIN sys.dbms_logmnr.start_logmnr(" +
                "startScn => '" + startScn + "', " +
                "endScn => '" + endScn + "', " +
                "OPTIONS => " + miningStrategy +
                " + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +
                "END;";
    }

5、查询解析结果

io.debezium.connector.oracle.logminer.LogMinerQueryBuilder.build(OracleConnectorConfig, OracleDatabaseSchema, String)

debezium 查询v$LOGMNR_CONTENTS时,添加了一些过滤条件,例如表、SCN、OPERATION_CODE等


    /**
     * Builds the LogMiner contents view query.
     *
     * The returned query will contain 2 bind parameters that the caller is responsible for binding before
     * executing the query.  The first bind parameter is the lower-bounds of the SCN mining window that is
     * not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive.
     *
     * The built query relies on the following columns from V$LOGMNR_CONTENTS:
     * <pre>
     *     SCN - the system change number at which the change was made
     *     SQL_REDO - the reconstructed SQL statement that initiated the change
     *     OPERATION - the database operation type name
     *     OPERATION_CODE - the database operation numeric code
     *     TIMESTAMP - the time when the change event occurred
     *     XID - the transaction identifier the change participated in
     *     CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes
     *     TABLE_NAME - the name of the table for which the change is for
     *     SEG_OWNER - the name of the schema for which the change is for
     *     USERNAME - the name of the database user that caused the change
     *     ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value
     *     ROLLBACK - the rollback flag, value of 0 or 1.  1 implies the row was rolled back
     *     RS_ID - the rollback segment idenifier where the change record was record from
     * </pre>
     *
     * @param connectorConfig connector configuration, should not be {@code null}
     * @param schema database schema, should not be {@code null}
     * @param userName jdbc connection username
     * @return the SQL string to be used to fetch changes from Oracle LogMiner
     */
    public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema, String userName) {
        final StringBuilder query = new StringBuilder(1024);
        query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");
        query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID ");
        query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
        // These bind parameters will be bound when the query is executed by the caller.
        query.append("WHERE SCN > ? AND SCN <= ? ");
        // Restrict to configured PDB if one is supplied
        final String pdbName = connectorConfig.getPdbName();
        if (!Strings.isNullOrEmpty(pdbName)) {
            query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");
        }
        query.append("AND (");
        // Always include START, COMMIT, MISSING_SCN, and ROLLBACK operations
        query.append("(OPERATION_CODE IN (6,7,34,36)");
        if (!schema.storeOnlyCapturedTables()) {
            // In this mode, the connector will always be fed DDL operations for all tables even if they
            // are not part of the inclusion/exclusion lists.
            query.append(" OR ").append(buildDdlPredicate(userName)).append(" ");
            // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
            if (connectorConfig.isLobEnabled()) {
                query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");
            }
            else {
                query.append(") OR (OPERATION_CODE IN (1,2,3) ");
            }
        }
        else {
            // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
            if (connectorConfig.isLobEnabled()) {
                query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");
            }
            else {
                query.append(") OR ((OPERATION_CODE IN (1,2,3) ");
            }
            // In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists
            query.append("OR ").append(buildDdlPredicate(userName)).append(") ");
        }
        // Always ignore the flush table
        query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' ");
        // There are some common schemas that we automatically ignore when building the runtime Filter
        // predicates and we put that same list of schemas here and apply those in the generated SQL.
        if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) {
            query.append("AND SEG_OWNER NOT IN (");
            for (Iterator<String> i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) {
                String excludedSchema = i.next();
                query.append("'").append(excludedSchema.toUpperCase()).append("'");
                if (i.hasNext()) {
                    query.append(",");
                }
            }
            query.append(") ");
        }

        String schemaPredicate = buildSchemaPredicate(connectorConfig);
        if (!Strings.isNullOrEmpty(schemaPredicate)) {
            query.append("AND ").append(schemaPredicate).append(" ");
        }

        String tablePredicate = buildTablePredicate(connectorConfig);
        if (!Strings.isNullOrEmpty(tablePredicate)) {
            query.append("AND ").append(tablePredicate).append(" ");
        }

        query.append("))");

        return query.toString();
    }

 类似资料:

相关阅读

相关文章

相关问答