diff --git a/.gitignore b/.gitignore index 47a8300..3a49a48 100644 --- a/.gitignore +++ b/.gitignore @@ -38,7 +38,6 @@ tags* # Vendored files vendor/** -!vendor/vendor.json # Benchmark files prof.cpu diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..184e7a8 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,44 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/apex/log" + packages = [".","handlers/text"] + revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be" + version = "v1.0.0" + +[[projects]] + name = "github.com/aws/aws-sdk-go" + packages = ["aws","aws/awserr","aws/awsutil","aws/client","aws/client/metadata","aws/corehandlers","aws/credentials","aws/credentials/ec2rolecreds","aws/credentials/endpointcreds","aws/credentials/stscreds","aws/defaults","aws/ec2metadata","aws/endpoints","aws/request","aws/session","aws/signer/v4","internal/shareddefaults","private/protocol","private/protocol/json/jsonutil","private/protocol/jsonrpc","private/protocol/query","private/protocol/query/queryutil","private/protocol/rest","private/protocol/xml/xmlutil","service/kinesis","service/sts"] + revision = "e4f7e38b704e3ed0acc4a7f8196b777696f6f1f3" + version = "v1.12.30" + +[[projects]] + name = "github.com/go-ini/ini" + packages = ["."] + revision = "7e7da451323b6766da368f8a1e8ec9a88a16b4a0" + version = "v1.31.1" + +[[projects]] + name = "github.com/jmespath/go-jmespath" + packages = ["."] + revision = "0b12d6b5" + +[[projects]] + name = "github.com/pkg/errors" + packages = ["."] + revision = "645ef00459ed84a119197bfb8d8205042c6df63d" + version = "v0.8.0" + +[[projects]] + name = "gopkg.in/redis.v5" + packages = [".","internal","internal/consistenthash","internal/hashtag","internal/pool","internal/proto"] + revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0" + version = "v5.2.9" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "4c89159f4d450ef21c84d46e59ff85c20283d05f3578413e3872a870022935d2" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..8cc7dcd --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,50 @@ + +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" + + +[[constraint]] + name = "github.com/apex/log" + version = "1.0.0" + +[[constraint]] + name = "github.com/aws/aws-sdk-go" + version = "1.12.30" + +[[constraint]] + branch = "master" + name = "github.com/bmizerany/assert" + +[[constraint]] + branch = "master" + name = "github.com/crowdmob/goamz" + +[[constraint]] + branch = "master" + name = "github.com/lib/pq" + +[[constraint]] + branch = "master" + name = "github.com/tj/go-kinesis" + +[[constraint]] + name = "gopkg.in/redis.v5" + version = "5.2.9" diff --git a/README.md b/README.md index 8b82f9e..fdb61ea 100644 --- a/README.md +++ b/README.md @@ -1,49 +1,80 @@ -# Golang Kinesis Connectors +# Golang Kinesis Consumer -__Kinesis connector applications written in Go__ +__Kinesis consumer applications written in Go__ -> With the new release of Kinesis Firehose I'd recommend using the [Lambda Streams to Firehose](https://github.com/awslabs/lambda-streams-to-firehose) project for loading data directly into S3 and Redshift. +> With the new release of Kinesis Firehose I'd recommend using the [kinesis to firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) functionality for writing data directly to S3, Redshift, or Elasticsearch. -Inspired by the [Amazon Kinesis Connector Library](https://github.com/awslabs/amazon-kinesis-connectors). This library is intended to be a lightweight wrapper around the Kinesis API to handle batching records, setting checkpoints, respecting ratelimits, and recovering from network errors. +## Installation -![golang_kinesis_connector](https://cloud.githubusercontent.com/assets/739782/4262283/2ee2550e-3b97-11e4-8cd1-21a5d7ee0964.png) +Get the package source: + + $ go get github.com/harlow/kinesis-consumer ## Overview -The consumer expects a handler func that will process a buffer of incoming records. +The consumer leverages a handler func that accepts a Kinesis record. The `Scan` method will consume all shards concurrently and call the callback func as it receives records from the stream. ```go +import consumer "github.com/harlow/kinesis-consumer" + func main() { - var( - app = flag.String("app", "", "The app name") - stream = flag.String("stream", "", "The stream name") - ) - flag.Parse() + log.SetHandler(text.New(os.Stderr)) + log.SetLevel(log.DebugLevel) - // create new consumer - c := connector.NewConsumer(connector.Config{ - AppName: *app, - MaxRecordCount: 400, - Streamname: *stream, - }) + var ( + app = flag.String("app", "", "App name") // name of consumer group + stream = flag.String("stream", "", "Stream name") + ) + flag.Parse() - // process records from the stream - c.Start(connector.HandlerFunc(func(b connector.Buffer) { - fmt.Println(b.GetRecords()) - })) + c, err := consumer.New(*app, *stream) + if err != nil { + log.Fatalf("new consumer error: %v", err) + } - select {} + c.Scan(context.TODO(), func(r *kinesis.Record) bool { + fmt.Println(string(r.Data)) + + return true // continue scanning + }) } ``` -### Config +Note: If you need to aggregate based on a specific shard the `ScanShard` method should be leverged instead. -The default behavior for checkpointing uses Redis on localhost. To set a custom Redis URL use ENV vars: +### Configuration + +The consumer requires the following config: + +* App Name (used for checkpoints) +* Stream Name (kinesis stream name) + +It also accepts the following optional overrides: + +* Kinesis Client +* Logger +* Checkpoint + +```go +svc := kinesis.New(session.New(aws.NewConfig())) + +c, err := consumer.New( + appName, + streamName, + consumer.WithClient(svc), +) +``` + +### Checkpoint + +The default checkpoint uses Redis on localhost; to set a custom Redis URL use ENV vars: ``` -REDIS_URL=my-custom-redis-server.com:6379 +REDIS_URL=redis.example.com:6379 ``` +* [Add DDB as a checkpoint option](https://github.com/harlow/kinesis-consumer/issues/26) + ### Logging [Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used for logging Info. Override the logs format with other [Log Handlers](https://github.com/apex/log/tree/master/_examples). For example using the "json" log handler: @@ -66,34 +97,10 @@ Which will producde the following logs: ``` INFO[0000] processing app=test shard=shardId-000000000000 stream=test - INFO[0008] emitted app=test count=500 shard=shardId-000000000000 stream=test - INFO[0012] emitted app=test count=500 shard=shardId-000000000000 stream=test + INFO[0008] checkpoint app=test shard=shardId-000000000000 stream=test + INFO[0012] checkpoint app=test shard=shardId-000000000000 stream=test ``` -### Installation - -Get the package source: - - $ go get github.com/harlow/kinesis-connectors - -### Fetching Dependencies - -Install `govendor`: - - $ export GO15VENDOREXPERIMENT=1 - $ go get -u github.com/kardianos/govendor - -Install dependencies into `./vendor/`: - - $ govendor sync - -### Examples - -Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/examples/seed) code to put sample data onto the stream. - -* [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose) -* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3) - ## Contributing Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]! diff --git a/buffer.go b/buffer.go deleted file mode 100644 index 8ec02b9..0000000 --- a/buffer.go +++ /dev/null @@ -1,59 +0,0 @@ -package connector - -import "github.com/aws/aws-sdk-go/service/kinesis" - -// Buffer holds records and answers questions on when it -// should be periodically flushed. -type Buffer struct { - records []*kinesis.Record - firstSequenceNumber string - lastSequenceNumber string - shardID string - - MaxRecordCount int -} - -// AddRecord adds a record to the buffer. -func (b *Buffer) AddRecord(r *kinesis.Record) { - if b.RecordCount() == 0 { - b.firstSequenceNumber = *r.SequenceNumber - } - - b.records = append(b.records, r) - b.lastSequenceNumber = *r.SequenceNumber -} - -// ShouldFlush determines if the buffer has reached its target size. -func (b *Buffer) ShouldFlush() bool { - return b.RecordCount() >= b.MaxRecordCount -} - -// Flush empties the buffer and resets the sequence counter. -func (b *Buffer) Flush() { - b.records = b.records[:0] -} - -// GetRecords returns the records in the buffer. -func (b *Buffer) GetRecords() []*kinesis.Record { - return b.records -} - -// RecordCount returns the number of records in the buffer. -func (b *Buffer) RecordCount() int { - return len(b.records) -} - -// FirstSequenceNumber returns the sequence number of the first record in the buffer. -func (b *Buffer) FirstSeq() string { - return b.firstSequenceNumber -} - -// LastSeq returns the sequence number of the last record in the buffer. -func (b *Buffer) LastSeq() string { - return b.lastSequenceNumber -} - -// ShardID returns the shard ID watched by the consumer -func (b *Buffer) ShardID() string { - return b.shardID -} diff --git a/buffer_test.go b/buffer_test.go deleted file mode 100644 index 6870d21..0000000 --- a/buffer_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package connector - -import ( - "testing" - - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/bmizerany/assert" -) - -func BenchmarkBufferLifecycle(b *testing.B) { - buf := Buffer{MaxRecordCount: 1000} - seq := "1" - rec := &kinesis.Record{SequenceNumber: &seq} - - for i := 0; i < b.N; i++ { - buf.AddRecord(rec) - - if buf.ShouldFlush() { - buf.Flush() - } - } -} - -func Test_FirstSeq(t *testing.T) { - b := Buffer{} - s1, s2 := "1", "2" - r1 := &kinesis.Record{SequenceNumber: &s1} - r2 := &kinesis.Record{SequenceNumber: &s2} - - b.AddRecord(r1) - assert.Equal(t, b.FirstSeq(), "1") - - b.AddRecord(r2) - assert.Equal(t, b.FirstSeq(), "1") -} - -func Test_LastSeq(t *testing.T) { - b := Buffer{} - s1, s2 := "1", "2" - r1 := &kinesis.Record{SequenceNumber: &s1} - r2 := &kinesis.Record{SequenceNumber: &s2} - - b.AddRecord(r1) - assert.Equal(t, b.LastSeq(), "1") - - b.AddRecord(r2) - assert.Equal(t, b.LastSeq(), "2") -} - -func Test_ShouldFlush(t *testing.T) { - b := Buffer{MaxRecordCount: 2} - s1, s2 := "1", "2" - r1 := &kinesis.Record{SequenceNumber: &s1} - r2 := &kinesis.Record{SequenceNumber: &s2} - - b.AddRecord(r1) - assert.Equal(t, b.ShouldFlush(), false) - - b.AddRecord(r2) - assert.Equal(t, b.ShouldFlush(), true) -} diff --git a/checkpoint.go b/checkpoint/checkpoint.go similarity index 93% rename from checkpoint.go rename to checkpoint/checkpoint.go index 001d337..04b447e 100644 --- a/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -1,4 +1,4 @@ -package connector +package checkpoint // Checkpoint interface for functions that checkpoints need to // implement in order to track consumer progress. diff --git a/redis_checkpoint.go b/checkpoint/redis/redis.go similarity index 57% rename from redis_checkpoint.go rename to checkpoint/redis/redis.go index 7195cc7..e88b963 100644 --- a/redis_checkpoint.go +++ b/checkpoint/redis/redis.go @@ -1,16 +1,41 @@ -package connector +package redis import ( "fmt" "log" + "os" - "gopkg.in/redis.v5" + redis "gopkg.in/redis.v5" ) -// RedisCheckpoint implements the Checkpont interface. +const localhost = "127.0.0.1:6379" + +// NewCheckpoint returns a checkpoint that uses Redis for underlying storage +func NewCheckpoint(appName, streamName string) (*Checkpoint, error) { + addr := os.Getenv("REDIS_URL") + if addr == "" { + addr = localhost + } + + client := redis.NewClient(&redis.Options{Addr: addr}) + + // verify we can ping server + _, err := client.Ping().Result() + if err != nil { + return nil, err + } + + return &Checkpoint{ + AppName: appName, + StreamName: streamName, + client: client, + }, nil +} + +// Checkpoint implements the Checkpont interface. // Used to enable the Pipeline.ProcessShard to checkpoint it's progress // while reading records from Kinesis stream. -type RedisCheckpoint struct { +type Checkpoint struct { AppName string StreamName string @@ -21,7 +46,7 @@ type RedisCheckpoint struct { // CheckpointExists determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { +func (c *Checkpoint) CheckpointExists(shardID string) bool { val, _ := c.client.Get(c.key(shardID)).Result() if val != "" { @@ -33,13 +58,13 @@ func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { } // SequenceNumber returns the current checkpoint stored for the specified shard. -func (c *RedisCheckpoint) SequenceNumber() string { +func (c *Checkpoint) SequenceNumber() string { return c.sequenceNumber } // SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { +func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) { err := c.client.Set(c.key(shardID), sequenceNumber, 0).Err() if err != nil { log.Printf("redis checkpoint set error: %v", err) @@ -48,6 +73,6 @@ func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { } // key generates a unique Redis key for storage of Checkpoint. -func (c *RedisCheckpoint) key(shardID string) string { +func (c *Checkpoint) key(shardID string) string { return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) } diff --git a/redis_checkpoint_test.go b/checkpoint/redis/redis_test.go similarity index 93% rename from redis_checkpoint_test.go rename to checkpoint/redis/redis_test.go index b0e1360..0cadec7 100644 --- a/redis_checkpoint_test.go +++ b/checkpoint/redis/redis_test.go @@ -1,4 +1,4 @@ -package connector +package redis import ( "testing" @@ -11,7 +11,7 @@ var defaultAddr = "127.0.0.1:6379" func Test_CheckpointLifecycle(t *testing.T) { client := redis.NewClient(&redis.Options{Addr: defaultAddr}) - c := RedisCheckpoint{ + c := &Checkpoint{ AppName: "app", StreamName: "stream", client: client, @@ -36,7 +36,7 @@ func Test_CheckpointLifecycle(t *testing.T) { func Test_key(t *testing.T) { client := redis.NewClient(&redis.Options{Addr: defaultAddr}) - c := &RedisCheckpoint{ + c := &Checkpoint{ AppName: "app", StreamName: "stream", client: client, diff --git a/config.go b/config.go deleted file mode 100644 index a110cde..0000000 --- a/config.go +++ /dev/null @@ -1,98 +0,0 @@ -package connector - -import ( - "os" - "time" - - redis "gopkg.in/redis.v5" - - "github.com/apex/log" -) - -const ( - defaultBufferSize = 500 - defaultRedisAddr = "127.0.0.1:6379" -) - -// Config vars for the application -type Config struct { - // AppName is the application name and checkpoint namespace. - AppName string - - // StreamName is the Kinesis stream. - StreamName string - - // FlushInterval is a regular interval for flushing the buffer. Defaults to 1s. - FlushInterval time.Duration - - // BufferSize determines the batch request size. Must not exceed 500. Defaults to 500. - BufferSize int - - // Logger is the logger used. Defaults to log.Log. - Logger log.Interface - - // Checkpoint for tracking progress of consumer. - Checkpoint Checkpoint -} - -// defaults for configuration. -func (c *Config) setDefaults() { - if c.Logger == nil { - c.Logger = log.Log - } - - c.Logger = c.Logger.WithFields(log.Fields{ - "package": "kinesis-connectors", - }) - - if c.AppName == "" { - c.Logger.WithField("type", "config").Error("AppName required") - os.Exit(1) - } - - if c.StreamName == "" { - c.Logger.WithField("type", "config").Error("StreamName required") - os.Exit(1) - } - - c.Logger = c.Logger.WithFields(log.Fields{ - "app": c.AppName, - "stream": c.StreamName, - }) - - if c.BufferSize == 0 { - c.BufferSize = defaultBufferSize - } - - if c.FlushInterval == 0 { - c.FlushInterval = time.Second - } - - if c.Checkpoint == nil { - client, err := redisClient() - if err != nil { - c.Logger.WithError(err).Error("Redis connection failed") - os.Exit(1) - } - c.Checkpoint = &RedisCheckpoint{ - AppName: c.AppName, - StreamName: c.StreamName, - client: client, - } - } -} - -func redisClient() (*redis.Client, error) { - redisURL := os.Getenv("REDIS_URL") - if redisURL == "" { - redisURL = defaultRedisAddr - } - client := redis.NewClient(&redis.Options{ - Addr: redisURL, - }) - _, err := client.Ping().Result() - if err != nil { - return nil, err - } - return client, nil -} diff --git a/consumer.go b/consumer.go index 854b96f..27bf269 100644 --- a/consumer.go +++ b/consumer.go @@ -1,119 +1,224 @@ -package connector +package consumer import ( + "context" + "fmt" "os" + "sync" "github.com/apex/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/harlow/kinesis-consumer/checkpoint" + "github.com/harlow/kinesis-consumer/checkpoint/redis" ) -// NewConsumer creates a new consumer with initialied kinesis connection -func NewConsumer(config Config) *Consumer { - config.setDefaults() +// Option is used to override defaults when creating a new Consumer +type Option func(*Consumer) error - svc := kinesis.New( - session.New( - aws.NewConfig().WithMaxRetries(10), - ), - ) - - return &Consumer{ - svc: svc, - Config: config, +// WithClient the Kinesis client +func WithClient(client *kinesis.Kinesis) Option { + return func(c *Consumer) error { + c.svc = client + return nil } } +// WithCheckpoint overrides the default checkpoint +func WithCheckpoint(checkpoint checkpoint.Checkpoint) Option { + return func(c *Consumer) error { + c.checkpoint = checkpoint + return nil + } +} + +// WithLogger overrides the default logger +func WithLogger(logger log.Interface) Option { + return func(c *Consumer) error { + c.logger = logger + return nil + } +} + +// New creates a kinesis consumer with default settings. Use Option to override +// any of the optional attributes. +func New(appName, streamName string, opts ...Option) (*Consumer, error) { + if appName == "" { + return nil, fmt.Errorf("must provide app name to consumer") + } + + if streamName == "" { + return nil, fmt.Errorf("must provide stream name to consumer") + } + + c := &Consumer{ + appName: appName, + streamName: streamName, + } + + // set options + for _, opt := range opts { + if err := opt(c); err != nil { + return nil, err + } + } + + // provide default logger + if c.logger == nil { + c.logger = log.Log.WithFields(log.Fields{ + "package": "kinesis-consumer", + "app": appName, + "stream": streamName, + }) + } + + // provide a default kinesis client + if c.svc == nil { + c.svc = kinesis.New(session.New(aws.NewConfig())) + } + + // provide default checkpoint + if c.checkpoint == nil { + ck, err := redis.NewCheckpoint(appName, streamName) + if err != nil { + return nil, err + } + c.checkpoint = ck + } + + return c, nil +} + // Consumer wraps the interaction with the Kinesis stream type Consumer struct { - svc *kinesis.Kinesis - Config + appName string + streamName string + svc *kinesis.Kinesis + logger log.Interface + checkpoint checkpoint.Checkpoint } -// Start takes a handler and then loops over each of the shards -// processing each one with the handler. -func (c *Consumer) Start(handler Handler) { +// Scan scans each of the shards of the stream, calls the callback +// func with each of the kinesis records +func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + resp, err := c.svc.DescribeStream( &kinesis.DescribeStreamInput{ - StreamName: aws.String(c.StreamName), + StreamName: aws.String(c.streamName), }, ) if err != nil { - c.Logger.WithError(err).Error("DescribeStream") + c.logger.WithError(err).Error("DescribeStream") os.Exit(1) } + var wg sync.WaitGroup + wg.Add(len(resp.StreamDescription.Shards)) + + // scan each of the shards for _, shard := range resp.StreamDescription.Shards { - go c.handlerLoop(*shard.ShardId, handler) + go func(shardID string) { + defer wg.Done() + c.ScanShard(ctx, shardID, fn) + cancel() + }(*shard.ShardId) } + + wg.Wait() } -func (c *Consumer) handlerLoop(shardID string, handler Handler) { - buf := &Buffer{ - MaxRecordCount: c.BufferSize, - shardID: shardID, +// ScanShard loops over records on a kinesis shard, call the callback func +// for each record and checkpoints after each page is processed +func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) { + var ( + logger = c.logger.WithFields(log.Fields{"shard": shardID}) + sequenceNumber string + ) + + shardIterator, err := c.getShardIterator(shardID) + if err != nil { + logger.WithError(err).Error("getShardIterator") + return } - ctx := c.Logger.WithFields(log.Fields{ - "shard": shardID, - }) - shardIterator := c.getShardIterator(shardID) - ctx.Info("processing") + logger.Info("scanning shard") +loop: for { - resp, err := c.svc.GetRecords( - &kinesis.GetRecordsInput{ - ShardIterator: shardIterator, - }, - ) + select { + case <-ctx.Done(): + break loop + default: + resp, err := c.svc.GetRecords( + &kinesis.GetRecordsInput{ + ShardIterator: shardIterator, + }, + ) - if err != nil { - ctx.WithError(err).Error("GetRecords") - shardIterator = c.getShardIterator(shardID) - continue - } - - if len(resp.Records) > 0 { - for _, r := range resp.Records { - buf.AddRecord(r) - - if buf.ShouldFlush() { - handler.HandleRecords(*buf) - ctx.WithField("count", buf.RecordCount()).Info("flushed") - c.Checkpoint.SetCheckpoint(shardID, buf.LastSeq()) - buf.Flush() + if err != nil { + shardIterator, err = c.getShardIterator(shardID) + if err != nil { + logger.WithError(err).Error("getShardIterator") + break loop } + continue + } + + if len(resp.Records) > 0 { + for _, r := range resp.Records { + select { + case <-ctx.Done(): + break loop + default: + sequenceNumber = *r.SequenceNumber + if ok := fn(r); !ok { + break loop + } + } + } + + logger.WithField("records", len(resp.Records)).Info("checkpoint") + c.checkpoint.SetCheckpoint(shardID, sequenceNumber) + } + + if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator { + shardIterator, err = c.getShardIterator(shardID) + if err != nil { + break loop + } + } else { + shardIterator = resp.NextShardIterator } } + } - if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator { - shardIterator = c.getShardIterator(shardID) - } else { - shardIterator = resp.NextShardIterator - } + if sequenceNumber != "" { + c.checkpoint.SetCheckpoint(shardID, sequenceNumber) } } -func (c *Consumer) getShardIterator(shardID string) *string { +func (c *Consumer) getShardIterator(shardID string) (*string, error) { params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), - StreamName: aws.String(c.StreamName), + StreamName: aws.String(c.streamName), } - if c.Checkpoint.CheckpointExists(shardID) { + if c.checkpoint.CheckpointExists(shardID) { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") - params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber()) + params.StartingSequenceNumber = aws.String(c.checkpoint.SequenceNumber()) } else { params.ShardIteratorType = aws.String("TRIM_HORIZON") } resp, err := c.svc.GetShardIterator(params) if err != nil { - c.Logger.WithError(err).Error("GetShardIterator") - os.Exit(1) + c.logger.WithError(err).Error("GetShardIterator") + return nil, err } - return resp.ShardIterator + return resp.ShardIterator, nil } diff --git a/emitter/redshift/emitter.go b/emitter/redshift/emitter.go deleted file mode 100644 index c3c1e0e..0000000 --- a/emitter/redshift/emitter.go +++ /dev/null @@ -1,74 +0,0 @@ -package connector - -import ( - "bytes" - "database/sql" - "fmt" - "io" - - // Postgres package is used when sql.Open is called - _ "github.com/lib/pq" -) - -// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. -// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered -// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. -type RedshiftEmitter struct { - AwsAccessKey string - AwsSecretAccessKey string - Delimiter string - Format string - Jsonpaths string - S3Bucket string - S3Prefix string - TableName string - Db *sql.DB -} - -// Emit is invoked when the buffer is full. This method leverages the S3Emitter and -// then issues a copy command to Redshift data store. -func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) { - // put contents to S3 Bucket - s3 := &Emitter{Bucket: e.S3Bucket} - s3.Emit(s3Key, b) - - for i := 0; i < 10; i++ { - // execute copy statement - _, err := e.Db.Exec(e.copyStatement(s3Key)) - - // db command succeeded, break from loop - if err == nil { - logger.Log("info", "RedshiftEmitter", "file", s3Key) - break - } - - // handle recoverable errors, else break from loop - if isRecoverableError(err) { - handleAwsWaitTimeExp(i) - } else { - logger.Log("error", "RedshiftEmitter", "msg", err.Error()) - break - } - } -} - -// Creates the SQL copy statement issued to Redshift cluster. -func (e RedshiftEmitter) copyStatement(s3Key string) string { - b := new(bytes.Buffer) - b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) - b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3Key)) - b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey)) - b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey)) - - switch e.Format { - case "json": - b.WriteString(fmt.Sprintf("json 'auto'")) - case "jsonpaths": - b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths)) - default: - b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) - } - b.WriteString(";") - - return b.String() -} diff --git a/emitter/redshift/emitter_test.go b/emitter/redshift/emitter_test.go deleted file mode 100644 index 52f33ef..0000000 --- a/emitter/redshift/emitter_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package connector - -import ( - "testing" -) - -func Test_CopyStatement(t *testing.T) { - e := RedshiftEmitter{ - Delimiter: ",", - S3Bucket: "test_bucket", - TableName: "test_table", - } - f := e.copyStatement("test.txt") - - copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';" - - if f != copyStatement { - t.Errorf("copyStatement() = %s want %s", f, copyStatement) - } -} diff --git a/emitter/redshift/manifest.go b/emitter/redshift/manifest.go deleted file mode 100644 index 23e6e62..0000000 --- a/emitter/redshift/manifest.go +++ /dev/null @@ -1,10 +0,0 @@ -package redshift - -type Entry struct { - Url string `json:"url"` - Mandatory bool `json:"mandatory"` -} - -type Manifest struct { - Entries []Entry `json:"entries"` -} diff --git a/emitter/redshift/redshift_manifest_emitter.go b/emitter/redshift/redshift_manifest_emitter.go deleted file mode 100644 index 5768c54..0000000 --- a/emitter/redshift/redshift_manifest_emitter.go +++ /dev/null @@ -1,151 +0,0 @@ -package redshift - -import ( - "bytes" - "database/sql" - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/crowdmob/goamz/aws" - "github.com/crowdmob/goamz/s3" - _ "github.com/lib/pq" -) - -// An implementation of Emitter that reads S3 file paths from a stream, creates a -// manifest file and batch copies them into Redshift. -type RedshiftManifestEmitter struct { - AccessKey string - CopyMandatory bool - DataTable string - Delimiter string - FileTable string - Format string - Jsonpaths string - S3Bucket string - SecretKey string -} - -// Invoked when the buffer is full. -// Emits a Manifest file to S3 and then performs the Redshift copy command. -func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer, shardID string) { - db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) - - if err != nil { - logger.Log("error", "sql.Open", "msg", err.Error()) - os.Exit(1) - } - - // Aggregate file paths as strings - files := []string{} - for _, r := range b.Records() { - f := t.FromRecord(r) - files = append(files, string(f)) - } - - // Manifest file name - date := time.Now().UTC().Format("2006/01/02") - manifestFileName := e.getManifestName(date, files) - - // Issue manifest COPY to Redshift - e.writeManifestToS3(files, manifestFileName) - c := e.copyStmt(manifestFileName) - _, err = db.Exec(c) - - if err != nil { - logger.Log("error", "db.Exec", "msg", err.Error()) - os.Exit(1) - } - - // Insert file paths into File Names table - i := e.fileInsertStmt(files) - _, err = db.Exec(i) - - if err != nil { - logger.Log("error", "db.Exec", "shard", shardID, "msg", err.Error()) - os.Exit(1) - } - - logger.Log("info", "Redshfit COPY", "shard", shardID, "manifest", manifestFileName) - db.Close() -} - -// Creates the INSERT statement for the file names database table. -func (e RedshiftManifestEmitter) fileInsertStmt(fileNames []string) string { - i := new(bytes.Buffer) - i.WriteString("('") - i.WriteString(strings.Join(fileNames, "'),('")) - i.WriteString("')") - - b := new(bytes.Buffer) - b.WriteString("INSERT INTO ") - b.WriteString(e.FileTable) - b.WriteString(" VALUES ") - b.WriteString(i.String()) - b.WriteString(";") - - return b.String() -} - -// Creates the COPY statment for Redshift insertion. -func (e RedshiftManifestEmitter) copyStmt(filePath string) string { - b := new(bytes.Buffer) - c := fmt.Sprintf( - "CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' ", - os.Getenv("AWS_ACCESS_KEY"), - os.Getenv("AWS_SECRET_KEY"), - ) - b.WriteString("COPY " + e.DataTable + " ") - b.WriteString("FROM 's3://" + e.S3Bucket + "/" + filePath + "' ") - b.WriteString(c) - switch e.Format { - case "json": - b.WriteString(fmt.Sprintf("json 'auto' ")) - case "jsonpaths": - b.WriteString(fmt.Sprintf("json '%s' ", e.Jsonpaths)) - default: - b.WriteString(fmt.Sprintf("DELIMITER '%s' ", e.Delimiter)) - } - b.WriteString("MANIFEST") - b.WriteString(";") - return b.String() -} - -// Put the Manifest file contents to Redshift -func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileName string) { - auth, _ := aws.EnvAuth() - s3Con := s3.New(auth, aws.USEast) - bucket := s3Con.Bucket(e.S3Bucket) - content := e.generateManifestFile(files) - err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) - if err != nil { - logger.Log("error", "writeManifestToS3", "msg", err.Error()) - } -} - -// Manifest file name based on First and Last sequence numbers -func (e RedshiftManifestEmitter) getManifestName(date string, files []string) string { - firstSeq := e.getSeq(files[0]) - lastSeq := e.getSeq(files[len(files)-1]) - return fmt.Sprintf("%v/_manifest/%v_%v", date, firstSeq, lastSeq) -} - -// Trims the date and suffix information from string -func (e RedshiftManifestEmitter) getSeq(file string) string { - matches := strings.Split(file, "/") - return matches[len(matches)-1] -} - -// Manifest file contents in JSON structure -func (e RedshiftManifestEmitter) generateManifestFile(files []string) []byte { - m := &Manifest{} - for _, r := range files { - var url = fmt.Sprintf("s3://%s/%s", e.S3Bucket, r) - var entry = Entry{Url: url, Mandatory: e.CopyMandatory} - m.Entries = append(m.Entries, entry) - } - b, _ := json.Marshal(m) - return b -} diff --git a/emitter/redshift/redshift_manifest_emitter_test.go b/emitter/redshift/redshift_manifest_emitter_test.go deleted file mode 100644 index 9dd2de9..0000000 --- a/emitter/redshift/redshift_manifest_emitter_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package redshiftemitter - -import "testing" - -func TestInsertStmt(t *testing.T) { - e := RedshiftManifestEmitter{FileTable: "funz"} - s := []string{"file1", "file2"} - - expected := "INSERT INTO funz VALUES ('file1'),('file2');" - result := e.fileInsertStmt(s) - - if result != expected { - t.Errorf("fileInsertStmt() = %v want %v", result, expected) - } -} - -func TestManifestName(t *testing.T) { - e := RedshiftManifestEmitter{} - s := []string{"2014/01/01/a-b", "2014/01/01/c-d"} - - expected := "2000/01/01/_manifest/a-b_c-d" - result := e.getManifestName("2000/01/01", s) - - if result != expected { - t.Errorf("getManifestName() = %v want %v", result, expected) - } -} - -func TestGenerateManifestFile(t *testing.T) { - e := RedshiftManifestEmitter{S3Bucket: "bucket_name", CopyMandatory: true} - s := []string{"file1"} - - expected := "{\"entries\":[{\"url\":\"s3://bucket_name/file1\",\"mandatory\":true}]}" - result := string(e.generateManifestFile(s)) - - if result != expected { - t.Errorf("generateManifestFile() = %v want %v", result, expected) - } -} diff --git a/emitter/s3/emitter.go b/emitter/s3/emitter.go deleted file mode 100644 index ae057a0..0000000 --- a/emitter/s3/emitter.go +++ /dev/null @@ -1,43 +0,0 @@ -package s3 - -import ( - "io" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" -) - -// Emitter stores data in S3 bucket. -// -// The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this -// struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated -// from the first and last sequence numbers of the records contained in that file separated by a -// dash. This struct requires the configuration of an S3 bucket and endpoint. -type Emitter struct { - Bucket string - Region string -} - -// Emit is invoked when the buffer is full. This method emits the set of filtered records. -func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error { - svc := s3.New( - session.New(aws.NewConfig().WithMaxRetries(10)), - aws.NewConfig().WithRegion(e.Region), - ) - - params := &s3.PutObjectInput{ - Body: b, - Bucket: aws.String(e.Bucket), - ContentType: aws.String("text/plain"), - Key: aws.String(s3Key), - } - - _, err := svc.PutObject(params) - - if err != nil { - return err - } - - return nil -} diff --git a/emitter/s3/key.go b/emitter/s3/key.go deleted file mode 100644 index a168f81..0000000 --- a/emitter/s3/key.go +++ /dev/null @@ -1,16 +0,0 @@ -package s3 - -import ( - "fmt" - "time" -) - -func Key(prefix, firstSeq, lastSeq string) string { - date := time.Now().UTC().Format("2006/01/02") - - if prefix == "" { - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) - } else { - return fmt.Sprintf("%v/%v/%v-%v", prefix, date, firstSeq, lastSeq) - } -} diff --git a/emitter/s3/key_test.go b/emitter/s3/key_test.go deleted file mode 100644 index d2ad154..0000000 --- a/emitter/s3/key_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package s3 - -import ( - "fmt" - "testing" - "time" - - "github.com/bmizerany/assert" -) - -func Test_Key(t *testing.T) { - d := time.Now().UTC().Format("2006/01/02") - - k := Key("", "a", "b") - assert.Equal(t, k, fmt.Sprintf("%v/a-b", d)) - - k = Key("prefix", "a", "b") - assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d)) -} diff --git a/emitter/s3/manifest_emitter.go b/emitter/s3/manifest_emitter.go deleted file mode 100644 index 46c0b06..0000000 --- a/emitter/s3/manifest_emitter.go +++ /dev/null @@ -1,39 +0,0 @@ -package s3 - -import ( - "io" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" -) - -// An implementation of Emitter that puts event data on S3 file, and then puts the -// S3 file path onto the output stream for processing by manifest application. -type ManifestEmitter struct { - OutputStream string - Bucket string - Prefix string -} - -func (e ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) error { - // put contents to S3 Bucket - s3 := &Emitter{Bucket: e.Bucket} - s3.Emit(s3Key, b) - - // put file path on Kinesis output stream - params := &kinesis.PutRecordInput{ - Data: []byte(s3Key), - PartitionKey: aws.String(s3Key), - StreamName: aws.String(e.OutputStream), - } - - svc := kinesis.New(session.New()) - _, err := svc.PutRecord(params) - - if err != nil { - return err - } - - return nil -} diff --git a/examples/consumer/README.md b/examples/consumer/README.md new file mode 100644 index 0000000..39de3b9 --- /dev/null +++ b/examples/consumer/README.md @@ -0,0 +1,17 @@ +# Consumer + +Read records from the Kinesis stream + +### Environment Variables + +Export the required environment vars for connecting to the Kinesis stream: + +``` +export AWS_ACCESS_KEY= +export AWS_REGION_NAME= +export AWS_SECRET_KEY= +``` + +### Run the consumer + + $ go run main.go -a appName -s streamName diff --git a/examples/consumer/main.go b/examples/consumer/main.go new file mode 100644 index 0000000..3e3073c --- /dev/null +++ b/examples/consumer/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "flag" + "os" + + "github.com/apex/log" + "github.com/apex/log/handlers/text" + "github.com/aws/aws-sdk-go/service/kinesis" + consumer "github.com/harlow/kinesis-consumer" +) + +func main() { + log.SetHandler(text.New(os.Stderr)) + log.SetLevel(log.DebugLevel) + + var ( + app = flag.String("app", "", "App name") + stream = flag.String("stream", "", "Stream name") + ) + flag.Parse() + + c, err := consumer.New(*app, *stream) + if err != nil { + log.Fatalf("new consumer error: %v", err) + } + + c.Scan(context.TODO(), func(r *kinesis.Record) bool { + // fmt.Println(string(r.Data)) + + return true // continue scanning + }) +} diff --git a/examples/firehose/main.go b/examples/firehose/main.go deleted file mode 100644 index 27d4566..0000000 --- a/examples/firehose/main.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/firehose" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/harlow/kinesis-connectors" -) - -var ( - app = flag.String("a", "", "App name") - stream = flag.String("s", "", "Kinesis stream name") - delivery = flag.String("f", "", "Firehose delivery name") -) - -func convertToFirehoseRecrods(kRecs []*kinesis.Record) []*firehose.Record { - fhRecs := []*firehose.Record{} - for _, kr := range kRecs { - fr := &firehose.Record{Data: kr.Data} - fhRecs = append(fhRecs, fr) - } - return fhRecs -} - -func main() { - flag.Parse() - svc := firehose.New(session.New()) - - cfg := connector.Config{ - MaxRecordCount: 400, - } - - c := connector.NewConsumer(*app, *stream, cfg) - - c.Start(connector.HandlerFunc(func(b connector.Buffer) { - params := &firehose.PutRecordBatchInput{ - DeliveryStreamName: aws.String(*delivery), - Records: convertToFirehoseRecrods(b.GetRecords()), - } - - _, err := svc.PutRecordBatch(params) - - if err != nil { - fmt.Println(err.Error()) - os.Exit(1) - } - - fmt.Println("Put records to Firehose") - })) - - select {} // run forever -} diff --git a/examples/producer/README.md b/examples/producer/README.md index 7612c72..3c8e027 100644 --- a/examples/producer/README.md +++ b/examples/producer/README.md @@ -1,4 +1,4 @@ -# Populate the Stream with data +# Producer A prepopulated file with JSON users is available on S3 for seeing the stream: diff --git a/examples/producer/main.go b/examples/producer/main.go index cdc6687..40d1591 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -3,49 +3,56 @@ package main import ( "bufio" "flag" + "fmt" "os" + "time" "github.com/apex/log" "github.com/apex/log/handlers/text" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - producer "github.com/tj/go-kinesis" ) -// Note: download file with test data -// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt -var stream = flag.String("s", "", "Stream name") - func main() { - flag.Parse() log.SetHandler(text.New(os.Stderr)) log.SetLevel(log.DebugLevel) - // set up producer - svc := kinesis.New(session.New()) - p := producer.New(producer.Config{ - StreamName: *stream, - BacklogSize: 500, - Client: svc, - }) - p.Start() + var streamName = flag.String("s", "", "Stream name") + flag.Parse() - // open data file + // download file with test data + // curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt f, err := os.Open("/tmp/users.txt") if err != nil { log.Fatal("Cannot open users.txt file") } defer f.Close() + var ( + svc = kinesis.New(session.New()) + records []*kinesis.PutRecordsRequestEntry + ) + // loop over file data b := bufio.NewScanner(f) for b.Scan() { - err := p.Put(b.Bytes(), "site") + records = append(records, &kinesis.PutRecordsRequestEntry{ + Data: b.Bytes(), + PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)), + }) - if err != nil { - log.WithError(err).Fatal("error producing") + if len(records) > 50 { + _, err = svc.PutRecords(&kinesis.PutRecordsInput{ + StreamName: streamName, + Records: records, + }) + if err != nil { + log.WithError(err).Fatal("error producing") + } + records = nil } - } - p.Stop() + fmt.Print(".") + } } diff --git a/examples/s3/README.md b/examples/s3/README.md deleted file mode 100644 index d50150d..0000000 --- a/examples/s3/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# S3 Pipeline - -The S3 Connector Pipeline performs the following steps: - -1. Pull records from Kinesis and buffer them untill the desired threshold is met. -2. Upload the batch of records to an S3 bucket. -3. Set the current Shard checkpoint in Redis. - -The pipleline config vars are loaded done with [gcfg]. - -[gcfg]: https://code.google.com/p/gcfg/ - -### Environment Variables - -Export the required environment vars for connecting to the Kinesis stream: - -``` -export AWS_ACCESS_KEY= -export AWS_REGION_NAME= -export AWS_SECRET_KEY= -``` - -### Running the code - - $ go run main.go -a appName -s streamName diff --git a/examples/s3/main.go b/examples/s3/main.go deleted file mode 100644 index b5558e3..0000000 --- a/examples/s3/main.go +++ /dev/null @@ -1,55 +0,0 @@ -package main - -import ( - "bytes" - "flag" - "fmt" - "os" - - "github.com/apex/log" - "github.com/apex/log/handlers/text" - "github.com/harlow/kinesis-connectors" - "github.com/harlow/kinesis-connectors/emitter/s3" -) - -func main() { - log.SetHandler(text.New(os.Stderr)) - log.SetLevel(log.DebugLevel) - - var ( - app = flag.String("a", "", "App name") - bucket = flag.String("b", "", "Bucket name") - stream = flag.String("s", "", "Stream name") - ) - flag.Parse() - - e := &s3.Emitter{ - Bucket: *bucket, - Region: "us-west-1", - } - - c := connector.NewConsumer(connector.Config{ - AppName: *app, - StreamName: *stream, - }) - - c.Start(connector.HandlerFunc(func(b connector.Buffer) { - body := new(bytes.Buffer) - - for _, r := range b.GetRecords() { - body.Write(r.Data) - } - - err := e.Emit( - s3.Key("", b.FirstSeq(), b.LastSeq()), - bytes.NewReader(body.Bytes()), - ) - - if err != nil { - fmt.Printf("error %s\n", err) - os.Exit(1) - } - })) - - select {} // run forever -} diff --git a/handler.go b/handler.go deleted file mode 100644 index 624907d..0000000 --- a/handler.go +++ /dev/null @@ -1,18 +0,0 @@ -package connector - -type Handler interface { - HandleRecords(b Buffer) -} - -// HandlerFunc is a convenience type to avoid having to declare a struct -// to implement the Handler interface, it can be used like this: -// -// consumer.AddHandler(connector.HandlerFunc(func(b Buffer) { -// // ... -// })) -type HandlerFunc func(b Buffer) - -// HandleRecords implements the Handler interface -func (h HandlerFunc) HandleRecords(b Buffer) { - h(b) -} diff --git a/vendor/vendor.json b/vendor/vendor.json deleted file mode 100644 index d5b4d31..0000000 --- a/vendor/vendor.json +++ /dev/null @@ -1,309 +0,0 @@ -{ - "comment": "", - "ignore": "test", - "package": [ - { - "checksumSHA1": "Ur88QI//9Ue82g83qvBSakGlzVg=", - "path": "github.com/apex/log", - "revision": "4ea85e918cc8389903d5f12d7ccac5c23ab7d89b", - "revisionTime": "2016-09-05T15:13:04Z" - }, - { - "checksumSHA1": "o5a5xWoaGDKEnNy0W7TikB66lMc=", - "path": "github.com/apex/log/handlers/text", - "revision": "4ea85e918cc8389903d5f12d7ccac5c23ab7d89b", - "revisionTime": "2016-09-05T15:13:04Z" - }, - { - "checksumSHA1": "dSo0vFXJGuTtd6H80q8ZczLszJM=", - "path": "github.com/aws/aws-sdk-go/aws", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "Y9W+4GimK4Fuxq+vyIskVYFRnX4=", - "path": "github.com/aws/aws-sdk-go/aws/awserr", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "dkfyy7aRNZ6BmUZ4ZdLIcMMXiPA=", - "path": "github.com/aws/aws-sdk-go/aws/awsutil", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "H/tMKHZU+Qka6RtYiGB50s2uA0s=", - "path": "github.com/aws/aws-sdk-go/aws/client", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "ieAJ+Cvp/PKv1LpUEnUXpc3OI6E=", - "path": "github.com/aws/aws-sdk-go/aws/client/metadata", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "gNWirlrTfSLbOe421hISBAhTqa4=", - "path": "github.com/aws/aws-sdk-go/aws/corehandlers", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "dNZNaOPfBPnzE2CBnfhXXZ9g9jU=", - "path": "github.com/aws/aws-sdk-go/aws/credentials", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "KQiUK/zr3mqnAXD7x/X55/iNme0=", - "path": "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "NUJUTWlc1sV8b7WjfiYc4JZbXl0=", - "path": "github.com/aws/aws-sdk-go/aws/credentials/endpointcreds", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "4Ipx+5xN0gso+cENC2MHMWmQlR4=", - "path": "github.com/aws/aws-sdk-go/aws/credentials/stscreds", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "nCMd1XKjgV21bEl7J8VZFqTV8PE=", - "path": "github.com/aws/aws-sdk-go/aws/defaults", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "U0SthWum+t9ACanK7SDJOg3dO6M=", - "path": "github.com/aws/aws-sdk-go/aws/ec2metadata", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "NyUg1P8ZS/LHAAQAk/4C5O4X3og=", - "path": "github.com/aws/aws-sdk-go/aws/request", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "tBdFneml1Vn7uvezcktsa+hUsGg=", - "path": "github.com/aws/aws-sdk-go/aws/session", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "7lla+sckQeF18wORAGuU2fFMlp4=", - "path": "github.com/aws/aws-sdk-go/aws/signer/v4", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "Bm6UrYb2QCzpYseLwwgw6aetgRc=", - "path": "github.com/aws/aws-sdk-go/private/endpoints", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "wk7EyvDaHwb5qqoOP/4d3cV0708=", - "path": "github.com/aws/aws-sdk-go/private/protocol", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "L7xWYwx0jNQnzlYHwBS+1q6DcCI=", - "path": "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "H9TymcQkQnXSXSVfjggiiS4bpzM=", - "path": "github.com/aws/aws-sdk-go/private/protocol/jsonrpc", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "isoix7lTx4qIq2zI2xFADtti5SI=", - "path": "github.com/aws/aws-sdk-go/private/protocol/query", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "5xzix1R8prUyWxgLnzUQoxTsfik=", - "path": "github.com/aws/aws-sdk-go/private/protocol/query/queryutil", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "TW/7U+/8ormL7acf6z2rv2hDD+s=", - "path": "github.com/aws/aws-sdk-go/private/protocol/rest", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "Y6Db2GGfGD9LPpcJIPj8vXE8BbQ=", - "path": "github.com/aws/aws-sdk-go/private/protocol/restxml", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "eUEkjyMPAuekKBE4ou+nM9tXEas=", - "path": "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "Eo9yODN5U99BK0pMzoqnBm7PCrY=", - "path": "github.com/aws/aws-sdk-go/private/waiter", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "TtIAgZ+evpkKB5bBYCB69k0wZoU=", - "path": "github.com/aws/aws-sdk-go/service/firehose", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "2n5/m0ClE4OyQRNdjfLwg+nSY3o=", - "path": "github.com/aws/aws-sdk-go/service/kinesis", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "fgZ1cdh2T0cWRorIZkMGFDADMQw=", - "path": "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "imxJucuPrgaPRMPtAgsu+Y7soB4=", - "path": "github.com/aws/aws-sdk-go/service/s3", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "nH/itbdeFHpl4ysegdtgww9bFSA=", - "path": "github.com/aws/aws-sdk-go/service/sts", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "zUyQXVCSaV97bbiVZbX1qn8UWm4=", - "path": "github.com/bmizerany/assert", - "revision": "e17e99893cb6509f428e1728281c2ad60a6b31e3", - "revisionTime": "2012-07-16T20:56:30Z" - }, - { - "checksumSHA1": "i7BD7wKsIrix92VtlJ4zQRP4G8c=", - "path": "github.com/crowdmob/goamz/aws", - "revision": "3a06871fe9fc0281ca90f3a7d97258d042ed64c0", - "revisionTime": "2015-01-28T19:49:25Z" - }, - { - "checksumSHA1": "qijq0UWIx8EKPT+GbsbuaZMw/gA=", - "path": "github.com/crowdmob/goamz/s3", - "revision": "3a06871fe9fc0281ca90f3a7d97258d042ed64c0", - "revisionTime": "2015-01-28T19:49:25Z" - }, - { - "checksumSHA1": "FCeEm2BWZV/n4oTy+SGd/k0Ab5c=", - "origin": "github.com/aws/aws-sdk-go/vendor/github.com/go-ini/ini", - "path": "github.com/go-ini/ini", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "0ZrwvB6KoGPj2PoDNSEJwxQ6Mog=", - "origin": "github.com/aws/aws-sdk-go/vendor/github.com/jmespath/go-jmespath", - "path": "github.com/jmespath/go-jmespath", - "revision": "f34b74c96bfd27df35643adeb14d8431ca047df5", - "revisionTime": "2016-08-17T18:35:19Z" - }, - { - "checksumSHA1": "IXVypCQsDOlWf8dqyFogbOsRdvM=", - "path": "github.com/jpillora/backoff", - "revision": "0496a6c14df020789376f4d4a261273d5ddb36ec", - "revisionTime": "2016-04-14T05:52:04Z" - }, - { - "checksumSHA1": "1YeGotQXMZMqk+mmm8sbBVJywpw=", - "path": "github.com/kr/pretty", - "revision": "e6ac2fc51e89a3249e82157fa0bb7a18ef9dd5bb", - "revisionTime": "2015-05-20T16:35:14Z" - }, - { - "checksumSHA1": "uulQHQ7IsRKqDudBC8Go9J0gtAc=", - "path": "github.com/kr/text", - "revision": "bb797dc4fb8320488f47bf11de07a733d7233e1f", - "revisionTime": "2015-09-05T22:45:08Z" - }, - { - "checksumSHA1": "Tivm2ueYu71B9YxTEyGxe+8ZWgk=", - "path": "github.com/lib/pq", - "revision": "f59175c2986495ff94109dee3835c504a96c3e81", - "revisionTime": "2016-01-27T22:38:42Z" - }, - { - "checksumSHA1": "xppHi82MLqVx1eyQmbhTesAEjx8=", - "path": "github.com/lib/pq/oid", - "revision": "f59175c2986495ff94109dee3835c504a96c3e81", - "revisionTime": "2016-01-27T22:38:42Z" - }, - { - "checksumSHA1": "QI1tJqI+jMmFrCAKcXs+LefgES4=", - "path": "github.com/tj/go-kinesis", - "revision": "817ff40136c6d4909bcff1021e58fdedf788ba23", - "revisionTime": "2016-06-02T03:00:41Z" - }, - { - "checksumSHA1": "+4r0PnLwwyhO5/jvU5R/TEJb4kA=", - "path": "gopkg.in/bsm/ratelimit.v1", - "revision": "db14e161995a5177acef654cb0dd785e8ee8bc22", - "revisionTime": "2016-02-20T15:49:07Z" - }, - { - "checksumSHA1": "JtXTQXRlxRB///NYmPDuMpEpvNI=", - "path": "gopkg.in/redis.v5", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - }, - { - "checksumSHA1": "vQSE4FOH4EvyzYA72w60XOetmVY=", - "path": "gopkg.in/redis.v5/internal", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - }, - { - "checksumSHA1": "2Ek4SixeRSKOX3mUiBMs3Aw+Guc=", - "path": "gopkg.in/redis.v5/internal/consistenthash", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - }, - { - "checksumSHA1": "rJYVKcBrwYUGl7nuuusmZGrt8mY=", - "path": "gopkg.in/redis.v5/internal/hashtag", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - }, - { - "checksumSHA1": "VnsHRPAMRMuhz7/n/85MZwMrchQ=", - "path": "gopkg.in/redis.v5/internal/pool", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - }, - { - "checksumSHA1": "604uyPTNWLBNAnAyNRMiwYHXknA=", - "path": "gopkg.in/redis.v5/internal/proto", - "revision": "854c88a72c8bb9c09936145aef886b7697d6b995", - "revisionTime": "2016-12-03T15:45:52Z" - } - ], - "rootPath": "github.com/harlow/kinesis-connectors" -}