diff --git a/expected/pg_ivm.out b/expected/pg_ivm.out index 1d33a4b..f6d8220 100644 --- a/expected/pg_ivm.out +++ b/expected/pg_ivm.out @@ -518,6 +518,24 @@ SELECT * FROM mv_self ORDER BY v1; 300 | 300 (4 rows) +--- with sub-transactions +SAVEPOINT p1; +INSERT INTO base_t VALUES (7,70); +RELEASE SAVEPOINT p1; +INSERT INTO base_t VALUES (7,77); +SELECT * FROM mv_self ORDER BY v1, v2; + v1 | v2 +-----+----- + 50 | 50 + 60 | 60 + 70 | 70 + 70 | 77 + 77 | 70 + 77 | 77 + 130 | 130 + 300 | 300 +(8 rows) + ROLLBACK; -- support simultaneous table changes BEGIN; diff --git a/matview.c b/matview.c index b5e715c..650d27a 100644 --- a/matview.c +++ b/matview.c @@ -14,6 +14,7 @@ #include "access/genam.h" #include "access/multixact.h" #include "access/table.h" +#include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_depend.h" #include "catalog/heap.h" @@ -90,8 +91,7 @@ typedef struct MV_TriggerHashEntry int before_trig_count; /* count of before triggers invoked */ int after_trig_count; /* count of after triggers invoked */ - TransactionId xid; /* Transaction id before the first table is modified*/ - CommandId cid; /* Command id before the first table is modified */ + Snapshot snapshot; /* Snapshot just before table change */ List *tables; /* List of MV_TriggerTable */ bool has_old; /* tuples are deleted from any table? */ @@ -113,11 +113,16 @@ typedef struct MV_TriggerTable List *rte_paths; /* List of paths to RTE index of the modified table */ RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */ + + Relation rel; /* relation of the modified table */ + TupleTableSlot *slot; /* for checking visibility in the pre-state table */ } MV_TriggerTable; static HTAB *mv_query_cache = NULL; static HTAB *mv_trigger_info = NULL; +static bool in_delta_calculation = false; + /* kind of IVM operation for the view */ typedef enum { @@ -141,13 +146,11 @@ static void OpenImmvIncrementalMaintenance(void); static void CloseImmvIncrementalMaintenance(void); static Query *rewrite_query_for_preupdate_state(Query *query, List *tables, - TransactionId xid, CommandId cid, - ParseState *pstate, List *rte_path); + ParseState *pstate, List *rte_path, Oid matviewid); static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables); static char *make_delta_enr_name(const char *prefix, Oid relid, int count); static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, - TransactionId xid, CommandId cid, - QueryEnvironment *queryEnv); + QueryEnvironment *queryEnv, Oid matviewid); static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix, QueryEnvironment *queryEnv); static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate); @@ -203,13 +206,14 @@ static void mv_InitHashTables(void); static SPIPlanPtr mv_FetchPreparedPlan(MV_QueryKey *key); static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan); static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type); -static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry); +static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort); static List *get_securityQuals(Oid relId, int rt_index, Query *query); /* SQL callable functions */ PG_FUNCTION_INFO_V1(IVM_immediate_before); PG_FUNCTION_INFO_V1(IVM_immediate_maintenance); +PG_FUNCTION_INFO_V1(ivm_visible_in_prestate); /* * ExecRefreshImmv -- execute a refresh_immv() function @@ -710,13 +714,16 @@ IVM_immediate_before(PG_FUNCTION_ARGS) /* On the first BEFORE to update the view, initialize trigger data */ if (!found) { + /* + * Get a snapshot just before the table was modified for checking + * tuple visibility in the pre-update state of the table. + */ Snapshot snapshot = GetActiveSnapshot(); entry->matview_id = matviewOid; entry->before_trig_count = 0; entry->after_trig_count = 0; - entry->xid = GetCurrentTransactionId(); - entry->cid = snapshot->curcid; + entry->snapshot = RegisterSnapshot(snapshot); entry->tables = NIL; entry->has_old = false; entry->has_new = false; @@ -811,6 +818,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) table->old_rtes = NIL; table->new_rtes = NIL; table->rte_paths = NIL; + table->slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel)); + table->rel = table_open(RelationGetRelid(rel), NoLock); entry->tables = lappend(entry->tables, table); MemoryContextSwitchTo(oldcxt); @@ -933,7 +942,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) } /* Clean up hash entry and delete tuplestores */ - clean_up_IVM_hash_entry(entry); + clean_up_IVM_hash_entry(entry, false); /* Pop the original snapshot. */ PopActiveSnapshot(); @@ -969,8 +978,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) /* Set all tables in the query to pre-update state */ rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables, - entry->xid, entry->cid, - pstate, NIL); + pstate, NIL, matviewOid); /* Rewrite for DISTINCT clause and aggregates functions */ rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate); @@ -1069,7 +1077,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) } /* Clean up hash entry and delete tuplestores */ - clean_up_IVM_hash_entry(entry); + clean_up_IVM_hash_entry(entry, false); if (old_tuplestore) { dest_old->rDestroy(dest_old); @@ -1100,13 +1108,11 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) * * Rewrite the query so that base tables' RTEs will represent "pre-update" * state of tables. This is necessary to calculate view delta after multiple - * tables are modified. xid and cid are the transaction id and command id - * before the first table was modified. + * tables are modified. */ static Query* rewrite_query_for_preupdate_state(Query *query, List *tables, - TransactionId xid, CommandId cid, - ParseState *pstate, List *rte_path) + ParseState *pstate, List *rte_path, Oid matviewid) { ListCell *lc; int num_rte = list_length(query->rtable); @@ -1129,7 +1135,7 @@ rewrite_query_for_preupdate_state(Query *query, List *tables, /* if rte contains subquery, search recursively */ if (r->rtekind == RTE_SUBQUERY) - rewrite_query_for_preupdate_state(r->subquery, tables, xid, cid, pstate, lappend_int(list_copy(rte_path), i)); + rewrite_query_for_preupdate_state(r->subquery, tables, pstate, lappend_int(list_copy(rte_path), i), matviewid); else { ListCell *lc2; @@ -1142,7 +1148,7 @@ rewrite_query_for_preupdate_state(Query *query, List *tables, */ if (r->relid == table->table_id) { - lfirst(lc) = get_prestate_rte(r, table, xid, cid, pstate->p_queryEnv); + lfirst(lc) = get_prestate_rte(r, table, pstate->p_queryEnv, matviewid); table->rte_paths = lappend(table->rte_paths, lappend_int(list_copy(rte_path), i)); break; } @@ -1233,6 +1239,51 @@ register_delta_ENRs(ParseState *pstate, Query *query, List *tables) } } +#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X)) +#define PG_GETARG_ITEMPOINTER(n) DatumGetItemPointer(PG_GETARG_DATUM(n)) + +/* + * ivm_visible_in_prestate + * + * Check visibility of a tuple specified by the tableoid and item pointer + * using the snapshot taken just before the table was modified. + */ +Datum +ivm_visible_in_prestate(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + ItemPointer itemPtr = PG_GETARG_ITEMPOINTER(1); + Oid matviewOid = PG_GETARG_OID(2); + MV_TriggerHashEntry *entry; + MV_TriggerTable *table = NULL; + ListCell *lc; + bool found; + bool result; + + if (!in_delta_calculation) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("ivm_visible_in_prestate can be called only in delta calculation"))); + + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_FIND, &found); + Assert (found && entry != NULL); + + foreach(lc, entry->tables) + { + table = (MV_TriggerTable *) lfirst(lc); + if (table->table_id == tableoid) + break; + } + + Assert (table != NULL); + + result = table_tuple_fetch_row_version(table->rel, itemPtr, entry->snapshot, table->slot); + + PG_RETURN_BOOL(result); +} + /* * get_prestate_rte * @@ -1241,8 +1292,7 @@ register_delta_ENRs(ParseState *pstate, Query *query, List *tables) */ static RangeTblEntry* get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, - TransactionId xid, CommandId cid, - QueryEnvironment *queryEnv) + QueryEnvironment *queryEnv, Oid matviewid) { StringInfoData str; RawStmt *raw; @@ -1269,9 +1319,8 @@ get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, initStringInfo(&str); appendStringInfo(&str, "SELECT t.* FROM %s t" - " WHERE (age(t.xmin) - age(%u::text::xid) > 0) OR" - " (t.xmin = %u AND t.cmin::text::int < %u)", - relname, xid, xid, cid); + " WHERE ivm_visible_in_prestate(t.tableoid, t.ctid ,%d::oid)", + relname, matviewid); for (i = 0; i < list_length(table->old_tuplestores); i++) { @@ -1481,6 +1530,8 @@ calc_delta(MV_TriggerTable *table, List *rte_path, Query *query, ListCell *lc = getRteListCell(query, rte_path); RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + in_delta_calculation = true; + /* Generate old delta */ if (list_length(table->old_rtes) > 0) { @@ -1496,6 +1547,8 @@ calc_delta(MV_TriggerTable *table, List *rte_path, Query *query, lfirst(lc) = union_ENRs(rte, table->table_id, table->new_rtes, "new", queryEnv); refresh_immv_datafill(dest_new, query, queryEnv, tupdesc_new, ""); } + + in_delta_calculation = false; } /* @@ -2833,8 +2886,10 @@ AtAbort_IVM() { hash_seq_init(&seq, mv_trigger_info); while ((entry = hash_seq_search(&seq)) != NULL) - clean_up_IVM_hash_entry(entry); + clean_up_IVM_hash_entry(entry, true); } + + in_delta_calculation = false; } /* @@ -2844,7 +2899,7 @@ AtAbort_IVM() * maintenance finished. */ static void -clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry) +clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort) { bool found; ListCell *lc; @@ -2867,9 +2922,18 @@ clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry) list_free(table->old_tuplestores); list_free(table->new_tuplestores); + if (!is_abort) + { + ExecDropSingleTupleTableSlot(table->slot); + table_close(table->rel, NoLock); + } } list_free(entry->tables); + if (!is_abort) + UnregisterSnapshot(entry->snapshot); + + hash_search(mv_trigger_info, (void *) &entry->matview_id, HASH_REMOVE, &found); } diff --git a/pg_ivm--1.2--1.3.sql b/pg_ivm--1.2--1.3.sql index e69de29..040182a 100644 --- a/pg_ivm--1.2--1.3.sql +++ b/pg_ivm--1.2--1.3.sql @@ -0,0 +1,7 @@ +-- functions + +CREATE FUNCTION ivm_visible_in_prestate(oid, tid, oid) +RETURNS bool +STABLE +AS 'MODULE_PATHNAME', 'ivm_visible_in_prestate' +LANGUAGE C; diff --git a/pg_ivm.h b/pg_ivm.h index 25a8a26..de47f1e 100644 --- a/pg_ivm.h +++ b/pg_ivm.h @@ -49,6 +49,7 @@ extern bool ImmvIncrementalMaintenanceIsEnabled(void); extern Query *get_immv_query(Relation matviewRel); extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); +extern Datum ivm_visible_in_prestate(PG_FUNCTION_ARGS); extern void AtAbort_IVM(void); extern bool isIvmName(const char *s); diff --git a/sql/pg_ivm.sql b/sql/pg_ivm.sql index 1508612..32dfe14 100644 --- a/sql/pg_ivm.sql +++ b/sql/pg_ivm.sql @@ -168,6 +168,14 @@ WITH dlt_t AS (DELETE FROM base_t WHERE i IN (4,5) RETURNING 1) SELECT NULL; SELECT * FROM mv_self ORDER BY v1; + +--- with sub-transactions +SAVEPOINT p1; +INSERT INTO base_t VALUES (7,70); +RELEASE SAVEPOINT p1; +INSERT INTO base_t VALUES (7,77); +SELECT * FROM mv_self ORDER BY v1, v2; + ROLLBACK; -- support simultaneous table changes