This commit is contained in:
Jason Petersen 2025-01-11 16:02:45 +01:00 committed by GitHub
commit 1385af4e21
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 147 additions and 12 deletions

View file

@ -896,10 +896,6 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partitioned table is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid))
ereport(ERROR,

View file

@ -116,3 +116,33 @@ ERROR: relation "mv_not_existing" does not exist
-- Try to refresh a normal table -- error
SELECT refresh_immv('t', true);
ERROR: "t" is not an IMMV
-- Create partitioned table
CREATE TABLE foo (id integer) PARTITION BY RANGE(id);
CREATE TABLE foo_default PARTITION OF foo DEFAULT;
INSERT INTO foo VALUES (1), (2), (3);
SELECT create_immv('foo_mv', 'SELECT COUNT(*) as count FROM foo');
create_immv
-------------
1
(1 row)
SELECT count FROM foo_mv;
count
-------
3
(1 row)
ALTER TABLE foo DETACH PARTITION foo_default;
SELECT count FROM foo_mv;
count
-------
0
(1 row)
ALTER TABLE foo ATTACH PARTITION foo_default DEFAULT;
SELECT count FROM foo_mv;
count
-------
3
(1 row)

View file

@ -41,6 +41,7 @@
#include "rewrite/rewriteManip.h"
#include "rewrite/rowsecurity.h"
#include "storage/lmgr.h"
#include "tcop/deparse_utility.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -220,6 +221,7 @@ static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type)
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort);
/* SQL callable functions */
PG_FUNCTION_INFO_V1(changes_partitions);
PG_FUNCTION_INFO_V1(IVM_immediate_before);
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
PG_FUNCTION_INFO_V1(ivm_visible_in_prestate);
@ -683,12 +685,59 @@ tuplestore_copy(Tuplestorestate *tuplestore, Relation rel)
*/
/*
* IVM_immediate_before
* changes_partitions
*
* IVM trigger function invoked before base table is modified. If this is
* invoked firstly in the same statement, we save the transaction id and the
* command id at that time.
* Accepts a parsed ALTER TABLE command (from pg_event_trigger_ddl_commands),
* returning whether that ALTER TABLE command contains subcommands related to
* changing partitions (i.e. ATTACH or DETACH PARTITION).
*/
Datum
changes_partitions(PG_FUNCTION_ARGS)
{
CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0);
ListCell *subcmdCell = NULL;
CollectedATSubcmd *collAlterSubcmd = NULL;
AlterTableCmd *alterCmd = NULL;
Oid newPartRelid = InvalidOid;
Oid oldPartRelid = InvalidOid;
/* this function is intended for ALTER TABLE only */
if (cmd->type != SCT_AlterTable)
{
elog(ERROR, "command is not ALTER TABLE");
}
/* expect at least one sub-command */
subcmdCell = list_head(cmd->d.alterTable.subcmds);
if (subcmdCell == NULL)
{
elog(ERROR, "empty alter table subcommand list");
}
/*
* This saves the OIDs of the affected partitions, for later use
* in an incremental approach.
*/
collAlterSubcmd = lfirst(subcmdCell);
alterCmd = castNode(AlterTableCmd, collAlterSubcmd->parsetree);
if (alterCmd->subtype == AT_AttachPartition)
{
newPartRelid = collAlterSubcmd->address.objectId;
}
else if (alterCmd->subtype == AT_DetachPartition)
{
oldPartRelid = collAlterSubcmd->address.objectId;
}
else
{
return BoolGetDatum(false);
}
return BoolGetDatum(true);
}
Datum
IVM_immediate_before(PG_FUNCTION_ARGS)
{

View file

@ -17,8 +17,22 @@ SELECT pg_catalog.pg_extension_config_dump('pg_catalog.pg_ivm_immv', '');
-- functions
CREATE FUNCTION changes_partitions(pg_ddl_command)
RETURNS boolean
IMMUTABLE
STRICT
AS 'MODULE_PATHNAME', 'changes_partitions'
LANGUAGE C;
-- CREATE FUNCTION get_command_type(text, text, pg_ddl_command, text)
-- RETURNS void
-- IMMUTABLE
-- STRICT
-- AS 'MODULE_PATHNAME', 'get_command_type'
-- LANGUAGE C;
CREATE FUNCTION create_immv(text, text)
RETURNS bigint
RETURNS bigint
STRICT
AS 'MODULE_PATHNAME', 'create_immv'
LANGUAGE C;
@ -26,17 +40,17 @@ LANGUAGE C;
-- trigger functions
CREATE FUNCTION "IVM_immediate_before"()
RETURNS trigger
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_before'
LANGUAGE C;
CREATE FUNCTION "IVM_immediate_maintenance"()
RETURNS trigger
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_maintenance'
LANGUAGE C;
CREATE FUNCTION "IVM_prevent_immv_change"()
RETURNS trigger
RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_prevent_immv_change'
LANGUAGE C;
@ -64,3 +78,33 @@ $$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER pg_ivm_sql_drop_trigger
ON sql_drop
EXECUTE PROCEDURE pg_catalog.pg_ivm_sql_drop_trigger_func();
-- Process ALTER TABLE, specifically ATTACH/DETACH PARTITION
-- TODO: Get incremental update working
CREATE OR REPLACE FUNCTION ivm_immediate_event()
RETURNS event_trigger
LANGUAGE plpgsql
AS $function$
DECLARE
r record;
BEGIN
FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
LOOP
IF changes_partitions(r.command) THEN
PERFORM refresh_immv(
convert_from(
substring(tgargs FOR (position('\x00'::bytea in tgargs)-1)),
'SQL_ASCII'
)::oid::regclass::text,
true)
FROM pg_trigger
WHERE tgfoid='"IVM_immediate_maintenance"'::regproc AND tgtype=4;
END IF;
END LOOP;
END;
$function$;
-- Run on any ALTER TABLE
CREATE EVENT TRIGGER IVM_trigger_event
ON ddl_command_end WHEN TAG IN ('ALTER TABLE')
EXECUTE PROCEDURE ivm_immediate_event();

View file

@ -51,6 +51,7 @@ extern ObjectAddress RefreshImmvByOid(Oid matviewOid, bool skipData,
const char *queryString, QueryCompletion *qc);
extern bool ImmvIncrementalMaintenanceIsEnabled(void);
extern Query *get_immv_query(Relation matviewRel);
extern Datum changes_partitions(PG_FUNCTION_ARGS);
extern Datum IVM_immediate_before(PG_FUNCTION_ARGS);
extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);
extern Query* rewrite_query_for_exists_subquery(Query *query);

View file

@ -37,3 +37,18 @@ SELECT refresh_immv('mv_not_existing', true);
-- Try to refresh a normal table -- error
SELECT refresh_immv('t', true);
-- Create partitioned table
CREATE TABLE foo (id integer) PARTITION BY RANGE(id);
CREATE TABLE foo_default PARTITION OF foo DEFAULT;
INSERT INTO foo VALUES (1), (2), (3);
SELECT create_immv('foo_mv', 'SELECT COUNT(*) as count FROM foo');
SELECT count FROM foo_mv;
ALTER TABLE foo DETACH PARTITION foo_default;
SELECT count FROM foo_mv;
ALTER TABLE foo ATTACH PARTITION foo_default DEFAULT;
SELECT count FROM foo_mv;