Support simple subquery (#17)

A simple subquery in FROM clause is supported.
DISTINCT and aggregate functions are not supported in subquery.
This commit is contained in:
thoshiai 2022-07-25 09:24:51 +09:00 committed by GitHub
parent 75ee63b99a
commit 790b0d2bd6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 40 deletions

View file

@ -64,6 +64,8 @@ typedef struct
typedef struct
{
bool has_agg;
bool has_subquery;
int sublevels_up;
} check_ivm_restriction_context;
static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
@ -525,6 +527,12 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
*relids = bms_add_member(*relids, rte->relid);
}
else if (rte->rtekind == RTE_SUBQUERY)
{
Query *subquery = rte->subquery;
Assert(rte->subquery != NULL);
CreateIvmTriggersOnBaseTablesRecurse(subquery, (Node *)subquery, matviewOid, relids, ex_lock);
}
}
break;
@ -646,7 +654,7 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
static void
check_ivm_restriction(Node *node)
{
check_ivm_restriction_context context = {false};
check_ivm_restriction_context context = {false, false};
check_ivm_restriction_walker(node, &context);
}
@ -713,10 +721,11 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("FOR UPDATE/SHARE clause is not supported on incrementally maintainable materialized view")));
if (qry->hasSubLinks)
if (qry->hasSubLinks && context->sublevels_up > 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("subquery is not supported on incrementally maintainable materialized view")));
errmsg("this query is not allowed on incrementally maintainable materialized view"),
errhint("Only simple subquery is supported")));
/* system column restrictions */
vars = pull_vars_of_level((Node *) qry, 0);
@ -732,6 +741,15 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
errmsg("system column is not supported on incrementally maintainable materialized view")));
}
}
/* subquery restrictions */
if (context->sublevels_up > 0 && qry->distinctClause != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DISTINCT clause in nested query are not supported on incrementally maintainable materialized view")));
if (context->sublevels_up > 0 && qry->hasAggs)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate functions in nested query are not supported on incrementally maintainable materialized view")));
context->has_agg |= qry->hasAggs;
@ -776,10 +794,13 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES is not supported on incrementally maintainable materialized view")));
if (rte->rtekind == RTE_SUBQUERY)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("subquery is not supported on incrementally maintainable materialized view")));
{
context->has_subquery = true;
context->sublevels_up++;
check_ivm_restriction_walker((Node *)rte->subquery, context);
context->sublevels_up--;
}
}
query_tree_walker(qry, check_ivm_restriction_walker, (void *) context, QTW_IGNORE_RANGE_TABLE);
@ -798,6 +819,12 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("expression containing an aggregate in it is not supported on incrementally maintainable materialized view")));
if (IsA(tle->expr, SubLink))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("this query is not allowed on incrementally maintainable materialized view"),
errhint("subquery is not supported in targetlist")));
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;
}
@ -840,6 +867,15 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname)));
break;
}
case T_SubLink:
{
/* Now, EXISTS clause is supported only */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("this query is not allowed on incrementally maintainable materialized view"),
errhint("Only simple subquery is supported")));
break;
}
default:
expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
break;

View file

@ -393,6 +393,59 @@ SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY
ERROR: aggregate function min(integer) is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
ERROR: aggregate function max(integer) is not supported on incrementally maintainable materialized view
-- support subquery in FROM clause
BEGIN;
SELECT create_immv('mv_ivm_subquery01', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT * FROM mv_base_b) b WHERE a.i = b.i');
NOTICE: could not create an index on immv "mv_ivm_subquery01" automatically
DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
HINT: Create an index on the immv for efficient incremental maintenance.
create_immv
-------------
4
(1 row)
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
i | j
---+----
1 | 10
2 | 20
3 | 30
4 | 40
(4 rows)
INSERT INTO mv_base_b VALUES(5,105);
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
i | j
---+----
1 | 10
2 | 20
3 | 30
4 | 40
5 | 50
(5 rows)
UPDATE mv_base_a SET j = 0 WHERE i = 1;
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
i | j
---+----
1 | 0
2 | 20
3 | 30
4 | 40
5 | 50
(5 rows)
DELETE FROM mv_base_b WHERE (i,k) = (5,105);
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
i | j
---+----
1 | 0
2 | 20
3 | 30
4 | 40
(4 rows)
ROLLBACK;
-- support self join view and multiple change on the same table
BEGIN;
CREATE TABLE base_t (i int, v int);
@ -651,11 +704,13 @@ SELECT create_immv('mv_ivm04', 'SELECT i,j,xidsend(xmin) AS x_min FROM mv_base_a
ERROR: system column is not supported on incrementally maintainable materialized view
-- contain subquery
SELECT create_immv('mv_ivm03', 'SELECT i,j FROM mv_base_a WHERE i IN (SELECT i FROM mv_base_b WHERE k < 103 )');
ERROR: subquery is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm04', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT * FROM mv_base_b) b WHERE a.i = b.i');
ERROR: subquery is not supported on incrementally maintainable materialized view
ERROR: this query is not allowed on incrementally maintainable materialized view
HINT: Only simple subquery is supported
SELECT create_immv('mv_ivm04', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT DISTINCT i FROM mv_base_b) b WHERE a.i = b.i');
ERROR: DISTINCT clause in nested query are not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm05', 'SELECT i,j, (SELECT k FROM mv_base_b b WHERE a.i = b.i) FROM mv_base_a a');
ERROR: subquery is not supported on incrementally maintainable materialized view
ERROR: this query is not allowed on incrementally maintainable materialized view
HINT: subquery is not supported in targetlist
-- contain ORDER BY
SELECT create_immv('mv_ivm07', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) ORDER BY i,j,k');
ERROR: ORDER BY clause is not supported on incrementally maintainable materialized view
@ -673,7 +728,7 @@ ERROR: VIEW or MATERIALIZED VIEW is not supported on incrementally maintainable
SELECT create_immv('mv_ivm08', 'SELECT a.i,a.j FROM mv_base_a a,b_mview b WHERE a.i = b.i');
ERROR: VIEW or MATERIALIZED VIEW is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm09', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT i, COUNT(*) FROM mv_base_b GROUP BY i) b WHERE a.i = b.i');
ERROR: subquery is not supported on incrementally maintainable materialized view
ERROR: aggregate functions in nested query are not supported on incrementally maintainable materialized view
-- contain mutable functions
SELECT create_immv('mv_ivm12', 'SELECT i,j FROM mv_base_a WHERE i = random()::int');
ERROR: mutable function is not supported on incrementally maintainable materialized view

View file

@ -111,7 +111,7 @@ typedef struct MV_TriggerTable
List *old_rtes; /* RTEs of ENRs for old_tuplestores*/
List *new_rtes; /* RTEs of ENRs for new_tuplestores */
List *rte_indexes; /* List of RTE index of the modified table */
List *rte_paths; /* List of paths to RTE index of the modified table */
RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */
} MV_TriggerTable;
@ -143,7 +143,7 @@ static Query *get_immv_query(Relation matviewRel);
static Query *rewrite_query_for_preupdate_state(Query *query, List *tables,
TransactionId xid, CommandId cid,
ParseState *pstate);
ParseState *pstate, List *rte_path);
static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables);
static char *make_delta_enr_name(const char *prefix, Oid relid, int count);
static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
@ -153,11 +153,12 @@ static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes,
QueryEnvironment *queryEnv);
static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate);
static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
static void calc_delta(MV_TriggerTable *table, List *rte_path, Query *query,
DestReceiver *dest_old, DestReceiver *dest_new,
TupleDesc *tupdesc_old, TupleDesc *tupdesc_new,
QueryEnvironment *queryEnv);
static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index);
static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, List *rte_path);
static ListCell *getRteListCell(Query *query, List *rte_path);
static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
TupleDesc tupdesc_old, TupleDesc tupdesc_new,
@ -795,7 +796,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
table->new_tuplestores = NIL;
table->old_rtes = NIL;
table->new_rtes = NIL;
table->rte_indexes = NIL;
table->rte_paths = NIL;
entry->tables = lappend(entry->tables, table);
MemoryContextSwitchTo(oldcxt);
@ -955,7 +956,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
/* Set all tables in the query to pre-update state */
rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
entry->xid, entry->cid,
pstate);
pstate, NIL);
/* Rewrite for DISTINCT clause and aggregates functions */
rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate);
@ -1011,9 +1012,9 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
table = (MV_TriggerTable *) lfirst(lc);
/* loop for self-join */
foreach(lc2, table->rte_indexes)
foreach(lc2, table->rte_paths)
{
int rte_index = lfirst_int(lc2);
List *rte_path = lfirst(lc2);
TupleDesc tupdesc_old;
TupleDesc tupdesc_new;
bool use_count = false;
@ -1025,11 +1026,11 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
use_count = true;
/* calculate delta tables */
calc_delta(table, rte_index, rewritten, dest_old, dest_new,
calc_delta(table, rte_path, rewritten, dest_old, dest_new,
&tupdesc_old, &tupdesc_new, queryEnv);
/* Set the table in the query to post-update state */
rewritten = rewrite_query_for_postupdate_state(rewritten, table, rte_index);
rewritten = rewrite_query_for_postupdate_state(rewritten, table, rte_path);
PG_TRY();
{
@ -1091,15 +1092,18 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
static Query*
rewrite_query_for_preupdate_state(Query *query, List *tables,
TransactionId xid, CommandId cid,
ParseState *pstate)
ParseState *pstate, List *rte_path)
{
ListCell *lc;
int num_rte = list_length(query->rtable);
int i;
/* This can recurse, so check for excessive recursion */
check_stack_depth();
/* register delta ENRs */
register_delta_ENRs(pstate, query, tables);
/* register delta ENRs only one at first call */
if (rte_path == NIL)
register_delta_ENRs(pstate, query, tables);
/* XXX: Is necessary? Is this right timing? */
AcquireRewriteLocks(query, true, false);
@ -1109,19 +1113,25 @@ rewrite_query_for_preupdate_state(Query *query, List *tables,
{
RangeTblEntry *r = (RangeTblEntry*) lfirst(lc);
ListCell *lc2;
foreach(lc2, tables)
/* if rte contains subquery, search recursively */
if (r->rtekind == RTE_SUBQUERY)
rewrite_query_for_preupdate_state(r->subquery, tables, xid, cid, pstate, lappend_int(list_copy(rte_path), i));
else
{
MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc2);
/*
* if the modified table is found then replace the original RTE with
* "pre-state" RTE and append its index to the list.
*/
if (r->relid == table->table_id)
ListCell *lc2;
foreach(lc2, tables)
{
lfirst(lc) = get_prestate_rte(r, table, xid, cid, pstate->p_queryEnv);
table->rte_indexes = lappend_int(table->rte_indexes, i);
break;
MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc2);
/*
* if the modified table is found then replace the original RTE with
* "pre-state" RTE and append its index to the list.
*/
if (r->relid == table->table_id)
{
lfirst(lc) = get_prestate_rte(r, table, xid, cid, pstate->p_queryEnv);
table->rte_paths = lappend(table->rte_paths, lappend_int(list_copy(rte_path), i));
break;
}
}
}
@ -1449,12 +1459,12 @@ rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate)
* by the RTE index.
*/
static void
calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
calc_delta(MV_TriggerTable *table, List *rte_path, Query *query,
DestReceiver *dest_old, DestReceiver *dest_new,
TupleDesc *tupdesc_old, TupleDesc *tupdesc_new,
QueryEnvironment *queryEnv)
{
ListCell *lc = list_nth_cell(query->rtable, rte_index - 1);
ListCell *lc = getRteListCell(query, rte_path);
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
/* Generate old delta */
@ -1482,9 +1492,9 @@ calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
* calculation due to changes on this table finishes.
*/
static Query*
rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index)
rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, List *rte_path)
{
ListCell *lc = list_nth_cell(query->rtable, rte_index - 1);
ListCell *lc = getRteListCell(query, rte_path);
/* Retore the original RTE */
lfirst(lc) = table->original_rte;
@ -1492,6 +1502,32 @@ rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte
return query;
}
/*
* getRteListCell
*
* Get ListCell which contains RTE specified by the given path.
*/
static ListCell*
getRteListCell(Query *query, List *rte_path)
{
ListCell *lc;
ListCell *rte_lc = NULL;
Assert(list_length(rte_path) > 0);
foreach (lc, rte_path)
{
int index = lfirst_int(lc);
RangeTblEntry *rte;
rte_lc = list_nth_cell(query->rtable, index - 1);
rte = (RangeTblEntry *) lfirst(rte_lc);
if (rte != NULL && rte->rtekind == RTE_SUBQUERY)
query = rte->subquery;
}
return rte_lc;
}
#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_")
/*

View file

@ -127,6 +127,18 @@ ROLLBACK;
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MIN(j) FROM mv_base_a GROUP BY i');
SELECT create_immv('mv_ivm_min_max', 'SELECT i, MAX(j) FROM mv_base_a GROUP BY i');
-- support subquery in FROM clause
BEGIN;
SELECT create_immv('mv_ivm_subquery01', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT * FROM mv_base_b) b WHERE a.i = b.i');
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
INSERT INTO mv_base_b VALUES(5,105);
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
UPDATE mv_base_a SET j = 0 WHERE i = 1;
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
DELETE FROM mv_base_b WHERE (i,k) = (5,105);
SELECT * FROM mv_ivm_subquery01 ORDER BY 1,2;
ROLLBACK;
-- support self join view and multiple change on the same table
BEGIN;
CREATE TABLE base_t (i int, v int);
@ -258,7 +270,7 @@ SELECT create_immv('mv_ivm04', 'SELECT i,j,xidsend(xmin) AS x_min FROM mv_base_a
-- contain subquery
SELECT create_immv('mv_ivm03', 'SELECT i,j FROM mv_base_a WHERE i IN (SELECT i FROM mv_base_b WHERE k < 103 )');
SELECT create_immv('mv_ivm04', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT * FROM mv_base_b) b WHERE a.i = b.i');
SELECT create_immv('mv_ivm04', 'SELECT a.i,a.j FROM mv_base_a a, (SELECT DISTINCT i FROM mv_base_b) b WHERE a.i = b.i');
SELECT create_immv('mv_ivm05', 'SELECT i,j, (SELECT k FROM mv_base_b b WHERE a.i = b.i) FROM mv_base_a a');
-- contain ORDER BY
SELECT create_immv('mv_ivm07', 'SELECT i,j,k FROM mv_base_a a INNER JOIN mv_base_b b USING(i) ORDER BY i,j,k');