Validate Parallel Pipelined Functions (Careful Outside Of Cursor Loop)

All of the previous examples in this set of posts focus on returning rows from a REF Cursor Loop.

This post explains what happens when rows are returned from outside the REF Cursor Loop. As always – lets dive right into the code!

-- a simple demo table
CREATE TABLE demo
( col1 NUMBER,
  col2 NUMBER );

-- object types for return values from the function
CREATE TYPE return_o AS OBJECT ( col1    NUMBER,         -- col1 value from REF Cursor
                                 pq      VARCHAR2(5),    -- PQ Server row process by
                                 ts      DATE,           -- timestamp row returned
                                 rowtype VARCHAR2(10) ); -- (In Loop OR After Loop)
/
CREATE TYPE return_t AS TABLE OF return_o;
/

-- a package to define the REF Cursor
CREATE OR REPLACE PACKAGE demo_curs AS
  TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE;
END;
/

-- Some demo data
BEGIN
  FOR counter1 IN 1..2 LOOP
    FOR counter2 IN 1..2 LOOP
      INSERT INTO demo
      VALUES(counter1,counter2);
    END LOOP;
  END LOOP;
END;
/

-- And last but not least - the function
CREATE OR REPLACE FUNCTION demo_function ( p_curs demo_curs.v_demo_curs )
                           RETURN return_t
                           PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                           ORDER p_curs BY ( col1 )
                           PIPELINED AS
  v_col1 NUMBER := 0;
  v_col2 NUMBER;
  v_pq VARCHAR2(5);
BEGIN
  BEGIN
    SELECT server_name
      INTO v_pq
      FROM v$px_process
     WHERE sid = ( SELECT sid
                     FROM v$mystat
                    WHERE rownum = 1 );
  EXCEPTION
    WHEN NO_DATA_FOUND THEN
      v_pq := 'N/A';
  END;

  LOOP
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;
    -- return row from inside cursor loop
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,'In Loop'));
  END LOOP;

  -- return row from outside cursor loop
  PIPE ROW(return_o(v_col1,v_pq,SYSDATE,'After Loop'));

  RETURN;

END;

Now to see what is returned in non-parallel mode.

SQL> BEGIN
  2    FOR x IN ( SELECT col1,
  3                      pq,
  4                      rowtype
  5                 FROM TABLE(demo_function(CURSOR(SELECT *
  6                                                   FROM demo))) ) LOOP
  7      DBMS_OUTPUT.PUT_LINE(x.col1 || '|' ||
  8                           x.pq   || '|' ||
  9                           x.rowtype);
 10    END LOOP;
 11  END;
 12  /
1|N/A|In Loop
1|N/A|In Loop
2|N/A|In Loop
2|N/A|In Loop
2|N/A|After Loop

That output is straightforward. Two In Loop rows for COL1=1, Two In Loop rows for (COL1=2) and one After Loop row with COL1=2. Note that the COL1 value of 2 was left over from the preceding cursor loop row.

Now for the parallel mode results:

0|P002|After Loop
2|P001|In Loop
2|P001|In Loop
2|P001|After Loop
1|P000|In Loop
1|P000|In Loop
1|P000|After Loop
0|P003|After Loop

Allow me to explain with 3 points:

  1. Rows 2 through 4 (shown in Green) came from PQ Server P001 which processed COL1=2 – it included two In Loop rows and one After Loop row
  2. Rows 5 through 7 (shown in Blue) show that PQ Server P000 did the same for COL=1
  3. Rows 1 and 8 (shown in Red) came from PQ Servers (P002 and P003 respectively) that were used but did not return any rows from the Ref Cursor

Points 1 and 2 are logical because the code in the function was essentially run twice so two After Loop rows were created.

NB: There is no way to predict how many PQ servers will be used. For example added rows for COL1=3 would not guarantee a 3rd PQ Server would be used.

Point 3 requires some code changes to deal with. I’ll use a boolean to check if rows were returned by the Ref Cursor.

CREATE OR REPLACE FUNCTION demo_function ( p_curs demo_curs.v_demo_curs )
                  RETURN return_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS
  v_col1 NUMBER := 0;
  v_col2 NUMBER;
  v_pq VARCHAR2(5);
  v_rows_found BOOLEAN := FALSE; -- were rows returned from cursor
BEGIN
  BEGIN
    SELECT server_name
      INTO v_pq
      FROM v$px_process
     WHERE sid = ( SELECT sid
                     FROM v$mystat
                     WHERE rownum = 1 );
  EXCEPTION
    WHEN NO_DATA_FOUND THEN
      v_pq := 'N/A';
  END;

  LOOP
    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;
    v_rows_found := TRUE;
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,'In Loop'));
  END LOOP;

  -- if rows returned by cursor then return After Loop row
  IF v_rows_found THEN
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,'After Loop'));
  END IF;

  RETURN;
END;

And that produces much better results.

1|P000|In Loop
1|P000|In Loop
1|P000|After Loop
2|P001|In Loop
2|P001|In Loop
2|P001|After Loop

Remember these behaviors when returning rows from outside the cursor loop in parallel table functions.

Thanks for reading!

Validate Parallel Pipelined Functions (Dont Assume PQ)

When writing code for parallel processing make sure you allow for running in a non parallel environment as well.

For example here is the code from the previous post being run with PARALLEL QUERY disabled.

SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.

SQL> DECLARE
  2    v_counter NUMBER := 0;
  3  BEGIN
  4    FOR x IN ( SELECT col1,
  5                      pq,
  6                      TO_CHAR(ts,'HH24:MI:SS') ts,
  7                      seq
  8                 FROM TABLE(demo_function(CURSOR(SELECT *
  9                                                   FROM demo))) ) LOOP
 10      v_counter := v_counter + 1;
 11      DBMS_OUTPUT.PUT_LINE(v_counter || '|' ||
 12                           x.col1 || '|' ||
 13                           x.pq || '|' ||
 14                           x.ts || '|' ||
 15                           x.seq);
 16    END LOOP;
 17  END;
 18  /
PL/SQL procedure successfully completed.

Yes, I did have SERVEROUTPUT ON

Where did the expected output go? The truth is it never even got created because the query shown below raised a NO DATA FOUND exception found exception because no PQ Server was in use.

SELECT server_name
  INTO v_pq
  FROM v$px_process
 WHERE sid = ( SELECT sid
                 FROM v$mystat
                WHERE rownum = 1 );

And the exception was not being passed out of the function so it silently failed. It needs an exception handler like this:

BEGIN
  SELECT server_name
    INTO v_pq
    FROM v$px_process
   WHERE sid = ( SELECT sid
                   FROM v$mystat
                  WHERE rownum = 1 );
EXCEPTION
  WHEN NO_DATA_FOUND THEN
    v_pq := 'N/A';
END;

Now it provides these results when called as in the previous post.

 1|1|N/A|19:45:20|1
 2|1|N/A|19:45:21|2
 3|1|N/A|19:45:21|1
 4|1|N/A|19:45:22|2
 5|1|N/A|19:45:22|1
 6|1|N/A|19:45:23|2
 7|2|N/A|19:45:23|1
 8|2|N/A|19:45:24|2
 9|2|N/A|19:45:24|1
10|2|N/A|19:45:25|2
11|2|N/A|19:45:25|1
12|2|N/A|19:45:26|2
13|3|N/A|19:45:26|1
14|3|N/A|19:45:27|2
15|3|N/A|19:45:27|1
16|3|N/A|19:45:28|2
17|3|N/A|19:45:28|1
18|3|N/A|19:45:29|2

Two lessons learned here:

  1. All PLSQL functions (not just Table Functions) silently fail when NO DATA FOUND errors occur
  2. Never assume a function will execute in Parallel just because that what it was designed for

Thanks for reading!

Validate Parallel Pipelined Table Functions

This post shows how to verify a pipelined-parallel function is working as expected by adding timestamps and PQ names to the output.

First let’s create a simple two column table and add some test data.

CREATE TABLE demo
( col1 NUMBER,
  col2 NUMBER );

BEGIN
  FOR counter1 IN 1..3 LOOP
    FOR counter2 IN 1..3 LOOP
      INSERT INTO demo
      VALUES(counter1,counter2);
    END LOOP;
  END LOOP;
END;
/

Define object types for return values from the function.

CREATE TYPE return_o AS OBJECT ( col1 NUMBER,
  pq VARCHAR2(5),  -- name of PQ server
  ts DATE,         -- date of (HH24:MI:SS) of pipe row
  seq NUMBER );    -- sequence of pipe row (1 or 2)
/
CREATE TYPE return_t AS TABLE OF return_o;
/

Define a package for for the REF CURSOR definition.

CREATE OR REPLACE PACKAGE demo_curs AS
  TYPE v_demo_curs IS REF CURSOR RETURN demo%ROWTYPE;
END;
/

And now for the function itself.

CREATE OR REPLACE FUNCTION demo_function ( p_curs demo_curs.v_demo_curs )
                  RETURN return_t
                  PARALLEL_ENABLE ( PARTITION p_curs BY RANGE(col1) )
                  ORDER p_curs BY ( col1 )
                  PIPELINED AS

  v_col1 NUMBER := -1;
  v_col2 NUMBER;
  v_pq VARCHAR2(5);

BEGIN

  -- determine which PQ Server is being used for the current
  -- Oracle session executing the function
  SELECT server_name
    INTO v_pq
    FROM v$px_process
   WHERE sid = ( SELECT sid
                   FROM v$mystat
                  WHERE rownum = 1 );

  -- fetch values from the cursor and pipe them twice with a
  -- one second sleep in the middle.
  -- apply a sequence number (1 or 2) to identify upstream
  LOOP

    FETCH p_curs INTO v_col1, v_col2;
    EXIT WHEN p_curs%NOTFOUND;
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,1));
    DBMS_LOCK.SLEEP(1);
    PIPE ROW(return_o(v_col1,v_pq,SYSDATE,2));

  END LOOP;

  RETURN;

END;
/

And now to call the function using an anonymous PLSQL block.

ALTER SESSION FORCE PARALLEL QUERY;
DECLARE
  v_counter NUMBER := 0;
BEGIN
  FOR x IN ( SELECT col1,
                    pq,
                    TO_CHAR(ts,'HH24:MI:SS') ts,
                    seq
               FROM TABLE(demo_function(CURSOR(SELECT *
                                                 FROM demo))) ) LOOP
     v_counter := v_counter + 1;
     DBMS_OUTPUT.PUT_LINE(v_counter || '|' ||
                          x.col1    || '|' ||
                          x.pq      || '|' ||
                          x.ts      || '|' ||
                          x.seq);
  END LOOP;
END;
/
ALTER SESSION DISABLE PARALLEL QUERY;

Here is the output.

 1|1|P000|18:50:43|1
 2|1|P000|18:50:44|2
 3|1|P000|18:50:44|1
 4|1|P000|18:50:45|2
 5|1|P000|18:50:45|1
 6|1|P000|18:50:46|2
 7|2|P001|18:50:43|1
 8|2|P001|18:50:44|2
 9|2|P001|18:50:44|1
10|2|P001|18:50:45|2
11|2|P001|18:50:45|1
12|2|P001|18:50:46|2
13|3|P001|18:50:46|1
14|3|P001|18:50:47|2
15|3|P001|18:50:47|1
16|3|P001|18:50:48|2
17|3|P001|18:50:48|1
18|3|P001|18:50:49|2

The column in the output are as follows:

  1. The incremental row number assigned by the anonymous block
  2. The the COL1 value in play (1,2 or 3)
  3. The PQ Server in use
  4. The time the row was returned to the anonymous block
  5. The sequence (1 or 2) of the row

Key points about the result set are:

  • The first 6 rows identify that COL=1 went to PQ Server P000 and sequence 1 and 2 were always 1 second apart.
  • Rows 7 through 12 identify that COL1=2 went to PQ Server P001 and sequence 1 and 2 were always 1 second apart.
  • Rows 13 through 18 identify that COL=3 went to PQ Server P001 (same as COL1=2) and sequence 1 and 2 were always 1 second apart.
  • Rows 12 and 13 went through PQ Server P001 at the same second even though they are different COL1 values

When using writing Table Functions (especially parallel and pipelined ones) I highly recommend having a simple set of test data and expected results such as the ones shown in this post.

It’s very helpful for verifying any code modifications have changed the overall processing.

Its also valuable for verifying Oracle is behaving correctly.

For more complex functions I recommend adding PQ Server and timestamp columns to the return types. They don’t have to be used by the application but can be very useful for debugging.

Check out the next post here on validating table functions.

Thanks for reading!