Add aggregates support

Built-in count, sum, avg are supported. We have not supported min/max
yet, but will support in future.

When an IMMV with any aggregates are defined, additional columns
are created for each aggregate function. Although such columns are
"hidden" in pgsql-ivm version, they are always visible for users
in the extension version.
This commit is contained in:
Yugo Nagata 2022-06-21 20:50:45 +09:00
parent 1e80a34a86
commit 57c8bac1a0
5 changed files with 888 additions and 56 deletions

View file

@ -35,12 +35,16 @@
#include "parser/parsetree.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "pg_ivm.h"
@ -68,6 +72,7 @@ static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing,
static void check_ivm_restriction(Node *node);
static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_create);
static bool check_aggregate_supports_ivm(Oid aggfnoid);
static void StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery);
@ -269,6 +274,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
* rewriteQueryForIMMV -- rewrite view definition query for IMMV
*
* count(*) is added for counting distinct tuples in views.
* Also, additional hidden columns are added for aggregate values.
*/
Query *
rewriteQueryForIMMV(Query *query, List *colNames)
@ -283,14 +289,46 @@ rewriteQueryForIMMV(Query *query, List *colNames)
rewritten = copyObject(query);
pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
/*
* Convert DISTINCT to GROUP BY and add count(*) for counting distinct
* tuples in views.
*/
if (rewritten->distinctClause)
/* group keys must be in targetlist */
if (rewritten->groupClause)
{
ListCell *lc;
foreach(lc, rewritten->groupClause)
{
SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(scl, rewritten->targetList);
if (tle->resjunk)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("GROUP BY expression not appearing in select list is not supported on incrementally maintainable materialized view")));
}
}
/* Convert DISTINCT to GROUP BY. count(*) will be added afterward. */
else if (!rewritten->hasAggs && rewritten->distinctClause)
rewritten->groupClause = transformDistinctClause(NULL, &rewritten->targetList, rewritten->sortClause, false);
/* Add additional columns for aggregate values */
if (rewritten->hasAggs)
{
ListCell *lc;
List *aggs = NIL;
AttrNumber next_resno = list_length(rewritten->targetList) + 1;
foreach(lc, rewritten->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
char *resname = (colNames == NIL ? tle->resname : strVal(list_nth(colNames, tle->resno - 1)));
if (IsA(tle->expr, Aggref))
makeIvmAggColumn(pstate, (Aggref *)tle->expr, resname, &next_resno, &aggs);
}
rewritten->targetList = list_concat(rewritten->targetList, aggs);
}
/* Add count(*) for counting distinct tuples in views */
if (rewritten->distinctClause || rewritten->hasAggs)
{
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000)
fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
#else
@ -311,6 +349,91 @@ rewriteQueryForIMMV(Query *query, List *colNames)
return rewritten;
}
/*
* makeIvmAggColumn -- make additional aggregate columns for IVM
*
* For an aggregate column specified by aggref, additional aggregate columns
* are added, which are used to calculate the new aggregate value in IMMV.
* An additional aggregate columns has a name based on resname
* (ex. ivm_count_resname), and resno specified by next_resno. The created
* columns are returned to aggs, and the resno for the next column is also
* returned to next_resno.
*
* Currently, an additional count() is created for aggref other than count.
* In addition, sum() is created for avg aggregate column.
*/
void
makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs)
{
TargetEntry *tle_count;
Node *node;
FuncCall *fn;
Const *dmy_arg = makeConst(INT4OID,
-1,
InvalidOid,
sizeof(int32),
Int32GetDatum(1),
false,
true); /* pass by value */
const char *aggname = get_func_name(aggref->aggfnoid);
/*
* For aggregate functions except count, add count() func with the same arg parameters.
* This count result is used for determining if the aggregate value should be NULL or not.
* Also, add sum() func for avg because we need to calculate an average value as sum/count.
*
* XXX: If there are same expressions explicitly in the target list, we can use this instead
* of adding new duplicated one.
*/
if (strcmp(aggname, "count") != 0)
{
fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
/* Make a Func with a dummy arg, and then override this by the original agg's args. */
node = ParseFuncOrColumn(pstate, fn->funcname, list_make1(dmy_arg), NULL, fn, false, -1);
((Aggref *)node)->args = aggref->args;
tle_count = makeTargetEntry((Expr *) node,
*next_resno,
pstrdup(makeObjectName("__ivm_count",resname, "_")),
false);
*aggs = lappend(*aggs, tle_count);
(*next_resno)++;
}
if (strcmp(aggname, "avg") == 0)
{
List *dmy_args = NIL;
ListCell *lc;
foreach(lc, aggref->aggargtypes)
{
Oid typeid = lfirst_oid(lc);
Type type = typeidType(typeid);
Const *con = makeConst(typeid,
-1,
typeTypeCollation(type),
typeLen(type),
(Datum) 0,
true,
typeByVal(type));
dmy_args = lappend(dmy_args, con);
ReleaseSysCache(type);
}
fn = makeFuncCall(list_make1(makeString("sum")), NIL, COERCE_EXPLICIT_CALL, -1);
/* Make a Func with dummy args, and then override this by the original agg's args. */
node = ParseFuncOrColumn(pstate, fn->funcname, dmy_args, NULL, fn, false, -1);
((Aggref *)node)->args = aggref->args;
tle_count = makeTargetEntry((Expr *) node,
*next_resno,
pstrdup(makeObjectName("__ivm_sum",resname, "_")),
false);
*aggs = lappend(*aggs, tle_count);
(*next_resno)++;
}
}
/*
* CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables
*/
@ -338,14 +461,17 @@ CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid, bool is_create)
* and upgrading the lock level after this increases probability of
* deadlock.
*
* XXX: For the current extension version, DISTINCT needs exclusive lock
* to prevent inconsistency that can be avoided by using
* XXX: For the current extension version, DISTINCT and aggregates with GROUP
* need exclusive lock to prevent inconsistency that can be avoided by using
* nulls_not_distinct which is available only in PG15 or later.
* XXX: This lock is not necessary if all columns in group keys or distinct
* target list are not nullable.
*/
rte = list_nth(qry->rtable, first_rtindex - 1);
if (list_length(qry->rtable) > first_rtindex ||
rte->rtekind != RTE_RELATION || qry->distinctClause)
rte->rtekind != RTE_RELATION || qry->distinctClause ||
(qry->hasAggs && qry->groupClause))
ex_lock = true;
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock);
@ -534,6 +660,10 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CTE is not supported on incrementally maintainable materialized view")));
if (qry->groupClause != NIL && !qry->hasAggs)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view")));
if (qry->havingQual != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -668,13 +798,35 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view")));
expression_tree_walker(node, check_ivm_restriction_walker, NULL);
break;
}
break;
case T_Aggref:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function is not supported on incrementally maintainable materialized view")));
break;
{
/* Check if this supports IVM */
Aggref *aggref = (Aggref *) node;
const char *aggname = format_procedure(aggref->aggfnoid);
if (aggref->aggfilter != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with FILTER clause is not supported on incrementally maintainable materialized view")));
if (aggref->aggdistinct != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with DISTINCT arguments is not supported on incrementally maintainable materialized view")));
if (aggref->aggorder != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function with ORDER clause is not supported on incrementally maintainable materialized view")));
if (!check_aggregate_supports_ivm(aggref->aggfnoid))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname)));
break;
}
default:
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;
@ -682,6 +834,46 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
return false;
}
/*
* check_aggregate_supports_ivm
*
* Check if the given aggregate function is supporting IVM
*/
static bool
check_aggregate_supports_ivm(Oid aggfnoid)
{
switch (aggfnoid)
{
/* count */
case F_COUNT_ANY:
case F_COUNT_:
/* sum */
case F_SUM_INT8:
case F_SUM_INT4:
case F_SUM_INT2:
case F_SUM_FLOAT4:
case F_SUM_FLOAT8:
case F_SUM_MONEY:
case F_SUM_INTERVAL:
case F_SUM_NUMERIC:
/* avg */
case F_AVG_INT8:
case F_AVG_INT4:
case F_AVG_INT2:
case F_AVG_NUMERIC:
case F_AVG_FLOAT4:
case F_AVG_FLOAT8:
case F_AVG_INTERVAL:
return true;
default:
return false;
}
}
/*
* CreateIndexOnIMMV
*
@ -741,7 +933,29 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create)
index->concurrent = false;
index->if_not_exists = false;
if (query->distinctClause)
if (query->groupClause)
{
/* create unique constraint on GROUP BY expression columns */
foreach(lc, query->groupClause)
{
SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(scl, query->targetList);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
IndexElem *iparam;
iparam = makeNode(IndexElem);
iparam->name = pstrdup(NameStr(attr->attname));
iparam->expr = NULL;
iparam->indexcolname = NULL;
iparam->collation = NIL;
iparam->opclass = NIL;
iparam->opclassopts = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
}
else if (query->distinctClause)
{
/* create unique constraint on all columns */
foreach(lc, query->targetList)
@ -799,7 +1013,7 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create)
(errmsg("could not create an index on immv \"%s\" automatically",
RelationGetRelationName(matviewRel)),
errdetail("This target list does not have all the primary key columns, "
"or this view does not contain DISTINCT clause."),
"or this view does not contain GROUP BY or DISTINCT clause."),
errhint("Create an index on the immv for efficient incremental maintenance.")));
return;
}
@ -981,8 +1195,8 @@ StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery)
Datum values[Natts_pg_ivm_immv];
bool isNulls[Natts_pg_ivm_immv];
Relation pgIvmImmv;
TupleDesc tupleDescriptor;
HeapTuple heapTuple;
TupleDesc tupleDescriptor;
HeapTuple heapTuple;
ObjectAddress address;
memset(values, 0, sizeof(values));
@ -994,10 +1208,10 @@ StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery)
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgIvmImmv);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
tupleDescriptor = RelationGetDescr(pgIvmImmv);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgIvmImmv, heapTuple);
CatalogTupleInsert(pgIvmImmv, heapTuple);
address.classId = RelationRelationId;
address.objectId = viewOid;

View file

@ -17,7 +17,7 @@ INSERT INTO mv_base_b VALUES
-- CREATE INCREMENTAL MATERIALIZED VIEW mv_ivm_1 AS SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) WITH NO DATA;
SELECT create_immv('mv_ivm_1', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i)');
NOTICE: could not create an index on immv "mv_ivm_1" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -83,7 +83,7 @@ CREATE FUNCTION ivm_func() RETURNS int LANGUAGE 'sql'
AS 'SELECT 1' IMMUTABLE;
SELECT create_immv('mv_ivm_func', 'SELECT * FROM ivm_func()');
NOTICE: could not create an index on immv "mv_ivm_func" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -92,7 +92,7 @@ HINT: Create an index on the immv for efficient incremental maintenance.
SELECT create_immv('mv_ivm_no_tbl', 'SELECT 1');
NOTICE: could not create an index on immv "mv_ivm_no_tbl" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -104,7 +104,7 @@ ROLLBACK;
BEGIN;
SELECT create_immv('mv_ivm_duplicate', 'SELECT j FROM mv_base_a');
NOTICE: could not create an index on immv "mv_ivm_duplicate" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -162,9 +162,174 @@ SELECT * FROM mv_ivm_distinct ORDER BY 1;
(5 rows)
ROLLBACK;
-- not support SUM(), COUNT() and AVG() aggregate functions
-- support SUM(), COUNT() and AVG() aggregate functions
BEGIN;
SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(i), AVG(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function is not supported on incrementally maintainable materialized view
NOTICE: created index "mv_ivm_agg_index" on immv "mv_ivm_agg"
create_immv
-------------
5
(1 row)
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
INSERT INTO mv_base_a VALUES(2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 120 | 2 | 60.0000000000000000 | 2 | 2 | 120 | 2
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
UPDATE mv_base_a SET j = 200 WHERE (i,j) = (2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+----------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 220 | 2 | 110.0000000000000000 | 2 | 2 | 220 | 2
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
DELETE FROM mv_base_a WHERE (i,j) = (2,200);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
ROLLBACK;
-- support COUNT(*) aggregate function
BEGIN;
SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(*) FROM mv_base_a GROUP BY i');
NOTICE: created index "mv_ivm_agg_index" on immv "mv_ivm_agg"
create_immv
-------------
5
(1 row)
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3;
i | sum | count | __ivm_count_sum__ | __ivm_count__
---+-----+-------+-------------------+---------------
1 | 10 | 1 | 1 | 1
2 | 20 | 1 | 1 | 1
3 | 30 | 1 | 1 | 1
4 | 40 | 1 | 1 | 1
5 | 50 | 1 | 1 | 1
(5 rows)
INSERT INTO mv_base_a VALUES(2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3;
i | sum | count | __ivm_count_sum__ | __ivm_count__
---+-----+-------+-------------------+---------------
1 | 10 | 1 | 1 | 1
2 | 120 | 2 | 2 | 2
3 | 30 | 1 | 1 | 1
4 | 40 | 1 | 1 | 1
5 | 50 | 1 | 1 | 1
(5 rows)
ROLLBACK;
-- support aggregate functions without GROUP clause
BEGIN;
SELECT create_immv('mv_ivm_group', 'SELECT SUM(j), COUNT(j), AVG(j) FROM mv_base_a');
NOTICE: could not create an index on immv "mv_ivm_group" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
1
(1 row)
SELECT * FROM mv_ivm_group ORDER BY 1;
sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
150 | 5 | 30.0000000000000000 | 5 | 5 | 150 | 5
(1 row)
INSERT INTO mv_base_a VALUES(6,60);
SELECT * FROM mv_ivm_group ORDER BY 1;
sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
210 | 6 | 35.0000000000000000 | 6 | 6 | 210 | 6
(1 row)
DELETE FROM mv_base_a;
SELECT * FROM mv_ivm_group ORDER BY 1;
sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
-----+-------+-----+-------------------+-------------------+-----------------+---------------
| 0 | | 0 | 0 | | 0
(1 row)
ROLLBACK;
-- resolved issue: When use AVG() function and values is indivisible, result of AVG() is incorrect.
BEGIN;
SELECT create_immv('mv_ivm_avg_bug', 'SELECT i, SUM(j), COUNT(j), AVG(j) FROM mv_base_A GROUP BY i');
NOTICE: created index "mv_ivm_avg_bug_index" on immv "mv_ivm_avg_bug"
create_immv
-------------
5
(1 row)
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
INSERT INTO mv_base_a VALUES
(1,0),
(1,0),
(2,30),
(2,30);
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 3 | 3.3333333333333333 | 3 | 3 | 10 | 3
2 | 80 | 3 | 26.6666666666666667 | 3 | 3 | 80 | 3
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
DELETE FROM mv_base_a WHERE (i,j) = (1,0);
DELETE FROM mv_base_a WHERE (i,j) = (2,30);
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
i | sum | count | avg | __ivm_count_sum__ | __ivm_count_avg__ | __ivm_sum_avg__ | __ivm_count__
---+-----+-------+---------------------+-------------------+-------------------+-----------------+---------------
1 | 10 | 1 | 10.0000000000000000 | 1 | 1 | 10 | 1
2 | 20 | 1 | 20.0000000000000000 | 1 | 1 | 20 | 1
3 | 30 | 1 | 30.0000000000000000 | 1 | 1 | 30 | 1
4 | 40 | 1 | 40.0000000000000000 | 1 | 1 | 40 | 1
5 | 50 | 1 | 50.0000000000000000 | 1 | 1 | 50 | 1
(5 rows)
ROLLBACK;
-- not support MIN(), MAX() aggregate functions
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function min(integer) is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function max(integer) is not supported on incrementally maintainable materialized view
-- support self join view and multiple change on the same table
BEGIN;
CREATE TABLE base_t (i int, v int);
@ -172,7 +337,7 @@ INSERT INTO base_t VALUES (1, 10), (2, 20), (3, 30);
SELECT create_immv('mv_self(v1, v2)',
'SELECT t1.v, t2.v FROM base_t AS t1 JOIN base_t AS t2 ON t1.i = t2.i');
NOTICE: could not create an index on immv "mv_self" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -227,7 +392,7 @@ INSERT INTO base_r VALUES (1, 10), (2, 20), (3, 30);
INSERT INTO base_s VALUES (1, 100), (2, 200), (3, 300);
SELECT create_immv('mv(v1, v2)', 'SELECT r.v, s.v FROM base_r AS r JOIN base_s AS s USING(i)');;
NOTICE: could not create an index on immv "mv" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -301,7 +466,7 @@ CREATE TABLE base_t (i int, v int);
INSERT INTO base_t VALUES (1,10),(2, NULL);
SELECT create_immv('mv', 'SELECT * FROM base_t');
NOTICE: could not create an index on immv "mv" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -328,7 +493,7 @@ BEGIN;
CREATE TABLE base_t (i int);
SELECT create_immv('mv', 'SELECT * FROM base_t');
NOTICE: could not create an index on immv "mv" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -389,7 +554,7 @@ CREATE TABLE t_mytype (x mytype);
SELECT create_immv('mv_mytype',
'SELECT * FROM t_mytype');
NOTICE: could not create an index on immv "mv_mytype" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -434,6 +599,9 @@ ERROR: ORDER BY clause is not supported on incrementally maintainable materiali
-- contain HAVING
SELECT create_immv('mv_ivm08', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) GROUP BY i,j,k HAVING SUM(i) > 5');
ERROR: HAVING clause is not supported on incrementally maintainable materialized view
-- contain GROUP BY without aggregate
SELECT create_immv('mv_ivm08', 'SELECT i,j FROM mv_base_a GROUP BY i,j');
ERROR: GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view
-- contain view or materialized view
CREATE VIEW b_view AS SELECT i,k FROM mv_base_b;
CREATE MATERIALIZED VIEW b_mview AS SELECT i,k FROM mv_base_b;
@ -507,7 +675,7 @@ GRANT ALL on num_tbl TO PUBLIC;
SET SESSION AUTHORIZATION ivm_user;
SELECT create_immv('ivm_rls', 'SELECT * FROM rls_tbl');
NOTICE: could not create an index on immv "ivm_rls" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
@ -532,7 +700,7 @@ SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
SELECT create_immv('ivm_rls2', 'SELECT * FROM rls_tbl JOIN num_tbl USING(id)');
NOTICE: could not create an index on immv "ivm_rls2" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain DISTINCT clause.
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------

438
matview.c
View file

@ -19,6 +19,7 @@
#include "catalog/heap.h"
#include "catalog/pg_trigger.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
#include "executor/execdesc.h"
@ -27,6 +28,7 @@
#include "executor/tstoreReceiver.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
@ -39,6 +41,7 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/fmgrprotos.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -50,6 +53,32 @@
#define MV_INIT_QUERYHASHSIZE 16
/* MV query type codes */
#define MV_PLAN_RECALC 1
#define MV_PLAN_SET_VALUE 2
/*
* MI_QueryKey
*
* The key identifying a prepared SPI plan in our query hashtable
*/
typedef struct MV_QueryKey
{
Oid matview_id; /* OID of materialized view */
int32 query_type; /* query type ID, see MV_PLAN_XXX above */
} MV_QueryKey;
/*
* MV_QueryHashEntry
*
* Hash entry for cached plans used to maintain materialized views.
*/
typedef struct MV_QueryHashEntry
{
MV_QueryKey key;
SPIPlanPtr plan;
} MV_QueryHashEntry;
/*
* MV_TriggerHashEntry
*
@ -86,8 +115,16 @@ typedef struct MV_TriggerTable
RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */
} MV_TriggerTable;
static HTAB *mv_query_cache = NULL;
static HTAB *mv_trigger_info = NULL;
/* kind of IVM operation for the view */
typedef enum
{
IVM_ADD,
IVM_SUB
} IvmOp;
/* ENR name for materialized view delta */
#define NEW_DELTA_ENRNAME "new_delta"
#define OLD_DELTA_ENRNAME "old_delta"
@ -114,7 +151,7 @@ static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *tabl
QueryEnvironment *queryEnv);
static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix,
QueryEnvironment *queryEnv);
static Query *rewrite_query_for_distinct(Query *query, ParseState *pstate);
static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate);
static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
DestReceiver *dest_old, DestReceiver *dest_new,
@ -125,14 +162,27 @@ static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *
static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
TupleDesc tupdesc_old, TupleDesc tupdesc_new,
Query *query, bool use_count, char *count_colname);
static void append_set_clause_for_count(const char *resname, StringInfo buf_old,
StringInfo buf_new,StringInfo aggs_list);
static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list);
static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
const char *aggtype);
static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
const char* count_col, const char *castType);
static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
const char* count_col);
static void apply_old_delta(const char *matviewname, const char *deltaname_old,
List *keys);
static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
List *keys, const char *count_colname);
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname);
static void apply_new_delta(const char *matviewname, const char *deltaname_new,
StringInfo target_list);
static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
List *keys, StringInfo target_list, const char* count_colname);
List *keys, StringInfo target_list, StringInfo aggs_set,
const char* count_colname);
static char *get_matching_condition_string(List *keys);
static void generate_equal(StringInfo querybuf, Oid opttype,
const char *leftop, const char *rightop);
@ -820,8 +870,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
entry->xid, entry->cid,
pstate);
/* Rewrite for DISTINCT clause */
rewritten = rewrite_query_for_distinct(rewritten, pstate);
/* Rewrite for DISTINCT clause and aggregates functions */
rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate);
/* Create tuplestores to store view deltas */
if (entry->has_old)
@ -885,7 +935,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
count_colname = pstrdup("__ivm_count__");
if (query->distinctClause)
if (query->hasAggs || query->distinctClause)
use_count = true;
/* calculate delta tables */
@ -1256,17 +1306,34 @@ union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix,
}
/*
* rewrite_query_for_distinct
* rewrite_query_for_distinct_and_aggregates
*
* Rewrite query for counting DISTINCT clause.
* Rewrite query for counting DISTINCT clause and aggregate functions.
*/
static Query *
rewrite_query_for_distinct(Query *query, ParseState *pstate)
rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate)
{
TargetEntry *tle_count;
FuncCall *fn;
Node *node;
/* For aggregate views */
if (query->hasAggs)
{
ListCell *lc;
List *aggs = NIL;
AttrNumber next_resno = list_length(query->targetList) + 1;
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Aggref))
makeIvmAggColumn(pstate, (Aggref *)tle->expr, tle->resname, &next_resno, &aggs);
}
query->targetList = list_concat(query->targetList, aggs);
}
/* Add count(*) for counting distinct tuples in views */
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000)
fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
@ -1339,6 +1406,8 @@ rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte
return query;
}
#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_")
/*
* apply_delta
*
@ -1352,13 +1421,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
{
StringInfoData querybuf;
StringInfoData target_list_buf;
StringInfo aggs_list_buf = NULL;
StringInfo aggs_set_old = NULL;
StringInfo aggs_set_new = NULL;
Relation matviewRel;
char *matviewname;
ListCell *lc;
int i;
List *keys = NIL;
/*
* get names of the materialized view and delta tables
*/
@ -1374,6 +1445,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
initStringInfo(&querybuf);
initStringInfo(&target_list_buf);
if (query->hasAggs)
{
if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
aggs_set_old = makeStringInfo();
if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
aggs_set_new = makeStringInfo();
aggs_list_buf = makeStringInfo();
}
/* build string of target list */
for (i = 0; i < matviewRel->rd_att->natts; i++)
{
@ -1390,13 +1470,61 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i);
char *resname = NameStr(attr->attname);
i++;
if (tle->resjunk)
continue;
keys = lappend(keys, attr);
/*
* For views without aggregates, all attributes are used as keys to identify a
* tuple in a view.
*/
if (!query->hasAggs)
keys = lappend(keys, attr);
/* For views with aggregates, we need to build SET clause for updating aggregate
* values. */
if (query->hasAggs && IsA(tle->expr, Aggref))
{
Aggref *aggref = (Aggref *) tle->expr;
const char *aggname = get_func_name(aggref->aggfnoid);
/*
* We can use function names here because it is already checked if these
* can be used in IMMV by its OID at the definition time.
*/
/* count */
if (!strcmp(aggname, "count"))
append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
/* sum */
else if (!strcmp(aggname, "sum"))
append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
/* avg */
else if (!strcmp(aggname, "avg"))
append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
format_type_be(aggref->aggtype));
else
elog(ERROR, "unsupported aggregate function: %s", aggname);
}
}
/* If we have GROUP BY clause, we use its entries as keys. */
if (query->hasAggs && query->groupClause)
{
foreach (lc, query->groupClause)
{
SortGroupClause *sgcl = (SortGroupClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(sgcl, query->targetList);
Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
keys = lappend(keys, attr);
}
}
/* Start maintaining the materialized view. */
@ -1425,12 +1553,12 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
elog(ERROR, "SPI_register failed");
if (use_count)
/* apply old delta and get rows to be recalculated */
/* apply old delta */
apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME,
keys, count_colname);
keys, aggs_list_buf, aggs_set_old,
count_colname);
else
apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys);
}
/* For tuple insertion */
if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
@ -1453,7 +1581,7 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
/* apply new delta */
if (use_count)
apply_new_delta_with_count(matviewname, NEW_DELTA_ENRNAME,
keys, &target_list_buf, count_colname);
keys, aggs_set_new, &target_list_buf, count_colname);
else
apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf);
}
@ -1468,20 +1596,272 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
elog(ERROR, "SPI_finish failed");
}
/*
* append_set_clause_for_count
*
* Append SET clause string for count aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_count(const char *resname, StringInfo buf_old,
StringInfo buf_new,StringInfo aggs_list)
{
/* For tuple deletion */
if (buf_old)
{
/* resname = mv.resname - t.resname */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL));
}
/* For tuple insertion */
if (buf_new)
{
/* resname = mv.resname + diff.resname */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL));
}
appendStringInfo(aggs_list, ", %s",
quote_qualified_identifier("diff", resname)
);
}
/*
* append_set_clause_for_sum
*
* Append SET clause string for sum aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_sum(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list)
{
char *count_col = IVM_colname("count", resname);
/* For tuple deletion */
if (buf_old)
{
/* sum = mv.sum - t.sum */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL)
);
/* count = mv.count - t.count */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
}
/* For tuple insertion */
if (buf_new)
{
/* sum = mv.sum + diff.sum */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL)
);
/* count = mv.count + diff.count */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
}
appendStringInfo(aggs_list, ", %s, %s",
quote_qualified_identifier("diff", resname),
quote_qualified_identifier("diff", IVM_colname("count", resname))
);
}
/*
* append_set_clause_for_avg
*
* Append SET clause string for avg aggregation to given buffers.
* Also, append resnames required for calculating the aggregate value.
*/
static void
append_set_clause_for_avg(const char *resname, StringInfo buf_old,
StringInfo buf_new, StringInfo aggs_list,
const char *aggtype)
{
char *sum_col = IVM_colname("sum", resname);
char *count_col = IVM_colname("count", resname);
/* For tuple deletion */
if (buf_old)
{
/* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */
appendStringInfo(buf_old,
", %s = %s OPERATOR(pg_catalog./) %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
/* sum = mv.sum - t.sum */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, sum_col),
get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL)
);
/* count = mv.count - t.count */
appendStringInfo(buf_old,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
);
}
/* For tuple insertion */
if (buf_new)
{
/* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */
appendStringInfo(buf_new,
", %s = %s OPERATOR(pg_catalog./) %s",
quote_qualified_identifier(NULL, resname),
get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
/* sum = mv.sum + diff.sum */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, sum_col),
get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL)
);
/* count = mv.count + diff.count */
appendStringInfo(buf_new,
", %s = %s",
quote_qualified_identifier(NULL, count_col),
get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
);
}
appendStringInfo(aggs_list, ", %s, %s, %s",
quote_qualified_identifier("diff", resname),
quote_qualified_identifier("diff", IVM_colname("sum", resname)),
quote_qualified_identifier("diff", IVM_colname("count", resname))
);
}
/*
* get_operation_string
*
* Build a string to calculate the new aggregate values.
*/
static char *
get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
const char* count_col, const char *castType)
{
StringInfoData buf;
StringInfoData castString;
char *col1 = quote_qualified_identifier(arg1, col);
char *col2 = quote_qualified_identifier(arg2, col);
char op_char = (op == IVM_SUB ? '-' : '+');
initStringInfo(&buf);
initStringInfo(&castString);
if (castType)
appendStringInfo(&castString, "::%s", castType);
if (!count_col)
{
/*
* If the attributes don't have count columns then calc the result
* by using the operator simply.
*/
appendStringInfo(&buf, "(%s OPERATOR(pg_catalog.%c) %s)%s",
col1, op_char, col2, castString.data);
}
else
{
/*
* If the attributes have count columns then consider the condition
* where the result becomes NULL.
*/
char *null_cond = get_null_condition_string(op, arg1, arg2, count_col);
appendStringInfo(&buf,
"(CASE WHEN %s THEN NULL "
"WHEN %s IS NULL THEN %s "
"WHEN %s IS NULL THEN %s "
"ELSE (%s OPERATOR(pg_catalog.%c) %s)%s END)",
null_cond,
col1, col2,
col2, col1,
col1, op_char, col2, castString.data
);
}
return buf.data;
}
/*
* get_null_condition_string
*
* Build a predicate string for CASE clause to check if an aggregate value
* will became NULL after the given operation is applied.
*/
static char *
get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
const char* count_col)
{
StringInfoData null_cond;
initStringInfo(&null_cond);
switch (op)
{
case IVM_ADD:
appendStringInfo(&null_cond,
"%s OPERATOR(pg_catalog.=) 0 AND %s OPERATOR(pg_catalog.=) 0",
quote_qualified_identifier(arg1, count_col),
quote_qualified_identifier(arg2, count_col)
);
break;
case IVM_SUB:
appendStringInfo(&null_cond,
"%s OPERATOR(pg_catalog.=) %s",
quote_qualified_identifier(arg1, count_col),
quote_qualified_identifier(arg2, count_col)
);
break;
default:
elog(ERROR,"unknown operation");
}
return null_cond.data;
}
/*
* apply_old_delta_with_count
*
* Execute a query for applying a delta table given by deltname_old
* which contains tuples to be deleted from to a materialized view given by
* matviewname. This is used when counting is required, that is, the view
* has aggregate or distinct.
* has aggregate or distinct. Also, when a table in EXISTS sub queries
* is modified.
*
* If the view desn't have aggregates or has GROUP BY, this requires a keys
* list to identify a tuple in the view. If the view has aggregates, this
* requires strings representing resnames of aggregates and SET clause for
* updating aggregate values.
*/
static void
apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
List *keys, const char *count_colname)
List *keys, StringInfo aggs_list, StringInfo aggs_set,
const char *count_colname)
{
StringInfoData querybuf;
char *match_cond;
bool agg_without_groupby = (list_length(keys) == 0);
/* build WHERE condition for searching tuples to be deleted */
match_cond = get_matching_condition_string(keys);
@ -1491,22 +1871,26 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
appendStringInfo(&querybuf,
"WITH t AS (" /* collecting tid of target tuples in the view */
"SELECT diff.%s, " /* count column */
"(diff.%s OPERATOR(pg_catalog.=) mv.%s) AS for_dlt, "
"(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, "
"mv.ctid "
"%s " /* aggregate columns */
"FROM %s AS mv, %s AS diff "
"WHERE %s" /* tuple matching condition */
"), updt AS (" /* update a tuple if this is not to be deleted */
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
"%s" /* SET clauses for aggregates */
"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
")"
/* delete a tuple if this is to be deleted */
"DELETE FROM %s AS mv USING t "
"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt",
count_colname,
count_colname, count_colname,
count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
(aggs_list != NULL ? aggs_list->data : ""),
matviewname, deltaname_old,
match_cond,
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
matviewname);
if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
@ -1570,10 +1954,15 @@ apply_old_delta(const char *matviewname, const char *deltaname_old,
* matviewname. This is used when counting is required, that is, the view
* has aggregate or distinct. Also, when a table in EXISTS sub queries
* is modified.
*
* If the view desn't have aggregates or has GROUP BY, this requires a keys
* list to identify a tuple in the view. If the view has aggregates, this
* requires strings representing SET clause for updating aggregate values.
*/
static void
apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
List *keys, StringInfo target_list, const char* count_colname)
List *keys, StringInfo aggs_set, StringInfo target_list,
const char* count_colname)
{
StringInfoData querybuf;
StringInfoData returning_keys;
@ -1604,6 +1993,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
appendStringInfo(&querybuf,
"WITH updt AS (" /* update a tuple if this exists in the view */
"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s "
"%s " /* SET clauses for aggregates */
"FROM %s AS diff "
"WHERE %s " /* tuple matching condition */
"RETURNING %s" /* returning keys of updated tuples */
@ -1611,6 +2001,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
"SELECT %s FROM %s AS diff "
"WHERE NOT EXISTS (SELECT 1 FROM updt AS mv WHERE %s);",
matviewname, count_colname, count_colname, count_colname,
(aggs_set != NULL ? aggs_set->data : ""),
deltaname_new,
match_cond,
returning_keys.data,
@ -1718,6 +2109,13 @@ mv_InitHashTables(void)
{
HASHCTL ctl;
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(MV_QueryKey);
ctl.entrysize = sizeof(MV_QueryHashEntry);
mv_query_cache = hash_create("MV query cache",
MV_INIT_QUERYHASHSIZE,
&ctl, HASH_ELEM | HASH_BLOBS);
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(MV_TriggerHashEntry);

View file

@ -22,8 +22,8 @@
#define Natts_pg_ivm_immv 3
#define Anum_pg_ivm_immv_immvrelid 1
#define Anum_pg_ivm_immv_ispopulated 2
#define Anum_pg_ivm_immv_viewdef 3
#define Anum_pg_ivm_immv_viewdef 2
#define Anum_pg_ivm_immv_ispopulated 3
/* pg_ivm.c */
@ -39,6 +39,7 @@ extern ObjectAddress ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid, bool is_create);
extern void CreateIndexOnIMMV(Query *query, Relation matviewRel, bool is_create);
extern Query *rewriteQueryForIMMV(Query *query, List *colNames);
extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs);
/* matview.c */
@ -49,5 +50,8 @@ extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);
extern void AtAbort_IVM(void);
extern bool isIvmName(const char *s);
extern Datum pg_get_viewdef(PG_FUNCTION_ARGS);
#endif

View file

@ -51,8 +51,54 @@ SELECT * FROM mv_ivm_duplicate ORDER BY 1;
SELECT * FROM mv_ivm_distinct ORDER BY 1;
ROLLBACK;
-- not support SUM(), COUNT() and AVG() aggregate functions
-- support SUM(), COUNT() and AVG() aggregate functions
BEGIN;
SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(i), AVG(j) FROM mv_base_a GROUP BY i');
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
INSERT INTO mv_base_a VALUES(2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
UPDATE mv_base_a SET j = 200 WHERE (i,j) = (2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
DELETE FROM mv_base_a WHERE (i,j) = (2,200);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3,4;
ROLLBACK;
-- support COUNT(*) aggregate function
BEGIN;
SELECT create_immv('mv_ivm_agg', 'SELECT i, SUM(j), COUNT(*) FROM mv_base_a GROUP BY i');
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3;
INSERT INTO mv_base_a VALUES(2,100);
SELECT * FROM mv_ivm_agg ORDER BY 1,2,3;
ROLLBACK;
-- support aggregate functions without GROUP clause
BEGIN;
SELECT create_immv('mv_ivm_group', 'SELECT SUM(j), COUNT(j), AVG(j) FROM mv_base_a');
SELECT * FROM mv_ivm_group ORDER BY 1;
INSERT INTO mv_base_a VALUES(6,60);
SELECT * FROM mv_ivm_group ORDER BY 1;
DELETE FROM mv_base_a;
SELECT * FROM mv_ivm_group ORDER BY 1;
ROLLBACK;
-- resolved issue: When use AVG() function and values is indivisible, result of AVG() is incorrect.
BEGIN;
SELECT create_immv('mv_ivm_avg_bug', 'SELECT i, SUM(j), COUNT(j), AVG(j) FROM mv_base_A GROUP BY i');
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
INSERT INTO mv_base_a VALUES
(1,0),
(1,0),
(2,30),
(2,30);
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
DELETE FROM mv_base_a WHERE (i,j) = (1,0);
DELETE FROM mv_base_a WHERE (i,j) = (2,30);
SELECT * FROM mv_ivm_avg_bug ORDER BY 1,2,3;
ROLLBACK;
-- not support MIN(), MAX() aggregate functions
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i');
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
-- support self join view and multiple change on the same table
BEGIN;
@ -191,6 +237,8 @@ SELECT create_immv('mv_ivm05', 'SELECT i,j, (SELECT k FROM mv_base_b b WHERE a.i
SELECT create_immv('mv_ivm07', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) ORDER BY i,j,k');
-- contain HAVING
SELECT create_immv('mv_ivm08', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) GROUP BY i,j,k HAVING SUM(i) > 5');
-- contain GROUP BY without aggregate
SELECT create_immv('mv_ivm08', 'SELECT i,j FROM mv_base_a GROUP BY i,j');
-- contain view or materialized view
CREATE VIEW b_view AS SELECT i,k FROM mv_base_b;