From 51a944b388e4a051be6b54d0b3fe637bcade8bb2 Mon Sep 17 00:00:00 2001 From: thoshiai <45982834+thoshiai@users.noreply.github.com> Date: Mon, 13 Jun 2022 18:22:28 +0900 Subject: [PATCH] Add refresh_immv() function refresh_immv(immv_name, with_data) is a function to refresh IMMV like REFRESH MATERIALIZED VIEW command. It has two argument. immv_name is incrementally maintainable materialized view's name, and with_data is an option that is corresponding to the WITH [NO] DATA option. When with_data is set false, the IMMV gets unpopulated. One of differences between IMMVs unpopulated by this function and normal materialized views unpopulated by REFRESH ... WITH NO DATA is that such IMMVs can be referenced by SELECT but return no rows, while unpopulated materialized views are not scanable. The behaviour may be changed in future to raise an error when unpopulated an IMMV is scanned. --- Makefile | 2 +- createas.c | 7 +- expected/create_immv.out | 1 + expected/refresh_immv.out | 105 +++++++++++++++ matview.c | 263 ++++++++++++++++++++++++++++++++++++++ pg_ivm--1.0.sql | 7 + pg_ivm.c | 17 ++- pg_ivm.h | 6 +- sql/create_immv.sql | 2 + sql/refresh_immv.sql | 31 +++++ 10 files changed, 434 insertions(+), 7 deletions(-) create mode 100644 expected/refresh_immv.out create mode 100644 sql/refresh_immv.sql diff --git a/Makefile b/Makefile index fcccc3a..3613f58 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL" EXTENSION = pg_ivm DATA = pg_ivm--1.0.sql -REGRESS = pg_ivm create_immv +REGRESS = pg_ivm create_immv refresh_immv PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/createas.c b/createas.c index 21ca067..5698ca4 100644 --- a/createas.c +++ b/createas.c @@ -69,7 +69,7 @@ static void check_ivm_restriction(Node *node); static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context); static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_create); -static void StoreImmvQuery(Oid viewOid, Query *viewQuery); +static void StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery); #if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 140000) static bool CreateTableAsRelExists(CreateTableAsStmt *ctas); @@ -232,7 +232,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt, } /* Create the "view" part of an IMMV. */ - StoreImmvQuery(address.objectId, viewQuery); + StoreImmvQuery(address.objectId, !into->skipData, viewQuery); if (is_matview) { @@ -975,7 +975,7 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_c * Store the query for the IMMV to pg_ivwm_immv */ static void -StoreImmvQuery(Oid viewOid, Query *viewQuery) +StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery) { char *querytree = nodeToString((Node *) viewQuery); Datum values[Natts_pg_ivm_immv]; @@ -989,6 +989,7 @@ StoreImmvQuery(Oid viewOid, Query *viewQuery) memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid); + values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(ispopulated); values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree); pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); diff --git a/expected/create_immv.out b/expected/create_immv.out index a70e96f..085cf17 100644 --- a/expected/create_immv.out +++ b/expected/create_immv.out @@ -41,3 +41,4 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; ----------- (0 rows) +DROP TABLE t; diff --git a/expected/refresh_immv.out b/expected/refresh_immv.out new file mode 100644 index 0000000..b7add64 --- /dev/null +++ b/expected/refresh_immv.out @@ -0,0 +1,105 @@ +CREATE TABLE t (i int PRIMARY KEY); +INSERT INTO t SELECT generate_series(1, 5); +SELECT create_immv('mv', 'SELECT * FROM t'); +NOTICE: created index "mv_index" on immv "mv" + create_immv +------------- + 5 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +-- refresh immv without changing the ispopulated flag +SELECT refresh_immv('mv', true); + refresh_immv +-------------- + 5 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +INSERT INTO t VALUES(6); +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 +(6 rows) + +-- change ispopulated to False +SELECT refresh_immv('mv', false); + refresh_immv +-------------- + 0 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | f +(1 row) + +SELECT i FROM mv ORDER BY 1; + i +--- +(0 rows) + +-- immv remains empty +INSERT INTO t VALUES(7); +SELECT i FROM mv ORDER BY 1; + i +--- +(0 rows) + +-- chaneg ispopulated to True, immv is updated +SELECT refresh_immv('mv', true); + refresh_immv +-------------- + 7 +(1 row) + +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + immvrelid | ispopulated +-----------+------------- + mv | t +(1 row) + +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 + 7 +(7 rows) + +-- immediate maintenance +INSERT INTO t VALUES(8); +SELECT i FROM mv ORDER BY 1; + i +--- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 +(8 rows) + diff --git a/matview.c b/matview.c index cf6f4c3..e2be9ef 100644 --- a/matview.c +++ b/matview.c @@ -12,8 +12,14 @@ #include "postgres.h" #include "access/genam.h" +#include "access/multixact.h" #include "access/table.h" #include "access/xact.h" +#include "catalog/pg_depend.h" +#include "catalog/heap.h" +#include "catalog/pg_trigger.h" +#include "commands/cluster.h" +#include "commands/matview.h" #include "commands/tablecmds.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -26,6 +32,7 @@ #include "parser/parse_func.h" #include "parser/parse_relation.h" #include "parser/parser.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rowsecurity.h" #include "storage/lmgr.h" @@ -92,6 +99,7 @@ static uint64 refresh_immv_datafill(DestReceiver *dest, Query *query, TupleDesc *resultTupleDesc, const char *queryString); +static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); static void OpenImmvIncrementalMaintenance(void); static void CloseImmvIncrementalMaintenance(void); static Query *get_immv_query(Relation matviewRel); @@ -138,6 +146,250 @@ static List *get_securityQuals(Oid relId, int rt_index, Query *query); PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); +/* + * ExecRefreshImmv -- execute a refresh_immv() function + * + * This imitates PostgreSQL's ExecRefreshMatView(). + */ +ObjectAddress +ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc) +{ + Oid matviewOid; + Relation matviewRel; + Query *dataQuery; + Query *viewQuery; + Oid tableSpace; + Oid relowner; + Oid OIDNewHeap; + DestReceiver *dest; + uint64 processed = 0; + //bool concurrent; + LOCKMODE lockmode; + char relpersistence; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + ObjectAddress address; + Relation pgIvmImmv; + TupleDesc tupdesc; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + bool isnull; + Datum datum; + bool oldSkipData; + + /* Determine strength of lock needed. */ + //concurrent = stmt->concurrent; + //lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; + lockmode = AccessExclusiveLock; + + /* + * Get a lock until end of transaction. + */ + matviewOid = RelnameGetRelid(relname); + if (!OidIsValid(matviewOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s\" does not exist", relname))); + + matviewRel = table_open(matviewOid, lockmode); + relowner = matviewRel->rd_rel->relowner; + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); + tupdesc = RelationGetDescr(pgIvmImmv); + ScanKeyInit(&key, + Anum_pg_ivm_immv_immvrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(matviewRel))); + scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(), + true, NULL, 1, &key); + tup = systable_getnext(scan); + if (!HeapTupleIsValid(tup)) + { + elog(ERROR, "could not find tuple for immvrelid %s", relname); + } + + datum = heap_getattr(tup, Anum_pg_ivm_immv_ispopulated, tupdesc, &isnull); + Assert(!isnull); + oldSkipData = !DatumGetBool(datum); + + /* update pg_ivm_immv view */ + if (skipData != oldSkipData) + { + Datum values[Natts_pg_ivm_immv]; + bool nulls[Natts_pg_ivm_immv]; + bool replaces[Natts_pg_ivm_immv]; + HeapTuple newtup = NULL; + + memset(values, 0, sizeof(values)); + values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(!skipData); + MemSet(nulls, false, sizeof(nulls)); + MemSet(replaces, false, sizeof(replaces)); + replaces[Anum_pg_ivm_immv_ispopulated -1 ] = true; + + newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces); + + CatalogTupleUpdate(pgIvmImmv, &newtup->t_self, newtup); + heap_freetuple(newtup); + } + + systable_endscan(scan); + table_close(pgIvmImmv, NoLock); + + viewQuery = get_immv_query(matviewRel); + + /* For IMMV, we need to rewrite matview query */ + if (!skipData) + dataQuery = rewriteQueryForIMMV(viewQuery,NIL); + + /* + * Check for active uses of the relation in the current transaction, such + * as open scans. + * + * NB: We count on this to protect us against problems with refreshing the + * data using TABLE_INSERT_FROZEN. + */ + CheckTableNotInUse(matviewRel, "refresh an IMMV"); + + tableSpace = matviewRel->rd_rel->reltablespace; + relpersistence = matviewRel->rd_rel->relpersistence; + + /* delete IMMV triggers. */ + if (skipData) + { + Relation tgRel; + Relation depRel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + ObjectAddresses *immv_triggers; + + immv_triggers = new_object_addresses(); + + tgRel = table_open(TriggerRelationId, RowExclusiveLock); + depRel = table_open(DependRelationId, RowExclusiveLock); + + /* search triggers that depends on IMMV. */ + ScanKeyInit(&key, + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(matviewOid)); + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 1, &key); + while ((tup = systable_getnext(scan)) != NULL) + { + ObjectAddress obj; + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + + if (foundDep->classid == TriggerRelationId) + { + HeapTuple tgtup; + ScanKeyData tgkey[1]; + SysScanDesc tgscan; + Form_pg_trigger tgform; + + /* Find the trigger name. */ + ScanKeyInit(&tgkey[0], + Anum_pg_trigger_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(foundDep->objid)); + + tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true, + NULL, 1, tgkey); + tgtup = systable_getnext(tgscan); + if (!HeapTupleIsValid(tgtup)) + elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid); + + tgform = (Form_pg_trigger) GETSTRUCT(tgtup); + + /* If trigger is created by IMMV, delete it. */ + if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0) + { + obj.classId = foundDep->classid; + obj.objectId = foundDep->objid; + obj.objectSubId = foundDep->refobjsubid; + add_exact_object_address(&obj, immv_triggers); + } + systable_endscan(tgscan); + } + } + systable_endscan(scan); + + performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + + table_close(depRel, RowExclusiveLock); + table_close(tgRel, RowExclusiveLock); + free_object_addresses(immv_triggers); + } + + /* + * Create the transient table that will receive the regenerated data. Lock + * it against access by any other process until commit (by which time it + * will be gone). + */ + OIDNewHeap = make_new_heap(matviewOid, tableSpace, + relpersistence, ExclusiveLock); + LockRelationOid(OIDNewHeap, AccessExclusiveLock); + dest = CreateTransientRelDestReceiver(OIDNewHeap); + + /* Generate the data, if wanted. */ + if (!skipData) + processed = refresh_immv_datafill(dest, dataQuery, NULL, NULL, ""); + + /* Make the matview match the newly generated data. */ + refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); + + /* + * Inform cumulative stats system about our activity: basically, we + * truncated the matview and inserted some new data. (The concurrent + * code path above doesn't need to worry about this because the + * inserts and deletes it issues get counted by lower-level code.) + */ + pgstat_count_truncate(matviewRel); + if (!skipData) + pgstat_count_heap_insert(matviewRel, processed); + + if (!skipData && oldSkipData) + { + CreateIvmTriggersOnBaseTables(viewQuery, matviewOid, true); + } + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + ObjectAddressSet(address, RelationRelationId, matviewOid); + + /* + * Save the rowcount so that pg_stat_statements can track the total number + * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we + * still don't display the rowcount in the command completion tag output, + * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW + * command tag is left false in cmdtaglist.h. Otherwise, the change of + * completion tag output might break applications using it. + */ + if (qc) + SetQueryCompletion(qc, CMDTAG_SELECT, processed); + + return address; +} + + /* * refresh_immv_datafill * @@ -210,6 +462,17 @@ refresh_immv_datafill(DestReceiver *dest, Query *query, return processed; } +/* + * Swap the physical files of the target and transient tables, then rebuild + * the target's indexes and throw away the transient table. Security context + * swapping is handled by the called function, so it is not needed here. + */ +static void +refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) +{ + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + RecentXmin, ReadNextMultiXactId(), relpersistence); +} /* * This should be used to test whether the backend is in a context where it is diff --git a/pg_ivm--1.0.sql b/pg_ivm--1.0.sql index 7b9c6fd..f8ed430 100644 --- a/pg_ivm--1.0.sql +++ b/pg_ivm--1.0.sql @@ -4,6 +4,7 @@ CREATE SCHEMA __pg_ivm__; CREATE TABLE __pg_ivm__.pg_ivm_immv( immvrelid regclass NOT NULL, + ispopulated bool NOT NULL, viewdef text NOT NULL, CONSTRAINT pg_ivm_immv_pkey PRIMARY KEY (immvrelid) @@ -23,6 +24,12 @@ STRICT AS 'MODULE_PATHNAME', 'create_immv' LANGUAGE C; +CREATE FUNCTION refresh_immv(text, bool) +RETURNS bigint +STRICT +AS 'MODULE_PATHNAME', 'refresh_immv' +LANGUAGE C; + -- trigger functions CREATE FUNCTION "IVM_immediate_before"() diff --git a/pg_ivm.c b/pg_ivm.c index ae00655..f38d049 100644 --- a/pg_ivm.c +++ b/pg_ivm.c @@ -44,6 +44,7 @@ static void parseNameAndColumns(const char *string, List **names, List **colName /* SQL callable functions */ PG_FUNCTION_INFO_V1(create_immv); +PG_FUNCTION_INFO_V1(refresh_immv); PG_FUNCTION_INFO_V1(IVM_prevent_immv_change); /* @@ -227,6 +228,21 @@ IVM_prevent_immv_change(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); } +/* + * User inerface for refreshing an IMMV + */ +Datum +refresh_immv(PG_FUNCTION_ARGS) +{ + text *t_relname = PG_GETARG_TEXT_PP(0); + bool ispopulated = PG_GETARG_BOOL(1); + char *relname = text_to_cstring(t_relname); + QueryCompletion qc; + + ExecRefreshImmv( relname, !(ispopulated), &qc); + + PG_RETURN_INT64(qc.nprocessed); +} /* * Create triggers to prevent IMMV from being changed @@ -246,7 +262,6 @@ CreateChangePreventTrigger(Oid matviewOid) refaddr.objectId = matviewOid; refaddr.objectSubId = 0; - ivm_trigger = makeNode(CreateTrigStmt); ivm_trigger->relation = NULL; ivm_trigger->row = false; diff --git a/pg_ivm.h b/pg_ivm.h index 0d3856e..9ebac58 100644 --- a/pg_ivm.h +++ b/pg_ivm.h @@ -19,10 +19,11 @@ #include "tcop/dest.h" #include "utils/queryenvironment.h" -#define Natts_pg_ivm_immv 2 +#define Natts_pg_ivm_immv 3 #define Anum_pg_ivm_immv_immvrelid 1 -#define Anum_pg_ivm_immv_viewdef 2 +#define Anum_pg_ivm_immv_ispopulated 2 +#define Anum_pg_ivm_immv_viewdef 3 /* pg_ivm.c */ @@ -41,6 +42,7 @@ extern Query *rewriteQueryForIMMV(Query *query, List *colNames); /* matview.c */ +extern ObjectAddress ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc); extern bool ImmvIncrementalMaintenanceIsEnabled(void); extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); diff --git a/sql/create_immv.sql b/sql/create_immv.sql index f5ca551..b76792d 100644 --- a/sql/create_immv.sql +++ b/sql/create_immv.sql @@ -15,3 +15,5 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; DROP TABLE mv2; SELECT immvrelid FROM pg_ivm_immv ORDER BY 1; + +DROP TABLE t; diff --git a/sql/refresh_immv.sql b/sql/refresh_immv.sql new file mode 100644 index 0000000..59d7179 --- /dev/null +++ b/sql/refresh_immv.sql @@ -0,0 +1,31 @@ +CREATE TABLE t (i int PRIMARY KEY); +INSERT INTO t SELECT generate_series(1, 5); + +SELECT create_immv('mv', 'SELECT * FROM t'); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + +-- refresh immv without changing the ispopulated flag +SELECT refresh_immv('mv', true); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; + +INSERT INTO t VALUES(6); +SELECT i FROM mv ORDER BY 1; + +-- change ispopulated to False +SELECT refresh_immv('mv', false); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; +SELECT i FROM mv ORDER BY 1; + +-- immv remains empty +INSERT INTO t VALUES(7); +SELECT i FROM mv ORDER BY 1; + +-- chaneg ispopulated to True, immv is updated +SELECT refresh_immv('mv', true); +SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1; +SELECT i FROM mv ORDER BY 1; + +-- immediate maintenance +INSERT INTO t VALUES(8); +SELECT i FROM mv ORDER BY 1; +