Fix to parse relname and column in create_immv arguments

It enables earlier checks for invalid inputs.
This commit is contained in:
Yugo Nagata 2022-04-28 18:59:48 +09:00
parent ab1bc09d8e
commit f68e04524b

111
pg_ivm.c
View file

@ -13,17 +13,22 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace_d.h"
#include "catalog/pg_trigger_d.h"
#include "commands/trigger.h"
#include "parser/analyze.h"
#include "parser/parser.h"
#include "parser/scansup.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "pg_ivm.h"
#include "nodes/print.h"
PG_MODULE_MAGIC;
@ -35,6 +40,7 @@ void _PG_init(void);
static void IvmXactCallback(XactEvent event, void *arg);
static void IvmSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
static void parseNameAndColumns(const char *string, List **names, List **colNames);
/* SQL callable functions */
PG_FUNCTION_INFO_V1(create_immv);
@ -69,6 +75,77 @@ _PG_init(void)
RegisterSubXactCallback(IvmSubXactCallback, NULL);
}
/*
* Given a C string, parse it into a qualified relation name
* followed by a optional parenthesized list of column names.
*/
static void
parseNameAndColumns(const char *string, List **names, List **colNames)
{
char *rawname;
char *ptr;
char *ptr2;
bool in_quote;
bool has_colnames = false;
List *cols;
ListCell *lc;
/* We need a modifiable copy of the input string. */
rawname = pstrdup(string);
/* Scan to find the expected left paren; mustn't be quoted */
in_quote = false;
for (ptr = rawname; *ptr; ptr++)
{
if (*ptr == '"')
in_quote = !in_quote;
else if (*ptr == '(' && !in_quote)
{
has_colnames = true;
break;
}
}
/* Separate the name and parse it into a list */
*ptr++ = '\0';
*names = stringToQualifiedNameList(rawname);
if (!has_colnames)
goto end;
/* Check for the trailing right parenthesis and remove it */
ptr2 = ptr + strlen(ptr);
while (--ptr2 > ptr)
{
if (!scanner_isspace(*ptr2))
break;
}
if (*ptr2 != ')')
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("expected a right parenthesis")));
*ptr2 = '\0';
if (!SplitIdentifierString(ptr, ',', &cols))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid syntax name")));
if (list_length(cols) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("must specify at least one column name")));
foreach(lc, cols)
{
char *colname = lfirst(lc);
*colNames = lappend(*colNames, makeString(pstrdup(colname)));
}
end:
pfree(rawname);
}
/*
* User inerface for creating an IMMV
@ -82,32 +159,48 @@ create_immv(PG_FUNCTION_ARGS)
char *sql = text_to_cstring(t_sql);
List *parsetree_list;
RawStmt *parsetree;
Query *query;
QueryCompletion qc;
List *names = NIL;
List *colNames = NIL;
ParseState *pstate = make_parsestate(NULL);
Query *query;
CreateTableAsStmt *stmt;
CreateTableAsStmt *ctas;
StringInfoData command_buf;
parseNameAndColumns(relname, &names, &colNames);
initStringInfo(&command_buf);
appendStringInfo(&command_buf, "CREATE MATERIALIZED VIEW %s AS %s;", relname, sql);
parsetree_list = pg_parse_query(command_buf.data);
appendStringInfo(&command_buf, "SELECT create_immv('%s' AS '%s');", relname, sql);
appendStringInfo(&command_buf, "%s;", sql);
pstate->p_sourcetext = command_buf.data;
parsetree_list = pg_parse_query(sql);
/* XXX: should we check t_sql before command_buf? */
if (list_length(parsetree_list) != 1)
elog(ERROR, "invalid view definition");
parsetree = linitial_node(RawStmt, parsetree_list);
query = transformStmt(pstate, parsetree->stmt);
ctas = makeNode(CreateTableAsStmt);
ctas->query = parsetree->stmt;
ctas->objtype = OBJECT_MATVIEW;
ctas->is_select_into = false;
ctas->into = makeNode(IntoClause);
ctas->into->rel = makeRangeVarFromNameList(names);
ctas->into->colNames = colNames;
ctas->into->accessMethod = NULL;
ctas->into->options = NIL;
ctas->into->onCommit = ONCOMMIT_NOOP;
ctas->into->tableSpaceName = NULL;
ctas->into->viewQuery = parsetree->stmt;
ctas->into->skipData = false;
query = transformStmt(pstate, (Node *)ctas);
Assert(query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt));
stmt = (CreateTableAsStmt*) query->utilityStmt;
query = castNode(Query, stmt->query);
ExecCreateImmv(pstate, stmt, NULL, NULL, &qc);
ExecCreateImmv(pstate, (CreateTableAsStmt *)query->utilityStmt, NULL, NULL, &qc);
PG_RETURN_INT64(qc.nprocessed);
}