Add support for JSONPaths in Redshift Load

By specifying a `jsonpaths` parameter to an S3 file containing ordinal
attribute position we can store free-form JSON in S3 and then reference
the col order from the COPY command.

http://docs.aws.amazon.com/redshift/latest/dg/r_COPY_command_examples.html
This commit is contained in:
Harlow Ward 2014-11-15 13:44:46 -08:00
parent 06b40e6ed8
commit f921eca908
3 changed files with 16 additions and 13 deletions

View file

@ -6,11 +6,11 @@ package connector
// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on
// these limits. // these limits.
type Buffer interface { type Buffer interface {
Add(data Model, sequenceNumber string) Add(data Model, sequenceNumber string)
FirstSequenceNumber() string FirstSequenceNumber() string
Flush() Flush()
LastSequenceNumber() string LastSequenceNumber() string
NumRecordsInBuffer() int NumRecordsInBuffer() int
Records() []Model Records() []Model
ShouldFlush() bool ShouldFlush() bool
} }

View file

@ -4,7 +4,7 @@ package connector
// The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, // The Kinesis Connector Library will pass an object implementing this interface to ProcessShard,
// so they can checkpoint their progress. // so they can checkpoint their progress.
type Checkpoint interface { type Checkpoint interface {
CheckpointExists(shardID string) bool CheckpointExists(shardID string) bool
SequenceNumber() string SequenceNumber() string
SetCheckpoint(shardID string, sequenceNumber string) SetCheckpoint(shardID string, sequenceNumber string)
} }

View file

@ -16,7 +16,7 @@ import (
type RedshiftEmitter struct { type RedshiftEmitter struct {
Delimiter string Delimiter string
Format string Format string
Jsonpath string Jsonpaths string
S3Bucket string S3Bucket string
TableName string TableName string
} }
@ -51,9 +51,12 @@ func (e RedshiftEmitter) copyStatement(s3File string) string {
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
if e.Format == "json" { switch e.Format {
case "json":
b.WriteString(fmt.Sprintf("json 'auto'")) b.WriteString(fmt.Sprintf("json 'auto'"))
} else { case "jsonpaths":
b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths))
default:
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
} }