diff --git a/README.md b/README.md index 1e9ea15..1c25662 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ To persist scan progress choose one of the following storage layers: The Redis checkpoint requries App Name, and Stream Name: ```go -import storage "github.com/harlow/kinesis-consumer/checkpoint/redis" +import storage "github.com/harlow/kinesis-consumer/store/redis" // redis checkpoint db, err := storage.New(appName) @@ -150,7 +150,7 @@ if err != nil { The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: ```go -import storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" +import storage "github.com/harlow/kinesis-consumer/store/ddb" // ddb checkpoint db, err := storage.New(appName, tableName) @@ -191,7 +191,7 @@ Sort key: shard_id The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: ```go -import storage "github.com/harlow/kinesis-consumer/checkpoint/postgres" +import storage "github.com/harlow/kinesis-consumer/store/postgres" // postgres checkpoint db, err := storage.New(app, table, connStr) @@ -219,7 +219,7 @@ The table name has to be the same that you specify when creating the checkpoint. The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): ```go -import storage "github.com/harlow/kinesis-consumer/checkpoint/mysql" +import storage "github.com/harlow/kinesis-consumer/store/mysql" // mysql checkpoint db, err := storage.New(app, table, connStr) diff --git a/allgroup.go b/allgroup.go index 7fa489e..9aee0bd 100644 --- a/allgroup.go +++ b/allgroup.go @@ -11,13 +11,13 @@ import ( // NewAllGroup returns an intitialized AllGroup for consuming // all shards on a stream -func NewAllGroup(ksis kinesisiface.KinesisAPI, db Storage, streamName string, logger Logger) *AllGroup { +func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger Logger) *AllGroup { return &AllGroup{ ksis: ksis, shards: make(map[string]*kinesis.Shard), streamName: streamName, logger: logger, - storage: db, + Store: store, } } @@ -27,7 +27,7 @@ type AllGroup struct { ksis kinesisiface.KinesisAPI streamName string logger Logger - storage Storage + Store shardMu sync.Mutex shards map[string]*kinesis.Shard @@ -59,16 +59,6 @@ func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) { } } -// GetCheckpoint returns the current checkpoint for provided stream -func (g *AllGroup) GetCheckpoint(streamName, shardID string) (string, error) { - return g.storage.GetCheckpoint(streamName, shardID) -} - -// SetCheckpoint sets the current checkpoint for provided stream -func (g *AllGroup) SetCheckpoint(streamName, shardID, sequenceNumber string) error { - return g.storage.SetCheckpoint(streamName, shardID, sequenceNumber) -} - // findNewShards pulls the list of shards from the Kinesis API // and uses a local cache to determine if we are already processing // a particular shard. diff --git a/consumer.go b/consumer.go index bc59ac7..86f311c 100644 --- a/consumer.go +++ b/consumer.go @@ -27,7 +27,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { c := &Consumer{ streamName: streamName, initialShardIteratorType: kinesis.ShardIteratorTypeLatest, - storage: &noopStorage{}, + store: &noopStore{}, counter: &noopCounter{}, logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), @@ -50,7 +50,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { // default group consumes all shards if c.group == nil { - c.group = NewAllGroup(c.client, c.storage, streamName, c.logger) + c.group = NewAllGroup(c.client, c.store, streamName, c.logger) } return c, nil @@ -64,7 +64,7 @@ type Consumer struct { counter Counter group Group logger Logger - storage Storage + store Store } // ScanFunc is the type of the function called for each message read diff --git a/examples/consumer/cp-dynamo/README.md b/examples/consumer/cp-dynamo/README.md index 0337371..ea7e634 100644 --- a/examples/consumer/cp-dynamo/README.md +++ b/examples/consumer/cp-dynamo/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream Export the required environment vars for connecting to the Kinesis stream: ``` -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ### Run the consumer diff --git a/examples/consumer/cp-dynamo/main.go b/examples/consumer/cp-dynamo/main.go index afd73ce..d74c816 100644 --- a/examples/consumer/cp-dynamo/main.go +++ b/examples/consumer/cp-dynamo/main.go @@ -19,7 +19,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" + storage "github.com/harlow/kinesis-consumer/store/ddb" ) // kick off a server for exposing scan metrics diff --git a/examples/consumer/cp-mysql/README.md b/examples/consumer/cp-mysql/README.md index dd3b317..5a7690c 100644 --- a/examples/consumer/cp-mysql/README.md +++ b/examples/consumer/cp-mysql/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream using mysql as checkpoint Export the required environment vars for connecting to the Kinesis stream: ```shell -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ## Run the consumer @@ -18,4 +17,4 @@ export AWS_SECRET_KEY= Connection string should look something like - user:password@/dbname \ No newline at end of file + user:password@/dbname diff --git a/examples/consumer/cp-mysql/main.go b/examples/consumer/cp-mysql/main.go index 143d846..349f190 100644 --- a/examples/consumer/cp-mysql/main.go +++ b/examples/consumer/cp-mysql/main.go @@ -10,7 +10,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" + checkpoint "github.com/harlow/kinesis-consumer/store/mysql" ) func main() { diff --git a/examples/consumer/cp-postgres/README.md b/examples/consumer/cp-postgres/README.md index f49e5d0..889fee8 100644 --- a/examples/consumer/cp-postgres/README.md +++ b/examples/consumer/cp-postgres/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream using postgres as checkpoint Export the required environment vars for connecting to the Kinesis stream: ```shell -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ## Run the consumer diff --git a/examples/consumer/cp-postgres/main.go b/examples/consumer/cp-postgres/main.go index adfeafe..ed83cc5 100644 --- a/examples/consumer/cp-postgres/main.go +++ b/examples/consumer/cp-postgres/main.go @@ -10,7 +10,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" + checkpoint "github.com/harlow/kinesis-consumer/store/postgres" ) func main() { diff --git a/examples/consumer/cp-redis/main.go b/examples/consumer/cp-redis/main.go index 9455132..7200850 100644 --- a/examples/consumer/cp-redis/main.go +++ b/examples/consumer/cp-redis/main.go @@ -9,7 +9,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" + checkpoint "github.com/harlow/kinesis-consumer/store/redis" ) // A myLogger provides a minimalistic logger satisfying the Logger interface. diff --git a/go.mod b/go.mod index e97e80f..befba7b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-sql-driver/mysql v1.4.1 github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 github.com/pkg/errors v0.8.0 + github.com/stretchr/testify v1.3.0 // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/redis.v5 v5.2.9 diff --git a/go.sum b/go.sum index 7a1f283..3b797bc 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/apex/log v1.0.0 h1:5UWeZC54mWVtOGSCjtuvDPgY/o0QxmjQgvYZ27pLVGQ= github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY= github.com/aws/aws-sdk-go v1.15.0 h1:uxi9gcf4jxEX7r8oWYMEkYB4kziKet+1cHPmq52LjC4= github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -14,6 +16,11 @@ github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5Wu github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/options.go b/options.go index f8f4440..4740dd4 100644 --- a/options.go +++ b/options.go @@ -6,9 +6,9 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" type Option func(*Consumer) // WithStorage overrides the default storage -func WithStorage(storage Storage) Option { +func WithStorage(store Store) Option { return func(c *Consumer) { - c.storage = storage + c.store = store } } diff --git a/storage.go b/storage.go deleted file mode 100644 index 8a926ad..0000000 --- a/storage.go +++ /dev/null @@ -1,13 +0,0 @@ -package consumer - -// Storage interface used to persist scan progress -type Storage interface { - GetCheckpoint(streamName, shardID string) (string, error) - SetCheckpoint(streamName, shardID, sequenceNumber string) error -} - -// noopStorage implements the storage interface with discard -type noopStorage struct{} - -func (n noopStorage) GetCheckpoint(string, string) (string, error) { return "", nil } -func (n noopStorage) SetCheckpoint(string, string, string) error { return nil } diff --git a/store.go b/store.go new file mode 100644 index 0000000..4b4a9d4 --- /dev/null +++ b/store.go @@ -0,0 +1,13 @@ +package consumer + +// Store interface used to persist scan progress +type Store interface { + GetCheckpoint(streamName, shardID string) (string, error) + SetCheckpoint(streamName, shardID, sequenceNumber string) error +} + +// noopStore implements the storage interface with discard +type noopStore struct{} + +func (n noopStore) GetCheckpoint(string, string) (string, error) { return "", nil } +func (n noopStore) SetCheckpoint(string, string, string) error { return nil } diff --git a/checkpoint/ddb/ddb.go b/store/ddb/ddb.go similarity index 100% rename from checkpoint/ddb/ddb.go rename to store/ddb/ddb.go diff --git a/checkpoint/ddb/ddb_test.go b/store/ddb/ddb_test.go similarity index 100% rename from checkpoint/ddb/ddb_test.go rename to store/ddb/ddb_test.go diff --git a/checkpoint/ddb/retryer.go b/store/ddb/retryer.go similarity index 100% rename from checkpoint/ddb/retryer.go rename to store/ddb/retryer.go diff --git a/checkpoint/ddb/retryer_test.go b/store/ddb/retryer_test.go similarity index 100% rename from checkpoint/ddb/retryer_test.go rename to store/ddb/retryer_test.go diff --git a/checkpoint/mysql/mysql.go b/store/mysql/mysql.go similarity index 100% rename from checkpoint/mysql/mysql.go rename to store/mysql/mysql.go diff --git a/checkpoint/mysql/mysql_databaseutils_test.go b/store/mysql/mysql_databaseutils_test.go similarity index 100% rename from checkpoint/mysql/mysql_databaseutils_test.go rename to store/mysql/mysql_databaseutils_test.go diff --git a/checkpoint/mysql/mysql_test.go b/store/mysql/mysql_test.go similarity index 100% rename from checkpoint/mysql/mysql_test.go rename to store/mysql/mysql_test.go diff --git a/checkpoint/postgres/postgres.go b/store/postgres/postgres.go similarity index 100% rename from checkpoint/postgres/postgres.go rename to store/postgres/postgres.go diff --git a/checkpoint/postgres/postgres_databaseutils_test.go b/store/postgres/postgres_databaseutils_test.go similarity index 100% rename from checkpoint/postgres/postgres_databaseutils_test.go rename to store/postgres/postgres_databaseutils_test.go diff --git a/checkpoint/postgres/postgres_test.go b/store/postgres/postgres_test.go similarity index 100% rename from checkpoint/postgres/postgres_test.go rename to store/postgres/postgres_test.go diff --git a/checkpoint/redis/redis.go b/store/redis/redis.go similarity index 100% rename from checkpoint/redis/redis.go rename to store/redis/redis.go diff --git a/checkpoint/redis/redis_test.go b/store/redis/redis_test.go similarity index 100% rename from checkpoint/redis/redis_test.go rename to store/redis/redis_test.go