Power Of Pipelining

Previous posts in this category introduced a way to perform DML in a SELECT using the following technology:

  • Table Functions
  • Autonomous Transactions
  • A REF CURSOR as a parameter
  • Row by row (aka slow by slow) processing of the REF CURSOR
  • Parallel processing to overcome the slow by slow processing

This post adds pipelining to the arsenal. Pipelining returns records from the function as soon as they are assembled rather than waiting for the whole function to finish.

This expedites things by allowing calling programs to start processing sooner.

The demo table and data is shown in the previous entry at Parallel By Range

Let’s get right to code shall we…

CREATE OR REPLACE FUNCTION delete_demo ( p_curs demo_curs.v_demo_curs )
                  RETURN v_n_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS --> identifies this function as pipelined

    PRAGMA AUTONOMOUS_TRANSACTION;
    v_number NUMBER := 0;

    v_last_col1 NUMBER := NULL; -- last col1 value processed
    v_total NUMBER := 0; -- running total of col2 for each col1
    v_col1 NUMBER; -- col1 value queried
    v_col2 NUMBER; -- col2 value queried
    v_rc NUMBER := 0; -- save the rowcount

BEGIN
  --
  -- Important Points:
  --   1) The code does not change much from the non pipelined version
  --   2) Except the RETURN call does not specify a value because values are sent back by the...
  --   3) ..PIPE ROW calls
  --   4) This still looks like SLOW BY SLOW processing
  --   5) I know and I promised ways to get past that!
  --   6) You mean parallel processing and this funky pipelining thing?
  --   7) Yes!
  --
  LOOP
    -- get a row from cursor
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;

    -- initialize last col1 if this is first record from cursor
    IF v_last_col1 IS NULL THEN
      v_last_col1 := v_col1;
    END IF;

    -- if same col1 value then add to total
    IF v_last_col1 = v_col1 THEN
      v_total := v_total + v_col2;
    ELSE
      --
      -- not same col1 value so save log and
      -- re-initialize tracking values
      --
      INSERT INTO demo_log
      VALUES(v_last_col1,v_total);
      v_last_col1 := v_col1;
      v_total := v_col2;

    END IF; -- if same col1

    -- delete the record and return the rowcount
    DELETE demo
    WHERE col1 = v_col1;

    v_rc := SQL%ROWCOUNT;
    COMMIT;
    --
    -- return a row now with the PIPE ROW call
    -- no need to wait until all rows have been processed
    -- the calling program (SQL*Plus in this case) can start processing it
    -- now before the function finishes
    PIPE ROW(v_rc);

  END LOOP;

  -- dont forget to log the last total
  IF v_last_col1 IS NOT NULL THEN
    INSERT INTO demo_log
    VALUES(v_last_col1,v_total);
  END IF;

  COMMIT;

  -- note the return is still required but it does not specify a value
  -- because everything was returned by prior PIPE ROW calls
  RETURN;

END;

Executing the function does not change.

SQL> ALTER SESSION ENABLE PARALLEL QUERY;
Session altered.

SQL> SELECT COUNT(column_value)
  2    FROM TABLE(delete_demo(CURSOR(SELECT col1,
  3                                         col2
  4                                    FROM demo
  5                                  ORDER BY col2))) -- ordered by col2 to create randomness
  6 /
COUNT(COLUMN_VALUE)
-------------------
                100

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

SQL> --
SQL> -- See what wound up in the demo log table
SQL> --
SQL> SELECT *
  2    FROM demo_log
  3  ORDER BY col1
  4 /

      COL1      TOTAL
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550
10 rows selected.

Those results are the same as from the non-pipelined version of the code at Parallel By Range.

Was the pipeline version faster? I’ll discuss ways to validate that in the next set of posts.

I’ve added a new post showing how to check what PQ server is being used to execute code. Check it out Here.

Thanks for reading.

Which PQ Server

This post shows how to determine which parallel query server’s (PQS) were used to execute a parallel table function.

This is done using the v$px_process view which shows the relationship between Oracle sessions and PQ servers.

First create a table with some demo data.

CREATE TABLE demo
( col1 NUMBER );

BEGIN
  FOR counter IN 1..10 LOOP
    INSERT INTO demo VALUES(counter);
  END LOOP;
END;
/

Then create two object types as the return values from the table function.

CREATE TYPE v_o AS OBJECT ( col1 NUMBER,
                            pq   VARCHAR2(100) )
/
CREATE TYPE v_t AS TABLE OF v_o;
/

Then a package for the cursor definition.

CREATE OR REPLACE PACKAGE demo_curs AS
 TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE;
END;

Then the function itself.

CREATE OR REPLACE FUNCTION which_pq ( p_curs demo_curs.v_demo_curs )
                  RETURN v_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS
  v_col1 NUMBER;
  v_pq VARCHAR2(100);
BEGIN
  LOOP
    -- find the name of the PQS for the current session
    SELECT server_name
      INTO v_pq
      FROM v$px_process
     WHERE sid = ( SELECT sid
                     FROM v$mystat
                    WHERE rownum = 1 );
    -- get a row from cursor
    FETCH p_curs INTO v_col1;
    EXIT WHEN p_curs%NOTFOUND;
    -- send a row back as soon as its found!
    PIPE ROW(v_o(v_col1,v_pq));
  END LOOP;
  RETURN;
END;

Then execute the function in parallel.

SQL> ALTER SESSION FORCE PARALLEL QUERY;
Session altered.

SQL> SELECT *
  2    FROM TABLE(which_pq(CURSOR(SELECT *
  3                                 FROM demo)))
  4   ORDER BY col1
  5 /

COL1       PQ
---------- ----------
         1 P004
         2 P004
         3 P004
         4 P005
         5 P005
         6 P005
         7 P005
         8 P006
         9 P006
        10 P007
10 rows selected.

SQL> ALTER SESSION DISABLE PARALLEL QUERY;

The function was defined to partition records across PQS based on the values in COL1. In this case it partitioned as:

  • Values 1 through 3 to P004
  • Values 4 through 7 to P005
  • Values 8 through 9 to P006
  • Value 10 to P007

The distribution and PQ processes used will vary based on system load and the amount of data.

Thanks for reading!

Parallel By Range

Previous posts in this category introduced a way to perform DML in a SELECT using the following technology:

  • Table Functions
  • Autonomous Transactions
  • A REF CURSOR as a parameter
  • Row by row (aka slow by slow) processing of the REF CURSOR
  • Parallel processing to try to overcome the slow by slow processing

This post expands the parallel processing to enable a simple requirement – provide a total of the values processed in one column for each value in another column.

Here is the demo table:

SQL> desc demo
 Name         Null?    Type
 ------------ -------- ------
 COL1                  NUMBER 
 COL2                  NUMBER

We want to see the total of all COL2 values deleted for every COL1 value. For example these two rows would produce a total of 11 for the COL1 value of 1.

SQL> SELECT *
 2     FROM demo;
      COL1       COL2
---------- ----------
         1          7
         1          4

Let’s create some test data in the table.

SQL> BEGIN
  2    FOR counter IN 1..10 LOOP
  3      FOR counter2 IN 1..10 LOOP
  4        IF MOD(counter,2) = 0 THEN
  5          INSERT INTO demo
  6          VALUES(counter,counter2 * counter);
  7        ELSE
  8          INSERT INTO demo
  9          VALUES(counter,counter * counter);
 10       END IF;
 11     END LOOP;
 12   END LOOP;
 13 END;
 14 /
PL/SQL procedure successfully completed.

SQL> COMMIT;
Commit complete.

-- Query the data. We hope to see the same result from the function we'll create later.
SQL> SELECT col1,
  2         SUM(col2)
  3    FROM demo
  4  GROUP BY col1
  5  ORDER BY col1;

COL1       SUM(COL2)
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550
10 rows selected.

Now here is the function – pay close attention to the PARALLEL_ENABLE and ORDER clause in the header.

CREATE OR REPLACE FUNCTION delete_demo ( p_curs demo_curs.v_demo_curs )
                  RETURN v_n_t
                  --
                  -- Rows returned by the cursor will be partitioned across
                  -- parallel processes based on their COL1 value. For example
                  -- all COL1=1 rows will go to the same parallel process, all
                  -- COL1=2 rows will go to the same parallel process, and so on
                  --
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  --
                  -- Within the parallel process the rows will be ordered by their COL1
                  -- values. For example if COL1=1 and COL1=2 go to the same parallel process
                  -- they wont get mixed up
                  --
                  ORDER p_curs BY ( col1 ) AS

    PRAGMA AUTONOMOUS_TRANSACTION;
    v_ret_val v_n_t := v_n_t();
    v_number NUMBER;

    v_last_col1 NUMBER := NULL; -- last col1 value processed
    v_total NUMBER := 0;        -- running total of col2 for each col1
    v_col1 NUMBER;              -- col1 value queried
    v_col2 NUMBER;              -- col2 value queried

BEGIN

   /*
   ||
   || The processing in this function is pretty straightforward. As the COL1 values
   || arrive we track the SUM Of COL2 values. When the COL1 value changes we output the
   || totals to a log table (demo_log), reset the counters and start again.
   ||
   */
   LOOP

    -- get a row from cursor
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;

    -- initialize last col1 if this is first record
    -- from cursor
    IF v_last_col1 IS NULL THEN
      v_last_col1 := v_col1;
    END IF;

    -- if same col1 value then add to total
    IF v_last_col1 = v_col1 THEN
      v_total := v_total + v_col2;
    ELSE
      --
      -- not same col1 value so save log and
      -- re-initialize tracking values
      --
      INSERT INTO demo_log
      VALUES(v_last_col1,v_total);
      v_last_col1 := v_col1;
      v_total := v_col2;

    END IF; -- if same col1

    -- delete the record and return the rowcount
    v_ret_val.EXTEND;
    DELETE demo
    WHERE col1 = v_col1;
    v_ret_val(v_ret_val.LAST) := SQL%ROWCOUNT;

  END LOOP;

  -- dont forget to log the last total
  IF v_last_col1 IS NOT NULL THEN
    INSERT INTO demo_log
    VALUES(v_last_col1,v_total);
  END IF;

  COMMIT;

  RETURN(v_ret_val);

END;
/

Now lets execute the function.

SQL> ALTER SESSION FORCE PARALLEL QUERY;
Session altered.

SQL> SELECT COUNT(*)
 2     FROM TABLE(delete_demo(CURSOR(SELECT col1,
 3                                          col2
 4                                     FROM demo
 5                                   ORDER BY col2))) -- ordered by col2 to create randomness
 6 /
  COUNT(*)
----------
       100

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

Now lets see what wound up in the log table.

SQL> SELECT *
  2    FROM demo_log
  3  ORDER BY col1
  4 /
      COL1      TOTAL
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550

That matches the query with a group by we did earlier so the partitioning and ordering of the cursor worked!

I’ll conclude with two key points:

  1. The partitioning and ordering works despite of the ORDER BY clause supplied in SQL of the cursor
  2. The cursor must be strongly typed to be partitioned and ordered. I used this package for that:
      CREATE OR REPLACE PACKAGE demo_curs AS
        TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE
      END;

In the next post I’ll explain PIPELINING to make the processing even faster.

Thanks for reading!