pg_ivm/pg_ivm.c
Yugo Nagata e31dc21eaa
Use object_access_hook to drop an IMMV entry from pg_ivm_immv (#29)
Previously, we used an event trigger and executed DELETE command,
but it caused a privilege error when non-superuser dropped a table
even if it is irrelevant to IMMV.

To fix it, we now use object_access_hook and an entry is deleted
via CatalogTupleDelete which doesn't need the superuser privilege.

Issue #25
2022-09-30 23:49:51 +09:00

398 lines
9.7 KiB
C

/*-------------------------------------------------------------------------
*
* pg_ivm.c
* incremental view maintenance extension
* Routines for user interfaces and callback functions
*
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
* Portions Copyright (c) 2022, IVM Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_namespace_d.h"
#include "catalog/pg_trigger_d.h"
#include "commands/trigger.h"
#include "parser/analyze.h"
#include "parser/parser.h"
#include "parser/scansup.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "pg_ivm.h"
PG_MODULE_MAGIC;
static Oid pg_ivm_immv_id = InvalidOid;
static Oid pg_ivm_immv_pkey_id = InvalidOid;
static object_access_hook_type PrevObjectAccessHook = NULL;
void _PG_init(void);
static void IvmXactCallback(XactEvent event, void *arg);
static void IvmSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
static void parseNameAndColumns(const char *string, List **names, List **colNames);
static void PgIvmObjectAccessHook(ObjectAccessType access, Oid classId,
Oid objectId, int subId, void *arg);
/* SQL callable functions */
PG_FUNCTION_INFO_V1(create_immv);
PG_FUNCTION_INFO_V1(refresh_immv);
PG_FUNCTION_INFO_V1(IVM_prevent_immv_change);
PG_FUNCTION_INFO_V1(get_immv_def);
/*
* Call back functions for cleaning up
*/
static void
IvmXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_ABORT)
AtAbort_IVM();
}
static void
IvmSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
if (event == SUBXACT_EVENT_ABORT_SUB)
AtAbort_IVM();
}
/*
* Module load callback
*/
void
_PG_init(void)
{
RegisterXactCallback(IvmXactCallback, NULL);
RegisterSubXactCallback(IvmSubXactCallback, NULL);
PrevObjectAccessHook = object_access_hook;
object_access_hook = PgIvmObjectAccessHook;
}
/*
* Given a C string, parse it into a qualified relation name
* followed by a optional parenthesized list of column names.
*/
static void
parseNameAndColumns(const char *string, List **names, List **colNames)
{
char *rawname;
char *ptr;
char *ptr2;
bool in_quote;
bool has_colnames = false;
List *cols;
ListCell *lc;
/* We need a modifiable copy of the input string. */
rawname = pstrdup(string);
/* Scan to find the expected left paren; mustn't be quoted */
in_quote = false;
for (ptr = rawname; *ptr; ptr++)
{
if (*ptr == '"')
in_quote = !in_quote;
else if (*ptr == '(' && !in_quote)
{
has_colnames = true;
break;
}
}
/* Separate the name and parse it into a list */
*ptr++ = '\0';
*names = stringToQualifiedNameList(rawname);
if (!has_colnames)
goto end;
/* Check for the trailing right parenthesis and remove it */
ptr2 = ptr + strlen(ptr);
while (--ptr2 > ptr)
{
if (!scanner_isspace(*ptr2))
break;
}
if (*ptr2 != ')')
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("expected a right parenthesis")));
*ptr2 = '\0';
if (!SplitIdentifierString(ptr, ',', &cols))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid name syntax")));
if (list_length(cols) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("must specify at least one column name")));
foreach(lc, cols)
{
char *colname = lfirst(lc);
*colNames = lappend(*colNames, makeString(pstrdup(colname)));
}
end:
pfree(rawname);
}
/*
* User interface for creating an IMMV
*/
Datum
create_immv(PG_FUNCTION_ARGS)
{
text *t_relname = PG_GETARG_TEXT_PP(0);
text *t_sql = PG_GETARG_TEXT_PP(1);
char *relname = text_to_cstring(t_relname);
char *sql = text_to_cstring(t_sql);
List *parsetree_list;
RawStmt *parsetree;
Query *query;
QueryCompletion qc;
List *names = NIL;
List *colNames = NIL;
ParseState *pstate = make_parsestate(NULL);
CreateTableAsStmt *ctas;
StringInfoData command_buf;
parseNameAndColumns(relname, &names, &colNames);
initStringInfo(&command_buf);
appendStringInfo(&command_buf, "SELECT create_immv('%s' AS '%s');", relname, sql);
appendStringInfo(&command_buf, "%s;", sql);
pstate->p_sourcetext = command_buf.data;
parsetree_list = pg_parse_query(sql);
/* XXX: should we check t_sql before command_buf? */
if (list_length(parsetree_list) != 1)
elog(ERROR, "invalid view definition");
parsetree = linitial_node(RawStmt, parsetree_list);
ctas = makeNode(CreateTableAsStmt);
ctas->query = parsetree->stmt;
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 140000)
ctas->objtype = OBJECT_MATVIEW;
#else
ctas->relkind = OBJECT_MATVIEW;
#endif
ctas->is_select_into = false;
ctas->into = makeNode(IntoClause);
ctas->into->rel = makeRangeVarFromNameList(names);
ctas->into->colNames = colNames;
ctas->into->accessMethod = NULL;
ctas->into->options = NIL;
ctas->into->onCommit = ONCOMMIT_NOOP;
ctas->into->tableSpaceName = NULL;
ctas->into->viewQuery = parsetree->stmt;
ctas->into->skipData = false;
query = transformStmt(pstate, (Node *)ctas);
Assert(query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt));
ExecCreateImmv(pstate, (CreateTableAsStmt *)query->utilityStmt, NULL, NULL, &qc);
PG_RETURN_INT64(qc.nprocessed);
}
/*
* User interface for refreshing an IMMV
*/
Datum
refresh_immv(PG_FUNCTION_ARGS)
{
text *t_relname = PG_GETARG_TEXT_PP(0);
bool ispopulated = PG_GETARG_BOOL(1);
char *relname = text_to_cstring(t_relname);
QueryCompletion qc;
StringInfoData command_buf;
initStringInfo(&command_buf);
appendStringInfo(&command_buf, "SELECT refresh_immv('%s, %s);",
relname, ispopulated ? "true" : "false");
ExecRefreshImmv(makeRangeVarFromNameList(textToQualifiedNameList(t_relname)),
!ispopulated, command_buf.data, &qc);
PG_RETURN_INT64(qc.nprocessed);
}
/*
* Trigger function to prevent IMMV from being changed
*/
Datum
IVM_prevent_immv_change(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
Relation rel = trigdata->tg_relation;
if (!ImmvIncrementalMaintenanceIsEnabled())
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change materialized view \"%s\"",
RelationGetRelationName(rel))));
return PointerGetDatum(NULL);
}
/*
* Create triggers to prevent IMMV from being changed
*/
void
CreateChangePreventTrigger(Oid matviewOid)
{
ObjectAddress refaddr;
ObjectAddress address;
CreateTrigStmt *ivm_trigger;
int16 types[4] = {TRIGGER_TYPE_INSERT, TRIGGER_TYPE_DELETE,
TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_TRUNCATE};
int i;
refaddr.classId = RelationRelationId;
refaddr.objectId = matviewOid;
refaddr.objectSubId = 0;
ivm_trigger = makeNode(CreateTrigStmt);
ivm_trigger->relation = NULL;
ivm_trigger->row = false;
ivm_trigger->timing = TRIGGER_TYPE_BEFORE;
ivm_trigger->trigname = "IVM_prevent_immv_change";
ivm_trigger->funcname = SystemFuncName("IVM_prevent_immv_change");
ivm_trigger->columns = NIL;
ivm_trigger->transitionRels = NIL;
ivm_trigger->whenClause = NULL;
ivm_trigger->isconstraint = false;
ivm_trigger->deferrable = false;
ivm_trigger->initdeferred = false;
ivm_trigger->constrrel = NULL;
ivm_trigger->args = NIL;
for (i = 0; i < 4; i++)
{
ivm_trigger->events = types[i];
address = CreateTrigger(ivm_trigger, NULL, matviewOid, InvalidOid, InvalidOid,
InvalidOid, InvalidOid, InvalidOid, NULL, true, false);
recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO);
}
/* Make changes-so-far visible */
CommandCounterIncrement();
}
/*
* Get relid of pg_ivm_immv
*/
Oid
PgIvmImmvRelationId(void)
{
if (!OidIsValid(pg_ivm_immv_id))
pg_ivm_immv_id = get_relname_relid("pg_ivm_immv", PG_CATALOG_NAMESPACE);
return pg_ivm_immv_id;
}
/*
* Get relid of pg_ivm_immv's primary key
*/
Oid
PgIvmImmvPrimaryKeyIndexId(void)
{
if (!OidIsValid(pg_ivm_immv_pkey_id))
pg_ivm_immv_pkey_id = get_relname_relid("pg_ivm_immv_pkey", PG_CATALOG_NAMESPACE);
return pg_ivm_immv_pkey_id;
}
/*
* Return the SELECT part of a IMMV
*/
Datum
get_immv_def(PG_FUNCTION_ARGS)
{
Oid matviewOid = PG_GETARG_OID(0);
Relation matviewRel = NULL;
Query *query = NULL;
char *querystring = NULL;
/* Make sure IMMV is a table. */
if (get_rel_relkind(matviewOid) != RELKIND_RELATION)
PG_RETURN_NULL();
matviewRel = table_open(matviewOid, AccessShareLock);
query = get_immv_query(matviewRel);
if (query == NULL)
{
table_close(matviewRel, NoLock);
PG_RETURN_NULL();
}
querystring = pg_ivm_get_viewdef(matviewRel, false);
table_close(matviewRel, NoLock);
PG_RETURN_TEXT_P(cstring_to_text(querystring));
}
/*
* object_access_hook function for dropping an IMMV
*/
static void
PgIvmObjectAccessHook(ObjectAccessType access, Oid classId,
Oid objectId, int subId, void *arg)
{
if (PrevObjectAccessHook)
PrevObjectAccessHook(access, classId, objectId, subId, arg);
if (access == OAT_DROP && classId == RelationRelationId && !OidIsValid(subId))
{
Relation pgIvmImmv = table_open(PgIvmImmvRelationId(), AccessShareLock);
SysScanDesc scan;
ScanKeyData key;
HeapTuple tup;
ScanKeyInit(&key,
Anum_pg_ivm_immv_immvrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(objectId));
scan = systable_beginscan(pgIvmImmv, PgIvmImmvPrimaryKeyIndexId(),
true, NULL, 1, &key);
tup = systable_getnext(scan);
if (HeapTupleIsValid(tup))
CatalogTupleDelete(pgIvmImmv, &tup->t_self);
systable_endscan(scan);
table_close(pgIvmImmv, NoLock);
}
}