diff --git a/buffer.go b/buffer.go index cfa6635..ce09ac4 100644 --- a/buffer.go +++ b/buffer.go @@ -6,11 +6,11 @@ package connector // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // these limits. type Buffer interface { - Add(data Model, sequenceNumber string) - FirstSequenceNumber() string - Flush() - LastSequenceNumber() string - NumRecordsInBuffer() int - Records() []Model - ShouldFlush() bool + Add(data Model, sequenceNumber string) + FirstSequenceNumber() string + Flush() + LastSequenceNumber() string + NumRecordsInBuffer() int + Records() []Model + ShouldFlush() bool } diff --git a/checkpoint.go b/checkpoint.go index 05ba85f..110b9a0 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -4,7 +4,7 @@ package connector // The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, // so they can checkpoint their progress. type Checkpoint interface { - CheckpointExists(shardID string) bool - SequenceNumber() string - SetCheckpoint(shardID string, sequenceNumber string) + CheckpointExists(shardID string) bool + SequenceNumber() string + SetCheckpoint(shardID string, sequenceNumber string) } diff --git a/redshift_emitter.go b/redshift_emitter.go index beebfad..f6639c8 100644 --- a/redshift_emitter.go +++ b/redshift_emitter.go @@ -16,7 +16,7 @@ import ( type RedshiftEmitter struct { Delimiter string Format string - Jsonpath string + Jsonpaths string S3Bucket string TableName string } @@ -51,9 +51,12 @@ func (e RedshiftEmitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) - if e.Format == "json" { + switch e.Format { + case "json": b.WriteString(fmt.Sprintf("json 'auto'")) - } else { + case "jsonpaths": + b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths)) + default: b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) }