Add refresh_immv() function

refresh_immv(immv_name, with_data) is a function to refresh IMMV like
 REFRESH MATERIALIZED VIEW command. It has two argument.
immv_name is incrementally maintainable materialized view's name, and
with_data is an option that is corresponding to the WITH [NO] DATA option.
When with_data is set false, the IMMV gets unpopulated.

One of differences between IMMVs unpopulated by this function and
normal materialized views unpopulated by REFRESH ... WITH NO DATA
is that such IMMVs can be referenced by SELECT but return no rows,
while unpopulated materialized views are not scanable.

The behaviour may be changed in future to raise an error when unpopulated
an IMMV is scanned.
This commit is contained in:
thoshiai 2022-06-13 18:22:28 +09:00 committed by Yugo Nagata
parent 7ab42004d2
commit 51a944b388
10 changed files with 434 additions and 7 deletions

View file

@ -11,7 +11,7 @@ PGFILEDESC = "pg_ivm - incremental view maintenance on PostgreSQL"
EXTENSION = pg_ivm
DATA = pg_ivm--1.0.sql
REGRESS = pg_ivm create_immv
REGRESS = pg_ivm create_immv refresh_immv
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)

View file

@ -69,7 +69,7 @@ static void check_ivm_restriction(Node *node);
static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_create);
static void StoreImmvQuery(Oid viewOid, Query *viewQuery);
static void StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 140000)
static bool CreateTableAsRelExists(CreateTableAsStmt *ctas);
@ -232,7 +232,7 @@ ExecCreateImmv(ParseState *pstate, CreateTableAsStmt *stmt,
}
/* Create the "view" part of an IMMV. */
StoreImmvQuery(address.objectId, viewQuery);
StoreImmvQuery(address.objectId, !into->skipData, viewQuery);
if (is_matview)
{
@ -975,7 +975,7 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList, bool is_c
* Store the query for the IMMV to pg_ivwm_immv
*/
static void
StoreImmvQuery(Oid viewOid, Query *viewQuery)
StoreImmvQuery(Oid viewOid, bool ispopulated, Query *viewQuery)
{
char *querytree = nodeToString((Node *) viewQuery);
Datum values[Natts_pg_ivm_immv];
@ -989,6 +989,7 @@ StoreImmvQuery(Oid viewOid, Query *viewQuery)
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid);
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(ispopulated);
values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree);
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);

View file

@ -41,3 +41,4 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1;
-----------
(0 rows)
DROP TABLE t;

105
expected/refresh_immv.out Normal file
View file

@ -0,0 +1,105 @@
CREATE TABLE t (i int PRIMARY KEY);
INSERT INTO t SELECT generate_series(1, 5);
SELECT create_immv('mv', 'SELECT * FROM t');
NOTICE: created index "mv_index" on immv "mv"
create_immv
-------------
5
(1 row)
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
immvrelid | ispopulated
-----------+-------------
mv | t
(1 row)
-- refresh immv without changing the ispopulated flag
SELECT refresh_immv('mv', true);
refresh_immv
--------------
5
(1 row)
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
immvrelid | ispopulated
-----------+-------------
mv | t
(1 row)
INSERT INTO t VALUES(6);
SELECT i FROM mv ORDER BY 1;
i
---
1
2
3
4
5
6
(6 rows)
-- change ispopulated to False
SELECT refresh_immv('mv', false);
refresh_immv
--------------
0
(1 row)
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
immvrelid | ispopulated
-----------+-------------
mv | f
(1 row)
SELECT i FROM mv ORDER BY 1;
i
---
(0 rows)
-- immv remains empty
INSERT INTO t VALUES(7);
SELECT i FROM mv ORDER BY 1;
i
---
(0 rows)
-- chaneg ispopulated to True, immv is updated
SELECT refresh_immv('mv', true);
refresh_immv
--------------
7
(1 row)
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
immvrelid | ispopulated
-----------+-------------
mv | t
(1 row)
SELECT i FROM mv ORDER BY 1;
i
---
1
2
3
4
5
6
7
(7 rows)
-- immediate maintenance
INSERT INTO t VALUES(8);
SELECT i FROM mv ORDER BY 1;
i
---
1
2
3
4
5
6
7
8
(8 rows)

263
matview.c
View file

@ -12,8 +12,14 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/multixact.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_depend.h"
#include "catalog/heap.h"
#include "catalog/pg_trigger.h"
#include "commands/cluster.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
@ -26,6 +32,7 @@
#include "parser/parse_func.h"
#include "parser/parse_relation.h"
#include "parser/parser.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rowsecurity.h"
#include "storage/lmgr.h"
@ -92,6 +99,7 @@ static uint64 refresh_immv_datafill(DestReceiver *dest, Query *query,
TupleDesc *resultTupleDesc,
const char *queryString);
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
static void OpenImmvIncrementalMaintenance(void);
static void CloseImmvIncrementalMaintenance(void);
static Query *get_immv_query(Relation matviewRel);
@ -138,6 +146,250 @@ static List *get_securityQuals(Oid relId, int rt_index, Query *query);
PG_FUNCTION_INFO_V1(IVM_immediate_before);
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
/*
* ExecRefreshImmv -- execute a refresh_immv() function
*
* This imitates PostgreSQL's ExecRefreshMatView().
*/
ObjectAddress
ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc)
{
Oid matviewOid;
Relation matviewRel;
Query *dataQuery;
Query *viewQuery;
Oid tableSpace;
Oid relowner;
Oid OIDNewHeap;
DestReceiver *dest;
uint64 processed = 0;
//bool concurrent;
LOCKMODE lockmode;
char relpersistence;
Oid save_userid;
int save_sec_context;
int save_nestlevel;
ObjectAddress address;
Relation pgIvmImmv;
TupleDesc tupdesc;
ScanKeyData key;
SysScanDesc scan;
HeapTuple tup;
bool isnull;
Datum datum;
bool oldSkipData;
/* Determine strength of lock needed. */
//concurrent = stmt->concurrent;
//lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
lockmode = AccessExclusiveLock;
/*
* Get a lock until end of transaction.
*/
matviewOid = RelnameGetRelid(relname);
if (!OidIsValid(matviewOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist", relname)));
matviewRel = table_open(matviewOid, lockmode);
relowner = matviewRel->rd_rel->relowner;
/*
* Switch to the owner's userid, so that any functions are run as that
* user. Also lock down security-restricted operations and arrange to
* make GUC variable changes local to this command.
*/
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(relowner,
save_sec_context | SECURITY_RESTRICTED_OPERATION);
save_nestlevel = NewGUCNestLevel();
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);
tupdesc = RelationGetDescr(pgIvmImmv);
ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(matviewRel)));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(),
true, NULL, 1, &key);
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
{
elog(ERROR, "could not find tuple for immvrelid %s", relname);
}
datum = heap_getattr(tup, Anum_pg_ivm_immv_ispopulated, tupdesc, &isnull);
Assert(!isnull);
oldSkipData = !DatumGetBool(datum);
/* update pg_ivm_immv view */
if (skipData != oldSkipData)
{
Datum values[Natts_pg_ivm_immv];
bool nulls[Natts_pg_ivm_immv];
bool replaces[Natts_pg_ivm_immv];
HeapTuple newtup = NULL;
memset(values, 0, sizeof(values));
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(!skipData);
MemSet(nulls, false, sizeof(nulls));
MemSet(replaces, false, sizeof(replaces));
replaces[Anum_pg_ivm_immv_ispopulated -1 ] = true;
newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces);
CatalogTupleUpdate(pgIvmImmv, &newtup->t_self, newtup);
heap_freetuple(newtup);
}
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
viewQuery = get_immv_query(matviewRel);
/* For IMMV, we need to rewrite matview query */
if (!skipData)
dataQuery = rewriteQueryForIMMV(viewQuery,NIL);
/*
* Check for active uses of the relation in the current transaction, such
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
* data using TABLE_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "refresh an IMMV");
tableSpace = matviewRel->rd_rel->reltablespace;
relpersistence = matviewRel->rd_rel->relpersistence;
/* delete IMMV triggers. */
if (skipData)
{
Relation tgRel;
Relation depRel;
ScanKeyData key;
SysScanDesc scan;
HeapTuple tup;
ObjectAddresses *immv_triggers;
immv_triggers = new_object_addresses();
tgRel = table_open(TriggerRelationId, RowExclusiveLock);
depRel = table_open(DependRelationId, RowExclusiveLock);
/* search triggers that depends on IMMV. */
ScanKeyInit(&key,
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(matviewOid));
scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, 1, &key);
while ((tup = systable_getnext(scan)) != NULL)
{
ObjectAddress obj;
Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
if (foundDep->classid == TriggerRelationId)
{
HeapTuple tgtup;
ScanKeyData tgkey[1];
SysScanDesc tgscan;
Form_pg_trigger tgform;
/* Find the trigger name. */
ScanKeyInit(&tgkey[0],
Anum_pg_trigger_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(foundDep->objid));
tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true,
NULL, 1, tgkey);
tgtup = systable_getnext(tgscan);
if (!HeapTupleIsValid(tgtup))
elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid);
tgform = (Form_pg_trigger) GETSTRUCT(tgtup);
/* If trigger is created by IMMV, delete it. */
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0)
{
obj.classId = foundDep->classid;
obj.objectId = foundDep->objid;
obj.objectSubId = foundDep->refobjsubid;
add_exact_object_address(&obj, immv_triggers);
}
systable_endscan(tgscan);
}
}
systable_endscan(scan);
performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL);
table_close(depRel, RowExclusiveLock);
table_close(tgRel, RowExclusiveLock);
free_object_addresses(immv_triggers);
}
/*
* Create the transient table that will receive the regenerated data. Lock
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(matviewOid, tableSpace,
relpersistence, ExclusiveLock);
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap);
/* Generate the data, if wanted. */
if (!skipData)
processed = refresh_immv_datafill(dest, dataQuery, NULL, NULL, "");
/* Make the matview match the newly generated data. */
refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
/*
* Inform cumulative stats system about our activity: basically, we
* truncated the matview and inserted some new data. (The concurrent
* code path above doesn't need to worry about this because the
* inserts and deletes it issues get counted by lower-level code.)
*/
pgstat_count_truncate(matviewRel);
if (!skipData)
pgstat_count_heap_insert(matviewRel, processed);
if (!skipData && oldSkipData)
{
CreateIvmTriggersOnBaseTables(viewQuery, matviewOid, true);
}
table_close(matviewRel, NoLock);
/* Roll back any GUC changes */
AtEOXact_GUC(false, save_nestlevel);
/* Restore userid and security context */
SetUserIdAndSecContext(save_userid, save_sec_context);
ObjectAddressSet(address, RelationRelationId, matviewOid);
/*
* Save the rowcount so that pg_stat_statements can track the total number
* of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
* still don't display the rowcount in the command completion tag output,
* i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
* command tag is left false in cmdtaglist.h. Otherwise, the change of
* completion tag output might break applications using it.
*/
if (qc)
SetQueryCompletion(qc, CMDTAG_SELECT, processed);
return address;
}
/*
* refresh_immv_datafill
*
@ -210,6 +462,17 @@ refresh_immv_datafill(DestReceiver *dest, Query *query,
return processed;
}
/*
* Swap the physical files of the target and transient tables, then rebuild
* the target's indexes and throw away the transient table. Security context
* swapping is handled by the called function, so it is not needed here.
*/
static void
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
{
finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
RecentXmin, ReadNextMultiXactId(), relpersistence);
}
/*
* This should be used to test whether the backend is in a context where it is

View file

@ -4,6 +4,7 @@ CREATE SCHEMA __pg_ivm__;
CREATE TABLE __pg_ivm__.pg_ivm_immv(
immvrelid regclass NOT NULL,
ispopulated bool NOT NULL,
viewdef text NOT NULL,
CONSTRAINT pg_ivm_immv_pkey PRIMARY KEY (immvrelid)
@ -23,6 +24,12 @@ STRICT
AS 'MODULE_PATHNAME', 'create_immv'
LANGUAGE C;
CREATE FUNCTION refresh_immv(text, bool)
RETURNS bigint
STRICT
AS 'MODULE_PATHNAME', 'refresh_immv'
LANGUAGE C;
-- trigger functions
CREATE FUNCTION "IVM_immediate_before"()

View file

@ -44,6 +44,7 @@ static void parseNameAndColumns(const char *string, List **names, List **colName
/* SQL callable functions */
PG_FUNCTION_INFO_V1(create_immv);
PG_FUNCTION_INFO_V1(refresh_immv);
PG_FUNCTION_INFO_V1(IVM_prevent_immv_change);
/*
@ -227,6 +228,21 @@ IVM_prevent_immv_change(PG_FUNCTION_ARGS)
return PointerGetDatum(NULL);
}
/*
* User inerface for refreshing an IMMV
*/
Datum
refresh_immv(PG_FUNCTION_ARGS)
{
text *t_relname = PG_GETARG_TEXT_PP(0);
bool ispopulated = PG_GETARG_BOOL(1);
char *relname = text_to_cstring(t_relname);
QueryCompletion qc;
ExecRefreshImmv( relname, !(ispopulated), &qc);
PG_RETURN_INT64(qc.nprocessed);
}
/*
* Create triggers to prevent IMMV from being changed
@ -246,7 +262,6 @@ CreateChangePreventTrigger(Oid matviewOid)
refaddr.objectId = matviewOid;
refaddr.objectSubId = 0;
ivm_trigger = makeNode(CreateTrigStmt);
ivm_trigger->relation = NULL;
ivm_trigger->row = false;

View file

@ -19,10 +19,11 @@
#include "tcop/dest.h"
#include "utils/queryenvironment.h"
#define Natts_pg_ivm_immv 2
#define Natts_pg_ivm_immv 3
#define Anum_pg_ivm_immv_immvrelid 1
#define Anum_pg_ivm_immv_viewdef 2
#define Anum_pg_ivm_immv_ispopulated 2
#define Anum_pg_ivm_immv_viewdef 3
/* pg_ivm.c */
@ -41,6 +42,7 @@ extern Query *rewriteQueryForIMMV(Query *query, List *colNames);
/* matview.c */
extern ObjectAddress ExecRefreshImmv(const char *relname, bool skipData, QueryCompletion *qc);
extern bool ImmvIncrementalMaintenanceIsEnabled(void);
extern Datum IVM_immediate_before(PG_FUNCTION_ARGS);
extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);

View file

@ -15,3 +15,5 @@ SELECT immvrelid FROM pg_ivm_immv ORDER BY 1;
DROP TABLE mv2;
SELECT immvrelid FROM pg_ivm_immv ORDER BY 1;
DROP TABLE t;

31
sql/refresh_immv.sql Normal file
View file

@ -0,0 +1,31 @@
CREATE TABLE t (i int PRIMARY KEY);
INSERT INTO t SELECT generate_series(1, 5);
SELECT create_immv('mv', 'SELECT * FROM t');
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
-- refresh immv without changing the ispopulated flag
SELECT refresh_immv('mv', true);
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
INSERT INTO t VALUES(6);
SELECT i FROM mv ORDER BY 1;
-- change ispopulated to False
SELECT refresh_immv('mv', false);
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
SELECT i FROM mv ORDER BY 1;
-- immv remains empty
INSERT INTO t VALUES(7);
SELECT i FROM mv ORDER BY 1;
-- chaneg ispopulated to True, immv is updated
SELECT refresh_immv('mv', true);
SELECT immvrelid, ispopulated FROM pg_ivm_immv ORDER BY 1;
SELECT i FROM mv ORDER BY 1;
-- immediate maintenance
INSERT INTO t VALUES(8);
SELECT i FROM mv ORDER BY 1;