From 57c8bac1a0380774564c7f1838bf839440ba84ec Mon Sep 17 00:00:00 2001 From: Yugo Nagata Date: Tue, 21 Jun 2022 20:50:45 +0900 Subject: [PATCH] Add aggregates support Built-in count, sum, avg are supported. We have not supported min/max yet, but will support in future. When an IMMV with any aggregates are defined, additional columns are created for each aggregate function. Although such columns are "hidden" in pgsql-ivm version, they are always visible for users in the extension version. --- createas.c | 254 +++++++++++++++++++++++-- expected/pg_ivm.out | 194 ++++++++++++++++++-- matview.c | 438 ++++++++++++++++++++++++++++++++++++++++++-- pg_ivm.h | 8 +- sql/pg_ivm.sql | 50 ++++- 5 files changed, 888 insertions(+), 56 deletions(-) diff --git a/createas.c b/createas.c index 5698ca4..581699a 100644 --- a/createas.c +++ b/createas.c @@ -35,12 +35,16 @@ #include "parser/parsetree.h" #include "parser/parse_clause.h" #include "parser/parse_func.h" +#include "parser/parse_type.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/regproc.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" #include "pg_ivm.h" @@ -68,6 +72,7 @@ static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, static void check_ivm_restriction(Node *node); static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context); static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_create); +static bool check_aggregate_supports_ivm(Oid aggfnoid); static void StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery); @@ -269,6 +274,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, * rewriteQueryForIMMV -- rewrite view definition query for IMMV * * count(*) is added for counting distinct tuples in views. + * Also, additional hidden columns are added for aggregate values. */ Query * rewriteQueryForIMMV(Query *query, List *colNames) @@ -283,14 +289,46 @@ rewriteQueryForIMMV(Query *query, List *colNames) rewritten = copyObject(query); pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; - /* - * Convert DISTINCT to GROUP BY and add count(*) for counting distinct - * tuples in views. - */ - if (rewritten->distinctClause) + /* group keys must be in targetlist */ + if (rewritten->groupClause) { + ListCell *lc; + foreach(lc, rewritten->groupClause) + { + SortGroupClause *scl = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(scl, rewritten->targetList); + + if (tle->resjunk) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("GROUP BY expression not appearing in select list is not supported on incrementally maintainable materialized view"))); + } + } + /* Convert DISTINCT to GROUP BY. count(*) will be added afterward. */ + else if (!rewritten->hasAggs && rewritten->distinctClause) rewritten->groupClause = transformDistinctClause(NULL, &rewritten->targetList, rewritten->sortClause, false); + /* Add additional columns for aggregate values */ + if (rewritten->hasAggs) + { + ListCell *lc; + List *aggs = NIL; + AttrNumber next_resno = list_length(rewritten->targetList) + 1; + + foreach(lc, rewritten->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + char *resname = (colNames == NIL ? tle->resname : strVal(list_nth(colNames, tle->resno - 1))); + + if (IsA(tle->expr, Aggref)) + makeIvmAggColumn(pstate, (Aggref *)tle->expr, resname, &next_resno, &aggs); + } + rewritten->targetList = list_concat(rewritten->targetList, aggs); + } + + /* Add count(*) for counting distinct tuples in views */ + if (rewritten->distinctClause || rewritten->hasAggs) + { #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000) fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1); #else @@ -311,6 +349,91 @@ rewriteQueryForIMMV(Query *query, List *colNames) return rewritten; } +/* + * makeIvmAggColumn -- make additional aggregate columns for IVM + * + * For an aggregate column specified by aggref, additional aggregate columns + * are added, which are used to calculate the new aggregate value in IMMV. + * An additional aggregate columns has a name based on resname + * (ex. ivm_count_resname), and resno specified by next_resno. The created + * columns are returned to aggs, and the resno for the next column is also + * returned to next_resno. + * + * Currently, an additional count() is created for aggref other than count. + * In addition, sum() is created for avg aggregate column. + */ +void +makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs) +{ + TargetEntry *tle_count; + Node *node; + FuncCall *fn; + Const *dmy_arg = makeConst(INT4OID, + -1, + InvalidOid, + sizeof(int32), + Int32GetDatum(1), + false, + true); /* pass by value */ + const char *aggname = get_func_name(aggref->aggfnoid); + + /* + * For aggregate functions except count, add count() func with the same arg parameters. + * This count result is used for determining if the aggregate value should be NULL or not. + * Also, add sum() func for avg because we need to calculate an average value as sum/count. + * + * XXX: If there are same expressions explicitly in the target list, we can use this instead + * of adding new duplicated one. + */ + if (strcmp(aggname, "count") != 0) + { + fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1); + + /* Make a Func with a dummy arg, and then override this by the original agg's args. */ + node = ParseFuncOrColumn(pstate, fn->funcname, list_make1(dmy_arg), NULL, fn, false, -1); + ((Aggref *)node)->args = aggref->args; + + tle_count = makeTargetEntry((Expr *) node, + *next_resno, + pstrdup(makeObjectName("__ivm_count",resname, "_")), + false); + *aggs = lappend(*aggs, tle_count); + (*next_resno)++; + } + if (strcmp(aggname, "avg") == 0) + { + List *dmy_args = NIL; + ListCell *lc; + foreach(lc, aggref->aggargtypes) + { + Oid typeid = lfirst_oid(lc); + Type type = typeidType(typeid); + + Const *con = makeConst(typeid, + -1, + typeTypeCollation(type), + typeLen(type), + (Datum) 0, + true, + typeByVal(type)); + dmy_args = lappend(dmy_args, con); + ReleaseSysCache(type); + } + fn = makeFuncCall(list_make1(makeString("sum")), NIL, COERCE_EXPLICIT_CALL, -1); + + /* Make a Func with dummy args, and then override this by the original agg's args. */ + node = ParseFuncOrColumn(pstate, fn->funcname, dmy_args, NULL, fn, false, -1); + ((Aggref *)node)->args = aggref->args; + + tle_count = makeTargetEntry((Expr *) node, + *next_resno, + pstrdup(makeObjectName("__ivm_sum",resname, "_")), + false); + *aggs = lappend(*aggs, tle_count); + (*next_resno)++; + } +} + /* * CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables */ @@ -338,14 +461,17 @@ CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid, bool is_create) * and upgrading the lock level after this increases probability of * deadlock. * - * XXX: For the current extension version, DISTINCT needs exclusive lock - * to prevent inconsistency that can be avoided by using + * XXX: For the current extension version, DISTINCT and aggregates with GROUP + * need exclusive lock to prevent inconsistency that can be avoided by using * nulls_not_distinct which is available only in PG15 or later. + * XXX: This lock is not necessary if all columns in group keys or distinct + * target list are not nullable. */ rte = list_nth(qry->rtable, first_rtindex - 1); if (list_length(qry->rtable) > first_rtindex || - rte->rtekind != RTE_RELATION || qry->distinctClause) + rte->rtekind != RTE_RELATION || qry->distinctClause || + (qry->hasAggs && qry->groupClause)) ex_lock = true; CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock); @@ -534,6 +660,10 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CTE is not supported on incrementally maintainable materialized view"))); + if (qry->groupClause != NIL && !qry->hasAggs) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view"))); if (qry->havingQual != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -668,13 +798,35 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context) errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view"))); expression_tree_walker(node, check_ivm_restriction_walker, NULL); + break; } - break; case T_Aggref: - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("aggregate function is not supported on incrementally maintainable materialized view"))); - break; + { + /* Check if this supports IVM */ + Aggref *aggref = (Aggref *) node; + const char *aggname = format_procedure(aggref->aggfnoid); + + if (aggref->aggfilter != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function with FILTER clause is not supported on incrementally maintainable materialized view"))); + + if (aggref->aggdistinct != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function with DISTINCT arguments is not supported on incrementally maintainable materialized view"))); + + if (aggref->aggorder != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function with ORDER clause is not supported on incrementally maintainable materialized view"))); + + if (!check_aggregate_supports_ivm(aggref->aggfnoid)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname))); + break; + } default: expression_tree_walker(node, check_ivm_restriction_walker, (void *) context); break; @@ -682,6 +834,46 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context) return false; } +/* + * check_aggregate_supports_ivm + * + * Check if the given aggregate function is supporting IVM + */ +static bool +check_aggregate_supports_ivm(Oid aggfnoid) +{ + switch (aggfnoid) + { + /* count */ + case F_COUNT_ANY: + case F_COUNT_: + + /* sum */ + case F_SUM_INT8: + case F_SUM_INT4: + case F_SUM_INT2: + case F_SUM_FLOAT4: + case F_SUM_FLOAT8: + case F_SUM_MONEY: + case F_SUM_INTERVAL: + case F_SUM_NUMERIC: + + /* avg */ + case F_AVG_INT8: + case F_AVG_INT4: + case F_AVG_INT2: + case F_AVG_NUMERIC: + case F_AVG_FLOAT4: + case F_AVG_FLOAT8: + case F_AVG_INTERVAL: + + return true; + + default: + return false; + } +} + /* * CreateIndexOnIMMV * @@ -741,7 +933,29 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create) index->concurrent = false; index->if_not_exists = false; - if (query->distinctClause) + if (query->groupClause) + { + /* create unique constraint on GROUP BY expression columns */ + foreach(lc, query->groupClause) + { + SortGroupClause *scl = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(scl, query->targetList); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1); + IndexElem *iparam; + + iparam = makeNode(IndexElem); + iparam->name = pstrdup(NameStr(attr->attname)); + iparam->expr = NULL; + iparam->indexcolname = NULL; + iparam->collation = NIL; + iparam->opclass = NIL; + iparam->opclassopts = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + } + else if (query->distinctClause) { /* create unique constraint on all columns */ foreach(lc, query->targetList) @@ -799,7 +1013,7 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create) (errmsg("could not create an index on immv \"%s\" automatically", RelationGetRelationName(matviewRel)), errdetail("This target list does not have all the primary key columns, " - "or this view does not contain DISTINCT clause."), + "or this view does not contain GROUP BY or DISTINCT clause."), errhint("Create an index on the immv for efficient incremental maintenance."))); return; } @@ -981,8 +1195,8 @@ StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery) Datum values[Natts_pg_ivm_immv]; bool isNulls[Natts_pg_ivm_immv]; Relation pgIvmImmv; - TupleDesc tupleDescriptor; - HeapTuple heapTuple; + TupleDesc tupleDescriptor; + HeapTuple heapTuple; ObjectAddress address; memset(values, 0, sizeof(values)); @@ -994,10 +1208,10 @@ StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery) pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); - tupleDescriptor = RelationGetDescr(pgIvmImmv); - heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + tupleDescriptor = RelationGetDescr(pgIvmImmv); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - CatalogTupleInsert(pgIvmImmv, heapTuple); + CatalogTupleInsert(pgIvmImmv, heapTuple); address.classId = RelationRelationId; address.objectId = viewOid; diff --git a/expected/pg_ivm.out b/expected/pg_ivm.out index f4fb72c..3cd1f9a 100644 --- a/expected/pg_ivm.out +++ b/expected/pg_ivm.out @@ -17,7 +17,7 @@ INSERT INTO mv_base_b VALUES -- CREATE INCREMENTAL MATERIALIZED VIEW mv_ivm_1 AS SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) WITH NO DATA; SELECT create_immv('mv_ivm_1', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i)'); NOTICE: could not create an index on immv "mv_ivm_1" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -83,7 +83,7 @@ CREATE FUNCTION ivm_func() RETURNS int LANGUAGE 'sql' AS 'SELECT 1' IMMUTABLE; SELECT create_immv('mv_ivm_func', 'SELECT * FROM ivm_func()'); NOTICE: could not create an index on immv "mv_ivm_func" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -92,7 +92,7 @@ HINT: Create an index on the immv for efficient incremental maintenance. SELECT create_immv('mv_ivm_no_tbl', 'SELECT 1'); NOTICE: could not create an index on immv "mv_ivm_no_tbl" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -104,7 +104,7 @@ ROLLBACK; BEGIN; SELECT create_immv('mv_ivm_duplicate', 'SELECT j FROM mv_base_a'); NOTICE: could not create an index on immv "mv_ivm_duplicate" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -162,9 +162,174 @@ SELECT * FROM mv_ivm_distinct ORDER BY 1; (5 rows) ROLLBACK; --- not support SUM(), COUNT() and AVG() aggregate functions +-- support SUM(), COUNT() and AVG() aggregate functions +BEGIN; SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(i), AVG(j) FROM mv_base_a GROUP BY i'); -ERROR: aggregate function is not supported on incrementally maintainable materialized view +NOTICE: created index "mv_ivm_agg_index" on immv "mv_ivm_agg" + create_immv +------------- + 5 +(1 row) + +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +INSERT INTO mv_base_a VALUES(2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 120 | 2 | 60.0000000000000000 | 2 | 2 | 120 | 2 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +UPDATE mv_base_a SET j = 200 WHERE (i,j) = (2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+----------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 220 | 2 | 110.0000000000000000 | 2 | 2 | 220 | 2 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +DELETE FROM mv_base_a WHERE (i,j) = (2,200); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +ROLLBACK; +-- support COUNT(*) aggregate function +BEGIN; +SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(*) FROM mv_base_a GROUP BY i'); +NOTICE: created index "mv_ivm_agg_index" on immv "mv_ivm_agg" + create_immv +------------- + 5 +(1 row) + +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3; + i | sum | count | __ivm_count_sum__ | __ivm_count__ +---+-----+-------+-------------------+--------------- + 1 | 10 | 1 | 1 | 1 + 2 | 20 | 1 | 1 | 1 + 3 | 30 | 1 | 1 | 1 + 4 | 40 | 1 | 1 | 1 + 5 | 50 | 1 | 1 | 1 +(5 rows) + +INSERT INTO mv_base_a VALUES(2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3; + i | sum | count | __ivm_count_sum__ | __ivm_count__ +---+-----+-------+-------------------+--------------- + 1 | 10 | 1 | 1 | 1 + 2 | 120 | 2 | 2 | 2 + 3 | 30 | 1 | 1 | 1 + 4 | 40 | 1 | 1 | 1 + 5 | 50 | 1 | 1 | 1 +(5 rows) + +ROLLBACK; +-- support aggregate functions without GROUP clause +BEGIN; +SELECT create_immv('mv_ivm_group', 'SELECT SUM(j), COUNT(j), AVG(j) FROM mv_base_a'); +NOTICE: could not create an index on immv "mv_ivm_group" automatically +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. +HINT: Create an index on the immv for efficient incremental maintenance. + create_immv +------------- + 1 +(1 row) + +SELECT * FROM mv_ivm_group ORDER BY 1; + sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 150 | 5 | 30.0000000000000000 | 5 | 5 | 150 | 5 +(1 row) + +INSERT INTO mv_base_a VALUES(6,60); +SELECT * FROM mv_ivm_group ORDER BY 1; + sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 210 | 6 | 35.0000000000000000 | 6 | 6 | 210 | 6 +(1 row) + +DELETE FROM mv_base_a; +SELECT * FROM mv_ivm_group ORDER BY 1; + sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +-----+-------+-----+-------------------+-------------------+-----------------+--------------- + | 0 | | 0 | 0 | | 0 +(1 row) + +ROLLBACK; +-- resolved issue: When use AVG() function and values is indivisible, result of AVG() is incorrect. +BEGIN; +SELECT create_immv('mv_ivm_avg_bug', 'SELECT i, SUM(j), COUNT(j), AVG(j) FROM mv_base_A GROUP BY i'); +NOTICE: created index "mv_ivm_avg_bug_index" on immv "mv_ivm_avg_bug" + create_immv +------------- + 5 +(1 row) + +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +INSERT INTO mv_base_a VALUES + (1,0), + (1,0), + (2,30), + (2,30); +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 3 | 3.3333333333333333 | 3 | 3 | 10 | 3 + 2 | 80 | 3 | 26.6666666666666667 | 3 | 3 | 80 | 3 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +DELETE FROM mv_base_a WHERE (i,j) = (1,0); +DELETE FROM mv_base_a WHERE (i,j) = (2,30); +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; + i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__ +---+-----+-------+---------------------+-------------------+-------------------+-----------------+--------------- + 1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1 + 2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1 + 3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1 + 4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1 + 5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1 +(5 rows) + +ROLLBACK; +-- not support MIN(), MAX() aggregate functions +SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i'); +ERROR: aggregate function min(integer) is not supported on incrementally maintainable materialized view +SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i'); +ERROR: aggregate function max(integer) is not supported on incrementally maintainable materialized view -- support self join view and multiple change on the same table BEGIN; CREATE TABLE base_t (i int, v int); @@ -172,7 +337,7 @@ INSERT INTO base_t VALUES (1, 10), (2, 20), (3, 30); SELECT create_immv('mv_self(v1, v2)', 'SELECT t1.v, t2.v FROM base_t AS t1 JOIN base_t AS t2 ON t1.i = t2.i'); NOTICE: could not create an index on immv "mv_self" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -227,7 +392,7 @@ INSERT INTO base_r VALUES (1, 10), (2, 20), (3, 30); INSERT INTO base_s VALUES (1, 100), (2, 200), (3, 300); SELECT create_immv('mv(v1, v2)', 'SELECT r.v, s.v FROM base_r AS r JOIN base_s AS s USING(i)');; NOTICE: could not create an index on immv "mv" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -301,7 +466,7 @@ CREATE TABLE base_t (i int, v int); INSERT INTO base_t VALUES (1,10),(2, NULL); SELECT create_immv('mv', 'SELECT * FROM base_t'); NOTICE: could not create an index on immv "mv" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -328,7 +493,7 @@ BEGIN; CREATE TABLE base_t (i int); SELECT create_immv('mv', 'SELECT * FROM base_t'); NOTICE: could not create an index on immv "mv" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -389,7 +554,7 @@ CREATE TABLE t_mytype (x mytype); SELECT create_immv('mv_mytype', 'SELECT * FROM t_mytype'); NOTICE: could not create an index on immv "mv_mytype" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -434,6 +599,9 @@ ERROR: ORDER BY clause is not supported on incrementally maintainable materiali -- contain HAVING SELECT create_immv('mv_ivm08', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) GROUP BY i,j,k HAVING SUM(i) > 5'); ERROR: HAVING clause is not supported on incrementally maintainable materialized view +-- contain GROUP BY without aggregate +SELECT create_immv('mv_ivm08', 'SELECT i,j FROM mv_base_a GROUP BY i,j'); +ERROR: GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view -- contain view or materialized view CREATE VIEW b_view AS SELECT i,k FROM mv_base_b; CREATE MATERIALIZED VIEW b_mview AS SELECT i,k FROM mv_base_b; @@ -507,7 +675,7 @@ GRANT ALL on num_tbl TO PUBLIC; SET SESSION AUTHORIZATION ivm_user; SELECT create_immv('ivm_rls', 'SELECT * FROM rls_tbl'); NOTICE: could not create an index on immv "ivm_rls" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- @@ -532,7 +700,7 @@ SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3; SELECT create_immv('ivm_rls2', 'SELECT * FROM rls_tbl JOIN num_tbl USING(id)'); NOTICE: could not create an index on immv "ivm_rls2" automatically -DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause. +DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause. HINT: Create an index on the immv for efficient incremental maintenance. create_immv ------------- diff --git a/matview.c b/matview.c index e2be9ef..bfbecab 100644 --- a/matview.c +++ b/matview.c @@ -19,6 +19,7 @@ #include "catalog/heap.h" #include "catalog/pg_trigger.h" #include "commands/cluster.h" +#include "commands/defrem.h" #include "commands/matview.h" #include "commands/tablecmds.h" #include "executor/execdesc.h" @@ -27,6 +28,7 @@ #include "executor/tstoreReceiver.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "parser/analyze.h" #include "parser/parse_clause.h" #include "parser/parse_func.h" @@ -39,6 +41,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/fmgrprotos.h" #include "utils/hsearch.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -50,6 +53,32 @@ #define MV_INIT_QUERYHASHSIZE 16 +/* MV query type codes */ +#define MV_PLAN_RECALC 1 +#define MV_PLAN_SET_VALUE 2 + +/* + * MI_QueryKey + * + * The key identifying a prepared SPI plan in our query hashtable + */ +typedef struct MV_QueryKey +{ + Oid matview_id; /* OID of materialized view */ + int32 query_type; /* query type ID, see MV_PLAN_XXX above */ +} MV_QueryKey; + +/* + * MV_QueryHashEntry + * + * Hash entry for cached plans used to maintain materialized views. + */ +typedef struct MV_QueryHashEntry +{ + MV_QueryKey key; + SPIPlanPtr plan; +} MV_QueryHashEntry; + /* * MV_TriggerHashEntry * @@ -86,8 +115,16 @@ typedef struct MV_TriggerTable RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */ } MV_TriggerTable; +static HTAB *mv_query_cache = NULL; static HTAB *mv_trigger_info = NULL; +/* kind of IVM operation for the view */ +typedef enum +{ + IVM_ADD, + IVM_SUB +} IvmOp; + /* ENR name for materialized view delta */ #define NEW_DELTA_ENRNAME "new_delta" #define OLD_DELTA_ENRNAME "old_delta" @@ -114,7 +151,7 @@ static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *tabl QueryEnvironment *queryEnv); static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix, QueryEnvironment *queryEnv); -static Query *rewrite_query_for_distinct(Query *query, ParseState *pstate); +static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate); static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query, DestReceiver *dest_old, DestReceiver *dest_new, @@ -125,14 +162,27 @@ static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable * static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, TupleDesc tupdesc_old, TupleDesc tupdesc_new, Query *query, bool use_count, char *count_colname); +static void append_set_clause_for_count(const char *resname, StringInfo buf_old, + StringInfo buf_new,StringInfo aggs_list); +static void append_set_clause_for_sum(const char *resname, StringInfo buf_old, + StringInfo buf_new, StringInfo aggs_list); +static void append_set_clause_for_avg(const char *resname, StringInfo buf_old, + StringInfo buf_new, StringInfo aggs_list, + const char *aggtype); +static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2, + const char* count_col, const char *castType); +static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2, + const char* count_col); static void apply_old_delta(const char *matviewname, const char *deltaname_old, List *keys); static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old, - List *keys, const char *count_colname); + List *keys, StringInfo aggs_list, StringInfo aggs_set, + const char *count_colname); static void apply_new_delta(const char *matviewname, const char *deltaname_new, StringInfo target_list); static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new, - List *keys, StringInfo target_list, const char* count_colname); + List *keys, StringInfo target_list, StringInfo aggs_set, + const char* count_colname); static char *get_matching_condition_string(List *keys); static void generate_equal(StringInfo querybuf, Oid opttype, const char *leftop, const char *rightop); @@ -820,8 +870,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables, entry->xid, entry->cid, pstate); - /* Rewrite for DISTINCT clause */ - rewritten = rewrite_query_for_distinct(rewritten, pstate); + /* Rewrite for DISTINCT clause and aggregates functions */ + rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate); /* Create tuplestores to store view deltas */ if (entry->has_old) @@ -885,7 +935,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) count_colname = pstrdup("__ivm_count__"); - if (query->distinctClause) + if (query->hasAggs || query->distinctClause) use_count = true; /* calculate delta tables */ @@ -1256,17 +1306,34 @@ union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix, } /* - * rewrite_query_for_distinct + * rewrite_query_for_distinct_and_aggregates * - * Rewrite query for counting DISTINCT clause. + * Rewrite query for counting DISTINCT clause and aggregate functions. */ static Query * -rewrite_query_for_distinct(Query *query, ParseState *pstate) +rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate) { TargetEntry *tle_count; FuncCall *fn; Node *node; + /* For aggregate views */ + if (query->hasAggs) + { + ListCell *lc; + List *aggs = NIL; + AttrNumber next_resno = list_length(query->targetList) + 1; + + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (IsA(tle->expr, Aggref)) + makeIvmAggColumn(pstate, (Aggref *)tle->expr, tle->resname, &next_resno, &aggs); + } + query->targetList = list_concat(query->targetList, aggs); + } + /* Add count(*) for counting distinct tuples in views */ #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000) fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1); @@ -1339,6 +1406,8 @@ rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte return query; } +#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_") + /* * apply_delta * @@ -1352,13 +1421,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n { StringInfoData querybuf; StringInfoData target_list_buf; + StringInfo aggs_list_buf = NULL; + StringInfo aggs_set_old = NULL; + StringInfo aggs_set_new = NULL; Relation matviewRel; char *matviewname; ListCell *lc; int i; List *keys = NIL; - /* * get names of the materialized view and delta tables */ @@ -1374,6 +1445,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n initStringInfo(&querybuf); initStringInfo(&target_list_buf); + if (query->hasAggs) + { + if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0) + aggs_set_old = makeStringInfo(); + if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0) + aggs_set_new = makeStringInfo(); + aggs_list_buf = makeStringInfo(); + } + /* build string of target list */ for (i = 0; i < matviewRel->rd_att->natts; i++) { @@ -1390,13 +1470,61 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n { TargetEntry *tle = (TargetEntry *) lfirst(lc); Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + char *resname = NameStr(attr->attname); i++; if (tle->resjunk) continue; - keys = lappend(keys, attr); + /* + * For views without aggregates, all attributes are used as keys to identify a + * tuple in a view. + */ + if (!query->hasAggs) + keys = lappend(keys, attr); + + /* For views with aggregates, we need to build SET clause for updating aggregate + * values. */ + if (query->hasAggs && IsA(tle->expr, Aggref)) + { + Aggref *aggref = (Aggref *) tle->expr; + const char *aggname = get_func_name(aggref->aggfnoid); + + /* + * We can use function names here because it is already checked if these + * can be used in IMMV by its OID at the definition time. + */ + + /* count */ + if (!strcmp(aggname, "count")) + append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf); + + /* sum */ + else if (!strcmp(aggname, "sum")) + append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf); + + /* avg */ + else if (!strcmp(aggname, "avg")) + append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf, + format_type_be(aggref->aggtype)); + + else + elog(ERROR, "unsupported aggregate function: %s", aggname); + } + } + + /* If we have GROUP BY clause, we use its entries as keys. */ + if (query->hasAggs && query->groupClause) + { + foreach (lc, query->groupClause) + { + SortGroupClause *sgcl = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(sgcl, query->targetList); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1); + + keys = lappend(keys, attr); + } } /* Start maintaining the materialized view. */ @@ -1425,12 +1553,12 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n elog(ERROR, "SPI_register failed"); if (use_count) - /* apply old delta and get rows to be recalculated */ + /* apply old delta */ apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME, - keys, count_colname); + keys, aggs_list_buf, aggs_set_old, + count_colname); else apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys); - } /* For tuple insertion */ if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0) @@ -1453,7 +1581,7 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n /* apply new delta */ if (use_count) apply_new_delta_with_count(matviewname, NEW_DELTA_ENRNAME, - keys, &target_list_buf, count_colname); + keys, aggs_set_new, &target_list_buf, count_colname); else apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf); } @@ -1468,20 +1596,272 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n elog(ERROR, "SPI_finish failed"); } +/* + * append_set_clause_for_count + * + * Append SET clause string for count aggregation to given buffers. + * Also, append resnames required for calculating the aggregate value. + */ +static void +append_set_clause_for_count(const char *resname, StringInfo buf_old, + StringInfo buf_new,StringInfo aggs_list) +{ + /* For tuple deletion */ + if (buf_old) + { + /* resname = mv.resname - t.resname */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL)); + } + /* For tuple insertion */ + if (buf_new) + { + /* resname = mv.resname + diff.resname */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL)); + } + + appendStringInfo(aggs_list, ", %s", + quote_qualified_identifier("diff", resname) + ); +} + +/* + * append_set_clause_for_sum + * + * Append SET clause string for sum aggregation to given buffers. + * Also, append resnames required for calculating the aggregate value. + */ +static void +append_set_clause_for_sum(const char *resname, StringInfo buf_old, + StringInfo buf_new, StringInfo aggs_list) +{ + char *count_col = IVM_colname("count", resname); + + /* For tuple deletion */ + if (buf_old) + { + /* sum = mv.sum - t.sum */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL) + ); + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + } + /* For tuple insertion */ + if (buf_new) + { + /* sum = mv.sum + diff.sum */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL) + ); + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + + appendStringInfo(aggs_list, ", %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("count", resname)) + ); +} + +/* + * append_set_clause_for_avg + * + * Append SET clause string for avg aggregation to given buffers. + * Also, append resnames required for calculating the aggregate value. + */ +static void +append_set_clause_for_avg(const char *resname, StringInfo buf_old, + StringInfo buf_new, StringInfo aggs_list, + const char *aggtype) +{ + char *sum_col = IVM_colname("sum", resname); + char *count_col = IVM_colname("count", resname); + + /* For tuple deletion */ + if (buf_old) + { + /* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */ + appendStringInfo(buf_old, + ", %s = %s OPERATOR(pg_catalog./) %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + /* sum = mv.sum - t.sum */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, sum_col), + get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL) + ); + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + + } + /* For tuple insertion */ + if (buf_new) + { + /* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */ + appendStringInfo(buf_new, + ", %s = %s OPERATOR(pg_catalog./) %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + /* sum = mv.sum + diff.sum */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, sum_col), + get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL) + ); + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + + appendStringInfo(aggs_list, ", %s, %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("sum", resname)), + quote_qualified_identifier("diff", IVM_colname("count", resname)) + ); +} + +/* + * get_operation_string + * + * Build a string to calculate the new aggregate values. + */ +static char * +get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2, + const char* count_col, const char *castType) +{ + StringInfoData buf; + StringInfoData castString; + char *col1 = quote_qualified_identifier(arg1, col); + char *col2 = quote_qualified_identifier(arg2, col); + char op_char = (op == IVM_SUB ? '-' : '+'); + + initStringInfo(&buf); + initStringInfo(&castString); + + if (castType) + appendStringInfo(&castString, "::%s", castType); + + if (!count_col) + { + /* + * If the attributes don't have count columns then calc the result + * by using the operator simply. + */ + appendStringInfo(&buf, "(%s OPERATOR(pg_catalog.%c) %s)%s", + col1, op_char, col2, castString.data); + } + else + { + /* + * If the attributes have count columns then consider the condition + * where the result becomes NULL. + */ + char *null_cond = get_null_condition_string(op, arg1, arg2, count_col); + + appendStringInfo(&buf, + "(CASE WHEN %s THEN NULL " + "WHEN %s IS NULL THEN %s " + "WHEN %s IS NULL THEN %s " + "ELSE (%s OPERATOR(pg_catalog.%c) %s)%s END)", + null_cond, + col1, col2, + col2, col1, + col1, op_char, col2, castString.data + ); + } + + return buf.data; +} + +/* + * get_null_condition_string + * + * Build a predicate string for CASE clause to check if an aggregate value + * will became NULL after the given operation is applied. + */ +static char * +get_null_condition_string(IvmOp op, const char *arg1, const char *arg2, + const char* count_col) +{ + StringInfoData null_cond; + initStringInfo(&null_cond); + + switch (op) + { + case IVM_ADD: + appendStringInfo(&null_cond, + "%s OPERATOR(pg_catalog.=) 0 AND %s OPERATOR(pg_catalog.=) 0", + quote_qualified_identifier(arg1, count_col), + quote_qualified_identifier(arg2, count_col) + ); + break; + case IVM_SUB: + appendStringInfo(&null_cond, + "%s OPERATOR(pg_catalog.=) %s", + quote_qualified_identifier(arg1, count_col), + quote_qualified_identifier(arg2, count_col) + ); + break; + default: + elog(ERROR,"unknown operation"); + } + + return null_cond.data; +} + + /* * apply_old_delta_with_count * * Execute a query for applying a delta table given by deltname_old * which contains tuples to be deleted from to a materialized view given by * matviewname. This is used when counting is required, that is, the view - * has aggregate or distinct. + * has aggregate or distinct. Also, when a table in EXISTS sub queries + * is modified. + * + * If the view desn't have aggregates or has GROUP BY, this requires a keys + * list to identify a tuple in the view. If the view has aggregates, this + * requires strings representing resnames of aggregates and SET clause for + * updating aggregate values. */ static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old, - List *keys, const char *count_colname) + List *keys, StringInfo aggs_list, StringInfo aggs_set, + const char *count_colname) { StringInfoData querybuf; char *match_cond; + bool agg_without_groupby = (list_length(keys) == 0); /* build WHERE condition for searching tuples to be deleted */ match_cond = get_matching_condition_string(keys); @@ -1491,22 +1871,26 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old, appendStringInfo(&querybuf, "WITH t AS (" /* collecting tid of target tuples in the view */ "SELECT diff.%s, " /* count column */ - "(diff.%s OPERATOR(pg_catalog.=) mv.%s) AS for_dlt, " + "(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, " "mv.ctid " + "%s " /* aggregate columns */ "FROM %s AS mv, %s AS diff " "WHERE %s" /* tuple matching condition */ "), updt AS (" /* update a tuple if this is not to be deleted */ "UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s " + "%s" /* SET clauses for aggregates */ "FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt " ")" /* delete a tuple if this is to be deleted */ "DELETE FROM %s AS mv USING t " "WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt", count_colname, - count_colname, count_colname, + count_colname, count_colname, (agg_without_groupby ? "false" : "true"), + (aggs_list != NULL ? aggs_list->data : ""), matviewname, deltaname_old, match_cond, matviewname, count_colname, count_colname, count_colname, + (aggs_set != NULL ? aggs_set->data : ""), matviewname); if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) @@ -1570,10 +1954,15 @@ apply_old_delta(const char *matviewname, const char *deltaname_old, * matviewname. This is used when counting is required, that is, the view * has aggregate or distinct. Also, when a table in EXISTS sub queries * is modified. + * + * If the view desn't have aggregates or has GROUP BY, this requires a keys + * list to identify a tuple in the view. If the view has aggregates, this + * requires strings representing SET clause for updating aggregate values. */ static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new, - List *keys, StringInfo target_list, const char* count_colname) + List *keys, StringInfo aggs_set, StringInfo target_list, + const char* count_colname) { StringInfoData querybuf; StringInfoData returning_keys; @@ -1604,6 +1993,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new, appendStringInfo(&querybuf, "WITH updt AS (" /* update a tuple if this exists in the view */ "UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s " + "%s " /* SET clauses for aggregates */ "FROM %s AS diff " "WHERE %s " /* tuple matching condition */ "RETURNING %s" /* returning keys of updated tuples */ @@ -1611,6 +2001,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new, "SELECT %s FROM %s AS diff " "WHERE NOT EXISTS (SELECT 1 FROM updt AS mv WHERE %s);", matviewname, count_colname, count_colname, count_colname, + (aggs_set != NULL ? aggs_set->data : ""), deltaname_new, match_cond, returning_keys.data, @@ -1718,6 +2109,13 @@ mv_InitHashTables(void) { HASHCTL ctl; + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(MV_QueryKey); + ctl.entrysize = sizeof(MV_QueryHashEntry); + mv_query_cache = hash_create("MV query cache", + MV_INIT_QUERYHASHSIZE, + &ctl, HASH_ELEM | HASH_BLOBS); + memset(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(Oid); ctl.entrysize = sizeof(MV_TriggerHashEntry); diff --git a/pg_ivm.h b/pg_ivm.h index 9ebac58..53faac9 100644 --- a/pg_ivm.h +++ b/pg_ivm.h @@ -22,8 +22,8 @@ #define Natts_pg_ivm_immv 3 #define Anum_pg_ivm_immv_immvrelid 1 -#define Anum_pg_ivm_immv_ispopulated 2 -#define Anum_pg_ivm_immv_viewdef 3 +#define Anum_pg_ivm_immv_viewdef 2 +#define Anum_pg_ivm_immv_ispopulated 3 /* pg_ivm.c */ @@ -39,6 +39,7 @@ extern ObjectAddress ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid, bool is_create); extern void CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create); extern Query *rewriteQueryForIMMV(Query *query, List *colNames); +extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs); /* matview.c */ @@ -49,5 +50,8 @@ extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); extern void AtAbort_IVM(void); extern bool isIvmName(const char *s); +extern Datum pg_get_viewdef(PG_FUNCTION_ARGS); + + #endif diff --git a/sql/pg_ivm.sql b/sql/pg_ivm.sql index 5689d8c..56210a5 100644 --- a/sql/pg_ivm.sql +++ b/sql/pg_ivm.sql @@ -51,8 +51,54 @@ SELECT * FROM mv_ivm_duplicate ORDER BY 1; SELECT * FROM mv_ivm_distinct ORDER BY 1; ROLLBACK; --- not support SUM(), COUNT() and AVG() aggregate functions +-- support SUM(), COUNT() and AVG() aggregate functions +BEGIN; SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(i), AVG(j) FROM mv_base_a GROUP BY i'); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; +INSERT INTO mv_base_a VALUES(2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; +UPDATE mv_base_a SET j = 200 WHERE (i,j) = (2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; +DELETE FROM mv_base_a WHERE (i,j) = (2,200); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4; +ROLLBACK; + +-- support COUNT(*) aggregate function +BEGIN; +SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(*) FROM mv_base_a GROUP BY i'); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3; +INSERT INTO mv_base_a VALUES(2,100); +SELECT * FROM mv_ivm_agg ORDER BY 1,2,3; +ROLLBACK; + +-- support aggregate functions without GROUP clause +BEGIN; +SELECT create_immv('mv_ivm_group', 'SELECT SUM(j), COUNT(j), AVG(j) FROM mv_base_a'); +SELECT * FROM mv_ivm_group ORDER BY 1; +INSERT INTO mv_base_a VALUES(6,60); +SELECT * FROM mv_ivm_group ORDER BY 1; +DELETE FROM mv_base_a; +SELECT * FROM mv_ivm_group ORDER BY 1; +ROLLBACK; + +-- resolved issue: When use AVG() function and values is indivisible, result of AVG() is incorrect. +BEGIN; +SELECT create_immv('mv_ivm_avg_bug', 'SELECT i, SUM(j), COUNT(j), AVG(j) FROM mv_base_A GROUP BY i'); +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; +INSERT INTO mv_base_a VALUES + (1,0), + (1,0), + (2,30), + (2,30); +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; +DELETE FROM mv_base_a WHERE (i,j) = (1,0); +DELETE FROM mv_base_a WHERE (i,j) = (2,30); +SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3; +ROLLBACK; + +-- not support MIN(), MAX() aggregate functions +SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i'); +SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i'); -- support self join view and multiple change on the same table BEGIN; @@ -191,6 +237,8 @@ SELECT create_immv('mv_ivm05', 'SELECT i,j, (SELECT k FROM mv_base_b b WHERE a.i SELECT create_immv('mv_ivm07', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) ORDER BY i,j,k'); -- contain HAVING SELECT create_immv('mv_ivm08', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) GROUP BY i,j,k HAVING SUM(i) > 5'); +-- contain GROUP BY without aggregate +SELECT create_immv('mv_ivm08', 'SELECT i,j FROM mv_base_a GROUP BY i,j'); -- contain view or materialized view CREATE VIEW b_view AS SELECT i,k FROM mv_base_b;