Validate Parallel Pipelined Functions (Dont Assume PQ)

When writing code for parallel processing make sure you allow for running in a non parallel environment as well.

For example here is the code from the previous post being run with PARALLEL QUERY disabled.

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

SQL> DECLARE
  2    v_counter NUMBER := 0;
  3  BEGIN
  4    FOR x IN ( SELECT col1,
  5                      pq,
  6                      TO_CHAR(ts,'HH24:MI:SS') ts,
  7                      seq
  8                 FROM TABLE(demo_function(CURSOR(SELECT *
  9                                                   FROM demo))) ) LOOP
 10      v_counter := v_counter + 1;
 11      DBMS_OUTPUT.PUT_LINE(v_counter || '|' ||
 12                           x.col1 || '|' ||
 13                           x.pq || '|' ||
 14                           x.ts || '|' ||
 15                           x.seq);
 16    END LOOP;
 17  END;
 18  /
PL/SQL procedure successfully completed.

Yes, I did have SERVEROUTPUT ON

Where did the expected output go? The truth is it never even got created because the query shown below raised a NO DATA FOUND exception found exception because no PQ Server was in use.

SELECT server_name
  INTO v_pq
  FROM v$px_process
 WHERE sid = ( SELECT sid
                 FROM v$mystat
                WHERE rownum = 1 );

And the exception was not being passed out of the function so it silently failed. It needs an exception handler like this:

BEGIN
  SELECT server_name
    INTO v_pq
    FROM v$px_process
   WHERE sid = ( SELECT sid
                   FROM v$mystat
                  WHERE rownum = 1 );
EXCEPTION
  WHEN NO_DATA_FOUND THEN
    v_pq := 'N/A';
END;

Now it provides these results when called as in the previous post.

 1|1|N/A|19:45:20|1
 2|1|N/A|19:45:21|2
 3|1|N/A|19:45:21|1
 4|1|N/A|19:45:22|2
 5|1|N/A|19:45:22|1
 6|1|N/A|19:45:23|2
 7|2|N/A|19:45:23|1
 8|2|N/A|19:45:24|2
 9|2|N/A|19:45:24|1
10|2|N/A|19:45:25|2
11|2|N/A|19:45:25|1
12|2|N/A|19:45:26|2
13|3|N/A|19:45:26|1
14|3|N/A|19:45:27|2
15|3|N/A|19:45:27|1
16|3|N/A|19:45:28|2
17|3|N/A|19:45:28|1
18|3|N/A|19:45:29|2

Two lessons learned here:

  1. All PLSQL functions (not just Table Functions) silently fail when NO DATA FOUND errors occur
  2. Never assume a function will execute in Parallel just because that what it was designed for

Thanks for reading!

Validate Parallel Pipelined Table Functions

This post shows how to verify a pipelined-parallel function is working as expected by adding timestamps and PQ names to the output.

First let’s create a simple two column table and add some test data.

CREATE TABLE demo
( col1 NUMBER,
  col2 NUMBER );

BEGIN
  FOR counter1 IN 1..3 LOOP
    FOR counter2 IN 1..3 LOOP
      INSERT INTO demo
      VALUES(counter1,counter2);
    END LOOP;
  END LOOP;
END;
/

Define object types for return values from the function.

CREATE TYPE return_o AS OBJECT ( col1 NUMBER,
  pq VARCHAR2(5),  -- name of PQ server
  ts DATE,         -- date of (HH24:MI:SS) of pipe row
  seq NUMBER );    -- sequence of pipe row (1 or 2)
/
CREATE TYPE return_t AS TABLE OF return_o;
/

Define a package for for the REF CURSOR definition.

CREATE OR REPLACE PACKAGE demo_curs AS
  TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE;
END;
/

And now for the function itself.

CREATE OR REPLACE FUNCTION demo_function ( p_curs demo_curs.v_demo_curs )
                  RETURN return_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS

  v_col1 NUMBER := -1;
  v_col2 NUMBER;
  v_pq VARCHAR2(5);

BEGIN

  -- determine which PQ Server is being used for the current
  -- Oracle session executing the function
  SELECT server_name
    INTO v_pq
    FROM v$px_process
   WHERE sid = ( SELECT sid
                   FROM v$mystat
                  WHERE rownum = 1 );

  -- fetch values from the cursor and pipe them twice with a
  -- one second sleep in the middle.
  -- apply a sequence number (1 or 2) to identify upstream
  LOOP

    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,1));
    DBMS_LOCK.SLEEP(1);
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,2));

  END LOOP;

  RETURN;

END;
/

And now to call the function using an anonymous PLSQL block.

ALTER SESSION FORCE PARALLEL QUERY;
DECLARE
  v_counter NUMBER := 0;
BEGIN
  FOR x IN ( SELECT col1,
                    pq,
                    TO_CHAR(ts,'HH24:MI:SS') ts,
                    seq
               FROM TABLE(demo_function(CURSOR(SELECT *
                                                 FROM demo))) ) LOOP
     v_counter := v_counter + 1;
     DBMS_OUTPUT.PUT_LINE(v_counter || '|' ||
                          x.col1    || '|' ||
                          x.pq      || '|' ||
                          x.ts      || '|' ||
                          x.seq);
  END LOOP;
END;
/
ALTER SESSION DISABLE PARALLEL QUERY;

Here is the output.

 1|1|P000|18:50:43|1
 2|1|P000|18:50:44|2
 3|1|P000|18:50:44|1
 4|1|P000|18:50:45|2
 5|1|P000|18:50:45|1
 6|1|P000|18:50:46|2
 7|2|P001|18:50:43|1
 8|2|P001|18:50:44|2
 9|2|P001|18:50:44|1
10|2|P001|18:50:45|2
11|2|P001|18:50:45|1
12|2|P001|18:50:46|2
13|3|P001|18:50:46|1
14|3|P001|18:50:47|2
15|3|P001|18:50:47|1
16|3|P001|18:50:48|2
17|3|P001|18:50:48|1
18|3|P001|18:50:49|2

The column in the output are as follows:

  1. The incremental row number assigned by the anonymous block
  2. The the COL1 value in play (1,2 or 3)
  3. The PQ Server in use
  4. The time the row was returned to the anonymous block
  5. The sequence (1 or 2) of the row

Key points about the result set are:

  • The first 6 rows identify that COL=1 went to PQ Server P000 and sequence 1 and 2 were always 1 second apart.
  • Rows 7 through 12 identify that COL1=2 went to PQ Server P001 and sequence 1 and 2 were always 1 second apart.
  • Rows 13 through 18 identify that COL=3 went to PQ Server P001 (same as COL1=2) and sequence 1 and 2 were always 1 second apart.
  • Rows 12 and 13 went through PQ Server P001 at the same second even though they are different COL1 values

When using writing Table Functions (especially parallel and pipelined ones) I highly recommend having a simple set of test data and expected results such as the ones shown in this post.

It’s very helpful for verifying any code modifications have changed the overall processing.

Its also valuable for verifying Oracle is behaving correctly.

For more complex functions I recommend adding PQ Server and timestamp columns to the return types. They don’t have to be used by the application but can be very useful for debugging.

Check out the next post here on validating table functions.

Thanks for reading!

 

Power Of Pipelining

Previous posts in this category introduced a way to perform DML in a SELECT using the following technology:

  • Table Functions
  • Autonomous Transactions
  • A REF CURSOR as a parameter
  • Row by row (aka slow by slow) processing of the REF CURSOR
  • Parallel processing to overcome the slow by slow processing

This post adds pipelining to the arsenal. Pipelining returns records from the function as soon as they are assembled rather than waiting for the whole function to finish.

This expedites things by allowing calling programs to start processing sooner.

The demo table and data is shown in the previous entry at Parallel By Range

Let’s get right to code shall we…

CREATE OR REPLACE FUNCTION delete_demo ( p_curs demo_curs.v_demo_curs )
                  RETURN v_n_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS --> identifies this function as pipelined

    PRAGMA AUTONOMOUS_TRANSACTION;
    v_number NUMBER := 0;

    v_last_col1 NUMBER := NULL; -- last col1 value processed
    v_total NUMBER := 0; -- running total of col2 for each col1
    v_col1 NUMBER; -- col1 value queried
    v_col2 NUMBER; -- col2 value queried
    v_rc NUMBER := 0; -- save the rowcount

BEGIN
  --
  -- Important Points:
  --   1) The code does not change much from the non pipelined version
  --   2) Except the RETURN call does not specify a value because values are sent back by the...
  --   3) ..PIPE ROW calls
  --   4) This still looks like SLOW BY SLOW processing
  --   5) I know and I promised ways to get past that!
  --   6) You mean parallel processing and this funky pipelining thing?
  --   7) Yes!
  --
  LOOP
    -- get a row from cursor
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;

    -- initialize last col1 if this is first record from cursor
    IF v_last_col1 IS NULL THEN
      v_last_col1 := v_col1;
    END IF;

    -- if same col1 value then add to total
    IF v_last_col1 = v_col1 THEN
      v_total := v_total + v_col2;
    ELSE
      --
      -- not same col1 value so save log and
      -- re-initialize tracking values
      --
      INSERT INTO demo_log
      VALUES(v_last_col1,v_total);
      v_last_col1 := v_col1;
      v_total := v_col2;

    END IF; -- if same col1

    -- delete the record and return the rowcount
    DELETE demo
    WHERE col1 = v_col1;

    v_rc := SQL%ROWCOUNT;
    COMMIT;
    --
    -- return a row now with the PIPE ROW call
    -- no need to wait until all rows have been processed
    -- the calling program (SQL*Plus in this case) can start processing it
    -- now before the function finishes
    PIPE ROW(v_rc);

  END LOOP;

  -- dont forget to log the last total
  IF v_last_col1 IS NOT NULL THEN
    INSERT INTO demo_log
    VALUES(v_last_col1,v_total);
  END IF;

  COMMIT;

  -- note the return is still required but it does not specify a value
  -- because everything was returned by prior PIPE ROW calls
  RETURN;

END;

Executing the function does not change.

SQL> ALTER SESSION ENABLE PARALLEL QUERY;
Session altered.

SQL> SELECT COUNT(column_value)
  2    FROM TABLE(delete_demo(CURSOR(SELECT col1,
  3                                         col2
  4                                    FROM demo
  5                                  ORDER BY col2))) -- ordered by col2 to create randomness
  6 /
COUNT(COLUMN_VALUE)
-------------------
                100

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

SQL> --
SQL> -- See what wound up in the demo log table
SQL> --
SQL> SELECT *
  2    FROM demo_log
  3  ORDER BY col1
  4 /

      COL1      TOTAL
---------- ----------
         1         10
         2        110
         3         90
         4        220
         5        250
         6        330
         7        490
         8        440
         9        810
        10        550
10 rows selected.

Those results are the same as from the non-pipelined version of the code at Parallel By Range.

Was the pipeline version faster? I’ll discuss ways to validate that in the next set of posts.

I’ve added a new post showing how to check what PQ server is being used to execute code. Check it out Here.

Thanks for reading.