Use snapshot to check tuple visibility in pre-update state (#28)
When multiple tables are updated or the view contains a self-join, we need to calculate table states that was before it is modified during incremental view maintenance. For get the pre-update state, tuples inserted in a command must be removed when the table is scanned. Previously, we used xmin and cmin system columns for this purpose, but this way is problematic because after a tuple is frozen, its xmin no longer has any meaning. Actually, we will get inconsistent view results after XID wraparound. Also, we can see similar similar inconsistency when using sub-transaction because xmin values do not always monotonically increasing by command executions. To fix this, we use a snapshot that taken just before the table is modified for checking tuple visibility in pre-state table instead of using xmin and cmin system columns. A new function returning boolean, ivm_visible_in_prestate, is added, and this is called in WHERE clause of sub-queries to calculate pre-state table. This function check if a specified tuple in the post-update table is visible or not using the snapshot and return true if the tuple is visible.
This commit is contained in:
parent
5f584e54c3
commit
b6702f9a3a
5 changed files with 124 additions and 26 deletions
|
|
@ -518,6 +518,24 @@ SELECT * FROM mv_self ORDER BY v1;
|
|||
300 | 300
|
||||
(4 rows)
|
||||
|
||||
--- with sub-transactions
|
||||
SAVEPOINT p1;
|
||||
INSERT INTO base_t VALUES (7,70);
|
||||
RELEASE SAVEPOINT p1;
|
||||
INSERT INTO base_t VALUES (7,77);
|
||||
SELECT * FROM mv_self ORDER BY v1, v2;
|
||||
v1 | v2
|
||||
-----+-----
|
||||
50 | 50
|
||||
60 | 60
|
||||
70 | 70
|
||||
70 | 77
|
||||
77 | 70
|
||||
77 | 77
|
||||
130 | 130
|
||||
300 | 300
|
||||
(8 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- support simultaneous table changes
|
||||
BEGIN;
|
||||
|
|
|
|||
116
matview.c
116
matview.c
|
|
@ -14,6 +14,7 @@
|
|||
#include "access/genam.h"
|
||||
#include "access/multixact.h"
|
||||
#include "access/table.h"
|
||||
#include "access/tableam.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_depend.h"
|
||||
#include "catalog/heap.h"
|
||||
|
|
@ -90,8 +91,7 @@ typedef struct MV_TriggerHashEntry
|
|||
int before_trig_count; /* count of before triggers invoked */
|
||||
int after_trig_count; /* count of after triggers invoked */
|
||||
|
||||
TransactionId xid; /* Transaction id before the first table is modified*/
|
||||
CommandId cid; /* Command id before the first table is modified */
|
||||
Snapshot snapshot; /* Snapshot just before table change */
|
||||
|
||||
List *tables; /* List of MV_TriggerTable */
|
||||
bool has_old; /* tuples are deleted from any table? */
|
||||
|
|
@ -113,11 +113,16 @@ typedef struct MV_TriggerTable
|
|||
|
||||
List *rte_paths; /* List of paths to RTE index of the modified table */
|
||||
RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */
|
||||
|
||||
Relation rel; /* relation of the modified table */
|
||||
TupleTableSlot *slot; /* for checking visibility in the pre-state table */
|
||||
} MV_TriggerTable;
|
||||
|
||||
static HTAB *mv_query_cache = NULL;
|
||||
static HTAB *mv_trigger_info = NULL;
|
||||
|
||||
static bool in_delta_calculation = false;
|
||||
|
||||
/* kind of IVM operation for the view */
|
||||
typedef enum
|
||||
{
|
||||
|
|
@ -141,13 +146,11 @@ static void OpenImmvIncrementalMaintenance(void);
|
|||
static void CloseImmvIncrementalMaintenance(void);
|
||||
|
||||
static Query *rewrite_query_for_preupdate_state(Query *query, List *tables,
|
||||
TransactionId xid, CommandId cid,
|
||||
ParseState *pstate, List *rte_path);
|
||||
ParseState *pstate, List *rte_path, Oid matviewid);
|
||||
static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables);
|
||||
static char *make_delta_enr_name(const char *prefix, Oid relid, int count);
|
||||
static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
|
||||
TransactionId xid, CommandId cid,
|
||||
QueryEnvironment *queryEnv);
|
||||
QueryEnvironment *queryEnv, Oid matviewid);
|
||||
static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix,
|
||||
QueryEnvironment *queryEnv);
|
||||
static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate);
|
||||
|
|
@ -203,13 +206,14 @@ static void mv_InitHashTables(void);
|
|||
static SPIPlanPtr mv_FetchPreparedPlan(MV_QueryKey *key);
|
||||
static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan);
|
||||
static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type);
|
||||
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry);
|
||||
static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort);
|
||||
|
||||
static List *get_securityQuals(Oid relId, int rt_index, Query *query);
|
||||
|
||||
/* SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(IVM_immediate_before);
|
||||
PG_FUNCTION_INFO_V1(IVM_immediate_maintenance);
|
||||
PG_FUNCTION_INFO_V1(ivm_visible_in_prestate);
|
||||
|
||||
/*
|
||||
* ExecRefreshImmv -- execute a refresh_immv() function
|
||||
|
|
@ -710,13 +714,16 @@ IVM_immediate_before(PG_FUNCTION_ARGS)
|
|||
/* On the first BEFORE to update the view, initialize trigger data */
|
||||
if (!found)
|
||||
{
|
||||
/*
|
||||
* Get a snapshot just before the table was modified for checking
|
||||
* tuple visibility in the pre-update state of the table.
|
||||
*/
|
||||
Snapshot snapshot = GetActiveSnapshot();
|
||||
|
||||
entry->matview_id = matviewOid;
|
||||
entry->before_trig_count = 0;
|
||||
entry->after_trig_count = 0;
|
||||
entry->xid = GetCurrentTransactionId();
|
||||
entry->cid = snapshot->curcid;
|
||||
entry->snapshot = RegisterSnapshot(snapshot);
|
||||
entry->tables = NIL;
|
||||
entry->has_old = false;
|
||||
entry->has_new = false;
|
||||
|
|
@ -811,6 +818,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
|
|||
table->old_rtes = NIL;
|
||||
table->new_rtes = NIL;
|
||||
table->rte_paths = NIL;
|
||||
table->slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel));
|
||||
table->rel = table_open(RelationGetRelid(rel), NoLock);
|
||||
entry->tables = lappend(entry->tables, table);
|
||||
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
|
@ -933,7 +942,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
/* Clean up hash entry and delete tuplestores */
|
||||
clean_up_IVM_hash_entry(entry);
|
||||
clean_up_IVM_hash_entry(entry, false);
|
||||
|
||||
/* Pop the original snapshot. */
|
||||
PopActiveSnapshot();
|
||||
|
|
@ -969,8 +978,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
|
|||
|
||||
/* Set all tables in the query to pre-update state */
|
||||
rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
|
||||
entry->xid, entry->cid,
|
||||
pstate, NIL);
|
||||
pstate, NIL, matviewOid);
|
||||
/* Rewrite for DISTINCT clause and aggregates functions */
|
||||
rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate);
|
||||
|
||||
|
|
@ -1069,7 +1077,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
/* Clean up hash entry and delete tuplestores */
|
||||
clean_up_IVM_hash_entry(entry);
|
||||
clean_up_IVM_hash_entry(entry, false);
|
||||
if (old_tuplestore)
|
||||
{
|
||||
dest_old->rDestroy(dest_old);
|
||||
|
|
@ -1100,13 +1108,11 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
|
|||
*
|
||||
* Rewrite the query so that base tables' RTEs will represent "pre-update"
|
||||
* state of tables. This is necessary to calculate view delta after multiple
|
||||
* tables are modified. xid and cid are the transaction id and command id
|
||||
* before the first table was modified.
|
||||
* tables are modified.
|
||||
*/
|
||||
static Query*
|
||||
rewrite_query_for_preupdate_state(Query *query, List *tables,
|
||||
TransactionId xid, CommandId cid,
|
||||
ParseState *pstate, List *rte_path)
|
||||
ParseState *pstate, List *rte_path, Oid matviewid)
|
||||
{
|
||||
ListCell *lc;
|
||||
int num_rte = list_length(query->rtable);
|
||||
|
|
@ -1129,7 +1135,7 @@ rewrite_query_for_preupdate_state(Query *query, List *tables,
|
|||
|
||||
/* if rte contains subquery, search recursively */
|
||||
if (r->rtekind == RTE_SUBQUERY)
|
||||
rewrite_query_for_preupdate_state(r->subquery, tables, xid, cid, pstate, lappend_int(list_copy(rte_path), i));
|
||||
rewrite_query_for_preupdate_state(r->subquery, tables, pstate, lappend_int(list_copy(rte_path), i), matviewid);
|
||||
else
|
||||
{
|
||||
ListCell *lc2;
|
||||
|
|
@ -1142,7 +1148,7 @@ rewrite_query_for_preupdate_state(Query *query, List *tables,
|
|||
*/
|
||||
if (r->relid == table->table_id)
|
||||
{
|
||||
lfirst(lc) = get_prestate_rte(r, table, xid, cid, pstate->p_queryEnv);
|
||||
lfirst(lc) = get_prestate_rte(r, table, pstate->p_queryEnv, matviewid);
|
||||
table->rte_paths = lappend(table->rte_paths, lappend_int(list_copy(rte_path), i));
|
||||
break;
|
||||
}
|
||||
|
|
@ -1233,6 +1239,51 @@ register_delta_ENRs(ParseState *pstate, Query *query, List *tables)
|
|||
}
|
||||
}
|
||||
|
||||
#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X))
|
||||
#define PG_GETARG_ITEMPOINTER(n) DatumGetItemPointer(PG_GETARG_DATUM(n))
|
||||
|
||||
/*
|
||||
* ivm_visible_in_prestate
|
||||
*
|
||||
* Check visibility of a tuple specified by the tableoid and item pointer
|
||||
* using the snapshot taken just before the table was modified.
|
||||
*/
|
||||
Datum
|
||||
ivm_visible_in_prestate(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid tableoid = PG_GETARG_OID(0);
|
||||
ItemPointer itemPtr = PG_GETARG_ITEMPOINTER(1);
|
||||
Oid matviewOid = PG_GETARG_OID(2);
|
||||
MV_TriggerHashEntry *entry;
|
||||
MV_TriggerTable *table = NULL;
|
||||
ListCell *lc;
|
||||
bool found;
|
||||
bool result;
|
||||
|
||||
if (!in_delta_calculation)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("ivm_visible_in_prestate can be called only in delta calculation")));
|
||||
|
||||
entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info,
|
||||
(void *) &matviewOid,
|
||||
HASH_FIND, &found);
|
||||
Assert (found && entry != NULL);
|
||||
|
||||
foreach(lc, entry->tables)
|
||||
{
|
||||
table = (MV_TriggerTable *) lfirst(lc);
|
||||
if (table->table_id == tableoid)
|
||||
break;
|
||||
}
|
||||
|
||||
Assert (table != NULL);
|
||||
|
||||
result = table_tuple_fetch_row_version(table->rel, itemPtr, entry->snapshot, table->slot);
|
||||
|
||||
PG_RETURN_BOOL(result);
|
||||
}
|
||||
|
||||
/*
|
||||
* get_prestate_rte
|
||||
*
|
||||
|
|
@ -1241,8 +1292,7 @@ register_delta_ENRs(ParseState *pstate, Query *query, List *tables)
|
|||
*/
|
||||
static RangeTblEntry*
|
||||
get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
|
||||
TransactionId xid, CommandId cid,
|
||||
QueryEnvironment *queryEnv)
|
||||
QueryEnvironment *queryEnv, Oid matviewid)
|
||||
{
|
||||
StringInfoData str;
|
||||
RawStmt *raw;
|
||||
|
|
@ -1269,9 +1319,8 @@ get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table,
|
|||
initStringInfo(&str);
|
||||
appendStringInfo(&str,
|
||||
"SELECT t.* FROM %s t"
|
||||
" WHERE (age(t.xmin) - age(%u::text::xid) > 0) OR"
|
||||
" (t.xmin = %u AND t.cmin::text::int < %u)",
|
||||
relname, xid, xid, cid);
|
||||
" WHERE ivm_visible_in_prestate(t.tableoid, t.ctid ,%d::oid)",
|
||||
relname, matviewid);
|
||||
|
||||
for (i = 0; i < list_length(table->old_tuplestores); i++)
|
||||
{
|
||||
|
|
@ -1481,6 +1530,8 @@ calc_delta(MV_TriggerTable *table, List *rte_path, Query *query,
|
|||
ListCell *lc = getRteListCell(query, rte_path);
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
|
||||
|
||||
in_delta_calculation = true;
|
||||
|
||||
/* Generate old delta */
|
||||
if (list_length(table->old_rtes) > 0)
|
||||
{
|
||||
|
|
@ -1496,6 +1547,8 @@ calc_delta(MV_TriggerTable *table, List *rte_path, Query *query,
|
|||
lfirst(lc) = union_ENRs(rte, table->table_id, table->new_rtes, "new", queryEnv);
|
||||
refresh_immv_datafill(dest_new, query, queryEnv, tupdesc_new, "");
|
||||
}
|
||||
|
||||
in_delta_calculation = false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -2833,8 +2886,10 @@ AtAbort_IVM()
|
|||
{
|
||||
hash_seq_init(&seq, mv_trigger_info);
|
||||
while ((entry = hash_seq_search(&seq)) != NULL)
|
||||
clean_up_IVM_hash_entry(entry);
|
||||
clean_up_IVM_hash_entry(entry, true);
|
||||
}
|
||||
|
||||
in_delta_calculation = false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -2844,7 +2899,7 @@ AtAbort_IVM()
|
|||
* maintenance finished.
|
||||
*/
|
||||
static void
|
||||
clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry)
|
||||
clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort)
|
||||
{
|
||||
bool found;
|
||||
ListCell *lc;
|
||||
|
|
@ -2867,9 +2922,18 @@ clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry)
|
|||
|
||||
list_free(table->old_tuplestores);
|
||||
list_free(table->new_tuplestores);
|
||||
if (!is_abort)
|
||||
{
|
||||
ExecDropSingleTupleTableSlot(table->slot);
|
||||
table_close(table->rel, NoLock);
|
||||
}
|
||||
}
|
||||
list_free(entry->tables);
|
||||
|
||||
if (!is_abort)
|
||||
UnregisterSnapshot(entry->snapshot);
|
||||
|
||||
|
||||
hash_search(mv_trigger_info, (void *) &entry->matview_id, HASH_REMOVE, &found);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
-- functions
|
||||
|
||||
CREATE FUNCTION ivm_visible_in_prestate(oid, tid, oid)
|
||||
RETURNS bool
|
||||
STABLE
|
||||
AS 'MODULE_PATHNAME', 'ivm_visible_in_prestate'
|
||||
LANGUAGE C;
|
||||
1
pg_ivm.h
1
pg_ivm.h
|
|
@ -49,6 +49,7 @@ extern bool ImmvIncrementalMaintenanceIsEnabled(void);
|
|||
extern Query *get_immv_query(Relation matviewRel);
|
||||
extern Datum IVM_immediate_before(PG_FUNCTION_ARGS);
|
||||
extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS);
|
||||
extern Datum ivm_visible_in_prestate(PG_FUNCTION_ARGS);
|
||||
extern void AtAbort_IVM(void);
|
||||
extern bool isIvmName(const char *s);
|
||||
|
||||
|
|
|
|||
|
|
@ -168,6 +168,14 @@ WITH
|
|||
dlt_t AS (DELETE FROM base_t WHERE i IN (4,5) RETURNING 1)
|
||||
SELECT NULL;
|
||||
SELECT * FROM mv_self ORDER BY v1;
|
||||
|
||||
--- with sub-transactions
|
||||
SAVEPOINT p1;
|
||||
INSERT INTO base_t VALUES (7,70);
|
||||
RELEASE SAVEPOINT p1;
|
||||
INSERT INTO base_t VALUES (7,77);
|
||||
SELECT * FROM mv_self ORDER BY v1, v2;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
-- support simultaneous table changes
|
||||
|
|
|
|||
Loading…
Reference in a new issue