diff --git a/store/memory/store.go b/store/memory/store.go index e111ec4..a68e6c7 100644 --- a/store/memory/store.go +++ b/store/memory/store.go @@ -4,30 +4,30 @@ package store import ( - "fmt" - "sync" + "fmt" + "sync" ) func New() *Store { - return &Store{} + return &Store{} } type Store struct { - sync.Map + sync.Map } func (c *Store) SetCheckpoint(streamName, shardID, sequenceNumber string) error { - if sequenceNumber == "" { - return fmt.Errorf("sequence number should not be empty") - } - c.Store(streamName+":"+shardID, sequenceNumber) - return nil + if sequenceNumber == "" { + return fmt.Errorf("sequence number should not be empty") + } + c.Store(streamName+":"+shardID, sequenceNumber) + return nil } func (c *Store) GetCheckpoint(streamName, shardID string) (string, error) { - val, ok := c.Load(streamName + ":" + shardID) - if !ok { - return "", nil - } - return val.(string), nil + val, ok := c.Load(streamName + ":" + shardID) + if !ok { + return "", nil + } + return val.(string), nil } diff --git a/store/memory/store_test.go b/store/memory/store_test.go index 6b05bc7..5eb553c 100644 --- a/store/memory/store_test.go +++ b/store/memory/store_test.go @@ -1,30 +1,30 @@ package store import ( - "testing" + "testing" ) func Test_CheckpointLifecycle(t *testing.T) { - c := New() + c := New() - // set - c.SetCheckpoint("streamName", "shardID", "testSeqNum") + // set + c.SetCheckpoint("streamName", "shardID", "testSeqNum") - // get - val, err := c.GetCheckpoint("streamName", "shardID") - if err != nil { - t.Fatalf("get checkpoint error: %v", err) - } - if val != "testSeqNum" { - t.Fatalf("checkpoint exists expected %s, got %s", "testSeqNum", val) - } + // get + val, err := c.GetCheckpoint("streamName", "shardID") + if err != nil { + t.Fatalf("get checkpoint error: %v", err) + } + if val != "testSeqNum" { + t.Fatalf("checkpoint exists expected %s, got %s", "testSeqNum", val) + } } func Test_SetEmptySeqNum(t *testing.T) { - c := New() + c := New() - err := c.SetCheckpoint("streamName", "shardID", "") - if err == nil || err.Error() != "sequence number should not be empty" { - t.Fatalf("should not allow empty sequence number") - } + err := c.SetCheckpoint("streamName", "shardID", "") + if err == nil || err.Error() != "sequence number should not be empty" { + t.Fatalf("should not allow empty sequence number") + } }