Support min/max aggregates (#18)

In order to re-calculate min/max values for groups where the min
or max value is deleted, we need the view query definition in string
form. However, pg_get_viewdef cannot be used for this purpose because
IMMV's defenition is in pg_ivm_immv but not pg_rewrite.  Therefore,
we have to convert query definition in pg_ivm_immv to query
definition string. We can use pg_get_querydef in PG15, but we cannot
in PG14 or earlier, so we use codes in ruleutil.c copied from PG13
or PG14 depending versions.
This commit is contained in:
Yugo Nagata 2022-07-25 13:11:33 +09:00 committed by GitHub
parent 790b0d2bd6
commit 6faf0b3baa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 16996 additions and 20 deletions

View file

@ -5,7 +5,8 @@ OBJS = \
$(WIN32RES) \
createas.o \
matview.o \
pg_ivm.o
pg_ivm.o \
ruleutils.o
PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL"
EXTENSION = pg_ivm

View file

@ -917,6 +917,51 @@ check_aggregate_supports_ivm(Oid aggfnoid)
case F_AVG_FLOAT8:
case F_AVG_INTERVAL:
/* min */
case F_MIN_ANYARRAY:
case F_MIN_INT8:
case F_MIN_INT4:
case F_MIN_INT2:
case F_MIN_OID:
case F_MIN_FLOAT4:
case F_MIN_FLOAT8:
case F_MIN_DATE:
case F_MIN_TIME:
case F_MIN_TIMETZ:
case F_MIN_MONEY:
case F_MIN_TIMESTAMP:
case F_MIN_TIMESTAMPTZ:
case F_MIN_INTERVAL:
case F_MIN_TEXT:
case F_MIN_NUMERIC:
case F_MIN_BPCHAR:
case F_MIN_TID:
case F_MIN_ANYENUM:
case F_MIN_INET:
case F_MIN_PG_LSN:
/* max */
case F_MAX_ANYARRAY:
case F_MAX_INT8:
case F_MAX_INT4:
case F_MAX_INT2:
case F_MAX_OID:
case F_MAX_FLOAT4:
case F_MAX_FLOAT8:
case F_MAX_DATE:
case F_MAX_TIME:
case F_MAX_TIMETZ:
case F_MAX_MONEY:
case F_MAX_TIMESTAMP:
case F_MAX_TIMESTAMPTZ:
case F_MAX_INTERVAL:
case F_MAX_TEXT:
case F_MAX_NUMERIC:
case F_MAX_BPCHAR:
case F_MAX_TID:
case F_MAX_ANYENUM:
case F_MAX_INET:
case F_MAX_PG_LSN:
return true;
default:
@ -948,6 +993,52 @@ check_aggregate_supports_ivm(Oid aggfnoid)
"avg(float8)",
"avg(interval)",
/* min */
"min(anyarray)",
"min(int8)",
"min(int4)",
"min(int2)",
"min(oid)",
"min(float4)",
"min(float8)",
"min(date)",
"min(time without time zone)",
"min(time with time zone)",
"min(money)",
"min(timestamp without time zone)",
"min(timestamp with time zone)",
"min(interval)",
"min(text)",
"min(numeric)",
"min(character)",
"min(tid)",
"min(anyenum)",
"min(inet)",
"min(pg_lsn)",
/* max */
"max(anyarray)",
"max(int8)",
"max(int4)",
"max(int2)",
"max(oid)",
"max(float4)",
"max(float8)",
"max(date)",
"max(time without time zone)",
"max(time with time zone)",
"max(money)",
"max(timestamp without time zone)",
"max(timestamp with time zone)",
"max(interval)",
"max(text)",
"max(numeric)",
"max(character)",
"max(tid)",
"max(anyenum)",
"max(inet)",
"max(pg_lsn)",
NULL
};

View file

@ -388,11 +388,93 @@ SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
(5 rows)
ROLLBACK;
-- not support MIN(), MAX() aggregate functions
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function min(integer) is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function max(integer) is not supported on incrementally maintainable materialized view
-- support MIN(), MAX() aggregate functions
BEGIN;
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j), MAX(j) FROM mv_base_a GROUP BY i');
NOTICE: created index "mv_ivm_min_max_index" on immv "mv_ivm_min_max"
create_immv
-------------
5
(1 row)
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
1 | 10 | 10 | 1 | 1 | 1
2 | 20 | 20 | 1 | 1 | 1
3 | 30 | 30 | 1 | 1 | 1
4 | 40 | 40 | 1 | 1 | 1
5 | 50 | 50 | 1 | 1 | 1
(5 rows)
INSERT INTO mv_base_a VALUES
(1,11), (1,12),
(2,21), (2,22),
(3,31), (3,32),
(4,41), (4,42),
(5,51), (5,52);
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
1 | 10 | 12 | 3 | 3 | 3
2 | 20 | 22 | 3 | 3 | 3
3 | 30 | 32 | 3 | 3 | 3
4 | 40 | 42 | 3 | 3 | 3
5 | 50 | 52 | 3 | 3 | 3
(5 rows)
DELETE FROM mv_base_a WHERE (i,j) IN ((1,10), (2,21), (3,32));
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
1 | 11 | 12 | 2 | 2 | 2
2 | 20 | 22 | 2 | 2 | 2
3 | 30 | 31 | 2 | 2 | 2
4 | 40 | 42 | 3 | 3 | 3
5 | 50 | 52 | 3 | 3 | 3
(5 rows)
ROLLBACK;
-- support MIN(), MAX() aggregate functions without GROUP clause
BEGIN;
SELECT create_immv('mv_ivm_min_max', 'SELECT MIN(j), MAX(j) FROM mv_base_a');
NOTICE: could not create an index on immv "mv_ivm_min_max" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
1
(1 row)
SELECT * FROM mv_ivm_min_max;
min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
-----+-----+-------------------+-------------------+---------------
10 | 50 | 5 | 5 | 5
(1 row)
INSERT INTO mv_base_a VALUES
(0,0), (6,60), (7,70);
SELECT * FROM mv_ivm_min_max;
min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
-----+-----+-------------------+-------------------+---------------
0 | 70 | 8 | 8 | 8
(1 row)
DELETE FROM mv_base_a WHERE (i,j) IN ((0,0), (7,70));
SELECT * FROM mv_ivm_min_max;
min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
-----+-----+-------------------+-------------------+---------------
10 | 60 | 6 | 6 | 6
(1 row)
DELETE FROM mv_base_a;
SELECT * FROM mv_ivm_min_max;
min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
-----+-----+-------------------+-------------------+---------------
| | 0 | 0 | 0
(1 row)
ROLLBACK;
-- support subquery in FROM clause
BEGIN;
SELECT create_immv('mv_ivm_subquery01', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT * FROM mv_base_b) b WHERE a.i = b.i');
@ -629,6 +711,70 @@ SELECT * FROM mv ORDER BY i;
(2 rows)
ROLLBACK;
BEGIN;
CREATE TABLE base_t (i int, v int);
INSERT INTO base_t VALUES (NULL, 1), (NULL, 2), (1, 10), (1, 20);
SELECT create_immv('mv', 'SELECT i, sum(v) FROM base_t GROUP BY i');
NOTICE: created index "mv_index" on immv "mv"
create_immv
-------------
2
(1 row)
SELECT * FROM mv ORDER BY i;
i | sum | __ivm_count_sum__ | __ivm_count__
---+-----+-------------------+---------------
1 | 30 | 2 | 2
| 3 | 2 | 2
(2 rows)
UPDATE base_t SET v = v * 10;
SELECT * FROM mv ORDER BY i;
i | sum | __ivm_count_sum__ | __ivm_count__
---+-----+-------------------+---------------
1 | 300 | 2 | 2
| 30 | 2 | 2
(2 rows)
ROLLBACK;
BEGIN;
CREATE TABLE base_t (i int, v int);
INSERT INTO base_t VALUES (NULL, 1), (NULL, 2), (NULL, 3), (NULL, 4), (NULL, 5);
SELECT create_immv('mv', 'SELECT i, min(v), max(v) FROM base_t GROUP BY i');
NOTICE: created index "mv_index" on immv "mv"
create_immv
-------------
1
(1 row)
SELECT * FROM mv ORDER BY i;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
| 1 | 5 | 5 | 5 | 5
(1 row)
DELETE FROM base_t WHERE v = 1;
SELECT * FROM mv ORDER BY i;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
| 2 | 5 | 4 | 4 | 4
(1 row)
DELETE FROM base_t WHERE v = 3;
SELECT * FROM mv ORDER BY i;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
| 2 | 5 | 3 | 3 | 3
(1 row)
DELETE FROM base_t WHERE v = 5;
SELECT * FROM mv ORDER BY i;
i | min | max | __ivm_count_min__ | __ivm_count_max__ | __ivm_count__
---+-----+-----+-------------------+-------------------+---------------
| 2 | 4 | 2 | 2 | 2
(1 row)
ROLLBACK;
-- IMMV containing user defined type
BEGIN;

592
matview.c
View file

@ -170,6 +170,9 @@ static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
const char *aggtype);
static void append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
bool is_min);
static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
const char* count_col, const char *castType);
static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
@ -178,17 +181,29 @@ static void apply_old_delta(const char *matviewname, const char *deltaname_old,
List *keys);
static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname);
List *minmax_list, List *is_min_list,
const char *count_colname,
SPITupleTable **tuptable_recalc, uint64 *num_recalc);
static void apply_new_delta(const char *matviewname, const char *deltaname_new,
StringInfo target_list);
static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
List *keys, StringInfo target_list, StringInfo aggs_set,
const char* count_colname);
static char *get_matching_condition_string(List *keys);
static char *get_returning_string(List *minmax_list, List *is_min_list, List *keys);
static char *get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list);
static char *get_select_for_recalc_string(List *keys);
static void recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
List *namelist, List *keys, Relation matviewRel);
static SPIPlanPtr get_plan_for_recalc(Relation matviewRel, List *namelist, List *keys, Oid *keyTypes);
static SPIPlanPtr get_plan_for_set_values(Relation matviewRel, List *namelist, Oid *valTypes);
static void generate_equal(StringInfo querybuf, Oid opttype,
const char *leftop, const char *rightop);
static void mv_InitHashTables(void);
static SPIPlanPtr mv_FetchPreparedPlan(MV_QueryKey *key);
static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan);
static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type);
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry);
static List *get_securityQuals(Oid relId, int rt_index, Query *query);
@ -1551,6 +1566,9 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
ListCell *lc;
int i;
List *keys = NIL;
List *minmax_list = NIL;
List *is_min_list = NIL;
/*
* get names of the materialized view and delta tables
@ -1631,6 +1649,17 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
format_type_be(aggref->aggtype));
/* min/max */
else if (!strcmp(aggname, "min") || !strcmp(aggname, "max"))
{
bool is_min = (!strcmp(aggname, "min"));
append_set_clause_for_minmax(resname, aggs_set_old, aggs_set_new, aggs_list_buf, is_min);
/* make a resname list of min and max aggregates */
minmax_list = lappend(minmax_list, resname);
is_min_list = lappend_int(is_min_list, is_min);
}
else
elog(ERROR, "unsupported aggregate function: %s", aggname);
}
@ -1660,6 +1689,8 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
{
EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData));
SPITupleTable *tuptable_recalc = NULL;
uint64 num_recalc;
int rc;
/* convert tuplestores to ENR, and register for SPI */
@ -1675,12 +1706,21 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
elog(ERROR, "SPI_register failed");
if (use_count)
/* apply old delta */
/* apply old delta and get rows to be recalculated */
apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME,
keys, aggs_list_buf, aggs_set_old,
count_colname);
minmax_list, is_min_list,
count_colname, &tuptable_recalc, &num_recalc);
else
apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys);
/*
* If we have min or max, we might have to recalculate aggregate values from base tables
* on some tuples. TIDs and keys such tuples are returned as a result of the above query.
*/
if (minmax_list && tuptable_recalc)
recalc_and_set_values(tuptable_recalc, num_recalc, minmax_list, keys, matviewRel);
}
/* For tuple insertion */
if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
@ -1872,6 +1912,70 @@ append_set_clause_for_avg(const char *resname, StringInfo buf_old,
);
}
/*
* append_set_clause_for_minmax
*
* Append SET clause string for min or max aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
* is_min is true if this is min, false if not.
*/
static void
append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
bool is_min)
{
char *count_col = IVM_colname("count", resname);
/* For tuple deletion */
if (buf_old)
{
/*
* If the new value doesn't became NULL then use the value remaining
* in the view although this will be recomputated afterwords.
*/
appendStringInfo(buf_old,
", %s = CASE WHEN %s THEN NULL ELSE %s END",
quote_qualified_identifier(NULL, resname),
get_null_condition_string(IVM_SUB, "mv", "t", count_col),
quote_qualified_identifier("mv", resname)
);
/* count = mv.count - t.count */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
}
/* For tuple insertion */
if (buf_new)
{
/*
* min = LEAST(mv.min, diff.min)
* max = GREATEST(mv.max, diff.max)
*/
appendStringInfo(buf_new,
", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END",
quote_qualified_identifier(NULL, resname),
get_null_condition_string(IVM_ADD, "mv", "diff", count_col),
is_min ? "LEAST" : "GREATEST",
quote_qualified_identifier("mv", resname),
quote_qualified_identifier("diff", resname)
);
/* count = mv.count + diff.count */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
}
appendStringInfo(aggs_list, ", %s, %s",
quote_qualified_identifier("diff", resname),
quote_qualified_identifier("diff", IVM_colname("count", resname))
);
}
/*
* get_operation_string
*
@ -1975,19 +2079,43 @@ get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
* list to identify a tuple in the view. If the view has aggregates, this
* requires strings representing resnames of aggregates and SET clause for
* updating aggregate values.
*
* If the view has min or max aggregate, this requires a list of resnames of
* min/max aggregates and a list of boolean which represents which entries in
* minmax_list is min. These are necessary to check if we need to recalculate
* min or max aggregate values. In this case, this query returns TID and keys
* of tuples which need to be recalculated. This result and the number of rows
* are stored in tuptables and num_recalc repectedly.
*/
static void
apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname)
List *minmax_list, List *is_min_list,
const char *count_colname,
SPITupleTable **tuptable_recalc, uint64 *num_recalc)
{
StringInfoData querybuf;
char *match_cond;
char *updt_returning = "";
char *select_for_recalc = "SELECT";
bool agg_without_groupby = (list_length(keys) == 0);
Assert(tuptable_recalc != NULL);
Assert(num_recalc != NULL);
/* build WHERE condition for searching tuples to be deleted */
match_cond = get_matching_condition_string(keys);
/*
* We need a special RETURNING clause and SELECT statement for min/max to
* check which tuple needs re-calculation from base tables.
*/
if (minmax_list)
{
updt_returning = get_returning_string(minmax_list, is_min_list, keys);
select_for_recalc = get_select_for_recalc_string(keys);
}
/* Search for matching tuples from the view and update or delete if found. */
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
@ -2002,10 +2130,11 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
"%s" /* SET clauses for aggregates */
"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
")"
/* delete a tuple if this is to be deleted */
"DELETE FROM %s AS mv USING t "
"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt",
"%s" /* RETURNING clause for recalc infomation */
"), dlt AS (" /* delete a tuple if this is to be deleted */
"DELETE FROM %s AS mv USING t "
"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt"
") %s", /* SELECT returning which tuples need to be recalculated */
count_colname,
count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
(aggs_list != NULL ? aggs_list->data : ""),
@ -2013,10 +2142,24 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
match_cond,
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
matviewname);
updt_returning,
matviewname,
select_for_recalc);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
if (SPI_exec(querybuf.data, 0) != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
/* Return tuples to be recalculated. */
if (minmax_list)
{
*tuptable_recalc = SPI_tuptable;
*num_recalc = SPI_processed;
}
else
{
*tuptable_recalc = NULL;
*num_recalc = 0;
}
}
/*
@ -2198,6 +2341,342 @@ get_matching_condition_string(List *keys)
return match_cond.data;
}
/*
* get_returning_string
*
* Build a string for RETURNING clause of UPDATE used in apply_old_delta_with_count.
* This clause returns ctid and a boolean value that indicates if we need to
* recalculate min or max value, for each updated row.
*/
static char *
get_returning_string(List *minmax_list, List *is_min_list, List *keys)
{
StringInfoData returning;
char *recalc_cond;
ListCell *lc;
Assert(minmax_list != NIL && is_min_list != NIL);
recalc_cond = get_minmax_recalc_condition_string(minmax_list, is_min_list);
initStringInfo(&returning);
appendStringInfo(&returning, "RETURNING mv.ctid AS tid, (%s) AS recalc", recalc_cond);
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
char *resname = NameStr(attr->attname);
appendStringInfo(&returning, ", %s", quote_qualified_identifier("mv", resname));
}
return returning.data;
}
/*
* get_minmax_recalc_condition_string
*
* Build a predicate string for checking if any min/max aggregate
* value needs to be recalculated.
*/
static char *
get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list)
{
StringInfoData recalc_cond;
ListCell *lc1, *lc2;
initStringInfo(&recalc_cond);
Assert (list_length(minmax_list) == list_length(is_min_list));
forboth (lc1, minmax_list, lc2, is_min_list)
{
char *resname = (char *) lfirst(lc1);
bool is_min = (bool) lfirst_int(lc2);
char *op_str = (is_min ? ">=" : "<=");
appendStringInfo(&recalc_cond, "%s OPERATOR(pg_catalog.%s) %s",
quote_qualified_identifier("mv", resname),
op_str,
quote_qualified_identifier("t", resname)
);
if (lnext(minmax_list, lc1))
appendStringInfo(&recalc_cond, " OR ");
}
return recalc_cond.data;
}
/*
* get_select_for_recalc_string
*
* Build a query to return tid and keys of tuples which need
* recalculation. This is used as the result of the query
* built by apply_old_delta.
*/
static char *
get_select_for_recalc_string(List *keys)
{
StringInfoData qry;
ListCell *lc;
initStringInfo(&qry);
appendStringInfo(&qry, "SELECT tid");
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
appendStringInfo(&qry, ", %s", NameStr(attr->attname));
}
appendStringInfo(&qry, " FROM updt WHERE recalc");
return qry.data;
}
/*
* recalc_and_set_values
*
* Recalculate tuples in a materialized from base tables and update these.
* The tuples which needs recalculation are specified by keys, and resnames
* of columns to be updated are specified by namelist. TIDs and key values
* are given by tuples in tuptable_recalc. Its first attribute must be TID
* and key values must be following this.
*/
static void
recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
List *namelist, List *keys, Relation matviewRel)
{
TupleDesc tupdesc_recalc = tuptable_recalc->tupdesc;
Oid *keyTypes = NULL, *types = NULL;
char *keyNulls = NULL, *nulls = NULL;
Datum *keyVals = NULL, *vals = NULL;
int num_vals = list_length(namelist);
int num_keys = list_length(keys);
uint64 i;
/* If we have keys, initialize arrays for them. */
if (keys)
{
keyTypes = palloc(sizeof(Oid) * num_keys);
keyNulls = palloc(sizeof(char) * num_keys);
keyVals = palloc(sizeof(Datum) * num_keys);
/* a tuple contains keys to be recalculated and ctid to be updated*/
Assert(tupdesc_recalc->natts == num_keys + 1);
/* Types of key attributes */
for (i = 0; i < num_keys; i++)
keyTypes[i] = TupleDescAttr(tupdesc_recalc, i + 1)->atttypid;
}
/* allocate memory for all attribute names and tid */
types = palloc(sizeof(Oid) * (num_vals + 1));
nulls = palloc(sizeof(char) * (num_vals + 1));
vals = palloc(sizeof(Datum) * (num_vals + 1));
/* For each tuple which needs recalculation */
for (i = 0; i < num_tuples; i++)
{
int j;
bool isnull;
SPIPlanPtr plan;
SPITupleTable *tuptable_newvals;
TupleDesc tupdesc_newvals;
/* Set group key values as parameters if needed. */
if (keys)
{
for (j = 0; j < num_keys; j++)
{
keyVals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, j + 2, &isnull);
if (isnull)
keyNulls[j] = 'n';
else
keyNulls[j] = ' ';
}
}
/*
* Get recalculated values from base tables. The result must be
* only one tuple thich contains the new values for specified keys.
*/
plan = get_plan_for_recalc(matviewRel, namelist, keys, keyTypes);
if (SPI_execute_plan(plan, keyVals, keyNulls, false, 0) != SPI_OK_SELECT)
elog(ERROR, "SPI_execute_plan");
if (SPI_processed != 1)
elog(ERROR, "SPI_execute_plan returned zero or more than one rows");
tuptable_newvals = SPI_tuptable;
tupdesc_newvals = tuptable_newvals->tupdesc;
Assert(tupdesc_newvals->natts == num_vals);
/* Set the new values as parameters */
for (j = 0; j < tupdesc_newvals->natts; j++)
{
if (i == 0)
types[j] = TupleDescAttr(tupdesc_newvals, j)->atttypid;
vals[j] = SPI_getbinval(tuptable_newvals->vals[0], tupdesc_newvals, j + 1, &isnull);
if (isnull)
nulls[j] = 'n';
else
nulls[j] = ' ';
}
/* Set TID of the view tuple to be updated as a parameter */
types[j] = TIDOID;
vals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, 1, &isnull);
nulls[j] = ' ';
/* Update the view tuple to the new values */
plan = get_plan_for_set_values(matviewRel, namelist, types);
if (SPI_execute_plan(plan, vals, nulls, false, 0) != SPI_OK_UPDATE)
elog(ERROR, "SPI_execute_plan");
}
}
/*
* get_plan_for_recalc
*
* Create or fetch a plan for recalculating value in the view's target list
* from base tables using the definition query of materialized view specified
* by matviewRel. namelist is a list of resnames of values to be recalculated.
*
* keys is a list of keys to identify tuples to be recalculated if this is not
* empty. KeyTypes is an array of types of keys.
*/
static SPIPlanPtr
get_plan_for_recalc(Relation matviewRel, List *namelist, List *keys, Oid *keyTypes)
{
MV_QueryKey hash_key;
SPIPlanPtr plan;
/* Fetch or prepare a saved plan for the recalculation */
mv_BuildQueryKey(&hash_key, RelationGetRelid(matviewRel), MV_PLAN_RECALC);
if ((plan = mv_FetchPreparedPlan(&hash_key)) == NULL)
{
ListCell *lc;
StringInfoData str;
char *viewdef;
/* get view definition of matview */
viewdef = pg_ivm_get_querydef(get_immv_query(matviewRel), false);
/*
* Build a query string for recalculating values. This is like
*
* SELECT x1, x2, x3, ... FROM ( ... view definition query ...) mv
* WHERE (key1, key2, ...) = ($1, $2, ...);
*/
initStringInfo(&str);
appendStringInfo(&str, "SELECT ");
foreach (lc, namelist)
{
appendStringInfo(&str, "%s", (char *) lfirst(lc));
if (lnext(namelist, lc))
appendStringInfoString(&str, ", ");
}
appendStringInfo(&str, " FROM (%s) mv", viewdef);
if (keys)
{
int i = 1;
char paramname[16];
appendStringInfo(&str, " WHERE (");
foreach (lc, keys)
{
Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
char *resname = NameStr(attr->attname);
Oid typid = attr->atttypid;
sprintf(paramname, "$%d", i);
appendStringInfo(&str, "(");
generate_equal(&str, typid, resname, paramname);
appendStringInfo(&str, " OR (%s IS NULL AND %s IS NULL))",
resname, paramname);
if (lnext(keys, lc))
appendStringInfoString(&str, " AND ");
i++;
}
appendStringInfo(&str, ")");
}
else
keyTypes = NULL;
plan = SPI_prepare(str.data, list_length(keys), keyTypes);
if (plan == NULL)
elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
SPI_keepplan(plan);
mv_HashPreparedPlan(&hash_key, plan);
}
return plan;
}
/*
* get_plan_for_set_values
*
* Create or fetch a plan for applying new values calculated by
* get_plan_for_recalc to a materialized view specified by matviewRel.
* namelist is a list of resnames of attributes to be updated, and
* valTypes is an array of types of the
* values.
*/
static SPIPlanPtr
get_plan_for_set_values(Relation matviewRel, List *namelist, Oid *valTypes)
{
MV_QueryKey key;
SPIPlanPtr plan;
char *matviewname;
matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
RelationGetRelationName(matviewRel));
/* Fetch or prepare a saved plan for the real check */
mv_BuildQueryKey(&key, RelationGetRelid(matviewRel), MV_PLAN_SET_VALUE);
if ((plan = mv_FetchPreparedPlan(&key)) == NULL)
{
ListCell *lc;
StringInfoData str;
int i;
/*
* Build a query string for applying min/max values. This is like
*
* UPDATE matviewname AS mv
* SET (x1, x2, x3, x4) = ($1, $2, $3, $4)
* WHERE ctid = $5;
*/
initStringInfo(&str);
appendStringInfo(&str, "UPDATE %s AS mv SET (", matviewname);
foreach (lc, namelist)
{
appendStringInfo(&str, "%s", (char *) lfirst(lc));
if (lnext(namelist, lc))
appendStringInfoString(&str, ", ");
}
appendStringInfo(&str, ") = ROW(");
for (i = 1; i <= list_length(namelist); i++)
appendStringInfo(&str, "%s$%d", (i==1 ? "" : ", "), i);
appendStringInfo(&str, ") WHERE ctid OPERATOR(pg_catalog.=) $%d", i);
plan = SPI_prepare(str.data, list_length(namelist) + 1, valTypes);
if (plan == NULL)
elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
SPI_keepplan(plan);
mv_HashPreparedPlan(&key, plan);
}
return plan;
}
/*
* generate_equal
*
@ -2246,6 +2725,99 @@ mv_InitHashTables(void)
&ctl, HASH_ELEM | HASH_BLOBS);
}
/*
* mv_FetchPreparedPlan
*/
static SPIPlanPtr
mv_FetchPreparedPlan(MV_QueryKey *key)
{
MV_QueryHashEntry *entry;
SPIPlanPtr plan;
/*
* On the first call initialize the hashtable
*/
if (!mv_query_cache)
mv_InitHashTables();
/*
* Lookup for the key
*/
entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
(void *) key,
HASH_FIND, NULL);
if (entry == NULL)
return NULL;
/*
* Check whether the plan is still valid. If it isn't, we don't want to
* simply rely on plancache.c to regenerate it; rather we should start
* from scratch and rebuild the query text too. This is to cover cases
* such as table/column renames. We depend on the plancache machinery to
* detect possible invalidations, though.
*
* CAUTION: this check is only trustworthy if the caller has already
* locked both materialized views and base tables.
*/
plan = entry->plan;
if (plan && SPI_plan_is_valid(plan))
return plan;
/*
* Otherwise we might as well flush the cached plan now, to free a little
* memory space before we make a new one.
*/
entry->plan = NULL;
if (plan)
SPI_freeplan(plan);
return NULL;
}
/*
* mv_HashPreparedPlan
*
* Add another plan to our private SPI query plan hashtable.
*/
static void
mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan)
{
MV_QueryHashEntry *entry;
bool found;
/*
* On the first call initialize the hashtable
*/
if (!mv_query_cache)
mv_InitHashTables();
/*
* Add the new plan. We might be overwriting an entry previously found
* invalid by mv_FetchPreparedPlan.
*/
entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
(void *) key,
HASH_ENTER, &found);
Assert(!found || entry->plan == NULL);
entry->plan = plan;
}
/*
* mv_BuildQueryKey
*
* Construct a hashtable key for a prepared SPI plan for IVM.
*/
static void
mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type)
{
/*
* We assume struct MV_QueryKey contains no padding bytes, else we'd need
* to use memset to clear them.
*/
key->matview_id = matview_id;
key->query_type = query_type;
}
/*
* AtAbort_IVM
*

View file

@ -28,7 +28,6 @@
#include "utils/varlena.h"
#include "pg_ivm.h"
#include "nodes/print.h"
PG_MODULE_MAGIC;

View file

@ -51,4 +51,8 @@ extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);
extern void AtAbort_IVM(void);
extern bool isIvmName(const char *s);
/* ruleutils.c */
extern char *pg_ivm_get_querydef(Query *query, bool pretty);
#endif

59
ruleutils.c Normal file
View file

@ -0,0 +1,59 @@
/*-------------------------------------------------------------------------
*
* ruleutils.c
* incremental view maintenance extension
* Routines for convert stored expressions/querytrees back to
* source text
*
* Portions Copyright (c) 2022, IVM Development Group
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 150000)
#include "utils/ruleutils.h"
#elif defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000)
#include "ruleutils_14.c"
#else
#include "ruleutils_13.c"
#endif
#include "pg_ivm.h"
/* Standard conversion of a "bool pretty" option to detailed flags */
#define GET_PRETTY_FLAGS(pretty) \
((pretty) ? (PRETTYFLAG_PAREN | PRETTYFLAG_INDENT | PRETTYFLAG_SCHEMA) \
: PRETTYFLAG_INDENT)
/* ----------
* pg_get_querydef
*
* Public entry point to deparse one query parsetree.
* The pretty flags are determined by GET_PRETTY_FLAGS(pretty).
*
* The result is a palloc'd C string.
* ----------
*/
char *
pg_ivm_get_querydef(Query *query, bool pretty)
{
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 150000)
return pg_get_querydef(query, pretty);
#else
StringInfoData buf;
int prettyFlags;
prettyFlags = GET_PRETTY_FLAGS(pretty);
initStringInfo(&buf);
get_query_def(query, &buf, NIL, NULL, true,
prettyFlags, WRAP_COLUMN_DEFAULT, 0);
return buf.data;
#endif
}

7844
ruleutils_13.c Normal file

File diff suppressed because it is too large Load diff

8214
ruleutils_14.c Normal file

File diff suppressed because it is too large Load diff

View file

@ -123,9 +123,33 @@ DELETE FROM mv_base_a WHERE (i,j) = (2,30);
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
ROLLBACK;
-- not support MIN(), MAX() aggregate functions
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i');
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
-- support MIN(), MAX() aggregate functions
BEGIN;
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j), MAX(j) FROM mv_base_a GROUP BY i');
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
INSERT INTO mv_base_a VALUES
(1,11), (1,12),
(2,21), (2,22),
(3,31), (3,32),
(4,41), (4,42),
(5,51), (5,52);
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
DELETE FROM mv_base_a WHERE (i,j) IN ((1,10), (2,21), (3,32));
SELECT * FROM mv_ivm_min_max ORDER BY 1,2,3;
ROLLBACK;
-- support MIN(), MAX() aggregate functions without GROUP clause
BEGIN;
SELECT create_immv('mv_ivm_min_max', 'SELECT MIN(j), MAX(j) FROM mv_base_a');
SELECT * FROM mv_ivm_min_max;
INSERT INTO mv_base_a VALUES
(0,0), (6,60), (7,70);
SELECT * FROM mv_ivm_min_max;
DELETE FROM mv_base_a WHERE (i,j) IN ((0,0), (7,70));
SELECT * FROM mv_ivm_min_max;
DELETE FROM mv_base_a;
SELECT * FROM mv_ivm_min_max;
ROLLBACK;
-- support subquery in FROM clause
BEGIN;
@ -207,6 +231,28 @@ INSERT INTO base_t VALUES (1),(NULL);
SELECT * FROM mv ORDER BY i;
ROLLBACK;
BEGIN;
CREATE TABLE base_t (i int, v int);
INSERT INTO base_t VALUES (NULL, 1), (NULL, 2), (1, 10), (1, 20);
SELECT create_immv('mv', 'SELECT i, sum(v) FROM base_t GROUP BY i');
SELECT * FROM mv ORDER BY i;
UPDATE base_t SET v = v * 10;
SELECT * FROM mv ORDER BY i;
ROLLBACK;
BEGIN;
CREATE TABLE base_t (i int, v int);
INSERT INTO base_t VALUES (NULL, 1), (NULL, 2), (NULL, 3), (NULL, 4), (NULL, 5);
SELECT create_immv('mv', 'SELECT i, min(v), max(v) FROM base_t GROUP BY i');
SELECT * FROM mv ORDER BY i;
DELETE FROM base_t WHERE v = 1;
SELECT * FROM mv ORDER BY i;
DELETE FROM base_t WHERE v = 3;
SELECT * FROM mv ORDER BY i;
DELETE FROM base_t WHERE v = 5;
SELECT * FROM mv ORDER BY i;
ROLLBACK;
-- IMMV containing user defined type
BEGIN;