diff --git a/consumer_test.go b/consumer_test.go index c68cb17..f1f9cca 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -3,7 +3,6 @@ package consumer import ( "context" "fmt" - "github.com/harlow/kinesis-consumer/store/memory" "sync" "testing" @@ -12,6 +11,8 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + + "github.com/harlow/kinesis-consumer/store/memory" ) var records = []*kinesis.Record{ @@ -53,7 +54,7 @@ func TestScan(t *testing.T) { }, } var ( - cp = memory.New() + cp = store.New() ctr = &fakeCounter{} ) @@ -115,7 +116,7 @@ func TestScanShard(t *testing.T) { } var ( - cp = memory.New() + cp = store.New() ctr = &fakeCounter{} ) @@ -220,7 +221,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) { }, } - var cp = memory.New() + var cp = store.New() c, err := New("myStreamName", WithClient(client), WithStore(cp)) if err != nil { diff --git a/store/memory/store.go b/store/memory/store.go index a556113..e111ec4 100644 --- a/store/memory/store.go +++ b/store/memory/store.go @@ -1,22 +1,22 @@ // The memory store provides a store that can be used for testing and single-threaded applications. // DO NOT USE this in a production application where persistence beyond a single application lifecycle is necessary // or when there are multiple consumers. -package memory +package store import ( "fmt" "sync" ) -func New() *Checkpoint{ - return &Checkpoint{} +func New() *Store { + return &Store{} } -type Checkpoint struct { +type Store struct { sync.Map } -func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { +func (c *Store) SetCheckpoint(streamName, shardID, sequenceNumber string) error { if sequenceNumber == "" { return fmt.Errorf("sequence number should not be empty") } @@ -24,7 +24,7 @@ func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) e return nil } -func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { +func (c *Store) GetCheckpoint(streamName, shardID string) (string, error) { val, ok := c.Load(streamName + ":" + shardID) if !ok { return "", nil diff --git a/store/memory/store_test.go b/store/memory/store_test.go index 6e2041c..e63b31a 100644 --- a/store/memory/store_test.go +++ b/store/memory/store_test.go @@ -1,4 +1,4 @@ -package memory_test +package store_test import ( "testing" @@ -7,7 +7,7 @@ import ( ) func Test_CheckpointLifecycle(t *testing.T) { - c := memory.New() + c := store.New() // set c.SetCheckpoint("streamName", "shardID", "testSeqNum") @@ -23,7 +23,7 @@ func Test_CheckpointLifecycle(t *testing.T) { } func Test_SetEmptySeqNum(t *testing.T) { - c := memory.New() + c := store.New() err := c.SetCheckpoint("streamName", "shardID", "") if err == nil || err.Error() != "sequence number should not be empty" {