Custom Aggregate and Window Functions in PostgreSQL and Oracle


In this article, we will see how to create custom aggregate and window (in Oracle terminology - analytical) functions in two systems. Despite the differences in syntax and in general in the approach to extensibility, the mechanism of these functions is very similar. But there are also differences.

Admittedly, intrinsic aggregate and window functions are quite rare. Window functions in general, for some reason, are traditionally classified as "advanced" SQL and are considered difficult to understand and master. Here it would be to deal with those functions that are already available in the DBMS!

Why then even delve into this issue? I can name a few reasons:

  • Although window functions are objectively more complex than ordinary aggregate functions, there is nothing transcendent in them; It is an absolutely essential tool for SQL developers. And creating your own window function, even quite simple, allows you to better understand how the standard ones work.
  • Window and aggregate functions are a great way to combine procedural processing with declarative logic. In some situations, it turns out to perform complex actions, remaining within the framework of the paradigm for solving the problem with one SQL query.
  • And just an interesting topic, and even more interesting to compare the two systems.

An example on which we will train is the calculation of the average, an analog of the standard avg function for the type numeric (number in Oracle). We will write such a function and see how it works in aggregate and window modes and whether it can be calculated by several parallel processes. In conclusion, let us look at an example from real life.

Aggregate Functions


We will move from simple to complex, switching between PostgreSQL and Oracle.

First, some general considerations. Any aggregate function is called for each row of the table in turn and ultimately processes them all. Between calls, she needs to maintain an internal state that defines the context for her execution. At the end of the work, she should return the final value.

So, we need four components:

  • State (context),
  • Function to process the next line,
  • The function of issuing the final result,
  • An indication that the previous three paragraphs constitute an aggregate function.

PostgreSQL


To store the state, you need to select the appropriate data type. You can take the standard, but you can define your own. For a function that calculates the average, you need to separately sum the values ​​and separately calculate their number. Therefore, we will create our composite type with two fields:
Now we define a function for processing the next value. In PostgreSQL, it is called a transition function:
The function takes the current state and the next value, and returns a new state: the values ​​are added up, and one is added to the quantity.

In addition, we output (RAISE NOTICE) function parameters - this will allow us to see how the work is done. Good old debugging PRINT, there is nothing better than you.

The next function is to return the final value:

CREATE OR REPLACE FUNCTION average_final (
state average_state
) RETURNS numeric AS $$
BEGIN
RAISE NOTICE '= %(%)', state.accum, state.qty;
RETURN CASE WHEN state.qty > 0 THEN
trim(trailing '0' from ( state.accum/state.qty )::text)::numeric
END;
END;
$$ LANGUAGE plpgsql;

The function takes a state and returns the resulting number. To do this, simply divide the accumulated amount by the amount. But with zero quantity we return NULL (so does avg).

Earthing with the trim function is needed exclusively for accurate output: this way we get rid of insignificant zeros that would otherwise clutter up the screen and interfere with perception. Something like this:
In real life, these tricks, of course, are not needed.

And finally, we determine the aggregate function itself. For this, a special CREATE AGGREGATE command is used:
This command indicates the data type for the state (stype), our two functions (sfunc and finalfunc), and the initial state value (initcond) as a string constant.

You can try. Almost all of the examples in this article will use a simple table with five rows: one, two, three, four, five. We create the table on the fly with the generate_series function, an indispensable assistant for generating test data:
The result is correct, and the output of the functions allows us to track the progress:

  • The state was set to (0,0),
  • The average_transition function is sequentially called five times, gradually changing state,
  • In the end, the average_final function was called, which got 3 = 15/5.

Another check is on an empty set:

SELECT average(g.x) FROM generate_series(1,0) AS g(x);
NOTICE:  = 0(0)
average
---------

(1 row)

Oracle


In Oracle, all extensibility is provided by the Data Cartridge engine. In simple terms, we need to create an object type that implements the interface necessary for aggregation. Context is naturally represented by attributes of this object.

CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty   number,
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number
RETURN number,
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number
);
/

The initial value of the context is determined here not by a constant, but by a separate (static, that is, not tied to a specific instance of the object) function ODCIAggregateInitialize.

The function called for each row is ODCIAggregateIterate.

The result is returned by the ODCIAggregateTerminate function, and, notice, some flags are passed to it, which we will deal with a little later.

The interface includes another required function: ODCIAggregateMerge. We will define it - where to go, but for now we will postpone the conversation about it.

Now we will create an object body with the implementation of the listed methods.

CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
END;
/

Реализация, по большей части, повторяет все то, что мы делали для the PostgreSQL , но в немного другом синтаксисе.

Trim-dances around the return value are not needed: Oracle independently cuts insignificant zeros when outputting the value.

Note that all functions return a success indication (ODCIConst.Success value), and semantic values ​​are passed through the OUT and IN OUT parameters (which in PL / SQL are in no way associated with the actual return value, as in PL / pgSQL). In particular, any function, including ODCIAggregateTerminate, can change the attributes of its object, the link to which is passed to it in the first parameter (self).

The definition of the aggregate function is as follows:
Check. To generate the values, we use the idiomatic construction with the recursive query CONNECT BY level:

SELECT average(level) FROM dual CONNECT BY level <= 5;
AVERAGE(LEVEL)
--------------
3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:0

You can pay attention to the fact that message output in PostgreSQL appears before the result, and in Oracle after. This is because RAISE NOTICE works asynchronously, and the dbms_output packet buffers the output.

As we can see, a null flag was passed to the ODCIAggregateTerminate function. This means that the context is no longer required and can be forgotten if desired.

And check on an empty set:

SELECT average(rownum) FROM dual WHERE 1 = 0;
AVERAGE(ROWNUM)
---------------

= 0(0) flags:0

Window Functions: OVER ()


The good news: the aggregate function that we wrote can work without any changes as a window (analytical) one.

The window function differs from the aggregate in that it does not collapse the selection into one (aggregated) row, but is calculated as if separately for each row. Syntactically, a window function call is distinguished by the presence of an OVER construct with an indication of a frame that defines a lot of lines for processing. In the simplest case, it is written like this: OVER (), and this means that the function must process all the lines. The result is as if we counted the usual aggregate function and wrote the result (the same one) opposite each sample row.

In other words, the frame is static and spans all lines:

 1. 2. 3. 4. 5. +---+ +---+ +---+ +---+ +---+ | 1 | | 1 | | 1 | | 1 | | 1 | | 2 | | 2 | | 2 | | 2 | | 2 | | 3 | | 3 | | 3 | | 3 | | 3 | | 4 | | 4 | | 4 | | 4 | | 4 | | 5 | | 5 | | 5 | | 5 | | 5 | +---+ +---+ +---+ +---+ +---+ 

PostgreSQL


Let's try:
According to the NOTICE conclusion, everything happens in exactly the same way as before when calculating the usual aggregate function. After receiving the result from the average_final function, PostgreSQL puts it on each row.

Oracle


SELECT average(level) OVER() average
FROM dual CONNECT BY level <= 5;

LEVEL     AVERAGE
---------- -----------
1           3
2           3
3           3
4           3
5           3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:0

Suddenly. Instead of calculating the result once, Oracle calls the ODCIAggregateTerminate function N + 1 times: first for each row with flag 1 (which means the context is still useful) and then one more time at the end. The value obtained from the last call is simply ignored.

The conclusion is this: if the function ODCIAggregateTerminate uses computationally complex logic, you need to think about not doing the same job several times.

Window Functions: OVER (PARTITION BY)


The PARTITION BY clause in a frame definition is similar to the usual GROUP BY aggregate construct. The window function with PARTITION BY is calculated separately for each group of rows, and the result is assigned to each row in the selection.

In this embodiment, the frame is also static, but for each group it is different. For example, if two groups of lines are defined (from the first to the second and from the third to the fifth), then the frame can be imagined as follows:

 1. 2. 3. 4. 5. +---+ +---+ | 1 | | 1 | | 2 | | 2 | +---+ +---+ +---+ +---+ +---+ | 3 | | 3 | | 3 | | 4 | | 4 | | 4 | | 5 | | 5 | | 5 | +---+ +---+ +---+ 

PostgreSQL


SELECT g.x/3 part,
g.x,
average(g.x) OVER (PARTITION BY g.x/3)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE: 0(0) + 3
NOTICE:  3(1) + 4
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
part | x | average
------+---+---------
0 | 1 |     1.5
0 | 2 |     1.5
1 | 3 |       4
1 | 4 |       4
1 | 5 |       4
(5 rows)

The calculation again occurs sequentially, but now, when moving to another group of lines, the state is reset to the initial value (initcond).

Oracle


SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3)) average
FROM dual CONNECT BY level <= 5;

PART      LEVEL    AVERAGE
---------- ---------- ----------
0          2        1.5
0          1        1.5
1          4          4
1          5          4
1          3          4
0(0) + 2
2(1) + 1
= 3(2) flags:1
= 3(2) flags:1
0(0) + 4
4(1) + 5
9(2) + 3
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:0

Interestingly, Oracle decided to swap lines. This may say something about the details of the implementation, but in any case it has a right.

Window Functions: OVER (ORDER BY)


If the ORDER BY clause is added to the definition of the frame, indicating the sort order, the function will start working in ascending mode (for the sum function, we would say so - in cumulative total ).

For the first line, the frame will consist of this line alone; for the second - from the first and second; for the third - from the first, second and third, and so on. In other words, the frame will include lines from the first to the current.

In fact, it can be written exactly like that: OVER (ORDER BY ... ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), but since this verbosity is implied by default, it is usually omitted.

So, the frame ceases to be static: its head moves down, and the tail remains in place:

 1. 2. 3. 4. 5. +---+ +---+ +---+ +---+ +---+ | 1 | | 1 | | 1 | | 1 | | 1 | +---+ | 2 | | 2 | | 2 | | 2 | +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+ 

PostgreSQL


SELECT g.x, average(g.x) OVER (ORDER BY g.x)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  3(2) + 3
NOTICE:  = 6(3)
NOTICE:  6(3) + 4
NOTICE:  = 10(4)
NOTICE:  10(4) + 5
NOTICE:  = 15(5)
x | average
---+---------
1 |       1
2 |     1.5
3 |       2
4 |     2.5
5 |       3
(5 rows)

As you can see, the rows are still added to the context one at a time, but now the average_final function is called after each addition, giving an intermediate result.

Oracle


SELECT level, average(level) OVER(ORDER BY level) average
FROM dual CONNECT BY level <= 5;

LEVEL    AVERAGE
---------- ----------
1          1
2        1.5
3          2
4        2.5
5          3
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) + 4
= 10(4) flags:1
10(4) + 5
= 15(5) flags:1
= 15(5) flags:0

This time, both systems work the same.

Window Functions: OVER (PARTITION BY ORDER BY)


PARTITION BY and ORDER BY clauses can be combined. Then, within each group of lines, the function will work in an ascending mode, and when moving from group to group, the state will be reset to the initial one.

 1. 2. 3. 4. 5. +---+ +---+ | 1 | | 1 | +---+ | 2 | +---+ +---+ +---+ +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+ 

PostgreSQL


SELECT g.x/3 part,
g.x,
average(g.x) OVER (PARTITION BY g.x/3 ORDER BY g.x)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE: 0(0) + 3
NOTICE:  = 3(1)
NOTICE:  3(1) + 4
NOTICE:  = 7(2)
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
part | x | average
------+---+---------
0 | 1 |       1
0 | 2 |     1.5
1 | 3 |       3
1 | 4 |     3.5
1 | 5 |       4
(5 rows)

Oracle


SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3) ORDER BY level) average
FROM dual CONNECT BY level <= 5;

PART    LEVEL     AVERAGE
---------- ---------- ----------
0        1           1
0        2         1.5
1        3           3
1        4         3.5
1        5           4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
0(0) + 3
= 3(1) flags:1
3(1) + 4
= 7(2) flags:1
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Window functions with a sliding frame


In all the examples that we looked at, the frame was either static, or only its head moved (when using the ORDER BY clause). This gave us the opportunity to calculate the state sequentially, adding line by line to the context.

But the frame of the window function can also be set in such a way that its tail will also shift. In our example, this will correspond to the concept of a moving average. For example, specifying OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) means that the current and two previous values ​​will be averaged for each row of the result.

 1. 2. 3. 4. 5. +---+ | | +---+ | | | | +---+ | 1 | | 1 | | 1 | +---+ +---+ | 2 | | 2 | | 2 | +---+ +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+ 

Can the window function be calculated in this case? It turns out he can, though ineffectively. But by writing some more code, you can improve the situation.

PostgreSQL


Let's see:
Up to the third line, everything goes well, because the tail does not actually move: we just add the next value to the existing context. But, since we do not know how to remove the value from the context, for the fourth and fifth lines everything has to be recounted completely, each time returning to the initial state.

So, it would be great to have not only the function of adding the next value, but also the function of removing the value from the state. Indeed, such a function can be created:
For the window function to be able to use it, you need to recreate the aggregate as follows:
Check:

SELECT g.x,
average(g.x) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);

NOTICE:  0(0) + 1
NOTICE:  = 1(1)
NOTICE:  1(1) + 2
NOTICE:  = 3(2)
NOTICE:  3(2) + 3
NOTICE:  = 6(3)
NOTICE: 6(3) - 1
NOTICE:  5(2) + 4
NOTICE:  = 9(3)
NOTICE: 9(3) - 2
NOTICE:  7(2) + 5
NOTICE:  = 12(3)
x | average
---+---------
1 |       1
2 |     1.5
3 |       2
4 |       3
5 |       4
(5 rows)

Now everything is all right: for the fourth and fifth rows, we remove the tail value from the state and add a new one.

Oracle


Here the situation is similar. The created version of the analytical function works, but is inefficient:
The function to remove a value from the context is defined as follows:

MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;

Full code for copy-paste
CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty   number,
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number) RETURN number
);
/
CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;
END;
/


It is not necessary to recreate the function itself. Check:

SELECT level,
average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;

LEVEL    AVERAGE
---------- ----------
1          1
2        1.5
3          2
4          3
5          4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) - 1
5(2) + 4
= 9(3) flags:1
9(3) - 2
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Parallelism


И the PostgreSQL , и Oracle (Enterprise Edition) умеют вычислять агрегатные функции в параллельном режиме. Moreover, each of the parallel processes performs its part of the work, forming an intermediate state. Then, the main coordinating process receives these several states and must combine them into one final one.

To do this, we need one more union function , which we will write now. In our case, it simply adds up the sums and the number of values.

PostgreSQL


The function is as follows:
We will also remove our debug output from the average_transition function. In parallel execution, we will not summarize five values, but more, so if this is not done, we will get too much useless information.

Since we remove the output, there is no need to use a procedural language either - we will write a function in pure SQL: It
remains to recreate the aggregate taking into account the new function and indicate that it can be safely used in parallel mode:
Now create a table and fill it with data. Thousands of lines will be enough.

CREATE TABLE t(n) AS SELECT generate_series(1,1000)::numeric;

With default settings, PostgreSQL will not build a parallel plan for such a table - it is too small - but it is not difficult to persuade it:

SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;

EXPLAIN(costs off) SELECT average(n) FROM t;
QUERY PLAN
------------------------------------------
Finalize Aggregate
->  Gather
Workers Planned: 2
->  Partial Aggregate
->  Parallel Seq Scan on t

In terms of request, we see:

  • two planned workflows that perform partial aggregation (Partial Aggregate),
  • Gather node collecting information
  • and the final state aggregation (Finalize Aggregate).

Check:
Why is the average_combine function called three times, not two? The fact is that in PostgreSQL, the coordinating process also does some of the work. Therefore, although two work processes were launched, the work was actually carried out in three. One of them managed to process 678 lines, the other 226 and the third - 96 (although these numbers do not mean anything and may differ with a different launch).

Oracle


If you remember, we already wrote the ODCIAggregateMerge function at the very beginning, since it is mandatory in Oracle. The documentation insists that this function is necessary not only for parallel operation, but also for sequential operation - although I find it difficult to understand why (and in practice I did not have to deal with its implementation during sequential processing).

All that remains to be done is to declare the function safe for parallel operation:
Create a table: To
convince Oracle is even easier than PostgreSQL - just write a hint. Here's what the plan is (the output is greatly trimmed for simplicity):
The plan also contains:

  • partial aggregation (4),
  • coordinator receiving partial contexts (2),
  • and the resulting combination of contexts (1).

SELECT /*+ PARALLEL(2) */ average(n) FROM t;
AVERAGE(N)
----------
500.5
0(0) & 216153(657)
216153(657) & 284347(343)
= 500500(1000) flags:0

At Oracle, the coordinator is not involved in partial aggregation. Therefore, only two contexts are combined, and for the same reason, we see only the output of the ODCIAggregateMerge function.

Documentation


It's time to provide links to documentation, including aggregate and window functions, already included in the DBMS. There you can find a lot of interesting things.

PostgreSQL:


Oracle:


An example about rounding cents


And the promised example from life. I came up with this function when I had to write reports for accounting, working under RAS (the rules of Russian accounting).

The simplest task, in which there is a need for rounding, is the allocation of total expenses (say, 100 rubles) to departments (say, 3 pieces) according to some principle (say, equally):
This query shows the problem: the amounts must be rounded, but at the same time a penny is lost. But RAS does not forgive this.

The problem can be solved in different ways, but for my taste the most elegant way is the window function, which works in an increasing mode and takes all the struggle against pennies:

WITH depts(name) AS (
VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round2(amount) OVER (ORDER BY dept) FROM report;

dept | round2
------+--------
A    |  33.33
B    |  33.34
C    |  33.33
(3 rows)

The state of such a function includes a rounding error (r_error) and the current rounded value (amount). The function for processing the next value increases the rounding error, and if it already exceeds half a penny, it adds a penny to the rounded amount:
And the function that returns the result simply returns the already prepared state.amount.

I will not give the full code of the function: using the examples already given, writing it is not difficult.

If you come across interesting examples of using your own aggregate or window functions - share them in the comments.