This commit is contained in:
Adam Guo 2025-05-21 01:34:52 +00:00
parent c13a33d847
commit f219050b33
6 changed files with 64 additions and 148 deletions

View file

@ -19,7 +19,7 @@ DATA = pg_ivm--1.0.sql \
pg_ivm--1.3--1.4.sql pg_ivm--1.4--1.5.sql pg_ivm--1.5--1.6.sql \ pg_ivm--1.3--1.4.sql pg_ivm--1.4--1.5.sql pg_ivm--1.5--1.6.sql \
pg_ivm--1.6--1.7.sql pg_ivm--1.7--1.8.sql pg_ivm--1.8--1.9.sql \ pg_ivm--1.6--1.7.sql pg_ivm--1.7--1.8.sql pg_ivm--1.8--1.9.sql \
pg_ivm--1.9--1.10.sql \ pg_ivm--1.9--1.10.sql \
pg_ivm--1.10.sql pg_ivm--1.10--1.11.sql pg_ivm--1.10.sql pg_ivm--1.11.sql
REGRESS = pg_ivm create_immv refresh_immv REGRESS = pg_ivm create_immv refresh_immv

View file

@ -130,8 +130,10 @@ create_immv_internal(List *attrList, IntoClause *into)
* so we don't need more code to complain if "replace" is false.) * so we don't need more code to complain if "replace" is false.)
*/ */
immv_addr.address = DefineRelation(create, relkind, InvalidOid, NULL, NULL); immv_addr.address = DefineRelation(create, relkind, InvalidOid, NULL, NULL);
/* Generate the IMMV UUID. */ /*
// TODO: check for hash collision * Generate the IMMV UUID.
* TODO: check for hash collision
*/
immv_uuid = DatumGetUUIDP(DirectFunctionCall1(gen_random_uuid, (Datum) NULL)); immv_uuid = DatumGetUUIDP(DirectFunctionCall1(gen_random_uuid, (Datum) NULL));
memcpy(&immv_addr.immv_uuid, immv_uuid, sizeof(*immv_uuid)); memcpy(&immv_addr.immv_uuid, immv_uuid, sizeof(*immv_uuid));
pfree(immv_uuid); pfree(immv_uuid);
@ -1718,7 +1720,6 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList)
static void static void
StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery) StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery)
{ {
char *querytree = nodeToString((Node *) viewQuery);
char *querystring; char *querystring;
int save_nestlevel; int save_nestlevel;
Datum values[Natts_pg_ivm_immv]; Datum values[Natts_pg_ivm_immv];
@ -1745,10 +1746,9 @@ StoreImmvQuery(ImmvAddress immv_addr, Query *viewQuery)
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(immv_addr.address.objectId); values[Anum_pg_ivm_immv_immvrelid -1 ] = ObjectIdGetDatum(immv_addr.address.objectId);
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(false);
values[Anum_pg_ivm_immv_viewdef -1 ] = CStringGetTextDatum(querytree);
values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(querystring);
values[Anum_pg_ivm_immv_immvuuid -1 ] = UUIDPGetDatum(&immv_addr.immv_uuid); values[Anum_pg_ivm_immv_immvuuid -1 ] = UUIDPGetDatum(&immv_addr.immv_uuid);
values[Anum_pg_ivm_immv_querystring - 1] = CStringGetTextDatum(querystring);
values[Anum_pg_ivm_immv_ispopulated -1 ] = BoolGetDatum(false);
pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock); pgIvmImmv = table_open(PgIvmImmvRelationId(), RowExclusiveLock);

View file

@ -14,6 +14,22 @@
#define IMMV_INIT_QUERYHASHSIZE 16 #define IMMV_INIT_QUERYHASHSIZE 16
/*
* This file defines the DDL event triggers that maintain pgivm.pg_ivm_immv.
*
* Every time an ALTER RENAME statement is executed, the save_query_strings
* pre-event trigger will parse each IMMV's query string and store the Query
* nodes in a hash table. Then, the restore_query_strings post-event trigger
* will take each Query node and re-create the query string.
*
* This allows the querystring column to remain up to date when any objects are
* renamed.
*
* TODO: This uses a brute force approach to checking if re-parse is necessary.
* Even if an unrelated object is renamed, these event triggers still fire. Can
* figure out a more precise heuristic.
*/
typedef struct HashEntry typedef struct HashEntry
{ {
Oid immv_relid; Oid immv_relid;
@ -35,6 +51,9 @@ static void save_query_strings_internal(void);
PG_FUNCTION_INFO_V1(save_query_strings); PG_FUNCTION_INFO_V1(save_query_strings);
PG_FUNCTION_INFO_V1(restore_query_strings); PG_FUNCTION_INFO_V1(restore_query_strings);
/*
* Pre-DDL event trigger function.
*/
Datum Datum
save_query_strings(PG_FUNCTION_ARGS) save_query_strings(PG_FUNCTION_ARGS)
{ {
@ -55,6 +74,9 @@ save_query_strings(PG_FUNCTION_ARGS)
PG_RETURN_NULL(); PG_RETURN_NULL();
} }
/*
* Post-DDL event trigger function.
*/
Datum Datum
restore_query_strings(PG_FUNCTION_ARGS) restore_query_strings(PG_FUNCTION_ARGS)
{ {
@ -73,6 +95,9 @@ restore_query_strings(PG_FUNCTION_ARGS)
PG_RETURN_NULL(); PG_RETURN_NULL();
} }
/*
* Initialize the hash table.
*/
static void static void
initialize_query_cache(void) initialize_query_cache(void)
{ {
@ -85,6 +110,9 @@ initialize_query_cache(void)
&ctl, HASH_ELEM | HASH_BLOBS); &ctl, HASH_ELEM | HASH_BLOBS);
} }
/*
* Store a query node in the hash table.
*/
static void static void
hash_query(Oid immv_relid, char *query) hash_query(Oid immv_relid, char *query)
{ {
@ -103,6 +131,9 @@ hash_query(Oid immv_relid, char *query)
MemoryContextSwitchTo(old_cxt); MemoryContextSwitchTo(old_cxt);
} }
/*
* Retrieve a query node from the hash table.
*/
static char * static char *
retrieve_query(Oid immv_relid) retrieve_query(Oid immv_relid)
{ {
@ -125,6 +156,10 @@ retrieve_query(Oid immv_relid)
return query; return query;
} }
/*
* Iterate through the pg_ivm_immv table and parse the querystring of each
* entry. Store the query nodes in the hash table.
*/
static void static void
save_query_strings_internal(void) save_query_strings_internal(void)
{ {
@ -162,6 +197,7 @@ save_query_strings_internal(void)
/* Parse the existing IMMV query pre-DDL. Add it to the list. */ /* Parse the existing IMMV query pre-DDL. Add it to the list. */
parse_immv_query(matview_relname, query_string, &query, &parse_state); parse_immv_query(matview_relname, query_string, &query, &parse_state);
query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery;
/* Store entry for this IMMV. */ /* Store entry for this IMMV. */
hash_query(matview_oid, nodeToString(query)); hash_query(matview_oid, nodeToString(query));
@ -171,6 +207,11 @@ save_query_strings_internal(void)
table_close(pg_ivm_immv_rel, NoLock); table_close(pg_ivm_immv_rel, NoLock);
} }
/*
* Iterate through the pg_ivm_immv table. For each entry, get its Query node
* from the hash table and reconstruct the query string. Update the row entry
* with the reconstructed query string.
*/
static void static void
restore_query_strings_internal(void) restore_query_strings_internal(void)
{ {
@ -211,7 +252,6 @@ restore_query_strings_internal(void)
serialized_query = retrieve_query(matview_oid); serialized_query = retrieve_query(matview_oid);
query = stringToNode(serialized_query); query = stringToNode(serialized_query);
query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery;
matview_rel = table_open(matview_oid, AccessShareLock); matview_rel = table_open(matview_oid, AccessShareLock);
new_query_string = pg_ivm_get_viewdef_internal(query, matview_rel, true); new_query_string = pg_ivm_get_viewdef_internal(query, matview_rel, true);

View file

@ -234,8 +234,6 @@ static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort,
static void setLastUpdateXid(Oid immv_oid, FullTransactionId xid); static void setLastUpdateXid(Oid immv_oid, FullTransactionId xid);
static FullTransactionId getLastUpdateXid(Oid immv_oid); static FullTransactionId getLastUpdateXid(Oid immv_oid);
static Query *update_immv_viewdef(Relation matviewRel);
/* SQL callable functions */ /* SQL callable functions */
PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_before);
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
@ -397,18 +395,7 @@ RefreshImmvByOid(ImmvAddress immv_addr, bool is_create, bool skipData,
systable_endscan(scan); systable_endscan(scan);
table_close(pgIvmImmv, NoLock); table_close(pgIvmImmv, NoLock);
/* viewQuery = get_immv_query(matviewRel);
* Recreate the Query tree from the query string to account for changing
* base table OIDs (e.g. after dump/restore) or changing format of the Query
* node (after pg_upgrade).
*
* No need to create the Query tree a second time if we are creating a new
* IMMV.
*/
if (is_create)
viewQuery = get_immv_query(matviewRel);
else
viewQuery = update_immv_viewdef(matviewRel);
/* For IMMV, we need to rewrite matview query */ /* For IMMV, we need to rewrite matview query */
if (!skipData) if (!skipData)
@ -707,6 +694,9 @@ get_immv_query(Relation matviewRel)
bool isnull; bool isnull;
Datum datum; Datum datum;
Query *query; Query *query;
char *querystring;
char *relname;
ParseState *parse_state;
ScanKeyInit(&key, ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid, Anum_pg_ivm_immv_immvrelid,
@ -724,91 +714,16 @@ get_immv_query(Relation matviewRel)
return NULL; return NULL;
} }
datum = heap_getattr(tup, Anum_pg_ivm_immv_viewdef, tupdesc, &isnull);
Assert(!isnull);
query = (Query *) stringToNode(TextDatumGetCString(datum));
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return query;
}
/*
* update_immv_viewdef
*
* Read the query string for this IMMV from pg_ivm_immv. Parse the query string
* and update the viewdef column with the new Query tree. Return the Query
* tree.
*/
static Query *
update_immv_viewdef(Relation matviewRel)
{
CreateTableAsStmt *stmt;
Datum datum;
Datum values[Natts_pg_ivm_immv];
HeapTuple newtup = NULL;
HeapTuple tup;
IntoClause *into;
ParseState *pstate = NULL;
Query *query = NULL;
Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock);
ScanKeyData key;
SysScanDesc scan;
TupleDesc tupdesc = RelationGetDescr(pgIvmImmv);
bool isnull;
bool nulls[Natts_pg_ivm_immv];
bool replaces[Natts_pg_ivm_immv];
const char *querystring;
const char *querytree;
const char *relname;
/* Scan pg_ivm_immv for the given IMMV entry. */
ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(matviewRel)));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(),
true, NULL, 1, &key);
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
{
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
return NULL;
}
/* Read the query string column. */
datum = heap_getattr(tup, Anum_pg_ivm_immv_querystring, tupdesc, &isnull); datum = heap_getattr(tup, Anum_pg_ivm_immv_querystring, tupdesc, &isnull);
Assert(!isnull); Assert(!isnull);
querystring = TextDatumGetCString(datum); querystring = TextDatumGetCString(datum);
/* Parse the query string using the same logic as create_immv. */
relname = psprintf("%s.%s", relname = psprintf("%s.%s",
get_namespace_name(get_rel_namespace(matviewRel->rd_id)), get_namespace_name(get_rel_namespace(matviewRel->rd_id)),
get_rel_name(matviewRel->rd_id)); get_rel_name(matviewRel->rd_id));
parse_immv_query(relname, querystring, &query, &pstate); parse_immv_query(relname, querystring, &query, &parse_state);
stmt = castNode(CreateTableAsStmt, query->utilityStmt); query = (Query *) ((CreateTableAsStmt *) query->utilityStmt)->into->viewQuery;
into = stmt->into;
query = castNode(Query, stmt->query);
query = rewriteQueryForIMMV(query, into->colNames);
querytree = nodeToString((Node *) query);
/* Update the pg_ivm_immv tuple with the new query tree. */
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
memset(replaces, 0, sizeof(replaces));
values[Anum_pg_ivm_immv_viewdef - 1] = CStringGetTextDatum(querytree);
replaces[Anum_pg_ivm_immv_viewdef - 1] = true;
newtup = heap_modify_tuple(tup, tupdesc, values, nulls, replaces);
CatalogTupleUpdate(pgIvmImmv, &newtup->t_self, newtup);
heap_freetuple(newtup);
systable_endscan(scan); systable_endscan(scan);
table_close(pgIvmImmv, NoLock); table_close(pgIvmImmv, NoLock);

View file

@ -1,43 +0,0 @@
ALTER TABLE pgivm.pg_ivm_immv ADD COLUMN querystring text;
ALTER TABLE pgivm.pg_ivm_immv ADD COLUMN immvuuid uuid;
UPDATE pgivm.pg_ivm_immv SET querystring = pgivm.get_immv_def(immvrelid);
UPDATE pgivm.pg_ivm_immv SET immvuuid = gen_random_uuid();
ALTER TABLE pgivm.pg_ivm_immv ADD CONSTRAINT pg_ivm_immv_uuid UNIQUE (immvuuid);
ALTER TABLE pgivm.pg_ivm_immv ALTER COLUMN querystring SET NOT NULL;
ALTER TABLE pgivm.pg_ivm_immv ALTER COLUMN immvuuid SET NOT NULL;
ALTER TABLE pgivm.pg_ivm_immv DROP COLUMN viewdef;
CREATE FUNCTION pgivm.recreate_all_immvs() RETURNS VOID LANGUAGE PLPGSQL AS
$$
BEGIN
PERFORM pgivm.refresh_immv(n.nspname || '.' || c.relname, false)
FROM pgivm.pg_ivm_immv as ivm
JOIN pg_catalog.pg_class as c
ON c.oid = ivm.immvrelid
JOIN pg_catalog.pg_namespace as n
ON c.relnamespace = n.oid;
PERFORM pgivm.refresh_immv(n.nspname || '.' || c.relname, true)
FROM pgivm.pg_ivm_immv as ivm
JOIN pg_catalog.pg_class as c
ON c.oid = ivm.immvrelid
JOIN pg_catalog.pg_namespace as n
ON c.relnamespace = n.oid;
END
$$;
CREATE FUNCTION pgivm.save_query_strings() RETURNS event_trigger
AS 'MODULE_PATHNAME', 'save_query_strings' LANGUAGE C;
CREATE FUNCTION pgivm.restore_query_strings() RETURNS event_trigger
AS 'MODULE_PATHNAME', 'restore_query_strings' LANGUAGE C;
CREATE EVENT TRIGGER save_query_strings
ON ddl_command_start
EXECUTE FUNCTION pgivm.save_query_strings();
CREATE EVENT TRIGGER restore_query_strings
ON ddl_command_end
EXECUTE FUNCTION pgivm.restore_query_strings();

View file

@ -21,15 +21,19 @@
#include "utils/queryenvironment.h" #include "utils/queryenvironment.h"
#include "utils/uuid.h" #include "utils/uuid.h"
#define Natts_pg_ivm_immv 6 #define Natts_pg_ivm_immv 5
#define Anum_pg_ivm_immv_immvrelid 1 #define Anum_pg_ivm_immv_immvrelid 1
#define Anum_pg_ivm_immv_viewdef 2 #define Anum_pg_ivm_immv_immvuuid 2
#define Anum_pg_ivm_immv_ispopulated 3 #define Anum_pg_ivm_immv_querystring 3
#define Anum_pg_ivm_immv_lastivmupdate 4 #define Anum_pg_ivm_immv_ispopulated 4
#define Anum_pg_ivm_immv_querystring 5 #define Anum_pg_ivm_immv_lastivmupdate 5
#define Anum_pg_ivm_immv_immvuuid 6
/*
* This struct uniquely identifies in IMMV. It consists of an ObjectAddress
* (OID of the IMMV table and its schema) as well as a pg_uuid_t (which is
* stored in the pg_ivm_immv table).
*/
typedef struct ImmvAddress typedef struct ImmvAddress
{ {
ObjectAddress address; ObjectAddress address;