当前位置: 首页 > 知识库问答 >
问题:

使用并行化优化批量更新

琴俊人
2023-03-14

我有一个用于将实时数据移动到测试环境中的事务数据置乱的过程。该表包含大约一亿行,分布在50个分区中。每月添加一个新分区。随着音量的增加,过程的执行速度比以前慢。

我正在考虑在我的代码中引入某种程度的并行化。这是一个新领域,我想知道是否有任何最佳实践。也许使用dbms_parallel_execute将更新拆分为块?

任何关于如何优化我的代码的建议都非常感谢!

PROCEDURE Scramble_Transactions
AS
    vSeed              BINARY_INTEGER;

    CURSOR Transactions_cur
    IS
        SELECT T.ID,
               T.MONTH_PARTITION,
               T.TRACE_NUM,
               T.TXTDATA
          FROM TRANSACTIONS T;

    TYPE TBL IS TABLE OF Transactions_cur%ROWTYPE
        INDEX BY PLS_INTEGER;

    Transactions_Rec   TBL;

    vCounter           NUMBER (10);
    vString            VARCHAR2 (300);
    vLen               NUMBER (5);
    vFromRange         VARCHAR2 (25);
    vToRange           VARCHAR2 (25);
BEGIN
    vCounter := 0;

    SELECT SUBSTR (TO_CHAR (SYSDATE, 'ddmmyyyyhhmiss'), 11)
      INTO vSeed
      FROM DUAL;

    DBMS_RANDOM.initialize (vSeed);
    DBMS_RANDOM.SEED (vSeed);
    vFromRange := 0;

    OPEN Transactions_cur;

    LOOP
        FETCH Transactions_cur BULK COLLECT INTO Transactions_Rec LIMIT 10000;

        FOR I IN 1 .. Transactions_Rec.COUNT
        LOOP
            IF Transactions_Rec (i).TRACE_NUM IS NOT NULL
            THEN
                vString := Transactions_Rec (i).TRACE_NUM;
                vLen := LENGTH (TRIM (vString));
                vToRange := POWER (10, vLen) - 1;
                Transactions_Rec (i).TRACE_NUM :=
                    LPAD (TRUNC (DBMS_RANDOM.VALUE (vFromRange, vToRange)),
                          6,
                          '1');
            END IF;

            IF Transactions_Rec (i).TXTDATA IS NOT NULL
            THEN
                vString := Transactions_Rec (i).TXTDATA;
                vLen := LENGTH (TRIM (vString));
                vToRange := POWER (10, vLen) - 1;
                Transactions_Rec (i).TXTDATA :=
                    LPAD (TRUNC (DBMS_RANDOM.VALUE (vFromRange, vToRange)),
                          12,
                          '3');
            END IF;

            vCounter := vCounter + 1;
        END LOOP;

        FORALL rec IN 1 .. Transactions_Rec.COUNT
            UPDATE Transactions
               SET TRACE_NUM = Transactions_Rec (rec).TRACE_NUM,
                   TXTDATA = Transactions_Rec (rec).TXTDATA
             WHERE ID = Transactions_Rec (rec).ID
               AND MONTH_PARTITION = Transactions_Rec (rec).MONTH_PARTITION;

        EXIT WHEN Transactions_cur%NOTFOUND;
    END LOOP;

    DBMS_RANDOM.TERMINATE;

    CLOSE Transactions_cur;

    COMMIT;
END Scramble_Transactions;

编辑,我的解决方案基于以下反馈:重写部分过程,使数据填充作为SQL的一部分而不是PL/SQL完成。该过程现在还将分区自/到作为允许并行处理的参数。

CREATE OR REPLACE PROCEDURE Scramble_Transactions(P_MONTH_PARTITION_FROM VARCHAR2, P_MONTH_PARTITION_FROM VARCHAR2)
AS

CURSOR Transactions_cur (V_MONTH_PARTITION_FROM TRANSACTIONS.MONTH_PARTITION%TYPE, 
V_MONTH_PARTITION_TO TRANSACTIONS.MONTH_PARTITION%TYPE) IS

  SELECT T.ID,
               T.MONTH_PARTITION,
               REGEXP_REPLACE(T.TRACE_NUM,'[0-9]','9') TRACE_NUM,
               REGEXP_REPLACE(T.TXTDATA,'[0-9]','9') TXTDATA
          FROM TRANSACTIONS T WHERE T.MONTH_PARTITION BETWEEN P_MONTH_PARTITION_FROM AND P_MONTH_PARTITION_FROM ;

    TYPE TBL IS TABLE OF Transactions_cur%ROWTYPE
        INDEX BY PLS_INTEGER;

    Transactions_Rec   TBL;

BEGIN
OPEN Transactions_cur(P_MONTH_PARTITION_FROM,P_MONTH_PARTITION_FROM);
LOOP
   FETCH Transactions_cur BULK COLLECT INTO Transactions_Rec LIMIT 10000;

       /*Some additional processing*/

       FORALL rec IN 1 .. Transactions_Rec.COUNT
            UPDATE Transactions
               SET TRACE_NUM = Transactions_Rec (rec).TRACE_NUM,
                   TXTDATA = Transactions_Rec (rec).TXTDATA
             WHERE ID = Transactions_Rec (rec).ID
               AND MONTH_PARTITION = Transactions_Rec (rec).MONTH_PARTITION;

  EXIT WHEN  Transactions_cur%NOTFOUND;
END LOOP;
CLOSE Transactions_cur;
COMMIT;
END;
/

现在通过使用DBMS_PARALLEL_EXECUTE并行执行该过程。查询根据分区键分成块。

DECLARE
  L_TASK_SQL CLOB;
  V_TASKNAME USER_PARALLEL_EXECUTE_TASKS.TASK_NAME%TYPE;
  V_STATUS   USER_PARALLEL_EXECUTE_TASKS.STATUS%TYPE;
  C_TASK_NAME VARCHAR2(50) := 'TRANSACTIONS_TASK';
BEGIN
  L_TASK_SQL := 'SELECT PARTITION_NAME, PARTITION_NAME FROM USER_TAB_PARTITIONS WHERE TABLE_NAME = ''TRANSACTIONS''';
  DBMS_PARALLEL_EXECUTE.CREATE_TASK(C_TASK_NAME);
  DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
        TASK_NAME => 'TRANSACTIONS_TASK',
        SQL_STMT  => L_TASK_SQL,
        BY_ROWID  => FALSE);
  DBMS_PARALLEL_EXECUTE.RUN_TASK(
        TASK_NAME      => C_TASK_NAME,
        SQL_STMT => 'BEGIN SCRAMBLE_TRANSACTIONS( :START_ID, :END_ID ); END;',
        LANGUAGE_FLAG  => DBMS_SQL.NATIVE,
        PARALLEL_LEVEL => 6);

  SELECT TASK_NAME, STATUS INTO V_TASKNAME,V_STATUS FROM USER_PARALLEL_EXECUTE_TASKS WHERE TASK_NAME = C_TASK_NAME; 
  DBMS_OUTPUT.PUT_LINE('TASK:'|| 'V_TASKNAME' ||' , STATUS:'|| V_STATUS);

  DBMS_PARALLEL_EXECUTE.DROP_CHUNKS(TASK_NAME => 'TRANSACTIONS_TASK');
  DBMS_PARALLEL_EXECUTE.DROP_TASK(TASK_NAME  => 'TRANSACTIONS_TASK');
END;
/

总体执行时间从之前的13-14小时减少到30分钟。

共有2个答案

姜彬郁
2023-03-14

我认为如果使用CTAS(创建表...作为选择)或插入/*附加*/...而不是更新,您的性能会好得多。由于您的数据是分区的,因此您可以使用分区交换。这将允许您更有效地使用并行性,以及直接的路径加载操作。

吕自明
2023-03-14

SQL是一个很好的选择,但也许一个非常快速的解决方法是您正在更新正在获取的同一个表。这可能会产生巨大的撤消问题,因为获取必须给出与时间点一致的结果集。因此,每次围绕获取循环,您可能会做越来越多的工作(撤消您刚刚完成的更新)。当然,提交每个循环会导致错误时的可重启性问题。所以也许一次分区,不循环,例如

PROCEDURE Scramble_Transactions(p_parname varchar2) AS
    vSeed              BINARY_INTEGER;


    Transactions_cur sys_refcursor;

    CURSOR Transactions_cur_template
    IS
        SELECT T.ID,
               T.MONTH_PARTITION,
               T.TRACE_NUM,
               T.TXTDATA
          FROM TRANSACTIONS T;

    TYPE TBL IS TABLE OF Transactions_cur_template%ROWTYPE INDEX BY PLS_INTEGER;

    Transactions_Rec   TBL;

    vCounter           NUMBER (10);
    vString            VARCHAR2 (300);
    vLen               NUMBER (5);
    vFromRange         VARCHAR2 (25);
    vToRange           VARCHAR2 (25);
BEGIN
    vCounter := 0;

    SELECT SUBSTR (TO_CHAR (SYSDATE, 'ddmmyyyyhhmiss'), 11)
      INTO vSeed
      FROM DUAL;

    DBMS_RANDOM.initialize (vSeed);
    DBMS_RANDOM.SEED (vSeed);
    vFromRange := 0;

    OPEN Transactions_cur for ' SELECT T.ID,
               T.MONTH_PARTITION,
               T.TRACE_NUM,
               T.TXTDATA
          FROM TRANSACTIONS T partition ('||p_parname||') where TRACE_NUM IS NOT NULL or TXTDATA IS NOT NULL';

        FETCH Transactions_cur BULK COLLECT INTO Transactions_Rec;

        FOR I IN 1 .. Transactions_Rec.COUNT
        LOOP
            IF Transactions_Rec (i).TRACE_NUM IS NOT NULL
            THEN
                vString := Transactions_Rec (i).TRACE_NUM;
                vLen := LENGTH (TRIM (vString));
                vToRange := POWER (10, vLen) - 1;
                Transactions_Rec (i).TRACE_NUM :=
                    LPAD (TRUNC (DBMS_RANDOM.VALUE (vFromRange, vToRange)),
                          6,
                          '1');
            END IF;

            IF Transactions_Rec (i).TXTDATA IS NOT NULL
            THEN
                vString := Transactions_Rec (i).TXTDATA;
                vLen := LENGTH (TRIM (vString));
                vToRange := POWER (10, vLen) - 1;
                Transactions_Rec (i).TXTDATA :=
                    LPAD (TRUNC (DBMS_RANDOM.VALUE (vFromRange, vToRange)),
                          12,
                          '3');
            END IF;

            vCounter := vCounter + 1;
        END LOOP;

        FORALL rec IN 1 .. Transactions_Rec.COUNT
            UPDATE Transactions
               SET TRACE_NUM = Transactions_Rec (rec).TRACE_NUM,
                   TXTDATA = Transactions_Rec (rec).TXTDATA
             WHERE ID = Transactions_Rec (rec).ID
               AND MONTH_PARTITION = Transactions_Rec (rec).MONTH_PARTITION;

    DBMS_RANDOM.TERMINATE;

    CLOSE Transactions_cur;

    COMMIT;
END Scramble_Transactions;

因此,只需更改几行代码,我们就可以

  • 消除了执行大量撤消操作的获取问题
  • 通过将分区名作为参数,使其易于并行运行

然后,您可以为每个分区名称提交一个作业(使用DBMS_SCHEDULER),因为我们现在隔离每个分区,所以我们不会在作业之间获得争用。

不要误解我的意思——在SQL中进行完全重构可能仍然是最好的选择,但是就速赢而言,上面的代码可能会以最少的更改解决您的问题。

 类似资料:
  • 问题内容: 我在将不同的缓冲区大小插入到本地SQLite DB中时发现,当缓冲区大小为10,000时,插入10,000,000行数据需要花费近8分钟的时间。换句话说,它需要1,000次写入来存储所有内容。 8分钟存储10,000,000个似乎太长了(或者是?) 可以优化以下任何一项以提高速度吗?请注意,插入的数据是字符的随机集合。 创建表格后,通过 是否可以进一步优化上述任何一项? 问题答案: 我

  • 问题内容: 我需要通过REST API的Batch端点将大量节点及其之间的关系插入到Neo4j中,大约每秒5k记录(仍在增加)。 这将是24x7连续插入。每条记录可能只需要创建一个节点,而其他记录可能需要两个节点并创建一个关系。 是否可以通过更改程序或修改Neo4j的设置来提高插入件的性能? 到目前为止,我的进度: 1.我已经使用Neo4j进行了一段时间的测试,但无法获得所需的性能 测试服务器盒:

  • 我通过Julia使用GLPK,我需要反复优化同一个GLPK。Prob。每次优化之间的变化是变量的某些组合固定为0 简单的放入伪代码 当我运行这个程序时,看起来CPU1就像一个调度器,保持在9-11%的范围内,CPU3和CPU4上的负载在0和100%之间交替,尽管从来没有同时发生过。。。CPU2上的负载保持在0% 这可能需要一点时间,我想使用所有的核心 然而,使用Julia的并行功能有点麻烦,尤其是

  • 有几种使用SIMD指令优化HOG描述符计算的尝试:OpenCV、Dlib和SIMD。它们都使用标量代码将结果幅值添加到HOG直方图中: 在那里,大小的值取决于实现,但一般意义相同。 我知道使用SIMD计算直方图的问题并没有简单有效的解决方案。但在这种情况下,我们有小尺寸(18)的直方图。它能帮助SIMD优化吗?

  • You might notice after requiring React JS into your project that the time it takes from a save to a finished rebundle of your application takes more time. In development you ideally want from 200-800

  • 问题内容: 我有一堆JSON数组文件(准确地说是AVRO),每个文件都产生多个样本来训练Keras模型。通过使用@GPhilo和@jsimsa的想法,我能够想到这一点来并行化我的输入管道。无法弄清楚如何设计来划分处理文件的工作。代码内部失败,因为该函数需要一个字符串文件路径而不是一个, 这里正确的设计方法是什么 这是使用和设计输入管道的优化方法吗? 问题答案: 在我看来,发电机不必要地使您的生活变