diff --git a/Makefile b/Makefile index 9f770f6..47aba28 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,8 @@ OBJS = \ matview.o \ pg_ivm.o \ ruleutils.o \ - subselect.o + subselect.o \ + event_trigger.o PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL" EXTENSION = pg_ivm @@ -18,7 +19,7 @@ DATA = pg_ivm--1.0.sql \ pg_ivm--1.3--1.4.sql pg_ivm--1.4--1.5.sql pg_ivm--1.5--1.6.sql \ pg_ivm--1.6--1.7.sql pg_ivm--1.7--1.8.sql pg_ivm--1.8--1.9.sql \ pg_ivm--1.9--1.10.sql \ - pg_ivm--1.10.sql + pg_ivm--1.10.sql pg_ivm--1.11.sql REGRESS = pg_ivm create_immv refresh_immv diff --git a/createas.c b/createas.c index b41fd1d..6526369 100644 --- a/createas.c +++ b/createas.c @@ -41,6 +41,7 @@ #include "rewrite/rewriteManip.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/regproc.h" #include "utils/rel.h" @@ -61,8 +62,8 @@ typedef struct } DR_intorel; /* utility functions for IMMV definition creation */ -static ObjectAddress create_immv_internal(List *attrList, IntoClause *into); -static ObjectAddress create_immv_nodata(List *tlist, IntoClause *into); +static ImmvAddress create_immv_internal(List *attrList, IntoClause *into); +static ImmvAddress create_immv_nodata(List *tlist, IntoClause *into); typedef struct { @@ -73,15 +74,15 @@ typedef struct int sublevels_up; /* (current) nesting depth */ } check_ivm_restriction_context; -static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, +static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, ImmvAddress immv_addr, Relids *relids, bool ex_lock); -static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock); +static void CreateIvmTrigger(Oid relOid, ImmvAddress immv_addr, int16 type, int16 timing, bool ex_lock); static void check_ivm_restriction(Node *node); static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context); static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList); static bool check_aggregate_supports_ivm(Oid aggfnoid); -static void StoreImmvQuery(Oid viewOid, Query *viewQuery); +static void StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery); #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 140000) static bool CreateTableAsRelExists(CreateTableAsStmt *ctas); @@ -95,14 +96,15 @@ static bool CreateTableAsRelExists(CreateTableAsStmt *ctas); * * This imitates PostgreSQL's create_ctas_internal(). */ -static ObjectAddress +static ImmvAddress create_immv_internal(List *attrList, IntoClause *into) { CreateStmt *create = makeNode(CreateStmt); char relkind; Datum toast_options; static char *validnsps[] = HEAP_RELOPT_NAMESPACES; - ObjectAddress intoRelationAddr; + ImmvAddress immv_addr; + pg_uuid_t *immv_uuid; /* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */ /* relkind of IMMV must be RELKIND_RELATION */ @@ -127,7 +129,14 @@ create_immv_internal(List *attrList, IntoClause *into) * Create the relation. (This will error out if there's an existing view, * so we don't need more code to complain if "replace" is false.) */ - intoRelationAddr = DefineRelation(create, relkind, InvalidOid, NULL, NULL); + immv_addr.address = DefineRelation(create, relkind, InvalidOid, NULL, NULL); + /* + * Generate the IMMV UUID. + * TODO: check for hash collision + */ + immv_uuid = DatumGetUUIDP(DirectFunctionCall1(gen_random_uuid, (Datum) NULL)); + memcpy(&immv_addr.immv_uuid, immv_uuid, sizeof(*immv_uuid)); + pfree(immv_uuid); /* * If necessary, create a TOAST table for the target table. Note that @@ -145,13 +154,13 @@ create_immv_internal(List *attrList, IntoClause *into) (void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true); - NewRelationCreateToastTable(intoRelationAddr.objectId, toast_options); + NewRelationCreateToastTable(immv_addr.address.objectId, toast_options); /* Create the "view" part of an IMMV. */ - StoreImmvQuery(intoRelationAddr.objectId, (Query *) into->viewQuery); + StoreImmvQuery(immv_addr, (Query *) into->viewQuery); CommandCounterIncrement(); - return intoRelationAddr; + return immv_addr; } /* @@ -162,7 +171,7 @@ create_immv_internal(List *attrList, IntoClause *into) * * This imitates PostgreSQL's create_ctas_nodata(). */ -static ObjectAddress +static ImmvAddress create_immv_nodata(List *tlist, IntoClause *into) { List *attrList; @@ -239,7 +248,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, Query *query = castNode(Query, stmt->query); IntoClause *into = stmt->into; bool do_refresh = false; - ObjectAddress address; + ImmvAddress immv_addr; /* Check if the relation exists or not */ if (CreateTableAsRelExists(stmt)) @@ -274,7 +283,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, * similar to CREATE VIEW. This avoids dump/restore problems stemming * from running the planner before all dependencies are set up. */ - address = create_immv_nodata(query->targetList, into); + immv_addr = create_immv_nodata(query->targetList, into); /* * For materialized views, reuse the REFRESH logic, which locks down @@ -285,18 +294,18 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, { Relation matviewRel; - RefreshImmvByOid(address.objectId, true, false, pstate->p_sourcetext, qc); + RefreshImmvByOid(immv_addr, true, false, pstate->p_sourcetext, qc); if (qc) qc->commandTag = CMDTAG_SELECT; - matviewRel = table_open(address.objectId, NoLock); + matviewRel = table_open(immv_addr.address.objectId, NoLock); /* Create an index on incremental maintainable materialized view, if possible */ CreateIndexOnIMMV(query, matviewRel); /* Create triggers to prevent IMMV from being changed */ - CreateChangePreventTrigger(address.objectId); + CreateChangePreventTrigger(immv_addr.address.objectId); table_close(matviewRel, NoLock); @@ -308,7 +317,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, "or execute refresh_immv to make sure the view is consistent."))); } - return address; + return immv_addr.address; } /* @@ -547,7 +556,7 @@ makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber * * CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables */ void -CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid) +CreateIvmTriggersOnBaseTables(Query *qry, ImmvAddress immv_addr) { Relids relids = NULL; bool ex_lock = false; @@ -581,13 +590,13 @@ CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid) qry->distinctClause || (qry->hasAggs && qry->groupClause)) ex_lock = true; - CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, immv_addr, &relids, ex_lock); bms_free(relids); } static void -CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, +CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, ImmvAddress immv_addr, Relids *relids, bool ex_lock) { if (node == NULL) @@ -603,12 +612,12 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, Query *query = (Query *) node; ListCell *lc; - CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *) query->jointree, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *) query->jointree, immv_addr, relids, ex_lock); foreach(lc, query->cteList) { CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc); Assert(IsA(cte->ctequery, Query)); - CreateIvmTriggersOnBaseTablesRecurse((Query *) cte->ctequery, cte->ctequery, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse((Query *) cte->ctequery, cte->ctequery, immv_addr, relids, ex_lock); } } break; @@ -620,14 +629,14 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, if (rte->rtekind == RTE_RELATION && !bms_is_member(rte->relid, *relids)) { - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock); - CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, immv_addr, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true); *relids = bms_add_member(*relids, rte->relid); } @@ -635,7 +644,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, { Query *subquery = rte->subquery; Assert(rte->subquery != NULL); - CreateIvmTriggersOnBaseTablesRecurse(subquery, (Node *) subquery, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(subquery, (Node *) subquery, immv_addr, relids, ex_lock); } } break; @@ -646,7 +655,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, ListCell *l; foreach(l, f->fromlist) - CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), immv_addr, relids, ex_lock); } break; @@ -654,8 +663,8 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, { JoinExpr *j = (JoinExpr *) node; - CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, matviewOid, relids, ex_lock); - CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, immv_addr, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, immv_addr, relids, ex_lock); } break; @@ -668,7 +677,7 @@ CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, * CreateIvmTrigger -- create IVM trigger on a base table */ static void -CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock) +CreateIvmTrigger(Oid relOid, ImmvAddress immv_addr, int16 type, int16 timing, bool ex_lock) { ObjectAddress refaddr; ObjectAddress address; @@ -678,7 +687,7 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock Assert(timing == TRIGGER_TYPE_BEFORE || timing == TRIGGER_TYPE_AFTER); refaddr.classId = RelationRelationId; - refaddr.objectId = viewOid; + refaddr.objectId = immv_addr.address.objectId; refaddr.objectSubId = 0; ivm_trigger = makeNode(CreateTrigStmt); @@ -740,6 +749,8 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE) ex_lock = true; + ivm_trigger->trigname = psprintf("%s_%d_%d", ivm_trigger->trigname, relOid, + immv_addr.address.objectId); ivm_trigger->funcname = (timing == TRIGGER_TYPE_BEFORE ? PgIvmFuncName("IVM_immediate_before") : PgIvmFuncName("IVM_immediate_maintenance")); @@ -752,12 +763,12 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock ivm_trigger->initdeferred = false; ivm_trigger->constrrel = NULL; ivm_trigger->args = list_make2( - makeString(DatumGetPointer(DirectFunctionCall1(oidout, ObjectIdGetDatum(viewOid)))), + makeString(DatumGetPointer(DirectFunctionCall1(uuid_out, UUIDPGetDatum(&immv_addr.immv_uuid)))), makeString(DatumGetPointer(DirectFunctionCall1(boolout, BoolGetDatum(ex_lock)))) ); address = CreateTrigger(ivm_trigger, NULL, relOid, InvalidOid, InvalidOid, - InvalidOid, InvalidOid, InvalidOid, NULL, true, false); + InvalidOid, InvalidOid, InvalidOid, NULL, false, false); recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO); @@ -1707,22 +1718,37 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList) * Store the query for the IMMV to pg_ivm_immv */ static void -StoreImmvQuery(Oid viewOid, Query *viewQuery) +StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery) { - char *querytree = nodeToString((Node *) viewQuery); + char *querystring; + int save_nestlevel; Datum values[Natts_pg_ivm_immv]; bool isNulls[Natts_pg_ivm_immv]; + Relation matviewRel; Relation pgIvmImmv; TupleDesc tupleDescriptor; HeapTuple heapTuple; ObjectAddress address; + /* + * Restrict search_path so that pg_ivm_get_viewdef_internal returns a + * fully-qualified query. + */ + save_nestlevel = NewGUCNestLevel(); + RestrictSearchPath(); + matviewRel = table_open(immv_addr.address.objectId, AccessShareLock); + querystring = pg_ivm_get_viewdef_internal(viewQuery, matviewRel, true); + table_close(matviewRel, NoLock); + /* Roll back the search_path change. */ + AtEOXact_GUC(false, save_nestlevel); + memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid); + values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(immv_addr.address.objectId); + values[Anum_pg_ivm_immv_immvuuid -1 ] = UUIDPGetDatum(&immv_addr.immv_uuid); + values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(querystring); values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(false); - values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree); pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); @@ -1732,7 +1758,7 @@ StoreImmvQuery(Oid viewOid, Query *viewQuery) CatalogTupleInsert(pgIvmImmv, heapTuple); address.classId = RelationRelationId; - address.objectId = viewOid; + address.objectId = immv_addr.address.objectId; address.objectSubId = 0; recordDependencyOnExpr(&address, (Node *) viewQuery, NIL, diff --git a/event_trigger.c b/event_trigger.c new file mode 100644 index 0000000..c20e92e --- /dev/null +++ b/event_trigger.c @@ -0,0 +1,278 @@ +#include "postgres.h" +#include "pg_ivm.h" + +#include "access/genam.h" +#include "access/table.h" +#include "catalog/indexing.h" +#include "commands/event_trigger.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" + +#define IMMV_INIT_QUERYHASHSIZE 16 + +/* + * This file defines the DDL event triggers that maintain pgivm.pg_ivm_immv. + * + * Every time an ALTER RENAME statement is executed, the save_query_strings + * pre-event trigger will parse each IMMV's query string and store the Query + * nodes in a hash table. Then, the restore_query_strings post-event trigger + * will take each Query node and re-create the query string. + * + * This allows the querystring column to remain up to date when any objects are + * renamed. + * + * TODO: This uses a brute force approach to checking if re-parse is necessary. + * Even if an unrelated object is renamed, these event triggers still fire. Can + * figure out a more precise heuristic. + */ + +typedef struct HashEntry +{ + Oid immv_relid; + char *query; +} HashEntry; + +/* Stores the Query nodes of each IMMV view query as they were pre-DDL. */ +static HTAB *immv_query_cache; + +/* Active snapshot at the time of the pre-DDL trigger. */ +static Snapshot active_snapshot; + +static char *retrieve_query(Oid immv_relid); +static void hash_query(Oid immv_relid, char *query); +static void initialize_query_cache(void); +static void restore_query_strings_internal(void); +static void save_query_strings_internal(void); + +PG_FUNCTION_INFO_V1(save_query_strings); +PG_FUNCTION_INFO_V1(restore_query_strings); + +/* + * Pre-DDL event trigger function. + */ +Datum +save_query_strings(PG_FUNCTION_ARGS) +{ + EventTriggerData *trigdata; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + ereport(ERROR, errmsg("not fired by event trigger manager")); + + trigdata = (EventTriggerData *) fcinfo->context; + + if (!IsA(trigdata->parsetree, RenameStmt)) + PG_RETURN_NULL(); + + initialize_query_cache(); + active_snapshot = GetActiveSnapshot(); + save_query_strings_internal(); + + PG_RETURN_NULL(); +} + +/* + * Post-DDL event trigger function. + */ +Datum +restore_query_strings(PG_FUNCTION_ARGS) +{ + EventTriggerData *trigdata; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + ereport(ERROR, errmsg("not fired by event trigger manager")); + + trigdata = (EventTriggerData *) fcinfo->context; + + if (!IsA(trigdata->parsetree, RenameStmt)) + PG_RETURN_NULL(); + + restore_query_strings_internal(); + + PG_RETURN_NULL(); +} + +/* + * Initialize the hash table. + */ +static void +initialize_query_cache(void) +{ + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(HashEntry); + immv_query_cache = hash_create("IMMV query cache", IMMV_INIT_QUERYHASHSIZE, + &ctl, HASH_ELEM | HASH_BLOBS); +} + +/* + * Store a query node in the hash table. + */ +static void +hash_query(Oid immv_relid, char *query) +{ + HashEntry *entry; + MemoryContext old_cxt; + bool found; + + entry = (HashEntry *) hash_search(immv_query_cache, + &immv_relid, + HASH_ENTER, + &found); + Assert(!found); + + old_cxt = MemoryContextSwitchTo(TopTransactionContext); + entry->query = pstrdup(query); + MemoryContextSwitchTo(old_cxt); +} + +/* + * Retrieve a query node from the hash table. + */ +static char * +retrieve_query(Oid immv_relid) +{ + HashEntry *entry; + MemoryContext old_cxt; + bool found; + char *query; + + entry = (HashEntry *) hash_search(immv_query_cache, + &immv_relid, + HASH_FIND, + &found); + Assert(found); + + old_cxt = MemoryContextSwitchTo(TopTransactionContext); + query = pstrdup(entry->query); + pfree(entry->query); + MemoryContextSwitchTo(old_cxt); + + return query; +} + +/* + * Iterate through the pg_ivm_immv table and parse the querystring of each + * entry. Store the query nodes in the hash table. + */ +static void +save_query_strings_internal(void) +{ + HeapTuple tup; + Relation pg_ivm_immv_rel; + SysScanDesc scan; + TupleDesc tupdesc; + + pg_ivm_immv_rel = table_open(PgIvmImmvRelationId(), RowExclusiveLock); + scan = systable_beginscan(pg_ivm_immv_rel, PgIvmImmvPrimaryKeyIndexId(), + true, active_snapshot, 0, NULL); + tupdesc = RelationGetDescr(pg_ivm_immv_rel); + + while ((tup = systable_getnext(scan)) != NULL) + { + Oid matview_oid; + ParseState *parse_state; + Query *query; + bool isnull; + char *matview_relname; + char *query_string; + + query_string = TextDatumGetCString(heap_getattr(tup, + Anum_pg_ivm_immv_querystring, + tupdesc, &isnull)); + Assert(!isnull); + + matview_oid = DatumGetObjectId(heap_getattr(tup, Anum_pg_ivm_immv_immvrelid, + tupdesc, &isnull)); + Assert(!isnull); + + matview_relname = psprintf("%s.%s", + get_namespace_name(get_rel_namespace(matview_oid)), + get_rel_name(matview_oid)); + + /* Parse the existing IMMV query pre-DDL. Add it to the list. */ + parse_immv_query(matview_relname, query_string, &query, &parse_state); + query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery; + + /* Store entry for this IMMV. */ + hash_query(matview_oid, nodeToString(query)); + } + + systable_endscan(scan); + table_close(pg_ivm_immv_rel, NoLock); +} + +/* + * Iterate through the pg_ivm_immv table. For each entry, get its Query node + * from the hash table and reconstruct the query string. Update the row entry + * with the reconstructed query string. + */ +static void +restore_query_strings_internal(void) +{ + HeapTuple tup; + Relation pg_ivm_immv_rel; + SysScanDesc scan; + TupleDesc tupdesc; + int save_nestlevel; + + pg_ivm_immv_rel = table_open(PgIvmImmvRelationId(), RowExclusiveLock); + scan = systable_beginscan(pg_ivm_immv_rel, PgIvmImmvPrimaryKeyIndexId(), + true, active_snapshot, 0, NULL); + tupdesc = RelationGetDescr(pg_ivm_immv_rel); + + /* + * Restrict search_path so that pg_ivm_get_viewdef_internal returns a + * fully-qualified query. + */ + save_nestlevel = NewGUCNestLevel(); + RestrictSearchPath(); + + while ((tup = systable_getnext(scan)) != NULL) + { + Datum values[Natts_pg_ivm_immv]; + HeapTuple newtup; + Oid matview_oid; + Query *query; + Relation matview_rel; + bool isnull; + bool nulls[Natts_pg_ivm_immv]; + bool replaces[Natts_pg_ivm_immv]; + char *new_query_string; + char *serialized_query; + + matview_oid = DatumGetObjectId(heap_getattr(tup, Anum_pg_ivm_immv_immvrelid, + tupdesc, &isnull)); + Assert(!isnull); + + serialized_query = retrieve_query(matview_oid); + query = stringToNode(serialized_query); + + matview_rel = table_open(matview_oid, AccessShareLock); + new_query_string = pg_ivm_get_viewdef_internal(query, matview_rel, true); + table_close(matview_rel, NoLock); + + memset(values, 0, sizeof(values)); + values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(new_query_string); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(nulls)); + replaces[Anum_pg_ivm_immv_querystring - 1] = true; + + newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces); + + CatalogTupleUpdate(pg_ivm_immv_rel, &newtup->t_self, newtup); + heap_freetuple(newtup); + } + + systable_endscan(scan); + table_close(pg_ivm_immv_rel, NoLock); + hash_destroy(immv_query_cache); + + /* Roll back the search_path change. */ + AtEOXact_GUC(false, save_nestlevel); +} diff --git a/matview.c b/matview.c index 66709b5..8b591e6 100644 --- a/matview.c +++ b/matview.c @@ -50,6 +50,7 @@ #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/typcache.h" +#include "utils/uuid.h" #include "utils/xid8.h" #include "pg_ivm.h" @@ -233,7 +234,6 @@ static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort, static void setLastUpdateXid(Oid immv_oid, FullTransactionId xid); static FullTransactionId getLastUpdateXid(Oid immv_oid); - /* SQL callable functions */ PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); @@ -250,6 +250,7 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData, { Oid matviewOid; LOCKMODE lockmode; + ImmvAddress immv_addr; /* Determine strength of lock needed. */ //concurrent = stmt->concurrent; @@ -271,7 +272,12 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData, NULL); #endif - return RefreshImmvByOid(matviewOid, false, skipData, queryString, qc); + immv_addr.address.classId = RelationRelationId; + immv_addr.address.objectId = matviewOid; + immv_addr.address.objectSubId = 0; + + // TODO: get uuid + return RefreshImmvByOid(immv_addr, false, skipData, queryString, qc); } /* @@ -282,7 +288,7 @@ ExecRefreshImmv(const RangeVar *relation, bool skipData, * This imitates PostgreSQL's RefreshMatViewByOid(). */ ObjectAddress -RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData, +RefreshImmvByOid(ImmvAddress immv_addr, bool is_create, bool skipData, const char *queryString, QueryCompletion *qc) { Relation matviewRel; @@ -299,6 +305,7 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData, int save_nestlevel; ObjectAddress address; bool oldPopulated; + Oid matviewOid = immv_addr.address.objectId; Relation pgIvmImmv; TupleDesc tupdesc; @@ -443,7 +450,8 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData, tgform = (Form_pg_trigger) GETSTRUCT(tgtup); /* If trigger is created by IMMV, delete it. */ - if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0) + if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0 || + strncmp(NameStr(tgform->tgname), "IVM_prevent_", 12) == 0) { obj.classId = foundDep->classid; obj.objectId = foundDep->objid; @@ -473,7 +481,7 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData, * is created. */ if (!skipData && !oldPopulated) - CreateIvmTriggersOnBaseTables(dataQuery, matviewOid); + CreateIvmTriggersOnBaseTables(dataQuery, immv_addr); /* * Create the transient table that will receive the regenerated data. Lock @@ -686,6 +694,9 @@ get_immv_query(Relation matviewRel) bool isnull; Datum datum; Query *query; + char *querystring; + char *relname; + ParseState *parse_state; ScanKeyInit(&key, Anum_pg_ivm_immv_immvrelid, @@ -703,9 +714,16 @@ get_immv_query(Relation matviewRel) return NULL; } - datum = heap_getattr(tup, Anum_pg_ivm_immv_viewdef, tupdesc, &isnull); + datum = heap_getattr(tup, Anum_pg_ivm_immv_querystring, tupdesc, &isnull); Assert(!isnull); - query = (Query *) stringToNode(TextDatumGetCString(datum)); + querystring = TextDatumGetCString(datum); + + relname = psprintf("%s.%s", + get_namespace_name(get_rel_namespace(matviewRel->rd_id)), + get_rel_name(matviewRel->rd_id)); + + parse_immv_query(relname, querystring, &query, &parse_state); + query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery; systable_endscan(scan); table_close(pgIvmImmv, NoLock); @@ -746,15 +764,17 @@ Datum IVM_immediate_before(PG_FUNCTION_ARGS) { TriggerData *trigdata = (TriggerData *) fcinfo->context; - char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + char *immv_uuid_text = trigdata->tg_trigger->tgargs[0]; char *ex_lock_text = trigdata->tg_trigger->tgargs[1]; + pg_uuid_t *immv_uuid; Oid matviewOid; MV_TriggerHashEntry *entry; bool found; bool ex_lock; - matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + immv_uuid = DatumGetUUIDP(DirectFunctionCall1(uuid_in, (CStringGetDatum(immv_uuid_text)))); ex_lock = DatumGetBool(DirectFunctionCall1(boolin, CStringGetDatum(ex_lock_text))); + matviewOid = GetImmvRelid(immv_uuid); /* If the view has more than one tables, we have to use an exclusive lock. */ if (ex_lock) @@ -869,10 +889,11 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) Oid matviewOid; Query *query; Query *rewritten = NULL; - char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + char *immv_uuid_text = trigdata->tg_trigger->tgargs[0]; Relation matviewRel; int old_depth = immv_maintenance_depth; SubTransactionId subxid; + pg_uuid_t *immv_uuid; Oid relowner; Tuplestorestate *old_tuplestore = NULL; @@ -900,7 +921,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) rel = trigdata->tg_relation; relid = rel->rd_id; - matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + immv_uuid = DatumGetUUIDP(DirectFunctionCall1(uuid_in, (CStringGetDatum(immv_uuid_text)))); + matviewOid = GetImmvRelid(immv_uuid); /* * On the first call initialize the hashtable diff --git a/pg_ivm--1.11.sql b/pg_ivm--1.11.sql new file mode 100644 index 0000000..8181acc --- /dev/null +++ b/pg_ivm--1.11.sql @@ -0,0 +1,78 @@ +CREATE SCHEMA pgivm; + +-- catalog + +CREATE TABLE pgivm.pg_ivm_immv( + immvrelid regclass NOT NULL, + immvuuid uuid NOT NULL, + querystring text NOT NULL, + ispopulated bool NOT NULL, + lastivmupdate xid8, + + CONSTRAINT pg_ivm_immv_pkey PRIMARY KEY (immvrelid), + CONSTRAINT pg_ivm_immv_uuid UNIQUE (immvuuid) +); + +SELECT pg_catalog.pg_extension_config_dump('pgivm.pg_ivm_immv', ''); + +-- functions + +CREATE FUNCTION pgivm.create_immv(text, text) +RETURNS bigint +STRICT +AS 'MODULE_PATHNAME', 'create_immv' +LANGUAGE C; + +CREATE FUNCTION pgivm.refresh_immv(text, bool) +RETURNS bigint +STRICT +AS 'MODULE_PATHNAME', 'refresh_immv' +LANGUAGE C; + +CREATE FUNCTION pgivm.get_immv_def(IN immvrelid regclass) +RETURNS text +STRICT +AS 'MODULE_PATHNAME', 'get_immv_def' +LANGUAGE C; + +CREATE FUNCTION pgivm.ivm_visible_in_prestate(oid, tid, oid) +RETURNS bool +STABLE +AS 'MODULE_PATHNAME', 'ivm_visible_in_prestate' +LANGUAGE C; + +-- trigger functions + +CREATE FUNCTION pgivm."IVM_immediate_before"() +RETURNS trigger +AS 'MODULE_PATHNAME', 'IVM_immediate_before' +LANGUAGE C; + +CREATE FUNCTION pgivm."IVM_immediate_maintenance"() +RETURNS trigger +AS 'MODULE_PATHNAME', 'IVM_immediate_maintenance' +LANGUAGE C; + +CREATE FUNCTION pgivm."IVM_prevent_immv_change"() +RETURNS trigger +AS 'MODULE_PATHNAME', 'IVM_prevent_immv_change' +LANGUAGE C; + +GRANT SELECT ON TABLE pgivm.pg_ivm_immv TO PUBLIC; +GRANT USAGE ON SCHEMA pgivm TO PUBLIC; + +-- event triggers + +CREATE FUNCTION pgivm.save_query_strings() RETURNS event_trigger +AS 'MODULE_PATHNAME', 'save_query_strings' LANGUAGE C; + +CREATE FUNCTION pgivm.restore_query_strings() RETURNS event_trigger +AS 'MODULE_PATHNAME', 'restore_query_strings' LANGUAGE C; + +CREATE EVENT TRIGGER save_query_strings +ON ddl_command_start +EXECUTE FUNCTION pgivm.save_query_strings(); + +CREATE EVENT TRIGGER restore_query_strings +ON ddl_command_end +EXECUTE FUNCTION pgivm.restore_query_strings(); diff --git a/pg_ivm.c b/pg_ivm.c index 19d0e0e..88c5841 100644 --- a/pg_ivm.c +++ b/pg_ivm.c @@ -32,6 +32,7 @@ #include "utils/lsyscache.h" #include "utils/regproc.h" #include "utils/rel.h" +#include "utils/uuid.h" #include "utils/varlena.h" #include "pg_ivm.h" @@ -177,17 +178,39 @@ create_immv(PG_FUNCTION_ARGS) text *t_sql = PG_GETARG_TEXT_PP(1); char *relname = text_to_cstring(t_relname); char *sql = text_to_cstring(t_sql); + + Query *query = NULL; + QueryCompletion qc; + ParseState *pstate = NULL; + + parse_immv_query(relname, sql, &query, &pstate); + + ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc); + + PG_RETURN_INT64(qc.nprocessed); +} + +/* + * parse_immv_query + * + * Parse an IMMV definition query and return the Query tree and ParseState using + * the supplied pointers. + */ +void +parse_immv_query(const char *relname, const char *sql, Query **query_ret, + ParseState **pstate_ret) +{ List *parsetree_list; RawStmt *parsetree; - Query *query; - QueryCompletion qc; List *names = NIL; List *colNames = NIL; - ParseState *pstate = make_parsestate(NULL); CreateTableAsStmt *ctas; StringInfoData command_buf; + Query *query; + ParseState *pstate; + pstate = make_parsestate(NULL); parseNameAndColumns(relname, &names, &colNames); initStringInfo(&command_buf); @@ -230,9 +253,8 @@ create_immv(PG_FUNCTION_ARGS) query = transformStmt(pstate, (Node *)ctas); Assert(query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt)); - ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc); - - PG_RETURN_INT64(qc.nprocessed); + *query_ret = query; + *pstate_ret = pstate; } /* @@ -298,7 +320,6 @@ CreateChangePreventTrigger(Oid matviewOid) ivm_trigger->row = false; ivm_trigger->timing = TRIGGER_TYPE_BEFORE; - ivm_trigger->trigname = "IVM_prevent_immv_change"; ivm_trigger->funcname = PgIvmFuncName("IVM_prevent_immv_change"); ivm_trigger->columns = NIL; ivm_trigger->transitionRels = NIL; @@ -312,8 +333,10 @@ CreateChangePreventTrigger(Oid matviewOid) for (i = 0; i < 4; i++) { ivm_trigger->events = types[i]; + ivm_trigger->trigname = psprintf("IVM_prevent_immv_change_%d_%d", + matviewOid, i + 1); address = CreateTrigger(ivm_trigger, NULL, matviewOid, InvalidOid, InvalidOid, - InvalidOid, InvalidOid, InvalidOid, NULL, true, false); + InvalidOid, InvalidOid, InvalidOid, NULL, false, false); recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO); } @@ -344,6 +367,17 @@ PgIvmImmvPrimaryKeyIndexId(void) AccessShareLock, true); } +/* + * Get relid of pg_ivm_immv's UUID unique key. + */ +Oid +PgIvmImmvUuidIndexId(void) +{ + return RangeVarGetRelid( + makeRangeVar("pgivm", "pg_ivm_immv_uuid", -1), + AccessShareLock, true); +} + /* * Return the SELECT part of an IMMV */ @@ -458,3 +492,86 @@ PgIvmFuncName(char *name) { return list_make2(makeString("pgivm"), makeString(name)); } + +#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000) +void +RestrictSearchPath(void) +{ + set_config_option("search_path", "pg_catalog, pg_temp", PGC_USERSET, + PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); +} +#endif + +/* + * GetImmvOid + * + * Look up the immvrelid of an IMMV from its immv_uuid. + */ +Oid +GetImmvRelid(pg_uuid_t *immv_uuid) +{ + Datum datum; + HeapTuple tup; + Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock); + ScanKeyData key; + SysScanDesc scan; + TupleDesc tupdesc = RelationGetDescr(pgIvmImmv); + bool isnull; + + ScanKeyInit(&key, Anum_pg_ivm_immv_immvuuid, BTEqualStrategyNumber, + F_UUID_EQ, UUIDPGetDatum(immv_uuid)); + scan = systable_beginscan(pgIvmImmv, PgIvmImmvUuidIndexId(), true, NULL, 1, + &key); + tup = systable_getnext(scan); + + if (!HeapTupleIsValid(tup)) + { + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + return InvalidOid; + } + + datum = heap_getattr(tup, Anum_pg_ivm_immv_immvrelid, tupdesc, &isnull); + Assert(!isnull); + + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + return DatumGetObjectId(datum); +} + +/* + * GetImmvUuid + * + * Look up the immv_uuid of an IMMV from its immvrelid. + */ +pg_uuid_t * +GetImmvUuid(Oid immvrelid) +{ + Datum datum; + HeapTuple tup; + Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock); + ScanKeyData key; + SysScanDesc scan; + TupleDesc tupdesc = RelationGetDescr(pgIvmImmv); + bool isnull; + + ScanKeyInit(&key, Anum_pg_ivm_immv_immvrelid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(immvrelid)); + scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(), + true, NULL, 1, &key); + tup = systable_getnext(scan); + + if (!HeapTupleIsValid(tup)) + { + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + return NULL; + } + + datum = heap_getattr(tup, Anum_pg_ivm_immv_immvuuid, tupdesc, &isnull); + Assert(!isnull); + + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + return DatumGetUUIDP(datum); +} diff --git a/pg_ivm.control b/pg_ivm.control index 27dc5af..fcfe90d 100644 --- a/pg_ivm.control +++ b/pg_ivm.control @@ -1,6 +1,6 @@ # incremental view maintenance extension_ comment = 'incremental view maintenance on PostgreSQL' -default_version = '1.10' +default_version = '1.11' module_pathname = '$libdir/pg_ivm' relocatable = false schema = pg_catalog diff --git a/pg_ivm.h b/pg_ivm.h index 88d7003..b03d92f 100644 --- a/pg_ivm.h +++ b/pg_ivm.h @@ -19,27 +19,45 @@ #include "parser/parse_node.h" #include "tcop/dest.h" #include "utils/queryenvironment.h" +#include "utils/uuid.h" -#define Natts_pg_ivm_immv 4 +#define Natts_pg_ivm_immv 5 #define Anum_pg_ivm_immv_immvrelid 1 -#define Anum_pg_ivm_immv_viewdef 2 -#define Anum_pg_ivm_immv_ispopulated 3 -#define Anum_pg_ivm_immv_lastivmupdate 4 +#define Anum_pg_ivm_immv_immvuuid 2 +#define Anum_pg_ivm_immv_querystring 3 +#define Anum_pg_ivm_immv_ispopulated 4 +#define Anum_pg_ivm_immv_lastivmupdate 5 + +/* + * This struct uniquely identifies in IMMV. It consists of an ObjectAddress + * (OID of the IMMV table and its schema) as well as a pg_uuid_t (which is + * stored in the pg_ivm_immv table). + */ +typedef struct ImmvAddress +{ + ObjectAddress address; + pg_uuid_t immv_uuid; +} ImmvAddress; /* pg_ivm.c */ extern void CreateChangePreventTrigger(Oid matviewOid); extern Oid PgIvmImmvRelationId(void); extern Oid PgIvmImmvPrimaryKeyIndexId(void); +extern Oid PgIvmImmvUuidIndexId(void); extern bool isImmv(Oid immv_oid); extern List *PgIvmFuncName(char *name); +extern void parse_immv_query(const char *relname, const char *sql, + Query **query_ret, ParseState **pstate_ret); +extern Oid GetImmvRelid(pg_uuid_t *immv_uuid); +extern pg_uuid_t *GetImmvUuid(Oid immvrelid); /* createas.c */ extern ObjectAddress ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, QueryCompletion *qc); -extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid); +extern void CreateIvmTriggersOnBaseTables(Query *qry, ImmvAddress immv_addr); extern void CreateIndexOnIMMV(Query *query, Relation matviewRel); extern Query *rewriteQueryForIMMV(Query *query, List *colNames); extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs); @@ -49,7 +67,7 @@ extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, extern Query *get_immv_query(Relation matviewRel); extern ObjectAddress ExecRefreshImmv(const RangeVar *relation, bool skipData, const char *queryString, QueryCompletion *qc); -extern ObjectAddress RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData, +extern ObjectAddress RefreshImmvByOid(ImmvAddress immv_addr, bool is_create, bool skipData, const char *queryString, QueryCompletion *qc); extern bool ImmvIncrementalMaintenanceIsEnabled(void); extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); @@ -64,8 +82,18 @@ extern bool isIvmName(const char *s); /* ruleutils.c */ extern char *pg_ivm_get_viewdef(Relation immvrel, bool pretty); +extern char *pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty); /* subselect.c */ extern void inline_cte(PlannerInfo *root, CommonTableExpr *cte); +/* event_trigger.c */ + +extern Datum save_query_string(PG_FUNCTION_ARGS); +extern Datum restore_query_string(PG_FUNCTION_ARGS); + +#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000) +extern void RestrictSearchPath(void); +#endif + #endif diff --git a/ruleutils.c b/ruleutils.c index dfe1fe4..2c7e080 100644 --- a/ruleutils.c +++ b/ruleutils.c @@ -43,6 +43,13 @@ char * pg_ivm_get_viewdef(Relation immvrel, bool pretty) { Query *query = get_immv_query(immvrel); + + return pg_ivm_get_viewdef_internal(query, immvrel, pretty); +} + +char * +pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty) +{ TupleDesc resultDesc = RelationGetDescr(immvrel); #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 150000)