This commit is contained in:
Adam Guo 2025-05-20 22:20:22 -04:00 committed by GitHub
commit a00ef7df6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 629 additions and 72 deletions

View file

@ -9,7 +9,8 @@ OBJS = \
matview.o \
pg_ivm.o \
ruleutils.o \
subselect.o
subselect.o \
event_trigger.o
PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL"
EXTENSION = pg_ivm
@ -18,7 +19,7 @@ DATA = pg_ivm--1.0.sql \
pg_ivm--1.3--1.4.sql pg_ivm--1.4--1.5.sql pg_ivm--1.5--1.6.sql \
pg_ivm--1.6--1.7.sql pg_ivm--1.7--1.8.sql pg_ivm--1.8--1.9.sql \
pg_ivm--1.9--1.10.sql \
pg_ivm--1.10.sql
pg_ivm--1.10.sql pg_ivm--1.11.sql
REGRESS = pg_ivm create_immv refresh_immv

View file

@ -41,6 +41,7 @@
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h"
@ -61,8 +62,8 @@ typedef struct
} DR_intorel;
/* utility functions for IMMV definition creation */
static ObjectAddress create_immv_internal(List *attrList, IntoClause *into);
static ObjectAddress create_immv_nodata(List *tlist, IntoClause *into);
static ImmvAddress create_immv_internal(List *attrList, IntoClause *into);
static ImmvAddress create_immv_nodata(List *tlist, IntoClause *into);
typedef struct
{
@ -73,15 +74,15 @@ typedef struct
int sublevels_up; /* (current) nesting depth */
} check_ivm_restriction_context;
static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, ImmvAddress immv_addr,
Relids *relids, bool ex_lock);
static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock);
static void CreateIvmTrigger(Oid relOid, ImmvAddress immv_addr, int16 type, int16 timing, bool ex_lock);
static void check_ivm_restriction(Node *node);
static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList);
static bool check_aggregate_supports_ivm(Oid aggfnoid);
static void StoreImmvQuery(Oid viewOid, Query *viewQuery);
static void StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 140000)
static bool CreateTableAsRelExists(CreateTableAsStmt *ctas);
@ -95,14 +96,15 @@ static bool CreateTableAsRelExists(CreateTableAsStmt *ctas);
*
* This imitates PostgreSQL's create_ctas_internal().
*/
static ObjectAddress
static ImmvAddress
create_immv_internal(List *attrList, IntoClause *into)
{
CreateStmt *create = makeNode(CreateStmt);
char relkind;
Datum toast_options;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
ObjectAddress intoRelationAddr;
ImmvAddress immv_addr;
pg_uuid_t *immv_uuid;
/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
/* relkind of IMMV must be RELKIND_RELATION */
@ -127,7 +129,14 @@ create_immv_internal(List *attrList, IntoClause *into)
* Create the relation. (This will error out if there's an existing view,
* so we don't need more code to complain if "replace" is false.)
*/
intoRelationAddr = DefineRelation(create, relkind, InvalidOid, NULL, NULL);
immv_addr.address = DefineRelation(create, relkind, InvalidOid, NULL, NULL);
/*
* Generate the IMMV UUID.
* TODO: check for hash collision
*/
immv_uuid = DatumGetUUIDP(DirectFunctionCall1(gen_random_uuid, (Datum) NULL));
memcpy(&immv_addr.immv_uuid, immv_uuid, sizeof(*immv_uuid));
pfree(immv_uuid);
/*
* If necessary, create a TOAST table for the target table. Note that
@ -145,13 +154,13 @@ create_immv_internal(List *attrList, IntoClause *into)
(void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
NewRelationCreateToastTable(intoRelationAddr.objectId, toast_options);
NewRelationCreateToastTable(immv_addr.address.objectId, toast_options);
/* Create the "view" part of an IMMV. */
StoreImmvQuery(intoRelationAddr.objectId, (Query *) into->viewQuery);
StoreImmvQuery(immv_addr, (Query *) into->viewQuery);
CommandCounterIncrement();
return intoRelationAddr;
return immv_addr;
}
/*
@ -162,7 +171,7 @@ create_immv_internal(List *attrList, IntoClause *into)
*
* This imitates PostgreSQL's create_ctas_nodata().
*/
static ObjectAddress
static ImmvAddress
create_immv_nodata(List *tlist, IntoClause *into)
{
List *attrList;
@ -239,7 +248,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
Query *query = castNode(Query, stmt->query);
IntoClause *into = stmt->into;
bool do_refresh = false;
ObjectAddress address;
ImmvAddress immv_addr;
/* Check if the relation exists or not */
if (CreateTableAsRelExists(stmt))
@ -274,7 +283,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
* similar to CREATE VIEW. This avoids dump/restore problems stemming
* from running the planner before all dependencies are set up.
*/
address = create_immv_nodata(query->targetList, into);
immv_addr = create_immv_nodata(query->targetList, into);
/*
* For materialized views, reuse the REFRESH logic, which locks down
@ -285,18 +294,18 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
{
Relation matviewRel;
RefreshImmvByOid(address.objectId, true, false, pstate->p_sourcetext, qc);
RefreshImmvByOid(immv_addr, true, false, pstate->p_sourcetext, qc);
if (qc)
qc->commandTag = CMDTAG_SELECT;
matviewRel = table_open(address.objectId, NoLock);
matviewRel = table_open(immv_addr.address.objectId, NoLock);
/* Create an index on incremental maintainable materialized view, if possible */
CreateIndexOnIMMV(query, matviewRel);
/* Create triggers to prevent IMMV from being changed */
CreateChangePreventTrigger(address.objectId);
CreateChangePreventTrigger(immv_addr.address.objectId);
table_close(matviewRel, NoLock);
@ -308,7 +317,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
"or execute refresh_immv to make sure the view is consistent.")));
}
return address;
return immv_addr.address;
}
/*
@ -547,7 +556,7 @@ makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *
* CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables
*/
void
CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid)
CreateIvmTriggersOnBaseTables(Query *qry, ImmvAddress immv_addr)
{
Relids relids = NULL;
bool ex_lock = false;
@ -581,13 +590,13 @@ CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid)
qry->distinctClause || (qry->hasAggs && qry->groupClause))
ex_lock = true;
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, immv_addr, &relids, ex_lock);
bms_free(relids);
}
static void
CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, ImmvAddress immv_addr,
Relids *relids, bool ex_lock)
{
if (node == NULL)
@ -603,12 +612,12 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
Query *query = (Query *) node;
ListCell *lc;
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *) query->jointree, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *) query->jointree, immv_addr, relids, ex_lock);
foreach(lc, query->cteList)
{
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
Assert(IsA(cte->ctequery, Query));
CreateIvmTriggersOnBaseTablesRecurse((Query *) cte->ctequery, cte->ctequery, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse((Query *) cte->ctequery, cte->ctequery, immv_addr, relids, ex_lock);
}
}
break;
@ -620,14 +629,14 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
if (rte->rtekind == RTE_RELATION && !bms_is_member(rte->relid, *relids))
{
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock);
CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true);
*relids = bms_add_member(*relids, rte->relid);
}
@ -635,7 +644,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
{
Query *subquery = rte->subquery;
Assert(rte->subquery != NULL);
CreateIvmTriggersOnBaseTablesRecurse(subquery, (Node *) subquery, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(subquery, (Node *) subquery, immv_addr, relids, ex_lock);
}
}
break;
@ -646,7 +655,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
ListCell *l;
foreach(l, f->fromlist)
CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), immv_addr, relids, ex_lock);
}
break;
@ -654,8 +663,8 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
{
JoinExpr *j = (JoinExpr *) node;
CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, matviewOid, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, immv_addr, relids, ex_lock);
CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, immv_addr, relids, ex_lock);
}
break;
@ -668,7 +677,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid,
* CreateIvmTrigger -- create IVM trigger on a base table
*/
static void
CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock)
CreateIvmTrigger(Oid relOid, ImmvAddress immv_addr, int16 type, int16 timing, bool ex_lock)
{
ObjectAddress refaddr;
ObjectAddress address;
@ -678,7 +687,7 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
Assert(timing == TRIGGER_TYPE_BEFORE || timing == TRIGGER_TYPE_AFTER);
refaddr.classId = RelationRelationId;
refaddr.objectId = viewOid;
refaddr.objectId = immv_addr.address.objectId;
refaddr.objectSubId = 0;
ivm_trigger = makeNode(CreateTrigStmt);
@ -740,6 +749,8 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE)
ex_lock = true;
ivm_trigger->trigname = psprintf("%s_%d_%d", ivm_trigger->trigname, relOid,
immv_addr.address.objectId);
ivm_trigger->funcname =
(timing == TRIGGER_TYPE_BEFORE ?
PgIvmFuncName("IVM_immediate_before") : PgIvmFuncName("IVM_immediate_maintenance"));
@ -752,12 +763,12 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
ivm_trigger->initdeferred = false;
ivm_trigger->constrrel = NULL;
ivm_trigger->args = list_make2(
makeString(DatumGetPointer(DirectFunctionCall1(oidout, ObjectIdGetDatum(viewOid)))),
makeString(DatumGetPointer(DirectFunctionCall1(uuid_out, UUIDPGetDatum(&immv_addr.immv_uuid)))),
makeString(DatumGetPointer(DirectFunctionCall1(boolout, BoolGetDatum(ex_lock))))
);
address = CreateTrigger(ivm_trigger, NULL, relOid, InvalidOid, InvalidOid,
InvalidOid, InvalidOid, InvalidOid, NULL, true, false);
InvalidOid, InvalidOid, InvalidOid, NULL, false, false);
recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO);
@ -1707,22 +1718,37 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList)
* Store the query for the IMMV to pg_ivm_immv
*/
static void
StoreImmvQuery(Oid viewOid, Query *viewQuery)
StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery)
{
char *querytree = nodeToString((Node *) viewQuery);
char *querystring;
int save_nestlevel;
Datum values[Natts_pg_ivm_immv];
bool isNulls[Natts_pg_ivm_immv];
Relation matviewRel;
Relation pgIvmImmv;
TupleDesc tupleDescriptor;
HeapTuple heapTuple;
ObjectAddress address;
/*
* Restrict search_path so that pg_ivm_get_viewdef_internal returns a
* fully-qualified query.
*/
save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();
matviewRel = table_open(immv_addr.address.objectId, AccessShareLock);
querystring = pg_ivm_get_viewdef_internal(viewQuery, matviewRel, true);
table_close(matviewRel, NoLock);
/* Roll back the search_path change. */
AtEOXact_GUC(false, save_nestlevel);
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid);
values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(immv_addr.address.objectId);
values[Anum_pg_ivm_immv_immvuuid -1 ] = UUIDPGetDatum(&immv_addr.immv_uuid);
values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(querystring);
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(false);
values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree);
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);
@ -1732,7 +1758,7 @@ StoreImmvQuery(Oid viewOid, Query *viewQuery)
CatalogTupleInsert(pgIvmImmv, heapTuple);
address.classId = RelationRelationId;
address.objectId = viewOid;
address.objectId = immv_addr.address.objectId;
address.objectSubId = 0;
recordDependencyOnExpr(&address, (Node *) viewQuery, NIL,

278
event_trigger.c Normal file
View file

@ -0,0 +1,278 @@
#include "postgres.h"
#include "pg_ivm.h"
#include "access/genam.h"
#include "access/table.h"
#include "catalog/indexing.h"
#include "commands/event_trigger.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#define IMMV_INIT_QUERYHASHSIZE 16
/*
* This file defines the DDL event triggers that maintain pgivm.pg_ivm_immv.
*
* Every time an ALTER RENAME statement is executed, the save_query_strings
* pre-event trigger will parse each IMMV's query string and store the Query
* nodes in a hash table. Then, the restore_query_strings post-event trigger
* will take each Query node and re-create the query string.
*
* This allows the querystring column to remain up to date when any objects are
* renamed.
*
* TODO: This uses a brute force approach to checking if re-parse is necessary.
* Even if an unrelated object is renamed, these event triggers still fire. Can
* figure out a more precise heuristic.
*/
typedef struct HashEntry
{
Oid immv_relid;
char *query;
} HashEntry;
/* Stores the Query nodes of each IMMV view query as they were pre-DDL. */
static HTAB *immv_query_cache;
/* Active snapshot at the time of the pre-DDL trigger. */
static Snapshot active_snapshot;
static char *retrieve_query(Oid immv_relid);
static void hash_query(Oid immv_relid, char *query);
static void initialize_query_cache(void);
static void restore_query_strings_internal(void);
static void save_query_strings_internal(void);
PG_FUNCTION_INFO_V1(save_query_strings);
PG_FUNCTION_INFO_V1(restore_query_strings);
/*
* Pre-DDL event trigger function.
*/
Datum
save_query_strings(PG_FUNCTION_ARGS)
{
EventTriggerData *trigdata;
if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
ereport(ERROR, errmsg("not fired by event trigger manager"));
trigdata = (EventTriggerData *) fcinfo->context;
if (!IsA(trigdata->parsetree, RenameStmt))
PG_RETURN_NULL();
initialize_query_cache();
active_snapshot = GetActiveSnapshot();
save_query_strings_internal();
PG_RETURN_NULL();
}
/*
* Post-DDL event trigger function.
*/
Datum
restore_query_strings(PG_FUNCTION_ARGS)
{
EventTriggerData *trigdata;
if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
ereport(ERROR, errmsg("not fired by event trigger manager"));
trigdata = (EventTriggerData *) fcinfo->context;
if (!IsA(trigdata->parsetree, RenameStmt))
PG_RETURN_NULL();
restore_query_strings_internal();
PG_RETURN_NULL();
}
/*
* Initialize the hash table.
*/
static void
initialize_query_cache(void)
{
HASHCTL ctl;
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(HashEntry);
immv_query_cache = hash_create("IMMV query cache", IMMV_INIT_QUERYHASHSIZE,
&ctl, HASH_ELEM | HASH_BLOBS);
}
/*
* Store a query node in the hash table.
*/
static void
hash_query(Oid immv_relid, char *query)
{
HashEntry *entry;
MemoryContext old_cxt;
bool found;
entry = (HashEntry *) hash_search(immv_query_cache,
&immv_relid,
HASH_ENTER,
&found);
Assert(!found);
old_cxt = MemoryContextSwitchTo(TopTransactionContext);
entry->query = pstrdup(query);
MemoryContextSwitchTo(old_cxt);
}
/*
* Retrieve a query node from the hash table.
*/
static char *
retrieve_query(Oid immv_relid)
{
HashEntry *entry;
MemoryContext old_cxt;
bool found;
char *query;
entry = (HashEntry *) hash_search(immv_query_cache,
&immv_relid,
HASH_FIND,
&found);
Assert(found);
old_cxt = MemoryContextSwitchTo(TopTransactionContext);
query = pstrdup(entry->query);
pfree(entry->query);
MemoryContextSwitchTo(old_cxt);
return query;
}
/*
* Iterate through the pg_ivm_immv table and parse the querystring of each
* entry. Store the query nodes in the hash table.
*/
static void
save_query_strings_internal(void)
{
HeapTuple tup;
Relation pg_ivm_immv_rel;
SysScanDesc scan;
TupleDesc tupdesc;
pg_ivm_immv_rel = table_open(PgIvmImmvRelationId(), RowExclusiveLock);
scan = systable_beginscan(pg_ivm_immv_rel, PgIvmImmvPrimaryKeyIndexId(),
true, active_snapshot, 0, NULL);
tupdesc = RelationGetDescr(pg_ivm_immv_rel);
while ((tup = systable_getnext(scan)) != NULL)
{
Oid matview_oid;
ParseState *parse_state;
Query *query;
bool isnull;
char *matview_relname;
char *query_string;
query_string = TextDatumGetCString(heap_getattr(tup,
Anum_pg_ivm_immv_querystring,
tupdesc, &isnull));
Assert(!isnull);
matview_oid = DatumGetObjectId(heap_getattr(tup, Anum_pg_ivm_immv_immvrelid,
tupdesc, &isnull));
Assert(!isnull);
matview_relname = psprintf("%s.%s",
get_namespace_name(get_rel_namespace(matview_oid)),
get_rel_name(matview_oid));
/* Parse the existing IMMV query pre-DDL. Add it to the list. */
parse_immv_query(matview_relname, query_string, &query, &parse_state);
query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery;
/* Store entry for this IMMV. */
hash_query(matview_oid, nodeToString(query));
}
systable_endscan(scan);
table_close(pg_ivm_immv_rel, NoLock);
}
/*
* Iterate through the pg_ivm_immv table. For each entry, get its Query node
* from the hash table and reconstruct the query string. Update the row entry
* with the reconstructed query string.
*/
static void
restore_query_strings_internal(void)
{
HeapTuple tup;
Relation pg_ivm_immv_rel;
SysScanDesc scan;
TupleDesc tupdesc;
int save_nestlevel;
pg_ivm_immv_rel = table_open(PgIvmImmvRelationId(), RowExclusiveLock);
scan = systable_beginscan(pg_ivm_immv_rel, PgIvmImmvPrimaryKeyIndexId(),
true, active_snapshot, 0, NULL);
tupdesc = RelationGetDescr(pg_ivm_immv_rel);
/*
* Restrict search_path so that pg_ivm_get_viewdef_internal returns a
* fully-qualified query.
*/
save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();
while ((tup = systable_getnext(scan)) != NULL)
{
Datum values[Natts_pg_ivm_immv];
HeapTuple newtup;
Oid matview_oid;
Query *query;
Relation matview_rel;
bool isnull;
bool nulls[Natts_pg_ivm_immv];
bool replaces[Natts_pg_ivm_immv];
char *new_query_string;
char *serialized_query;
matview_oid = DatumGetObjectId(heap_getattr(tup, Anum_pg_ivm_immv_immvrelid,
tupdesc, &isnull));
Assert(!isnull);
serialized_query = retrieve_query(matview_oid);
query = stringToNode(serialized_query);
matview_rel = table_open(matview_oid, AccessShareLock);
new_query_string = pg_ivm_get_viewdef_internal(query, matview_rel, true);
table_close(matview_rel, NoLock);
memset(values, 0, sizeof(values));
values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(new_query_string);
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(nulls));
replaces[Anum_pg_ivm_immv_querystring - 1] = true;
newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces);
CatalogTupleUpdate(pg_ivm_immv_rel, &newtup->t_self, newtup);
heap_freetuple(newtup);
}
systable_endscan(scan);
table_close(pg_ivm_immv_rel, NoLock);
hash_destroy(immv_query_cache);
/* Roll back the search_path change. */
AtEOXact_GUC(false, save_nestlevel);
}

View file

@ -50,6 +50,7 @@
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/typcache.h"
#include "utils/uuid.h"
#include "utils/xid8.h"
#include "pg_ivm.h"
@ -233,7 +234,6 @@ static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort,
static void setLastUpdateXid(Oid immv_oid, FullTransactionId xid);
static FullTransactionId getLastUpdateXid(Oid immv_oid);
/* SQL callable functions */
PG_FUNCTION_INFO_V1(IVM_immediate_before);
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
@ -250,6 +250,7 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData,
{
Oid matviewOid;
LOCKMODE lockmode;
ImmvAddress immv_addr;
/* Determine strength of lock needed. */
//concurrent = stmt->concurrent;
@ -271,7 +272,12 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData,
NULL);
#endif
return RefreshImmvByOid(matviewOid, false, skipData, queryString, qc);
immv_addr.address.classId = RelationRelationId;
immv_addr.address.objectId = matviewOid;
immv_addr.address.objectSubId = 0;
// TODO: get uuid
return RefreshImmvByOid(immv_addr, false, skipData, queryString, qc);
}
/*
@ -282,7 +288,7 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData,
* This imitates PostgreSQL's RefreshMatViewByOid().
*/
ObjectAddress
RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
RefreshImmvByOid(ImmvAddress immv_addr, bool is_create, bool skipData,
const char *queryString, QueryCompletion *qc)
{
Relation matviewRel;
@ -299,6 +305,7 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
int save_nestlevel;
ObjectAddress address;
bool oldPopulated;
Oid matviewOid = immv_addr.address.objectId;
Relation pgIvmImmv;
TupleDesc tupdesc;
@ -443,7 +450,8 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
tgform = (Form_pg_trigger) GETSTRUCT(tgtup);
/* If trigger is created by IMMV, delete it. */
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0)
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0 ||
strncmp(NameStr(tgform->tgname), "IVM_prevent_", 12) == 0)
{
obj.classId = foundDep->classid;
obj.objectId = foundDep->objid;
@ -473,7 +481,7 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
* is created.
*/
if (!skipData && !oldPopulated)
CreateIvmTriggersOnBaseTables(dataQuery, matviewOid);
CreateIvmTriggersOnBaseTables(dataQuery, immv_addr);
/*
* Create the transient table that will receive the regenerated data. Lock
@ -686,6 +694,9 @@ get_immv_query(Relation matviewRel)
bool isnull;
Datum datum;
Query *query;
char *querystring;
char *relname;
ParseState *parse_state;
ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid,
@ -703,9 +714,16 @@ get_immv_query(Relation matviewRel)
return NULL;
}
datum = heap_getattr(tup, Anum_pg_ivm_immv_viewdef, tupdesc, &isnull);
datum = heap_getattr(tup, Anum_pg_ivm_immv_querystring, tupdesc, &isnull);
Assert(!isnull);
query = (Query *) stringToNode(TextDatumGetCString(datum));
querystring = TextDatumGetCString(datum);
relname = psprintf("%s.%s",
get_namespace_name(get_rel_namespace(matviewRel->rd_id)),
get_rel_name(matviewRel->rd_id));
parse_immv_query(relname, querystring, &query, &parse_state);
query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery;
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
@ -746,15 +764,17 @@ Datum
IVM_immediate_before(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
char *matviewOid_text = trigdata->tg_trigger->tgargs[0];
char *immv_uuid_text = trigdata->tg_trigger->tgargs[0];
char *ex_lock_text = trigdata->tg_trigger->tgargs[1];
pg_uuid_t *immv_uuid;
Oid matviewOid;
MV_TriggerHashEntry *entry;
bool found;
bool ex_lock;
matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text)));
immv_uuid = DatumGetUUIDP(DirectFunctionCall1(uuid_in, (CStringGetDatum(immv_uuid_text))));
ex_lock = DatumGetBool(DirectFunctionCall1(boolin, CStringGetDatum(ex_lock_text)));
matviewOid = GetImmvRelid(immv_uuid);
/* If the view has more than one tables, we have to use an exclusive lock. */
if (ex_lock)
@ -869,10 +889,11 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
Oid matviewOid;
Query *query;
Query *rewritten = NULL;
char *matviewOid_text = trigdata->tg_trigger->tgargs[0];
char *immv_uuid_text = trigdata->tg_trigger->tgargs[0];
Relation matviewRel;
int old_depth = immv_maintenance_depth;
SubTransactionId subxid;
pg_uuid_t *immv_uuid;
Oid relowner;
Tuplestorestate *old_tuplestore = NULL;
@ -900,7 +921,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
rel = trigdata->tg_relation;
relid = rel->rd_id;
matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text)));
immv_uuid = DatumGetUUIDP(DirectFunctionCall1(uuid_in, (CStringGetDatum(immv_uuid_text))));
matviewOid = GetImmvRelid(immv_uuid);
/*
* On the first call initialize the hashtable

78
pg_ivm--1.11.sql Normal file
View file

@ -0,0 +1,78 @@
CREATE SCHEMA pgivm;
-- catalog
CREATE TABLE pgivm.pg_ivm_immv(
immvrelid regclass NOT NULL,
immvuuid uuid NOT NULL,
querystring text NOT NULL,
ispopulated bool NOT NULL,
lastivmupdate xid8,
CONSTRAINT pg_ivm_immv_pkey PRIMARY KEY (immvrelid),
CONSTRAINT pg_ivm_immv_uuid UNIQUE (immvuuid)
);
SELECT pg_catalog.pg_extension_config_dump('pgivm.pg_ivm_immv', '');
-- functions
CREATE FUNCTION pgivm.create_immv(text, text)
RETURNS bigint
STRICT
AS 'MODULE_PATHNAME', 'create_immv'
LANGUAGE C;
CREATE FUNCTION pgivm.refresh_immv(text, bool)
RETURNS bigint
STRICT
AS 'MODULE_PATHNAME', 'refresh_immv'
LANGUAGE C;
CREATE FUNCTION pgivm.get_immv_def(IN immvrelid regclass)
RETURNS text
STRICT
AS 'MODULE_PATHNAME', 'get_immv_def'
LANGUAGE C;
CREATE FUNCTION pgivm.ivm_visible_in_prestate(oid, tid, oid)
RETURNS bool
STABLE
AS 'MODULE_PATHNAME', 'ivm_visible_in_prestate'
LANGUAGE C;
-- trigger functions
CREATE FUNCTION pgivm."IVM_immediate_before"()
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_before'
LANGUAGE C;
CREATE FUNCTION pgivm."IVM_immediate_maintenance"()
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_maintenance'
LANGUAGE C;
CREATE FUNCTION pgivm."IVM_prevent_immv_change"()
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_prevent_immv_change'
LANGUAGE C;
GRANT SELECT ON TABLE pgivm.pg_ivm_immv TO PUBLIC;
GRANT USAGE ON SCHEMA pgivm TO PUBLIC;
-- event triggers
CREATE FUNCTION pgivm.save_query_strings() RETURNS event_trigger
AS 'MODULE_PATHNAME', 'save_query_strings' LANGUAGE C;
CREATE FUNCTION pgivm.restore_query_strings() RETURNS event_trigger
AS 'MODULE_PATHNAME', 'restore_query_strings' LANGUAGE C;
CREATE EVENT TRIGGER save_query_strings
ON ddl_command_start
EXECUTE FUNCTION pgivm.save_query_strings();
CREATE EVENT TRIGGER restore_query_strings
ON ddl_command_end
EXECUTE FUNCTION pgivm.restore_query_strings();

133
pg_ivm.c
View file

@ -32,6 +32,7 @@
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h"
#include "utils/uuid.h"
#include "utils/varlena.h"
#include "pg_ivm.h"
@ -177,17 +178,39 @@ create_immv(PG_FUNCTION_ARGS)
text *t_sql = PG_GETARG_TEXT_PP(1);
char *relname = text_to_cstring(t_relname);
char *sql = text_to_cstring(t_sql);
Query *query = NULL;
QueryCompletion qc;
ParseState *pstate = NULL;
parse_immv_query(relname, sql, &query, &pstate);
ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc);
PG_RETURN_INT64(qc.nprocessed);
}
/*
* parse_immv_query
*
* Parse an IMMV definition query and return the Query tree and ParseState using
* the supplied pointers.
*/
void
parse_immv_query(const char *relname, const char *sql, Query **query_ret,
ParseState **pstate_ret)
{
List *parsetree_list;
RawStmt *parsetree;
Query *query;
QueryCompletion qc;
List *names = NIL;
List *colNames = NIL;
ParseState *pstate = make_parsestate(NULL);
CreateTableAsStmt *ctas;
StringInfoData command_buf;
Query *query;
ParseState *pstate;
pstate = make_parsestate(NULL);
parseNameAndColumns(relname, &names, &colNames);
initStringInfo(&command_buf);
@ -230,9 +253,8 @@ create_immv(PG_FUNCTION_ARGS)
query = transformStmt(pstate, (Node *)ctas);
Assert(query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt));
ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc);
PG_RETURN_INT64(qc.nprocessed);
*query_ret = query;
*pstate_ret = pstate;
}
/*
@ -298,7 +320,6 @@ CreateChangePreventTrigger(Oid matviewOid)
ivm_trigger->row = false;
ivm_trigger->timing = TRIGGER_TYPE_BEFORE;
ivm_trigger->trigname = "IVM_prevent_immv_change";
ivm_trigger->funcname = PgIvmFuncName("IVM_prevent_immv_change");
ivm_trigger->columns = NIL;
ivm_trigger->transitionRels = NIL;
@ -312,8 +333,10 @@ CreateChangePreventTrigger(Oid matviewOid)
for (i = 0; i < 4; i++)
{
ivm_trigger->events = types[i];
ivm_trigger->trigname = psprintf("IVM_prevent_immv_change_%d_%d",
matviewOid, i + 1);
address = CreateTrigger(ivm_trigger, NULL, matviewOid, InvalidOid, InvalidOid,
InvalidOid, InvalidOid, InvalidOid, NULL, true, false);
InvalidOid, InvalidOid, InvalidOid, NULL, false, false);
recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO);
}
@ -344,6 +367,17 @@ PgIvmImmvPrimaryKeyIndexId(void)
AccessShareLock, true);
}
/*
* Get relid of pg_ivm_immv's UUID unique key.
*/
Oid
PgIvmImmvUuidIndexId(void)
{
return RangeVarGetRelid(
makeRangeVar("pgivm", "pg_ivm_immv_uuid", -1),
AccessShareLock, true);
}
/*
* Return the SELECT part of an IMMV
*/
@ -458,3 +492,86 @@ PgIvmFuncName(char *name)
{
return list_make2(makeString("pgivm"), makeString(name));
}
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000)
void
RestrictSearchPath(void)
{
set_config_option("search_path", "pg_catalog, pg_temp", PGC_USERSET,
PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false);
}
#endif
/*
* GetImmvOid
*
* Look up the immvrelid of an IMMV from its immv_uuid.
*/
Oid
GetImmvRelid(pg_uuid_t *immv_uuid)
{
Datum datum;
HeapTuple tup;
Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock);
ScanKeyData key;
SysScanDesc scan;
TupleDesc tupdesc = RelationGetDescr(pgIvmImmv);
bool isnull;
ScanKeyInit(&key, Anum_pg_ivm_immv_immvuuid, BTEqualStrategyNumber,
F_UUID_EQ, UUIDPGetDatum(immv_uuid));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvUuidIndexId(), true, NULL, 1,
&key);
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
{
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return InvalidOid;
}
datum = heap_getattr(tup, Anum_pg_ivm_immv_immvrelid, tupdesc, &isnull);
Assert(!isnull);
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return DatumGetObjectId(datum);
}
/*
* GetImmvUuid
*
* Look up the immv_uuid of an IMMV from its immvrelid.
*/
pg_uuid_t *
GetImmvUuid(Oid immvrelid)
{
Datum datum;
HeapTuple tup;
Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock);
ScanKeyData key;
SysScanDesc scan;
TupleDesc tupdesc = RelationGetDescr(pgIvmImmv);
bool isnull;
ScanKeyInit(&key, Anum_pg_ivm_immv_immvrelid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(immvrelid));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(),
true, NULL, 1, &key);
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
{
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return NULL;
}
datum = heap_getattr(tup, Anum_pg_ivm_immv_immvuuid, tupdesc, &isnull);
Assert(!isnull);
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return DatumGetUUIDP(datum);
}

View file

@ -1,6 +1,6 @@
# incremental view maintenance extension_
comment = 'incremental view maintenance on PostgreSQL'
default_version = '1.10'
default_version = '1.11'
module_pathname = '$libdir/pg_ivm'
relocatable = false
schema = pg_catalog

View file

@ -19,27 +19,45 @@
#include "parser/parse_node.h"
#include "tcop/dest.h"
#include "utils/queryenvironment.h"
#include "utils/uuid.h"
#define Natts_pg_ivm_immv 4
#define Natts_pg_ivm_immv 5
#define Anum_pg_ivm_immv_immvrelid 1
#define Anum_pg_ivm_immv_viewdef 2
#define Anum_pg_ivm_immv_ispopulated 3
#define Anum_pg_ivm_immv_lastivmupdate 4
#define Anum_pg_ivm_immv_immvuuid 2
#define Anum_pg_ivm_immv_querystring 3
#define Anum_pg_ivm_immv_ispopulated 4
#define Anum_pg_ivm_immv_lastivmupdate 5
/*
* This struct uniquely identifies in IMMV. It consists of an ObjectAddress
* (OID of the IMMV table and its schema) as well as a pg_uuid_t (which is
* stored in the pg_ivm_immv table).
*/
typedef struct ImmvAddress
{
ObjectAddress address;
pg_uuid_t immv_uuid;
} ImmvAddress;
/* pg_ivm.c */
extern void CreateChangePreventTrigger(Oid matviewOid);
extern Oid PgIvmImmvRelationId(void);
extern Oid PgIvmImmvPrimaryKeyIndexId(void);
extern Oid PgIvmImmvUuidIndexId(void);
extern bool isImmv(Oid immv_oid);
extern List *PgIvmFuncName(char *name);
extern void parse_immv_query(const char *relname, const char *sql,
Query **query_ret, ParseState **pstate_ret);
extern Oid GetImmvRelid(pg_uuid_t *immv_uuid);
extern pg_uuid_t *GetImmvUuid(Oid immvrelid);
/* createas.c */
extern ObjectAddress ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
QueryCompletion *qc);
extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid);
extern void CreateIvmTriggersOnBaseTables(Query *qry, ImmvAddress immv_addr);
extern void CreateIndexOnIMMV(Query *query, Relation matviewRel);
extern Query *rewriteQueryForIMMV(Query *query, List *colNames);
extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs);
@ -49,7 +67,7 @@ extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname,
extern Query *get_immv_query(Relation matviewRel);
extern ObjectAddress ExecRefreshImmv(const RangeVar *relation, bool skipData,
const char *queryString, QueryCompletion *qc);
extern ObjectAddress RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
extern ObjectAddress RefreshImmvByOid(ImmvAddress immv_addr, bool is_create, bool skipData,
const char *queryString, QueryCompletion *qc);
extern bool ImmvIncrementalMaintenanceIsEnabled(void);
extern Datum IVM_immediate_before(PG_FUNCTION_ARGS);
@ -64,8 +82,18 @@ extern bool isIvmName(const char *s);
/* ruleutils.c */
extern char *pg_ivm_get_viewdef(Relation immvrel, bool pretty);
extern char *pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty);
/* subselect.c */
extern void inline_cte(PlannerInfo *root, CommonTableExpr *cte);
/* event_trigger.c */
extern Datum save_query_string(PG_FUNCTION_ARGS);
extern Datum restore_query_string(PG_FUNCTION_ARGS);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000)
extern void RestrictSearchPath(void);
#endif
#endif

View file

@ -43,6 +43,13 @@ char *
pg_ivm_get_viewdef(Relation immvrel, bool pretty)
{
Query *query = get_immv_query(immvrel);
return pg_ivm_get_viewdef_internal(query, immvrel, pretty);
}
char *
pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty)
{
TupleDesc resultDesc = RelationGetDescr(immvrel);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 150000)