WIP: Allow partitioned tables in IMMVs

This isn't feature-complete yet, but opening as a place to start a con-
versation around how to fully implement this feature.

At the moment, the prohibition on partitioned tables is removed, and
hooks are added to handle ATTACH PARTITION and DETACH PARTITION.

To do:

  * Add check to prohibit multiple partitioned tables (causes crash)
  * Change ATTACH/DETACH to be incremental (presently does a REFRESH)
  * Figure out issue with multiple tables and correct it
This commit is contained in:
Jason Petersen 2024-09-26 10:22:45 -06:00
parent 85b11c359a
commit fe6a09e5f7
6 changed files with 147 additions and 12 deletions

View file

@ -897,10 +897,6 @@ check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view"))); errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partitioned table is not supported on incrementally maintainable materialized view")));
if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid)) if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid))
ereport(ERROR, ereport(ERROR,

View file

@ -116,3 +116,33 @@ ERROR: relation "mv_not_existing" does not exist
-- Try to refresh a normal table -- error -- Try to refresh a normal table -- error
SELECT refresh_immv('t', true); SELECT refresh_immv('t', true);
ERROR: "t" is not an IMMV ERROR: "t" is not an IMMV
-- Create partitioned table
CREATE TABLE foo (id integer) PARTITION BY RANGE(id);
CREATE TABLE foo_default PARTITION OF foo DEFAULT;
INSERT INTO foo VALUES (1), (2), (3);
SELECT create_immv('foo_mv', 'SELECT COUNT(*) as count FROM foo');
create_immv
-------------
1
(1 row)
SELECT count FROM foo_mv;
count
-------
3
(1 row)
ALTER TABLE foo DETACH PARTITION foo_default;
SELECT count FROM foo_mv;
count
-------
0
(1 row)
ALTER TABLE foo ATTACH PARTITION foo_default DEFAULT;
SELECT count FROM foo_mv;
count
-------
3
(1 row)

View file

@ -41,6 +41,7 @@
#include "rewrite/rewriteManip.h" #include "rewrite/rewriteManip.h"
#include "rewrite/rowsecurity.h" #include "rewrite/rowsecurity.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "tcop/deparse_utility.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -220,6 +221,7 @@ static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type)
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort); static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort);
/* SQL callable functions */ /* SQL callable functions */
PG_FUNCTION_INFO_V1(changes_partitions);
PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_before);
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
PG_FUNCTION_INFO_V1(ivm_visible_in_prestate); PG_FUNCTION_INFO_V1(ivm_visible_in_prestate);
@ -683,12 +685,59 @@ tuplestore_copy(Tuplestorestate *tuplestore, Relation rel)
*/ */
/* /*
* IVM_immediate_before * changes_partitions
* *
* IVM trigger function invoked before base table is modified. If this is * Accepts a parsed ALTER TABLE command (from pg_event_trigger_ddl_commands),
* invoked firstly in the same statement, we save the transaction id and the * returning whether that ALTER TABLE command contains subcommands related to
* command id at that time. * changing partitions (i.e. ATTACH or DETACH PARTITION).
*/ */
Datum
changes_partitions(PG_FUNCTION_ARGS)
{
CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0);
ListCell *subcmdCell = NULL;
CollectedATSubcmd *collAlterSubcmd = NULL;
AlterTableCmd *alterCmd = NULL;
Oid newPartRelid = InvalidOid;
Oid oldPartRelid = InvalidOid;
/* this function is intended for ALTER TABLE only */
if (cmd->type != SCT_AlterTable)
{
elog(ERROR, "command is not ALTER TABLE");
}
/* expect at least one sub-command */
subcmdCell = list_head(cmd->d.alterTable.subcmds);
if (subcmdCell == NULL)
{
elog(ERROR, "empty alter table subcommand list");
}
/*
* This saves the OIDs of the affected partitions, for later use
* in an incremental approach.
*/
collAlterSubcmd = lfirst(subcmdCell);
alterCmd = castNode(AlterTableCmd, collAlterSubcmd->parsetree);
if (alterCmd->subtype == AT_AttachPartition)
{
newPartRelid = collAlterSubcmd->address.objectId;
}
else if (alterCmd->subtype == AT_DetachPartition)
{
oldPartRelid = collAlterSubcmd->address.objectId;
}
else
{
return BoolGetDatum(false);
}
return BoolGetDatum(true);
}
Datum Datum
IVM_immediate_before(PG_FUNCTION_ARGS) IVM_immediate_before(PG_FUNCTION_ARGS)
{ {

View file

@ -17,8 +17,22 @@ SELECT pg_catalog.pg_extension_config_dump('pg_catalog.pg_ivm_immv', '');
-- functions -- functions
CREATE FUNCTION changes_partitions(pg_ddl_command)
RETURNS boolean
IMMUTABLE
STRICT
AS 'MODULE_PATHNAME', 'changes_partitions'
LANGUAGE C;
-- CREATE FUNCTION get_command_type(text, text, pg_ddl_command, text)
-- RETURNS void
-- IMMUTABLE
-- STRICT
-- AS 'MODULE_PATHNAME', 'get_command_type'
-- LANGUAGE C;
CREATE FUNCTION create_immv(text, text) CREATE FUNCTION create_immv(text, text)
RETURNS bigint RETURNS bigint
STRICT STRICT
AS 'MODULE_PATHNAME', 'create_immv' AS 'MODULE_PATHNAME', 'create_immv'
LANGUAGE C; LANGUAGE C;
@ -26,17 +40,17 @@ LANGUAGE C;
-- trigger functions -- trigger functions
CREATE FUNCTION "IVM_immediate_before"() CREATE FUNCTION "IVM_immediate_before"()
RETURNS trigger RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_before' AS 'MODULE_PATHNAME', 'IVM_immediate_before'
LANGUAGE C; LANGUAGE C;
CREATE FUNCTION "IVM_immediate_maintenance"() CREATE FUNCTION "IVM_immediate_maintenance"()
RETURNS trigger RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_immediate_maintenance' AS 'MODULE_PATHNAME', 'IVM_immediate_maintenance'
LANGUAGE C; LANGUAGE C;
CREATE FUNCTION "IVM_prevent_immv_change"() CREATE FUNCTION "IVM_prevent_immv_change"()
RETURNS trigger RETURNS trigger
AS 'MODULE_PATHNAME', 'IVM_prevent_immv_change' AS 'MODULE_PATHNAME', 'IVM_prevent_immv_change'
LANGUAGE C; LANGUAGE C;
@ -64,3 +78,33 @@ $$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER pg_ivm_sql_drop_trigger CREATE EVENT TRIGGER pg_ivm_sql_drop_trigger
ON sql_drop ON sql_drop
EXECUTE PROCEDURE pg_catalog.pg_ivm_sql_drop_trigger_func(); EXECUTE PROCEDURE pg_catalog.pg_ivm_sql_drop_trigger_func();
-- Process ALTER TABLE, specifically ATTACH/DETACH PARTITION
-- TODO: Get incremental update working
CREATE OR REPLACE FUNCTION ivm_immediate_event()
RETURNS event_trigger
LANGUAGE plpgsql
AS $function$
DECLARE
r record;
BEGIN
FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
LOOP
IF changes_partitions(r.command) THEN
PERFORM refresh_immv(
convert_from(
substring(tgargs FOR (position('\x00'::bytea in tgargs)-1)),
'SQL_ASCII'
)::oid::regclass::text,
true)
FROM pg_trigger
WHERE tgfoid='"IVM_immediate_maintenance"'::regproc AND tgtype=4;
END IF;
END LOOP;
END;
$function$;
-- Run on any ALTER TABLE
CREATE EVENT TRIGGER IVM_trigger_event
ON ddl_command_end WHEN TAG IN ('ALTER TABLE')
EXECUTE PROCEDURE ivm_immediate_event();

View file

@ -52,6 +52,7 @@ extern ObjectAddress RefreshImmvByOid(Oid matviewOid, bool skipData,
const char *queryString, QueryCompletion *qc); const char *queryString, QueryCompletion *qc);
extern bool ImmvIncrementalMaintenanceIsEnabled(void); extern bool ImmvIncrementalMaintenanceIsEnabled(void);
extern Query *get_immv_query(Relation matviewRel); extern Query *get_immv_query(Relation matviewRel);
extern Datum changes_partitions(PG_FUNCTION_ARGS);
extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); extern Datum IVM_immediate_before(PG_FUNCTION_ARGS);
extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);
extern Query* rewrite_query_for_exists_subquery(Query *query); extern Query* rewrite_query_for_exists_subquery(Query *query);

View file

@ -37,3 +37,18 @@ SELECT refresh_immv('mv_not_existing', true);
-- Try to refresh a normal table -- error -- Try to refresh a normal table -- error
SELECT refresh_immv('t', true); SELECT refresh_immv('t', true);
-- Create partitioned table
CREATE TABLE foo (id integer) PARTITION BY RANGE(id);
CREATE TABLE foo_default PARTITION OF foo DEFAULT;
INSERT INTO foo VALUES (1), (2), (3);
SELECT create_immv('foo_mv', 'SELECT COUNT(*) as count FROM foo');
SELECT count FROM foo_mv;
ALTER TABLE foo DETACH PARTITION foo_default;
SELECT count FROM foo_mv;
ALTER TABLE foo ATTACH PARTITION foo_default DEFAULT;
SELECT count FROM foo_mv;