diff --git a/Makefile b/Makefile index fcccc3a..3613f58 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL" EXTENSION = pg_ivm DATA = pg_ivm--1.0.sql -REGRESS = pg_ivm create_immv +REGRESS = pg_ivm create_immv refresh_immv PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/createas.c b/createas.c index 21ca067..5698ca4 100644 --- a/createas.c +++ b/createas.c @@ -69,7 +69,7 @@ static void check_ivm_restriction(Node *node); static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context); static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_create); -static void StoreImmvQuery(Oid viewOid, Query *viewQuery); +static void StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery); #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 140000) static bool CreateTableAsRelExists(CreateTableAsStmt *ctas); @@ -232,7 +232,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, } /* Create the "view" part of an IMMV. */ - StoreImmvQuery(address.objectId, viewQuery); + StoreImmvQuery(address.objectId, !into->skipData, viewQuery); if (is_matview) { @@ -975,7 +975,7 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_c * Store the query for the IMMV to pg_ivwm_immv */ static void -StoreImmvQuery(Oid viewOid, Query *viewQuery) +StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery) { char *querytree = nodeToString((Node *) viewQuery); Datum values[Natts_pg_ivm_immv]; @@ -989,6 +989,7 @@ StoreImmvQuery(Oid viewOid, Query *viewQuery) memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid); + values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(ispopulated); values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree); pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); diff --git a/expected/create_immv.out b/expected/create_immv.out index a70e96f..085cf17 100644 --- a/expected/create_immv.out +++ b/expected/create_immv.out @@ -41,3 +41,4 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; ----------- (0 rows) +DROP TABLE t; diff --git a/expected/refresh_immv.out b/expected/refresh_immv.out new file mode 100644 index 0000000..b7add64 --- /dev/null +++ b/expected/refresh_immv.out @@ -0,0 +1,105 @@ +CREATE TABLE t (i int PRIMARY KEY); +INSERT INTO t SELECT generate_series(1, 5); +SELECT create_immv('mv', 'SELECT * FROM t'); +NOTICE: created index "mv_index" on immv "mv" + create_immv +------------- + 5 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +-- refresh immv without changing the ispopulated flag +SELECT refresh_immv('mv', true); + refresh_immv +-------------- + 5 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +INSERT INTO t VALUES(6); +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 +(6 rows) + +-- change ispopulated to False +SELECT refresh_immv('mv', false); + refresh_immv +-------------- + 0 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | f +(1 row) + +SELECT i FROM mv ORDER BY 1; + i +--- +(0 rows) + +-- immv remains empty +INSERT INTO t VALUES(7); +SELECT i FROM mv ORDER BY 1; + i +--- +(0 rows) + +-- chaneg ispopulated to True, immv is updated +SELECT refresh_immv('mv', true); + refresh_immv +-------------- + 7 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 + 7 +(7 rows) + +-- immediate maintenance +INSERT INTO t VALUES(8); +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 +(8 rows) + diff --git a/matview.c b/matview.c index cf6f4c3..e2be9ef 100644 --- a/matview.c +++ b/matview.c @@ -12,8 +12,14 @@ #include "postgres.h" #include "access/genam.h" +#include "access/multixact.h" #include "access/table.h" #include "access/xact.h" +#include "catalog/pg_depend.h" +#include "catalog/heap.h" +#include "catalog/pg_trigger.h" +#include "commands/cluster.h" +#include "commands/matview.h" #include "commands/tablecmds.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -26,6 +32,7 @@ #include "parser/parse_func.h" #include "parser/parse_relation.h" #include "parser/parser.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rowsecurity.h" #include "storage/lmgr.h" @@ -92,6 +99,7 @@ static uint64 refresh_immv_datafill(DestReceiver *dest, Query *query, TupleDesc *resultTupleDesc, const char *queryString); +static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); static void OpenImmvIncrementalMaintenance(void); static void CloseImmvIncrementalMaintenance(void); static Query *get_immv_query(Relation matviewRel); @@ -138,6 +146,250 @@ static List *get_securityQuals(Oid relId, int rt_index, Query *query); PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); +/* + * ExecRefreshImmv -- execute a refresh_immv() function + * + * This imitates PostgreSQL's ExecRefreshMatView(). + */ +ObjectAddress +ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc) +{ + Oid matviewOid; + Relation matviewRel; + Query *dataQuery; + Query *viewQuery; + Oid tableSpace; + Oid relowner; + Oid OIDNewHeap; + DestReceiver *dest; + uint64 processed = 0; + //bool concurrent; + LOCKMODE lockmode; + char relpersistence; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + ObjectAddress address; + Relation pgIvmImmv; + TupleDesc tupdesc; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + bool isnull; + Datum datum; + bool oldSkipData; + + /* Determine strength of lock needed. */ + //concurrent = stmt->concurrent; + //lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; + lockmode = AccessExclusiveLock; + + /* + * Get a lock until end of transaction. + */ + matviewOid = RelnameGetRelid(relname); + if (!OidIsValid(matviewOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s\" does not exist", relname))); + + matviewRel = table_open(matviewOid, lockmode); + relowner = matviewRel->rd_rel->relowner; + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); + tupdesc = RelationGetDescr(pgIvmImmv); + ScanKeyInit(&key, + Anum_pg_ivm_immv_immvrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(matviewRel))); + scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(), + true, NULL, 1, &key); + tup = systable_getnext(scan); + if (!HeapTupleIsValid(tup)) + { + elog(ERROR, "could not find tuple for immvrelid %s", relname); + } + + datum = heap_getattr(tup, Anum_pg_ivm_immv_ispopulated, tupdesc, &isnull); + Assert(!isnull); + oldSkipData = !DatumGetBool(datum); + + /* update pg_ivm_immv view */ + if (skipData != oldSkipData) + { + Datum values[Natts_pg_ivm_immv]; + bool nulls[Natts_pg_ivm_immv]; + bool replaces[Natts_pg_ivm_immv]; + HeapTuple newtup = NULL; + + memset(values, 0, sizeof(values)); + values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(!skipData); + MemSet(nulls, false, sizeof(nulls)); + MemSet(replaces, false, sizeof(replaces)); + replaces[Anum_pg_ivm_immv_ispopulated -1 ] = true; + + newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces); + + CatalogTupleUpdate(pgIvmImmv, &newtup->t_self, newtup); + heap_freetuple(newtup); + } + + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + + viewQuery = get_immv_query(matviewRel); + + /* For IMMV, we need to rewrite matview query */ + if (!skipData) + dataQuery = rewriteQueryForIMMV(viewQuery,NIL); + + /* + * Check for active uses of the relation in the current transaction, such + * as open scans. + * + * NB: We count on this to protect us against problems with refreshing the + * data using TABLE_INSERT_FROZEN. + */ + CheckTableNotInUse(matviewRel, "refresh an IMMV"); + + tableSpace = matviewRel->rd_rel->reltablespace; + relpersistence = matviewRel->rd_rel->relpersistence; + + /* delete IMMV triggers. */ + if (skipData) + { + Relation tgRel; + Relation depRel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + ObjectAddresses *immv_triggers; + + immv_triggers = new_object_addresses(); + + tgRel = table_open(TriggerRelationId, RowExclusiveLock); + depRel = table_open(DependRelationId, RowExclusiveLock); + + /* search triggers that depends on IMMV. */ + ScanKeyInit(&key, + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(matviewOid)); + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 1, &key); + while ((tup = systable_getnext(scan)) != NULL) + { + ObjectAddress obj; + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + + if (foundDep->classid == TriggerRelationId) + { + HeapTuple tgtup; + ScanKeyData tgkey[1]; + SysScanDesc tgscan; + Form_pg_trigger tgform; + + /* Find the trigger name. */ + ScanKeyInit(&tgkey[0], + Anum_pg_trigger_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(foundDep->objid)); + + tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true, + NULL, 1, tgkey); + tgtup = systable_getnext(tgscan); + if (!HeapTupleIsValid(tgtup)) + elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid); + + tgform = (Form_pg_trigger) GETSTRUCT(tgtup); + + /* If trigger is created by IMMV, delete it. */ + if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0) + { + obj.classId = foundDep->classid; + obj.objectId = foundDep->objid; + obj.objectSubId = foundDep->refobjsubid; + add_exact_object_address(&obj, immv_triggers); + } + systable_endscan(tgscan); + } + } + systable_endscan(scan); + + performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + + table_close(depRel, RowExclusiveLock); + table_close(tgRel, RowExclusiveLock); + free_object_addresses(immv_triggers); + } + + /* + * Create the transient table that will receive the regenerated data. Lock + * it against access by any other process until commit (by which time it + * will be gone). + */ + OIDNewHeap = make_new_heap(matviewOid, tableSpace, + relpersistence, ExclusiveLock); + LockRelationOid(OIDNewHeap, AccessExclusiveLock); + dest = CreateTransientRelDestReceiver(OIDNewHeap); + + /* Generate the data, if wanted. */ + if (!skipData) + processed = refresh_immv_datafill(dest, dataQuery, NULL, NULL, ""); + + /* Make the matview match the newly generated data. */ + refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); + + /* + * Inform cumulative stats system about our activity: basically, we + * truncated the matview and inserted some new data. (The concurrent + * code path above doesn't need to worry about this because the + * inserts and deletes it issues get counted by lower-level code.) + */ + pgstat_count_truncate(matviewRel); + if (!skipData) + pgstat_count_heap_insert(matviewRel, processed); + + if (!skipData && oldSkipData) + { + CreateIvmTriggersOnBaseTables(viewQuery, matviewOid, true); + } + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + ObjectAddressSet(address, RelationRelationId, matviewOid); + + /* + * Save the rowcount so that pg_stat_statements can track the total number + * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we + * still don't display the rowcount in the command completion tag output, + * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW + * command tag is left false in cmdtaglist.h. Otherwise, the change of + * completion tag output might break applications using it. + */ + if (qc) + SetQueryCompletion(qc, CMDTAG_SELECT, processed); + + return address; +} + + /* * refresh_immv_datafill * @@ -210,6 +462,17 @@ refresh_immv_datafill(DestReceiver *dest, Query *query, return processed; } +/* + * Swap the physical files of the target and transient tables, then rebuild + * the target's indexes and throw away the transient table. Security context + * swapping is handled by the called function, so it is not needed here. + */ +static void +refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) +{ + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + RecentXmin, ReadNextMultiXactId(), relpersistence); +} /* * This should be used to test whether the backend is in a context where it is diff --git a/pg_ivm--1.0.sql b/pg_ivm--1.0.sql index 7b9c6fd..f8ed430 100644 --- a/pg_ivm--1.0.sql +++ b/pg_ivm--1.0.sql @@ -4,6 +4,7 @@ CREATE SCHEMA __pg_ivm__; CREATE TABLE __pg_ivm__.pg_ivm_immv( immvrelid regclass NOT NULL, + ispopulated bool NOT NULL, viewdef text NOT NULL, CONSTRAINT pg_ivm_immv_pkey PRIMARY KEY (immvrelid) @@ -23,6 +24,12 @@ STRICT AS 'MODULE_PATHNAME', 'create_immv' LANGUAGE C; +CREATE FUNCTION refresh_immv(text, bool) +RETURNS bigint +STRICT +AS 'MODULE_PATHNAME', 'refresh_immv' +LANGUAGE C; + -- trigger functions CREATE FUNCTION "IVM_immediate_before"() diff --git a/pg_ivm.c b/pg_ivm.c index ae00655..f38d049 100644 --- a/pg_ivm.c +++ b/pg_ivm.c @@ -44,6 +44,7 @@ static void parseNameAndColumns(const char *string, List **names, List **colName /* SQL callable functions */ PG_FUNCTION_INFO_V1(create_immv); +PG_FUNCTION_INFO_V1(refresh_immv); PG_FUNCTION_INFO_V1(IVM_prevent_immv_change); /* @@ -227,6 +228,21 @@ IVM_prevent_immv_change(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); } +/* + * User inerface for refreshing an IMMV + */ +Datum +refresh_immv(PG_FUNCTION_ARGS) +{ + text *t_relname = PG_GETARG_TEXT_PP(0); + bool ispopulated = PG_GETARG_BOOL(1); + char *relname = text_to_cstring(t_relname); + QueryCompletion qc; + + ExecRefreshImmv( relname, !(ispopulated), &qc); + + PG_RETURN_INT64(qc.nprocessed); +} /* * Create triggers to prevent IMMV from being changed @@ -246,7 +262,6 @@ CreateChangePreventTrigger(Oid matviewOid) refaddr.objectId = matviewOid; refaddr.objectSubId = 0; - ivm_trigger = makeNode(CreateTrigStmt); ivm_trigger->relation = NULL; ivm_trigger->row = false; diff --git a/pg_ivm.h b/pg_ivm.h index 0d3856e..9ebac58 100644 --- a/pg_ivm.h +++ b/pg_ivm.h @@ -19,10 +19,11 @@ #include "tcop/dest.h" #include "utils/queryenvironment.h" -#define Natts_pg_ivm_immv 2 +#define Natts_pg_ivm_immv 3 #define Anum_pg_ivm_immv_immvrelid 1 -#define Anum_pg_ivm_immv_viewdef 2 +#define Anum_pg_ivm_immv_ispopulated 2 +#define Anum_pg_ivm_immv_viewdef 3 /* pg_ivm.c */ @@ -41,6 +42,7 @@ extern Query *rewriteQueryForIMMV(Query *query, List *colNames); /* matview.c */ +extern ObjectAddress ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc); extern bool ImmvIncrementalMaintenanceIsEnabled(void); extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); diff --git a/sql/create_immv.sql b/sql/create_immv.sql index f5ca551..b76792d 100644 --- a/sql/create_immv.sql +++ b/sql/create_immv.sql @@ -15,3 +15,5 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; DROP TABLE mv2; SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; + +DROP TABLE t; diff --git a/sql/refresh_immv.sql b/sql/refresh_immv.sql new file mode 100644 index 0000000..59d7179 --- /dev/null +++ b/sql/refresh_immv.sql @@ -0,0 +1,31 @@ +CREATE TABLE t (i int PRIMARY KEY); +INSERT INTO t SELECT generate_series(1, 5); + +SELECT create_immv('mv', 'SELECT * FROM t'); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + +-- refresh immv without changing the ispopulated flag +SELECT refresh_immv('mv', true); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + +INSERT INTO t VALUES(6); +SELECT i FROM mv ORDER BY 1; + +-- change ispopulated to False +SELECT refresh_immv('mv', false); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; +SELECT i FROM mv ORDER BY 1; + +-- immv remains empty +INSERT INTO t VALUES(7); +SELECT i FROM mv ORDER BY 1; + +-- chaneg ispopulated to True, immv is updated +SELECT refresh_immv('mv', true); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; +SELECT i FROM mv ORDER BY 1; + +-- immediate maintenance +INSERT INTO t VALUES(8); +SELECT i FROM mv ORDER BY 1; +