Parallel By Range

Previous posts in this category introduced a way to perform DML in a SELECT using the following technology:

  • Table Functions
  • Autonomous Transactions
  • A REF CURSOR as a parameter
  • Row by row (aka slow by slow) processing of the REF CURSOR
  • Parallel processing to try to overcome the slow by slow processing

This post expands the parallel processing to enable a simple requirement – provide a total of the values processed in one column for each value in another column.

Here is the demo table:

SQL> desc demo
 Name         Null?    Type
 ------------ -------- ------
 COL1                  NUMBER 
 COL2                  NUMBER

We want to see the total of all COL2 values deleted for every COL1 value. For example these two rows would produce a total of 11 for the COL1 value of 1.

SQL> SELECT *
 2     FROM demo;
      COL1       COL2
---------- ----------
         1          7
         1          4

Let’s create some test data in the table.

SQL> BEGIN
  2    FOR counter IN 1..10 LOOP
  3      FOR counter2 IN 1..10 LOOP
  4        IF MOD(counter,2) = 0 THEN
  5          INSERT INTO demo
  6          VALUES(counter,counter2 * counter);
  7        ELSE
  8          INSERT INTO demo
  9          VALUES(counter,counter * counter);
 10       END IF;
 11     END LOOP;
 12   END LOOP;
 13 END;
 14 /
PL/SQL procedure successfully completed.

SQL> COMMIT;
Commit complete.

-- Query the data. We hope to see the same result from the function we'll create later.
SQL> SELECT col1,
  2         SUM(col2)
  3    FROM demo
  4  GROUP BY col1
  5  ORDER BY col1;

COL1       SUM(COL2)
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550
10 rows selected.

Now here is the function – pay close attention to the PARALLEL_ENABLE and ORDER clause in the header.

CREATE OR REPLACE FUNCTION delete_demo ( p_curs demo_curs.v_demo_curs )
                  RETURN v_n_t
                  --
                  -- Rows returned by the cursor will be partitioned across
                  -- parallel processes based on their COL1 value. For example
                  -- all COL1=1 rows will go to the same parallel process, all
                  -- COL1=2 rows will go to the same parallel process, and so on
                  --
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  --
                  -- Within the parallel process the rows will be ordered by their COL1
                  -- values. For example if COL1=1 and COL1=2 go to the same parallel process
                  -- they wont get mixed up
                  --
                  ORDER p_curs BY ( col1 ) AS

    PRAGMA AUTONOMOUS_TRANSACTION;
    v_ret_val v_n_t := v_n_t();
    v_number NUMBER;

    v_last_col1 NUMBER := NULL; -- last col1 value processed
    v_total NUMBER := 0;        -- running total of col2 for each col1
    v_col1 NUMBER;              -- col1 value queried
    v_col2 NUMBER;              -- col2 value queried

BEGIN

   /*
   ||
   || The processing in this function is pretty straightforward. As the COL1 values
   || arrive we track the SUM Of COL2 values. When the COL1 value changes we output the
   || totals to a log table (demo_log), reset the counters and start again.
   ||
   */
   LOOP

    -- get a row from cursor
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;

    -- initialize last col1 if this is first record
    -- from cursor
    IF v_last_col1 IS NULL THEN
      v_last_col1 := v_col1;
    END IF;

    -- if same col1 value then add to total
    IF v_last_col1 = v_col1 THEN
      v_total := v_total + v_col2;
    ELSE
      --
      -- not same col1 value so save log and
      -- re-initialize tracking values
      --
      INSERT INTO demo_log
      VALUES(v_last_col1,v_total);
      v_last_col1 := v_col1;
      v_total := v_col2;

    END IF; -- if same col1

    -- delete the record and return the rowcount
    v_ret_val.EXTEND;
    DELETE demo
    WHERE col1 = v_col1;
    v_ret_val(v_ret_val.LAST) := SQL%ROWCOUNT;

  END LOOP;

  -- dont forget to log the last total
  IF v_last_col1 IS NOT NULL THEN
    INSERT INTO demo_log
    VALUES(v_last_col1,v_total);
  END IF;

  COMMIT;

  RETURN(v_ret_val);

END;
/

Now lets execute the function.

SQL> ALTER SESSION FORCE PARALLEL QUERY;
Session altered.

SQL> SELECT COUNT(*)
 2     FROM TABLE(delete_demo(CURSOR(SELECT col1,
 3                                          col2
 4                                     FROM demo
 5                                   ORDER BY col2))) -- ordered by col2 to create randomness
 6 /
  COUNT(*)
----------
       100

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

Now lets see what wound up in the log table.

SQL> SELECT *
  2    FROM demo_log
  3  ORDER BY col1
  4 /
      COL1      TOTAL
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550

That matches the query with a group by we did earlier so the partitioning and ordering of the cursor worked!

I’ll conclude with two key points:

  1. The partitioning and ordering works despite of the ORDER BY clause supplied in SQL of the cursor
  2. The cursor must be strongly typed to be partitioned and ordered. I used this package for that:
      CREATE OR REPLACE PACKAGE demo_curs AS
        TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE
      END;

In the next post I’ll explain PIPELINING to make the processing even faster.

Thanks for reading!