Rename memory to store

This commit is contained in:
Andrew S. Brown 2019-09-07 11:58:46 -07:00
parent b3aebf5ab9
commit a74a986cf9
3 changed files with 14 additions and 13 deletions

View file

@ -3,7 +3,6 @@ package consumer
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/harlow/kinesis-consumer/store/memory"
"sync" "sync"
"testing" "testing"
@ -12,6 +11,8 @@ import (
"github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/harlow/kinesis-consumer/store/memory"
) )
var records = []*kinesis.Record{ var records = []*kinesis.Record{
@ -53,7 +54,7 @@ func TestScan(t *testing.T) {
}, },
} }
var ( var (
cp = memory.New() cp = store.New()
ctr = &fakeCounter{} ctr = &fakeCounter{}
) )
@ -115,7 +116,7 @@ func TestScanShard(t *testing.T) {
} }
var ( var (
cp = memory.New() cp = store.New()
ctr = &fakeCounter{} ctr = &fakeCounter{}
) )
@ -220,7 +221,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) {
}, },
} }
var cp = memory.New() var cp = store.New()
c, err := New("myStreamName", WithClient(client), WithStore(cp)) c, err := New("myStreamName", WithClient(client), WithStore(cp))
if err != nil { if err != nil {

View file

@ -1,22 +1,22 @@
// The memory store provides a store that can be used for testing and single-threaded applications. // The memory store provides a store that can be used for testing and single-threaded applications.
// DO NOT USE this in a production application where persistence beyond a single application lifecycle is necessary // DO NOT USE this in a production application where persistence beyond a single application lifecycle is necessary
// or when there are multiple consumers. // or when there are multiple consumers.
package memory package store
import ( import (
"fmt" "fmt"
"sync" "sync"
) )
func New() *Checkpoint{ func New() *Store {
return &Checkpoint{} return &Store{}
} }
type Checkpoint struct { type Store struct {
sync.Map sync.Map
} }
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { func (c *Store) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" { if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty") return fmt.Errorf("sequence number should not be empty")
} }
@ -24,7 +24,7 @@ func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) e
return nil return nil
} }
func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { func (c *Store) GetCheckpoint(streamName, shardID string) (string, error) {
val, ok := c.Load(streamName + ":" + shardID) val, ok := c.Load(streamName + ":" + shardID)
if !ok { if !ok {
return "", nil return "", nil

View file

@ -1,4 +1,4 @@
package memory_test package store_test
import ( import (
"testing" "testing"
@ -7,7 +7,7 @@ import (
) )
func Test_CheckpointLifecycle(t *testing.T) { func Test_CheckpointLifecycle(t *testing.T) {
c := memory.New() c := store.New()
// set // set
c.SetCheckpoint("streamName", "shardID", "testSeqNum") c.SetCheckpoint("streamName", "shardID", "testSeqNum")
@ -23,7 +23,7 @@ func Test_CheckpointLifecycle(t *testing.T) {
} }
func Test_SetEmptySeqNum(t *testing.T) { func Test_SetEmptySeqNum(t *testing.T) {
c := memory.New() c := store.New()
err := c.SetCheckpoint("streamName", "shardID", "") err := c.SetCheckpoint("streamName", "shardID", "")
if err == nil || err.Error() != "sequence number should not be empty" { if err == nil || err.Error() != "sequence number should not be empty" {