This commit is contained in:
Adam Guo 2025-05-19 17:03:59 -04:00 committed by GitHub
commit 1e9024a3b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 223 additions and 11 deletions

View file

@ -18,7 +18,7 @@ DATA = pg_ivm--1.0.sql \
pg_ivm--1.3--1.4.sql pg_ivm--1.4--1.5.sql pg_ivm--1.5--1.6.sql \
pg_ivm--1.6--1.7.sql pg_ivm--1.7--1.8.sql pg_ivm--1.8--1.9.sql \
pg_ivm--1.9--1.10.sql \
pg_ivm--1.10.sql
pg_ivm--1.10.sql pg_ivm--1.10--1.11.sql
REGRESS = pg_ivm create_immv refresh_immv

View file

@ -41,6 +41,7 @@
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h"
@ -1710,19 +1711,35 @@ static void
StoreImmvQuery(Oid viewOid, Query *viewQuery)
{
char *querytree = nodeToString((Node *) viewQuery);
char *querystring;
int save_nestlevel;
Datum values[Natts_pg_ivm_immv];
bool isNulls[Natts_pg_ivm_immv];
Relation matviewRel;
Relation pgIvmImmv;
TupleDesc tupleDescriptor;
HeapTuple heapTuple;
ObjectAddress address;
/*
* Restrict search_path so that pg_ivm_get_viewdef_internal returns a
* fully-qualified query.
*/
save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();
matviewRel = table_open(viewOid, AccessShareLock);
querystring = pg_ivm_get_viewdef_internal(viewQuery, matviewRel, true);
table_close(matviewRel, NoLock);
/* Roll back the search_path change. */
AtEOXact_GUC(false, save_nestlevel);
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(viewOid);
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(false);
values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree);
values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(querystring);
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);

100
matview.c
View file

@ -233,6 +233,7 @@ static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort,
static void setLastUpdateXid(Oid immv_oid, FullTransactionId xid);
static FullTransactionId getLastUpdateXid(Oid immv_oid);
static Query *update_immv_viewdef(Relation matviewRel);
/* SQL callable functions */
PG_FUNCTION_INFO_V1(IVM_immediate_before);
@ -388,7 +389,18 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
/*
* Recreate the Query tree from the query string to account for changing
* base table OIDs (e.g. after dump/restore) or changing format of the Query
* node (after pg_upgrade).
*
* No need to create the Query tree a second time if we are creating a new
* IMMV.
*/
if (is_create)
viewQuery = get_immv_query(matviewRel);
else
viewQuery = update_immv_viewdef(matviewRel);
/* For IMMV, we need to rewrite matview query */
if (!skipData)
@ -443,7 +455,8 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
tgform = (Form_pg_trigger) GETSTRUCT(tgtup);
/* If trigger is created by IMMV, delete it. */
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0)
if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0 ||
strncmp(NameStr(tgform->tgname), "IVM_prevent_", 12) == 0)
{
obj.classId = foundDep->classid;
obj.objectId = foundDep->objid;
@ -473,7 +486,10 @@ RefreshImmvByOid(Oid matviewOid, bool is_create, bool skipData,
* is created.
*/
if (!skipData && !oldPopulated)
{
CreateIvmTriggersOnBaseTables(dataQuery, matviewOid);
CreateChangePreventTrigger(matviewOid);
}
/*
* Create the transient table that will receive the regenerated data. Lock
@ -713,6 +729,88 @@ get_immv_query(Relation matviewRel)
return query;
}
/*
* update_immv_viewdef
*
* Read the query string for this IMMV from pg_ivm_immv. Parse the query string
* and update the viewdef column with the new Query tree. Return the Query
* tree.
*/
static Query *
update_immv_viewdef(Relation matviewRel)
{
CreateTableAsStmt *stmt;
Datum datum;
Datum values[Natts_pg_ivm_immv];
HeapTuple newtup = NULL;
HeapTuple tup;
IntoClause *into;
ParseState *pstate = NULL;
Query *query = NULL;
Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock);
ScanKeyData key;
SysScanDesc scan;
TupleDesc tupdesc = RelationGetDescr(pgIvmImmv);
bool isnull;
bool nulls[Natts_pg_ivm_immv];
bool replaces[Natts_pg_ivm_immv];
const char *querystring;
const char *querytree;
const char *relname;
/* Scan pg_ivm_immv for the given IMMV entry. */
ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(matviewRel)));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(),
true, NULL, 1, &key);
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
{
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return NULL;
}
/* Read the query string column. */
datum = heap_getattr(tup, Anum_pg_ivm_immv_querystring, tupdesc, &isnull);
Assert(!isnull);
querystring = TextDatumGetCString(datum);
/* Parse the query string using the same logic as create_immv. */
relname = psprintf("%s.%s",
get_namespace_name(get_rel_namespace(matviewRel->rd_id)),
get_rel_name(matviewRel->rd_id));
parse_immv_query(relname, querystring, &query, &pstate);
stmt = castNode(CreateTableAsStmt, query->utilityStmt);
into = stmt->into;
query = castNode(Query, stmt->query);
query = rewriteQueryForIMMV(query, into->colNames);
querytree = nodeToString((Node *) query);
/* Update the pg_ivm_immv tuple with the new query tree. */
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
memset(replaces, 0, sizeof(replaces));
values[Anum_pg_ivm_immv_viewdef - 1] = CStringGetTextDatum(querytree);
replaces[Anum_pg_ivm_immv_viewdef - 1] = true;
newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces);
CatalogTupleUpdate(pgIvmImmv, &newtup->t_self, newtup);
heap_freetuple(newtup);
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return query;
}
static Tuplestorestate *
tuplestore_copy(Tuplestorestate *tuplestore, Relation rel)
{

52
pg_ivm--1.10--1.11.sql Normal file
View file

@ -0,0 +1,52 @@
ALTER TABLE pgivm.pg_ivm_immv ADD COLUMN querystring text NOT NULL;
CREATE FUNCTION pgivm.recreate_all_immvs() RETURNS VOID LANGUAGE PLPGSQL AS
$$
BEGIN
PERFORM pgivm.refresh_immv(n.nspname || '.' || c.relname, false)
FROM pgivm.pg_ivm_immv as ivm
JOIN pg_catalog.pg_class as c
ON c.oid = ivm.immvrelid
JOIN pg_catalog.pg_namespace as n
ON c.relnamespace = n.oid;
PERFORM pgivm.refresh_immv(n.nspname || '.' || c.relname, true)
FROM pgivm.pg_ivm_immv as ivm
JOIN pg_catalog.pg_class as c
ON c.oid = ivm.immvrelid
JOIN pg_catalog.pg_namespace as n
ON c.relnamespace = n.oid;
END
$$;
CREATE FUNCTION pgivm.refresh_query_strings()
RETURNS event_trigger LANGUAGE plpgsql SECURITY DEFINER AS
$$
DECLARE
old_search_path text;
BEGIN
-- Only need to refresh query strings if an object is renamed.
-- As a rough heuristic, check if this is an ALTER command.
IF tg_tag LIKE 'ALTER %' THEN
-- Empty search path so that get_immv_def returns a fully-qualified query.
SELECT setting INTO old_search_path FROM pg_catalog.pg_settings
WHERE name = 'search_path';
SET search_path = '';
UPDATE pgivm.pg_ivm_immv SET querystring = pgivm.get_immv_def(immvrelid);
-- Reset search path to the original value.
IF old_search_path != '' AND old_search_path != '""' THEN
EXECUTE format('SET search_path = %s', old_search_path);
END IF;
END IF;
EXCEPTION
WHEN internal_error THEN
RAISE WARNING 'pg_ivm could not refresh the pg_ivm_immv query strings.'
USING HINT = 'Please recreate your IMMVs using pgivm.recreate_all_immvs().';
END
$$;
CREATE EVENT TRIGGER refresh_query_strings
ON ddl_command_end
EXECUTE FUNCTION pgivm.refresh_query_strings();

View file

@ -177,17 +177,39 @@ create_immv(PG_FUNCTION_ARGS)
text *t_sql = PG_GETARG_TEXT_PP(1);
char *relname = text_to_cstring(t_relname);
char *sql = text_to_cstring(t_sql);
Query *query = NULL;
QueryCompletion qc;
ParseState *pstate = NULL;
parse_immv_query(relname, sql, &query, &pstate);
ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc);
PG_RETURN_INT64(qc.nprocessed);
}
/*
* parse_immv_query
*
* Parse an IMMV definition query and return the Query tree and ParseState using
* the supplied pointers.
*/
void
parse_immv_query(const char *relname, const char *sql, Query **query_ret,
ParseState **pstate_ret)
{
List *parsetree_list;
RawStmt *parsetree;
Query *query;
QueryCompletion qc;
List *names = NIL;
List *colNames = NIL;
ParseState *pstate = make_parsestate(NULL);
CreateTableAsStmt *ctas;
StringInfoData command_buf;
Query *query;
ParseState *pstate;
pstate = make_parsestate(NULL);
parseNameAndColumns(relname, &names, &colNames);
initStringInfo(&command_buf);
@ -230,9 +252,8 @@ create_immv(PG_FUNCTION_ARGS)
query = transformStmt(pstate, (Node *)ctas);
Assert(query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt));
ExecCreateImmv(pstate, (CreateTableAsStmt *) query->utilityStmt, &qc);
PG_RETURN_INT64(qc.nprocessed);
*query_ret = query;
*pstate_ret = pstate;
}
/*
@ -458,3 +479,12 @@ PgIvmFuncName(char *name)
{
return list_make2(makeString("pgivm"), makeString(name));
}
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000)
void
RestrictSearchPath(void)
{
set_config_option("search_path", "pg_catalog, pg_temp", PGC_USERSET,
PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false);
}
#endif

View file

@ -1,6 +1,6 @@
# incremental view maintenance extension_
comment = 'incremental view maintenance on PostgreSQL'
default_version = '1.10'
default_version = '1.11'
module_pathname = '$libdir/pg_ivm'
relocatable = false
schema = pg_catalog

View file

@ -20,12 +20,13 @@
#include "tcop/dest.h"
#include "utils/queryenvironment.h"
#define Natts_pg_ivm_immv 4
#define Natts_pg_ivm_immv 5
#define Anum_pg_ivm_immv_immvrelid 1
#define Anum_pg_ivm_immv_viewdef 2
#define Anum_pg_ivm_immv_ispopulated 3
#define Anum_pg_ivm_immv_lastivmupdate 4
#define Anum_pg_ivm_immv_querystring 5
/* pg_ivm.c */
@ -34,6 +35,8 @@ extern Oid PgIvmImmvRelationId(void);
extern Oid PgIvmImmvPrimaryKeyIndexId(void);
extern bool isImmv(Oid immv_oid);
extern List *PgIvmFuncName(char *name);
extern void parse_immv_query(const char *relname, const char *sql,
Query **query_ret, ParseState **pstate_ret);
/* createas.c */
@ -64,8 +67,13 @@ extern bool isIvmName(const char *s);
/* ruleutils.c */
extern char *pg_ivm_get_viewdef(Relation immvrel, bool pretty);
extern char *pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty);
/* subselect.c */
extern void inline_cte(PlannerInfo *root, CommonTableExpr *cte);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM < 170000)
extern void RestrictSearchPath(void);
#endif
#endif

View file

@ -43,6 +43,13 @@ char *
pg_ivm_get_viewdef(Relation immvrel, bool pretty)
{
Query *query = get_immv_query(immvrel);
return pg_ivm_get_viewdef_internal(query, immvrel, pretty);
}
char *
pg_ivm_get_viewdef_internal(Query *query, Relation immvrel, bool pretty)
{
TupleDesc resultDesc = RelationGetDescr(immvrel);
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 150000)