Refactor to use handler func
The previous pipeline model required a lot of setup and abstracted away the processing of records. By passing a HandlerFunc to the consumer we keep the business logic of processing of records closer to the use of the consumer. * Add refactoring note and SHA to README
This commit is contained in:
parent
509f68de89
commit
f0e6461cb6
39 changed files with 559 additions and 698 deletions
33
README.md
33
README.md
|
|
@ -2,8 +2,9 @@
|
||||||
|
|
||||||
__Kinesis connector applications written in Go__
|
__Kinesis connector applications written in Go__
|
||||||
|
|
||||||
Inspired by the [Amazon Kinesis Connector Library][1]. These components are used for extracting streaming event data
|
_Note: Repo is going under refactoring to use a handler func to process batch data. The previous stable version of connectors exist at SHA `509f68de89efb74aa8d79a733749208edaf56b4d`_
|
||||||
into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation.
|
|
||||||
|
Inspired by the [Amazon Kinesis Connector Library][1]. This library is used for extracting streaming event data from Kinesis into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation.
|
||||||
|
|
||||||
[1]: https://github.com/awslabs/amazon-kinesis-connectors
|
[1]: https://github.com/awslabs/amazon-kinesis-connectors
|
||||||
[2]: http://godoc.org/github.com/harlow/kinesis-connectors
|
[2]: http://godoc.org/github.com/harlow/kinesis-connectors
|
||||||
|
|
@ -12,15 +13,25 @@ into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documen
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
Each Amazon Kinesis connector application is a pipeline that determines how records from an Amazon Kinesis stream will be handled. Records are retrieved from the stream, transformed according to a user-defined data model, buffered for batch processing, and then emitted to the appropriate AWS service.
|
The consumer expects a handler func that will process a buffer of incoming records.
|
||||||
|
|
||||||
A connector pipeline uses the following interfaces:
|
```golang
|
||||||
|
func main() {
|
||||||
|
var(
|
||||||
|
app = flag.String("app", "", "The app name")
|
||||||
|
stream = flag.String("stream", "", "The stream name")
|
||||||
|
)
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
* __Pipeline:__ The pipeline implementation itself.
|
c := connector.NewConsumer(*app, *stream)
|
||||||
* __Transformer:__ Defines the transformation of records from the Amazon Kinesis stream in order to suit the user-defined data model. Includes methods for custom serializer/deserializers.
|
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
||||||
* __Filter:__ Defines a method for excluding irrelevant records from the processing.
|
fmt.Println(b.GetRecords())
|
||||||
* __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination.
|
// process the records
|
||||||
* __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream.
|
}))
|
||||||
|
|
||||||
|
select {}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### Installation
|
### Installation
|
||||||
|
|
||||||
|
|
@ -32,8 +43,8 @@ Get the package source:
|
||||||
|
|
||||||
Examples pipelines:
|
Examples pipelines:
|
||||||
|
|
||||||
* [S3 Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3-pipeline)
|
* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3)
|
||||||
* [Redshift Basic Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift-pipeline)
|
* [Redshift](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift)
|
||||||
|
|
||||||
### Logging
|
### Logging
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// AllPassFilter an implementation of the Filter interface that returns true for all records.
|
|
||||||
type AllPassFilter struct{}
|
|
||||||
|
|
||||||
// KeepRecord returns true for all records.
|
|
||||||
func (b *AllPassFilter) KeepRecord(r interface{}) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/bmizerany/assert"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
@ -25,11 +26,8 @@ func Test_isRecoverableError(t *testing.T) {
|
||||||
{err: pq.Error{Message: "Some other pq error"}, isRecoverable: false},
|
{err: pq.Error{Message: "Some other pq error"}, isRecoverable: false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
isRecoverable := isRecoverableError(tc.err)
|
isRecoverable := isRecoverableError(tc.err)
|
||||||
|
assert.Equal(t, isRecoverable, tc.isRecoverable)
|
||||||
if isRecoverable != tc.isRecoverable {
|
|
||||||
t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
58
buffer.go
58
buffer.go
|
|
@ -1,16 +1,48 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
// Buffer defines a buffer used to store records streamed through Kinesis. It is a part of the
|
import "github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
// Pipeline utilized by the Pipeline.ProcessShard function. Records are stored in the buffer by calling
|
|
||||||
// the Add method. The buffer has two size limits defined: total total number of records and a
|
// Buffer holds records and answers questions on when it
|
||||||
// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on
|
// should be periodically flushed.
|
||||||
// these limits.
|
type Buffer struct {
|
||||||
type Buffer interface {
|
records []*kinesis.Record
|
||||||
FirstSequenceNumber() string
|
firstSequenceNumber string
|
||||||
Flush()
|
lastSequenceNumber string
|
||||||
LastSequenceNumber() string
|
|
||||||
NumRecordsInBuffer() int
|
MaxBufferSize int
|
||||||
ProcessRecord(record interface{}, sequenceNumber string)
|
}
|
||||||
Records() []interface{}
|
|
||||||
ShouldFlush() bool
|
// AddRecord adds a record to the buffer.
|
||||||
|
func (b *Buffer) AddRecord(r *kinesis.Record) {
|
||||||
|
if len(b.records) == 0 {
|
||||||
|
b.firstSequenceNumber = *r.SequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
b.records = append(b.records, r)
|
||||||
|
b.lastSequenceNumber = *r.SequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// ShouldFlush determines if the buffer has reached its target size.
|
||||||
|
func (b *Buffer) ShouldFlush() bool {
|
||||||
|
return len(b.records) >= b.MaxBufferSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush empties the buffer and resets the sequence counter.
|
||||||
|
func (b *Buffer) Flush() {
|
||||||
|
b.records = b.records[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRecords returns the records in the buffer.
|
||||||
|
func (b *Buffer) GetRecords() []*kinesis.Record {
|
||||||
|
return b.records
|
||||||
|
}
|
||||||
|
|
||||||
|
// FirstSequenceNumber returns the sequence number of the first record in the buffer.
|
||||||
|
func (b *Buffer) FirstSeq() string {
|
||||||
|
return b.firstSequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastSeq returns the sequence number of the last record in the buffer.
|
||||||
|
func (b *Buffer) LastSeq() string {
|
||||||
|
return b.lastSequenceNumber
|
||||||
}
|
}
|
||||||
|
|
|
||||||
47
buffer_test.go
Normal file
47
buffer_test.go
Normal file
|
|
@ -0,0 +1,47 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
"github.com/bmizerany/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_FirstSeq(t *testing.T) {
|
||||||
|
b := Buffer{}
|
||||||
|
s1, s2 := "1", "2"
|
||||||
|
r1 := &kinesis.Record{SequenceNumber: &s1}
|
||||||
|
r2 := &kinesis.Record{SequenceNumber: &s2}
|
||||||
|
|
||||||
|
b.AddRecord(r1)
|
||||||
|
assert.Equal(t, b.FirstSeq(), "1")
|
||||||
|
|
||||||
|
b.AddRecord(r2)
|
||||||
|
assert.Equal(t, b.FirstSeq(), "1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_LastSeq(t *testing.T) {
|
||||||
|
b := Buffer{}
|
||||||
|
s1, s2 := "1", "2"
|
||||||
|
r1 := &kinesis.Record{SequenceNumber: &s1}
|
||||||
|
r2 := &kinesis.Record{SequenceNumber: &s2}
|
||||||
|
|
||||||
|
b.AddRecord(r1)
|
||||||
|
assert.Equal(t, b.LastSeq(), "1")
|
||||||
|
|
||||||
|
b.AddRecord(r2)
|
||||||
|
assert.Equal(t, b.LastSeq(), "2")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_ShouldFlush(t *testing.T) {
|
||||||
|
b := Buffer{MaxBufferSize: 2}
|
||||||
|
s1, s2 := "1", "2"
|
||||||
|
r1 := &kinesis.Record{SequenceNumber: &s1}
|
||||||
|
r2 := &kinesis.Record{SequenceNumber: &s2}
|
||||||
|
|
||||||
|
b.AddRecord(r1)
|
||||||
|
assert.Equal(t, b.ShouldFlush(), false)
|
||||||
|
|
||||||
|
b.AddRecord(r2)
|
||||||
|
assert.Equal(t, b.ShouldFlush(), true)
|
||||||
|
}
|
||||||
|
|
@ -1,10 +1,48 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
// Checkpoint is used by Pipeline.ProcessShard when they want to checkpoint their progress.
|
import (
|
||||||
// The Kinesis Connector Library will pass an object implementing this interface to ProcessShard,
|
"fmt"
|
||||||
// so they can checkpoint their progress.
|
|
||||||
type Checkpoint interface {
|
"github.com/hoisie/redis"
|
||||||
CheckpointExists(shardID string) bool
|
)
|
||||||
SequenceNumber() string
|
|
||||||
SetCheckpoint(shardID string, sequenceNumber string)
|
// RedisCheckpoint implements the Checkpont interface.
|
||||||
|
// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress.
|
||||||
|
type Checkpoint struct {
|
||||||
|
AppName string
|
||||||
|
StreamName string
|
||||||
|
|
||||||
|
client redis.Client
|
||||||
|
sequenceNumber string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckpointExists determines if a checkpoint for a particular Shard exists.
|
||||||
|
// Typically used to determine whether we should start processing the shard with
|
||||||
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
|
func (c *Checkpoint) CheckpointExists(shardID string) bool {
|
||||||
|
val, _ := c.client.Get(c.key(shardID))
|
||||||
|
|
||||||
|
if val != nil && string(val) != "" {
|
||||||
|
c.sequenceNumber = string(val)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// SequenceNumber returns the current checkpoint stored for the specified shard.
|
||||||
|
func (c *Checkpoint) SequenceNumber() string {
|
||||||
|
return c.sequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
|
// Upon failover, record processing is resumed from this point.
|
||||||
|
func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) {
|
||||||
|
c.client.Set(c.key(shardID), []byte(sequenceNumber))
|
||||||
|
c.sequenceNumber = sequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// key generates a unique Redis key for storage of Checkpoint.
|
||||||
|
func (c *Checkpoint) key(shardID string) string {
|
||||||
|
return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
46
checkpoint_test.go
Normal file
46
checkpoint_test.go
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/bmizerany/assert"
|
||||||
|
"github.com/hoisie/redis"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_key(t *testing.T) {
|
||||||
|
c := Checkpoint{
|
||||||
|
AppName: "app",
|
||||||
|
StreamName: "stream",
|
||||||
|
}
|
||||||
|
|
||||||
|
k := c.key("shard")
|
||||||
|
assert.Equal(t, k, "app:checkpoint:stream:shard")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_CheckpointExists(t *testing.T) {
|
||||||
|
var rc redis.Client
|
||||||
|
rc.Set("app:checkpoint:stream:shard", []byte("testSeqNum"))
|
||||||
|
c := Checkpoint{
|
||||||
|
AppName: "app",
|
||||||
|
StreamName: "stream",
|
||||||
|
}
|
||||||
|
|
||||||
|
r := c.CheckpointExists("shard")
|
||||||
|
assert.Equal(t, r, true)
|
||||||
|
|
||||||
|
rc.Del("app:checkpoint:stream:shard")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_SetCheckpoint(t *testing.T) {
|
||||||
|
var rc redis.Client
|
||||||
|
c := Checkpoint{
|
||||||
|
AppName: "app",
|
||||||
|
StreamName: "stream",
|
||||||
|
}
|
||||||
|
|
||||||
|
c.SetCheckpoint("shard", "testSeqNum")
|
||||||
|
r, _ := rc.Get("app:checkpoint:stream:shard")
|
||||||
|
assert.Equal(t, string(r), "testSeqNum")
|
||||||
|
|
||||||
|
rc.Del("app:checkpoint:stream:shard")
|
||||||
|
}
|
||||||
118
consumer.go
Normal file
118
consumer.go
Normal file
|
|
@ -0,0 +1,118 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxBufferSize = 1000
|
||||||
|
|
||||||
|
func NewConsumer(appName, streamName string) *Consumer {
|
||||||
|
svc := kinesis.New(session.New())
|
||||||
|
|
||||||
|
return &Consumer{
|
||||||
|
appName: appName,
|
||||||
|
streamName: streamName,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Consumer struct {
|
||||||
|
appName string
|
||||||
|
streamName string
|
||||||
|
svc *kinesis.Kinesis
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Consumer) Start(handler Handler) {
|
||||||
|
params := &kinesis.DescribeStreamInput{
|
||||||
|
StreamName: aws.String(c.streamName),
|
||||||
|
}
|
||||||
|
|
||||||
|
// describe stream
|
||||||
|
resp, err := c.svc.DescribeStream(params)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("fatal", "DescribeStream", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle shards
|
||||||
|
for _, shard := range resp.StreamDescription.Shards {
|
||||||
|
logger.Log("info", "processing", "stream", c.streamName, "shard", shard.ShardId)
|
||||||
|
go c.handlerLoop(*shard.ShardId, handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Consumer) handlerLoop(shardID string, handler Handler) {
|
||||||
|
params := &kinesis.GetShardIteratorInput{
|
||||||
|
ShardId: aws.String(shardID),
|
||||||
|
StreamName: aws.String(c.streamName),
|
||||||
|
}
|
||||||
|
|
||||||
|
checkpoint := &Checkpoint{AppName: c.appName, StreamName: c.streamName}
|
||||||
|
if checkpoint.CheckpointExists(shardID) {
|
||||||
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
||||||
|
params.StartingSequenceNumber = aws.String(checkpoint.SequenceNumber())
|
||||||
|
} else {
|
||||||
|
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.svc.GetShardIterator(params)
|
||||||
|
if err != nil {
|
||||||
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
|
logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIterator := resp.ShardIterator
|
||||||
|
b := &Buffer{MaxBufferSize: maxBufferSize}
|
||||||
|
errCount := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
// get records from stream
|
||||||
|
resp, err := c.svc.GetRecords(&kinesis.GetRecordsInput{
|
||||||
|
ShardIterator: shardIterator,
|
||||||
|
})
|
||||||
|
|
||||||
|
// handle recoverable errors, else exit program
|
||||||
|
if err != nil {
|
||||||
|
awsErr, _ := err.(awserr.Error)
|
||||||
|
|
||||||
|
if isRecoverableError(err) {
|
||||||
|
logger.Log("warn", "getRecords", "errorCount", errCount, "code", awsErr.Code())
|
||||||
|
handleAwsWaitTimeExp(errCount)
|
||||||
|
errCount++
|
||||||
|
} else {
|
||||||
|
logger.Log("fatal", "getRecords", awsErr.Code())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
errCount = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// process records
|
||||||
|
if len(resp.Records) > 0 {
|
||||||
|
for _, r := range resp.Records {
|
||||||
|
b.AddRecord(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.ShouldFlush() {
|
||||||
|
handler.HandleRecords(*b)
|
||||||
|
checkpoint.SetCheckpoint(shardID, b.LastSeq())
|
||||||
|
b.Flush()
|
||||||
|
}
|
||||||
|
} else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator {
|
||||||
|
logger.Log("fatal", "nextShardIterator", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
} else {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIterator = resp.NextShardIterator
|
||||||
|
}
|
||||||
|
}
|
||||||
11
emitter.go
11
emitter.go
|
|
@ -1,11 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// Emitter takes a full buffer and processes the stored records. The Emitter is a member of the
|
|
||||||
// Pipeline that "emits" the objects that have been deserialized by the
|
|
||||||
// Transformer. The Emit() method is invoked when the buffer is full (possibly to persist the
|
|
||||||
// records or send them to another Kinesis stream). After emitting the records.
|
|
||||||
// Implementations may choose to fail the entire set of records in the buffer or to fail records
|
|
||||||
// individually.
|
|
||||||
type Emitter interface {
|
|
||||||
Emit(b Buffer, t Transformer)
|
|
||||||
}
|
|
||||||
|
|
@ -12,7 +12,7 @@ import (
|
||||||
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
||||||
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
||||||
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
||||||
type RedshiftBasicEmitter struct {
|
type RedshiftEmitter struct {
|
||||||
AwsAccessKey string
|
AwsAccessKey string
|
||||||
AwsSecretAccessKey string
|
AwsSecretAccessKey string
|
||||||
Delimiter string
|
Delimiter string
|
||||||
|
|
@ -26,10 +26,10 @@ type RedshiftBasicEmitter struct {
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
// then issues a copy command to Redshift data store.
|
// then issues a copy command to Redshift data store.
|
||||||
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
func (e RedshiftEmitter) Emit(b Buffer) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSeq(), b.LastSeq())
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// execute copy statement
|
// execute copy statement
|
||||||
|
|
@ -37,7 +37,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
|
|
||||||
// db command succeeded, break from loop
|
// db command succeeded, break from loop
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.Log("info", "RedshiftBasicEmitter", "file", s3File)
|
logger.Log("info", "RedshiftEmitter", "file", s3File)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -45,14 +45,14 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
if isRecoverableError(err) {
|
if isRecoverableError(err) {
|
||||||
handleAwsWaitTimeExp(i)
|
handleAwsWaitTimeExp(i)
|
||||||
} else {
|
} else {
|
||||||
logger.Log("error", "RedshiftBasicEmitter", "msg", err.Error())
|
logger.Log("error", "RedshiftEmitter", "msg", err.Error())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
func (e RedshiftEmitter) copyStatement(s3File string) string {
|
||||||
b := new(bytes.Buffer)
|
b := new(bytes.Buffer)
|
||||||
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
|
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
|
||||||
|
|
@ -64,9 +64,6 @@ func main() {
|
||||||
auth := kinesis.NewAuth()
|
auth := kinesis.NewAuth()
|
||||||
ksis := kinesis.New(&auth, kinesis.Region{})
|
ksis := kinesis.New(&auth, kinesis.Region{})
|
||||||
|
|
||||||
// Create stream
|
|
||||||
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
|
|
||||||
|
|
||||||
// Fetch stream info
|
// Fetch stream info
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
args.Add("StreamName", cfg.Kinesis.StreamName)
|
args.Add("StreamName", cfg.Kinesis.StreamName)
|
||||||
|
|
@ -1,83 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"code.google.com/p/gcfg"
|
|
||||||
"github.com/harlow/kinesis-connectors"
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
Pipeline struct {
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
Kinesis struct {
|
|
||||||
BufferSize int
|
|
||||||
ShardCount int
|
|
||||||
StreamName string
|
|
||||||
}
|
|
||||||
S3 struct {
|
|
||||||
BucketName string
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newS3Pipeline(cfg Config) *connector.Pipeline {
|
|
||||||
f := &connector.AllPassFilter{}
|
|
||||||
b := &connector.RecordBuffer{
|
|
||||||
NumRecordsToBuffer: cfg.Kinesis.BufferSize,
|
|
||||||
}
|
|
||||||
t := &connector.StringToStringTransformer{}
|
|
||||||
c := &connector.RedisCheckpoint{
|
|
||||||
AppName: cfg.Pipeline.Name,
|
|
||||||
StreamName: cfg.Kinesis.StreamName,
|
|
||||||
}
|
|
||||||
e := &connector.S3Emitter{
|
|
||||||
S3Bucket: cfg.S3.BucketName,
|
|
||||||
}
|
|
||||||
return &connector.Pipeline{
|
|
||||||
Buffer: b,
|
|
||||||
Checkpoint: c,
|
|
||||||
Emitter: e,
|
|
||||||
Filter: f,
|
|
||||||
StreamName: cfg.Kinesis.StreamName,
|
|
||||||
Transformer: t,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var cfg Config
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// Load config vars
|
|
||||||
err = gcfg.ReadFileInto(&cfg, "pipeline.cfg")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Config ERROR: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize Kinesis client
|
|
||||||
auth := kinesis.NewAuth()
|
|
||||||
ksis := kinesis.New(&auth, kinesis.Region{})
|
|
||||||
|
|
||||||
// Create stream
|
|
||||||
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
|
|
||||||
|
|
||||||
// Fetch stream info
|
|
||||||
args := kinesis.NewArgs()
|
|
||||||
args.Add("StreamName", cfg.Kinesis.StreamName)
|
|
||||||
streamInfo, err := ksis.DescribeStream(args)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process kinesis shards
|
|
||||||
for _, shard := range streamInfo.StreamDescription.Shards {
|
|
||||||
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
|
|
||||||
p := newS3Pipeline(cfg)
|
|
||||||
go p.ProcessShard(shard.ShardId)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep alive
|
|
||||||
<-make(chan int)
|
|
||||||
}
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
[pipeline]
|
|
||||||
name = s3Pipeline
|
|
||||||
[s3]
|
|
||||||
bucketName = kinesis.test
|
|
||||||
[kinesis]
|
|
||||||
bufferSize = 100
|
|
||||||
shardCount = 2
|
|
||||||
streamName = userStream
|
|
||||||
|
|
@ -22,4 +22,4 @@ export AWS_SECRET_KEY=
|
||||||
|
|
||||||
### Running the code
|
### Running the code
|
||||||
|
|
||||||
$ go run main.go
|
$ go run main.go -a appName -s streamName
|
||||||
38
examples/s3/main.go
Normal file
38
examples/s3/main.go
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"flag"
|
||||||
|
|
||||||
|
"github.com/harlow/kinesis-connectors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
app = flag.String("a", "", "App name")
|
||||||
|
bucket = flag.String("b", "", "Bucket name")
|
||||||
|
stream = flag.String("s", "", "Stream name")
|
||||||
|
)
|
||||||
|
|
||||||
|
func handler(b connector.Buffer) {
|
||||||
|
body := new(bytes.Buffer)
|
||||||
|
|
||||||
|
// filter or transform data if needed
|
||||||
|
for _, r := range b.GetRecords() {
|
||||||
|
body.Write(r.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
s3 := &connector.S3Emitter{Bucket: *bucket}
|
||||||
|
s3.Emit(
|
||||||
|
connector.S3Key("", b.FirstSeq(), b.LastSeq()),
|
||||||
|
bytes.NewReader(body.Bytes()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
c := connector.NewConsumer(*app, *stream)
|
||||||
|
c.Start(connector.HandlerFunc(handler))
|
||||||
|
|
||||||
|
select {} // run forever
|
||||||
|
}
|
||||||
|
|
@ -16,5 +16,5 @@ export AWS_SECRET_KEY=
|
||||||
|
|
||||||
### Running the code
|
### Running the code
|
||||||
|
|
||||||
$ curl https://s3.amazonaws.com/kinesis.test/users.txt > users.txt
|
$ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt
|
||||||
$ go run main.go
|
$ go run main.go
|
||||||
|
|
@ -2,6 +2,8 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -13,55 +15,56 @@ import (
|
||||||
|
|
||||||
// Note: download file with test data
|
// Note: download file with test data
|
||||||
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
|
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
|
||||||
func putToS3(svc *kinesis.Kinesis, data string) {
|
var stream = flag.String("s", "", "Stream name")
|
||||||
|
|
||||||
|
func putToS3(svc *kinesis.Kinesis, data string, partitionKey string) {
|
||||||
params := &kinesis.PutRecordInput{
|
params := &kinesis.PutRecordInput{
|
||||||
Data: []byte(data),
|
Data: []byte(data),
|
||||||
PartitionKey: aws.String("partitionKey"),
|
PartitionKey: aws.String(partitionKey),
|
||||||
StreamName: aws.String("hw-test-stream"),
|
StreamName: aws.String(*stream),
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := svc.PutRecord(params)
|
_, err := svc.PutRecord(params)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err.Error())
|
fmt.Println(err.Error())
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
log.Print(".")
|
fmt.Print(".")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
wg := &sync.WaitGroup{}
|
flag.Parse()
|
||||||
|
|
||||||
jobCh := make(chan string)
|
jobCh := make(chan string)
|
||||||
|
|
||||||
// read sample data
|
|
||||||
file, err := os.Open("/tmp/users.txt")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Cannot open users.txt file")
|
|
||||||
}
|
|
||||||
|
|
||||||
defer file.Close()
|
|
||||||
scanner := bufio.NewScanner(file)
|
|
||||||
|
|
||||||
// initialize kinesis client
|
|
||||||
svc := kinesis.New(session.New())
|
svc := kinesis.New(session.New())
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
// boot the workers for processing data
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
for data := range jobCh {
|
for data := range jobCh {
|
||||||
putToS3(svc, data)
|
putToS3(svc, data, string(i))
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for scanner.Scan() {
|
// open data file
|
||||||
data := scanner.Text()
|
f, err := os.Open("/tmp/users.txt")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Cannot open users.txt file")
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// put sample data on channel
|
||||||
|
b := bufio.NewScanner(f)
|
||||||
|
for b.Scan() {
|
||||||
|
data := b.Text()
|
||||||
jobCh <- data
|
jobCh <- data
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println(".")
|
fmt.Println(".")
|
||||||
log.Println("Finished populating stream")
|
log.Println("Finished populating stream")
|
||||||
}
|
}
|
||||||
|
|
@ -1,7 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// Filter is an interface used for determinint whether to buffer records.
|
|
||||||
// Returns false if you don't want to hold on to the record.
|
|
||||||
type Filter interface {
|
|
||||||
KeepRecord(r interface{}) bool
|
|
||||||
}
|
|
||||||
18
handler.go
Normal file
18
handler.go
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
type Handler interface {
|
||||||
|
HandleRecords(b Buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandlerFunc is a convenience type to avoid having to declare a struct
|
||||||
|
// to implement the Handler interface, it can be used like this:
|
||||||
|
//
|
||||||
|
// consumer.AddHandler(connector.HandlerFunc(func(b Buffer) {
|
||||||
|
// // ...
|
||||||
|
// }))
|
||||||
|
type HandlerFunc func(b Buffer)
|
||||||
|
|
||||||
|
// HandleRecords implements the Handler interface
|
||||||
|
func (h HandlerFunc) HandleRecords(b Buffer) {
|
||||||
|
h(b)
|
||||||
|
}
|
||||||
54
kinesis.go
54
kinesis.go
|
|
@ -1,54 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and
|
|
||||||
// waits for it to become available.
|
|
||||||
func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
|
||||||
if !StreamExists(k, streamName) {
|
|
||||||
err := k.CreateStream(streamName, shardCount)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("error", "CreateStream", "msg", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := &kinesis.DescribeStreamResp{}
|
|
||||||
timeout := make(chan bool, 30)
|
|
||||||
|
|
||||||
for {
|
|
||||||
args := kinesis.NewArgs()
|
|
||||||
args.Add("StreamName", streamName)
|
|
||||||
resp, _ = k.DescribeStream(args)
|
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
|
||||||
logger.Log("info", "DescribeStream", "stream", streamName, "status", streamStatus)
|
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
|
||||||
time.Sleep(4 * time.Second)
|
|
||||||
timeout <- true
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamExists checks if a Kinesis stream exists.
|
|
||||||
func StreamExists(k *kinesis.Kinesis, streamName string) bool {
|
|
||||||
args := kinesis.NewArgs()
|
|
||||||
resp, err := k.ListStreams(args)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("error", "ListStream", "stream", streamName, "status", err.Error())
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, s := range resp.StreamNames {
|
|
||||||
if s == streamName {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
@ -13,4 +13,4 @@ func SetLogger(l log.Logger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// specify a default logger so that we don't end up with panics.
|
// specify a default logger so that we don't end up with panics.
|
||||||
var logger log.Logger = log.NewPrefixLogger(os.Stderr)
|
var logger log.Logger = log.NewLogfmtLogger(os.Stderr)
|
||||||
|
|
|
||||||
112
pipeline.go
112
pipeline.go
|
|
@ -1,112 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Pipeline is used as a record processor to configure a pipline.
|
|
||||||
//
|
|
||||||
// The user should implement this such that each method returns a configured implementation of each
|
|
||||||
// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model.
|
|
||||||
// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter.
|
|
||||||
type Pipeline struct {
|
|
||||||
Buffer Buffer
|
|
||||||
Checkpoint Checkpoint
|
|
||||||
Emitter Emitter
|
|
||||||
Filter Filter
|
|
||||||
Kinesis *kinesis.Kinesis
|
|
||||||
StreamName string
|
|
||||||
Transformer Transformer
|
|
||||||
|
|
||||||
checkpointSequenceNumber string
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessShard is a long running process that handles reading records from a Kinesis shard.
|
|
||||||
func (p Pipeline) ProcessShard(shardID string) {
|
|
||||||
svc := kinesis.New(&aws.Config{Region: "us-east-1"})
|
|
||||||
|
|
||||||
args := &kinesis.GetShardIteratorInput{
|
|
||||||
ShardID: aws.String(shardID),
|
|
||||||
StreamName: aws.String(p.StreamName),
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.Checkpoint.CheckpointExists(shardID) {
|
|
||||||
args.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
|
||||||
args.StartingSequenceNumber = aws.String(p.Checkpoint.SequenceNumber())
|
|
||||||
} else {
|
|
||||||
args.ShardIteratorType = aws.String("TRIM_HORIZON")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := svc.GetShardIterator(args)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if awsErr, ok := err.(awserr.Error); ok {
|
|
||||||
logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr())
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
errorCount := 0
|
|
||||||
shardIterator := resp.ShardIterator
|
|
||||||
|
|
||||||
for {
|
|
||||||
// exit program if error threshold is reached
|
|
||||||
if errorCount > 50 {
|
|
||||||
logger.Log("fatal", "getRecords", "msg", "Too many consecutive error attempts")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get records from stream
|
|
||||||
args := &kinesis.GetRecordsInput{ShardIterator: shardIterator}
|
|
||||||
resp, err := svc.GetRecords(args)
|
|
||||||
|
|
||||||
// handle recoverable errors, else exit program
|
|
||||||
if err != nil {
|
|
||||||
awsErr, _ := err.(awserr.Error)
|
|
||||||
errorCount++
|
|
||||||
|
|
||||||
if isRecoverableError(err) {
|
|
||||||
logger.Log("warn", "getRecords", "errorCount", errorCount, "code", awsErr.Code())
|
|
||||||
handleAwsWaitTimeExp(errorCount)
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
logger.Log("fatal", "getRecords", awsErr.Code())
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
errorCount = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// process records
|
|
||||||
if len(resp.Records) > 0 {
|
|
||||||
for _, r := range resp.Records {
|
|
||||||
transformedRecord := p.Transformer.ToRecord(r.Data)
|
|
||||||
|
|
||||||
if p.Filter.KeepRecord(transformedRecord) {
|
|
||||||
p.Buffer.ProcessRecord(transformedRecord, *r.SequenceNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.checkpointSequenceNumber = *r.SequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
|
||||||
p.Emitter.Emit(p.Buffer, p.Transformer)
|
|
||||||
logger.Log("info", "emit", "shardID", shardID, "recordsEmitted", len(p.Buffer.Records()))
|
|
||||||
p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber)
|
|
||||||
p.Buffer.Flush()
|
|
||||||
}
|
|
||||||
} else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator {
|
|
||||||
logger.Log("fatal", "nextShardIterator", "msg", err.Error())
|
|
||||||
os.Exit(1)
|
|
||||||
} else {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
shardIterator = resp.NextShardIterator
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,51 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// RecordBuffer is a basic implementation of the Buffer interface.
|
|
||||||
// It buffer's records and answers questions on when it should be periodically flushed.
|
|
||||||
type RecordBuffer struct {
|
|
||||||
NumRecordsToBuffer int
|
|
||||||
|
|
||||||
firstSequenceNumber string
|
|
||||||
lastSequenceNumber string
|
|
||||||
recordsInBuffer []interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessRecord adds a message to the buffer.
|
|
||||||
func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) {
|
|
||||||
if b.NumRecordsInBuffer() == 0 {
|
|
||||||
b.firstSequenceNumber = sequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
b.lastSequenceNumber = sequenceNumber
|
|
||||||
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Records returns the records in the buffer.
|
|
||||||
func (b *RecordBuffer) Records() []interface{} {
|
|
||||||
return b.recordsInBuffer
|
|
||||||
}
|
|
||||||
|
|
||||||
// NumRecordsInBuffer returns the number of messages in the buffer.
|
|
||||||
func (b RecordBuffer) NumRecordsInBuffer() int {
|
|
||||||
return len(b.recordsInBuffer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush empties the buffer and resets the sequence counter.
|
|
||||||
func (b *RecordBuffer) Flush() {
|
|
||||||
b.recordsInBuffer = b.recordsInBuffer[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// ShouldFlush determines if the buffer has reached its target size.
|
|
||||||
func (b *RecordBuffer) ShouldFlush() bool {
|
|
||||||
return len(b.recordsInBuffer) >= b.NumRecordsToBuffer
|
|
||||||
}
|
|
||||||
|
|
||||||
// FirstSequenceNumber returns the sequence number of the first message in the buffer.
|
|
||||||
func (b *RecordBuffer) FirstSequenceNumber() string {
|
|
||||||
return b.firstSequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastSequenceNumber returns the sequence number of the last message in the buffer.
|
|
||||||
func (b *RecordBuffer) LastSequenceNumber() string {
|
|
||||||
return b.lastSequenceNumber
|
|
||||||
}
|
|
||||||
|
|
@ -1,98 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
type TestRecord struct{}
|
|
||||||
|
|
||||||
func (r TestRecord) ToDelimitedString() string {
|
|
||||||
return "test"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r TestRecord) ToJSON() []byte {
|
|
||||||
return []byte("test")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcessRecord(t *testing.T) {
|
|
||||||
var r1, s1 = TestRecord{}, "Seq1"
|
|
||||||
var r2, s2 = TestRecord{}, "Seq2"
|
|
||||||
|
|
||||||
b := RecordBuffer{}
|
|
||||||
b.ProcessRecord(r1, s1)
|
|
||||||
|
|
||||||
if b.NumRecordsInBuffer() != 1 {
|
|
||||||
t.Errorf("NumRecordsInBuffer() want %v", 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ProcessRecord(r2, s2)
|
|
||||||
|
|
||||||
if b.NumRecordsInBuffer() != 2 {
|
|
||||||
t.Errorf("NumRecordsInBuffer() want %v", 2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFlush(t *testing.T) {
|
|
||||||
var r1, s1 = TestRecord{}, "SeqNum"
|
|
||||||
b := RecordBuffer{}
|
|
||||||
b.ProcessRecord(r1, s1)
|
|
||||||
|
|
||||||
b.Flush()
|
|
||||||
|
|
||||||
if b.NumRecordsInBuffer() != 0 {
|
|
||||||
t.Errorf("Count() want %v", 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLastSequenceNumber(t *testing.T) {
|
|
||||||
var r1, s1 = TestRecord{}, "Seq1"
|
|
||||||
var r2, s2 = TestRecord{}, "Seq2"
|
|
||||||
|
|
||||||
b := RecordBuffer{}
|
|
||||||
b.ProcessRecord(r1, s1)
|
|
||||||
|
|
||||||
if b.LastSequenceNumber() != s1 {
|
|
||||||
t.Errorf("LastSequenceNumber() want %v", s1)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ProcessRecord(r2, s2)
|
|
||||||
|
|
||||||
if b.LastSequenceNumber() != s2 {
|
|
||||||
t.Errorf("LastSequenceNumber() want %v", s2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFirstSequenceNumber(t *testing.T) {
|
|
||||||
var r1, s1 = TestRecord{}, "Seq1"
|
|
||||||
var r2, s2 = TestRecord{}, "Seq2"
|
|
||||||
|
|
||||||
b := RecordBuffer{}
|
|
||||||
b.ProcessRecord(r1, s1)
|
|
||||||
|
|
||||||
if b.FirstSequenceNumber() != s1 {
|
|
||||||
t.Errorf("FirstSequenceNumber() want %v", s1)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ProcessRecord(r2, s2)
|
|
||||||
|
|
||||||
if b.FirstSequenceNumber() != s1 {
|
|
||||||
t.Errorf("FirstSequenceNumber() want %v", s1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestShouldFlush(t *testing.T) {
|
|
||||||
const n = 2
|
|
||||||
var r1, s1 = TestRecord{}, "Seq1"
|
|
||||||
var r2, s2 = TestRecord{}, "Seq2"
|
|
||||||
|
|
||||||
b := RecordBuffer{NumRecordsToBuffer: n}
|
|
||||||
b.ProcessRecord(r1, s1)
|
|
||||||
|
|
||||||
if b.ShouldFlush() != false {
|
|
||||||
t.Errorf("ShouldFlush() want %v", false)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ProcessRecord(r2, s2)
|
|
||||||
|
|
||||||
if b.ShouldFlush() != true {
|
|
||||||
t.Errorf("ShouldFlush() want %v", true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,48 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/hoisie/redis"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RedisCheckpoint implements the Checkpont interface.
|
|
||||||
// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress.
|
|
||||||
type RedisCheckpoint struct {
|
|
||||||
AppName string
|
|
||||||
StreamName string
|
|
||||||
|
|
||||||
client redis.Client
|
|
||||||
sequenceNumber string
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckpointExists determines if a checkpoint for a particular Shard exists.
|
|
||||||
// Typically used to determine whether we should start processing the shard with
|
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
|
||||||
func (c *RedisCheckpoint) CheckpointExists(shardID string) bool {
|
|
||||||
val, _ := c.client.Get(c.key(shardID))
|
|
||||||
|
|
||||||
if val != nil && string(val) != "" {
|
|
||||||
c.sequenceNumber = string(val)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// SequenceNumber returns the current checkpoint stored for the specified shard.
|
|
||||||
func (c *RedisCheckpoint) SequenceNumber() string {
|
|
||||||
return c.sequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
|
||||||
// Upon failover, record processing is resumed from this point.
|
|
||||||
func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) {
|
|
||||||
c.client.Set(c.key(shardID), []byte(sequenceNumber))
|
|
||||||
c.sequenceNumber = sequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
// key generates a unique Redis key for storage of Checkpoint.
|
|
||||||
func (c *RedisCheckpoint) key(shardID string) string {
|
|
||||||
return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID)
|
|
||||||
}
|
|
||||||
|
|
@ -1,48 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/hoisie/redis"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestKey(t *testing.T) {
|
|
||||||
k := "app:checkpoint:stream:shard"
|
|
||||||
c := RedisCheckpoint{AppName: "app", StreamName: "stream"}
|
|
||||||
|
|
||||||
r := c.key("shard")
|
|
||||||
|
|
||||||
if r != k {
|
|
||||||
t.Errorf("key() = %v, want %v", k, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCheckpointExists(t *testing.T) {
|
|
||||||
var rc redis.Client
|
|
||||||
k := "app:checkpoint:stream:shard"
|
|
||||||
rc.Set(k, []byte("fakeSeqNum"))
|
|
||||||
c := RedisCheckpoint{AppName: "app", StreamName: "stream"}
|
|
||||||
|
|
||||||
r := c.CheckpointExists("shard")
|
|
||||||
|
|
||||||
if r != true {
|
|
||||||
t.Errorf("CheckpointExists() = %v, want %v", false, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.Del(k)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSetCheckpoint(t *testing.T) {
|
|
||||||
k := "app:checkpoint:stream:shard"
|
|
||||||
var rc redis.Client
|
|
||||||
c := RedisCheckpoint{AppName: "app", StreamName: "stream"}
|
|
||||||
c.SetCheckpoint("shard", "fakeSeqNum")
|
|
||||||
|
|
||||||
r, _ := rc.Get(k)
|
|
||||||
|
|
||||||
if string(r) != "fakeSeqNum" {
|
|
||||||
t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", r)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.Del(k)
|
|
||||||
}
|
|
||||||
74
redshift_emitter.go
Normal file
74
redshift_emitter.go
Normal file
|
|
@ -0,0 +1,74 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
// Postgres package is used when sql.Open is called
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
||||||
|
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
||||||
|
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
||||||
|
type RedshiftEmitter struct {
|
||||||
|
AwsAccessKey string
|
||||||
|
AwsSecretAccessKey string
|
||||||
|
Delimiter string
|
||||||
|
Format string
|
||||||
|
Jsonpaths string
|
||||||
|
S3Bucket string
|
||||||
|
S3Prefix string
|
||||||
|
TableName string
|
||||||
|
Db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
|
// then issues a copy command to Redshift data store.
|
||||||
|
func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) {
|
||||||
|
// put contents to S3 Bucket
|
||||||
|
s3 := &S3Emitter{Bucket: e.S3Bucket}
|
||||||
|
s3.Emit(s3Key, b)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
// execute copy statement
|
||||||
|
_, err := e.Db.Exec(e.copyStatement(s3Key))
|
||||||
|
|
||||||
|
// db command succeeded, break from loop
|
||||||
|
if err == nil {
|
||||||
|
logger.Log("info", "RedshiftEmitter", "file", s3Key)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle recoverable errors, else break from loop
|
||||||
|
if isRecoverableError(err) {
|
||||||
|
handleAwsWaitTimeExp(i)
|
||||||
|
} else {
|
||||||
|
logger.Log("error", "RedshiftEmitter", "msg", err.Error())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
|
func (e RedshiftEmitter) copyStatement(s3Key string) string {
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
|
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3Key))
|
||||||
|
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey))
|
||||||
|
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey))
|
||||||
|
|
||||||
|
switch e.Format {
|
||||||
|
case "json":
|
||||||
|
b.WriteString(fmt.Sprintf("json 'auto'"))
|
||||||
|
case "jsonpaths":
|
||||||
|
b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths))
|
||||||
|
default:
|
||||||
|
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
||||||
|
}
|
||||||
|
b.WriteString(";")
|
||||||
|
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
@ -4,8 +4,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCopyStatement(t *testing.T) {
|
func Test_CopyStatement(t *testing.T) {
|
||||||
e := RedshiftBasicEmitter{
|
e := RedshiftEmitter{
|
||||||
Delimiter: ",",
|
Delimiter: ",",
|
||||||
S3Bucket: "test_bucket",
|
S3Bucket: "test_bucket",
|
||||||
TableName: "test_table",
|
TableName: "test_table",
|
||||||
|
|
@ -1,43 +1,33 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"io"
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"gopkg.in/matryer/try.v1"
|
"gopkg.in/matryer/try.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
// S3Emitter stores data in S3 bucket.
|
||||||
//
|
//
|
||||||
// The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this
|
// The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this
|
||||||
// struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated
|
// struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated
|
||||||
// from the first and last sequence numbers of the records contained in that file separated by a
|
// from the first and last sequence numbers of the records contained in that file separated by a
|
||||||
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
S3Bucket string
|
Bucket string
|
||||||
S3Prefix string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
||||||
func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) {
|
||||||
var buffer bytes.Buffer
|
svc := s3.New(session.New())
|
||||||
svc := s3.New(&aws.Config{Region: "us-east-1"})
|
|
||||||
key := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
|
||||||
|
|
||||||
for _, r := range b.Records() {
|
|
||||||
var s = t.FromRecord(r)
|
|
||||||
buffer.Write(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
params := &s3.PutObjectInput{
|
params := &s3.PutObjectInput{
|
||||||
Body: bytes.NewReader(buffer.Bytes()),
|
Body: b,
|
||||||
Bucket: aws.String(e.S3Bucket),
|
Bucket: aws.String(e.Bucket),
|
||||||
ContentType: aws.String("text/plain"),
|
ContentType: aws.String("text/plain"),
|
||||||
Key: aws.String(key),
|
Key: aws.String(s3Key),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := try.Do(func(attempt int) (bool, error) {
|
err := try.Do(func(attempt int) (bool, error) {
|
||||||
|
|
@ -48,18 +38,9 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if awsErr, ok := err.(awserr.Error); ok {
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
logger.Log("error", "emit", "code", awsErr.Code())
|
logger.Log("error", "s3.PutObject", "code", awsErr.Code())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
logger.Log("info", "S3Emitter", "msg", "success", "key", s3Key)
|
||||||
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
|
||||||
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
|
||||||
date := time.Now().UTC().Format("2006/01/02")
|
|
||||||
if e.S3Prefix == "" {
|
|
||||||
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
|
||||||
} else {
|
|
||||||
return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,27 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestS3FileName(t *testing.T) {
|
|
||||||
d := time.Now().UTC().Format("2006/01/02")
|
|
||||||
e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"}
|
|
||||||
|
|
||||||
expected := fmt.Sprintf("prefix/%v/a-b", d)
|
|
||||||
result := e.S3FileName("a", "b")
|
|
||||||
|
|
||||||
if result != expected {
|
|
||||||
t.Errorf("S3FileName() = %v want %v", result, expected)
|
|
||||||
}
|
|
||||||
|
|
||||||
e.S3Prefix = ""
|
|
||||||
expected = fmt.Sprintf("%v/a-b", d)
|
|
||||||
result = e.S3FileName("a", "b")
|
|
||||||
|
|
||||||
if result != expected {
|
|
||||||
t.Errorf("S3FileName() = %v want %v", result, expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
16
s3_key.go
Normal file
16
s3_key.go
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func S3Key(prefix, firstSeq, lastSeq string) string {
|
||||||
|
date := time.Now().UTC().Format("2006/01/02")
|
||||||
|
|
||||||
|
if prefix == "" {
|
||||||
|
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("%v/%v/%v-%v", prefix, date, firstSeq, lastSeq)
|
||||||
|
}
|
||||||
|
}
|
||||||
19
s3_key_test.go
Normal file
19
s3_key_test.go
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bmizerany/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_S3Key(t *testing.T) {
|
||||||
|
d := time.Now().UTC().Format("2006/01/02")
|
||||||
|
|
||||||
|
k := S3Key("", "a", "b")
|
||||||
|
assert.Equal(t, k, fmt.Sprintf("%v/a-b", d))
|
||||||
|
|
||||||
|
k = S3Key("prefix", "a", "b")
|
||||||
|
assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d))
|
||||||
|
}
|
||||||
|
|
@ -1,38 +1,41 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
||||||
// S3 file path onto the output stream for processing by manifest application.
|
// S3 file path onto the output stream for processing by manifest application.
|
||||||
type S3ManifestEmitter struct {
|
type S3ManifestEmitter struct {
|
||||||
OutputStream string
|
OutputStream string
|
||||||
S3Bucket string
|
Bucket string
|
||||||
Ksis *kinesis.Kinesis
|
Prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) {
|
||||||
|
// put contents to S3 Bucket
|
||||||
|
s3 := &S3Emitter{Bucket: e.Bucket}
|
||||||
|
s3.Emit(s3Key, b)
|
||||||
|
|
||||||
// Emit buffer contents to S3 Bucket
|
// put file path on Kinesis output stream
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
params := &kinesis.PutRecordInput{
|
||||||
s3Emitter.Emit(b, t)
|
Data: []byte(s3Key),
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
PartitionKey: aws.String(s3Key),
|
||||||
|
StreamName: aws.String(e.OutputStream),
|
||||||
|
}
|
||||||
|
|
||||||
// Emit the file path to Kinesis Output stream
|
svc := kinesis.New(session.New())
|
||||||
args := kinesis.NewArgs()
|
_, err := svc.PutRecord(params)
|
||||||
args.Add("StreamName", e.OutputStream)
|
|
||||||
args.Add("PartitionKey", s3File)
|
|
||||||
args.AddData([]byte(s3File))
|
|
||||||
|
|
||||||
_, err := e.Ksis.PutRecord(args)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log("error", "PutRecord", "msg", err)
|
logger.Log("error", "PutRecord", "msg", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
} else {
|
} else {
|
||||||
logger.Log("info", "S3ManifestEmitter", "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream)
|
logger.Log("info", "S3ManifestEmitter", "stream", e.OutputStream, "key", s3Key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// StringToStringTransformer an implemenation of Transformer interface.
|
|
||||||
type StringToStringTransformer struct{}
|
|
||||||
|
|
||||||
// ToRecord takes a byte array and returns a string.
|
|
||||||
func (t StringToStringTransformer) ToRecord(data []byte) interface{} {
|
|
||||||
return string(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromRecord takes an string and returns a byte array.
|
|
||||||
func (t StringToStringTransformer) FromRecord(s interface{}) []byte {
|
|
||||||
return []byte(s.(string))
|
|
||||||
}
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
// Transformer is used to transform data (byte array) to a Record for
|
|
||||||
// processing in the application.
|
|
||||||
type Transformer interface {
|
|
||||||
FromRecord(r interface{}) []byte
|
|
||||||
ToRecord(data []byte) interface{}
|
|
||||||
}
|
|
||||||
Loading…
Reference in a new issue