kinesis-consumer/store/memory/store.go

42 lines
1.3 KiB
Go
Raw Normal View History

2024-04-10 14:45:34 +00:00
// Package store
//
// The memory store provides a store that can be used for testing and single-threaded applications.
// DO NOT USE this in a production application where persistence beyond a single application lifecycle is necessary
// or when there are multiple consumers.
package store
import (
2020-07-22 03:31:38 +00:00
"fmt"
"sync"
)
2024-04-10 14:45:34 +00:00
// New returns a new in memory store to persist the last consumed offset.
func New() *Store {
2020-07-22 03:31:38 +00:00
return &Store{}
}
2024-04-10 14:45:34 +00:00
// Store is the in-memory data structure that holds the offsets per stream
type Store struct {
2020-07-22 03:31:38 +00:00
sync.Map
}
2024-04-10 14:45:34 +00:00
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
func (c *Store) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
2020-07-22 03:31:38 +00:00
if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty")
}
c.Store(streamName+":"+shardID, sequenceNumber)
return nil
}
2024-04-10 14:45:34 +00:00
// GetCheckpoint determines if a checkpoint for a particular Shard exists.
// Typically, this is used to determine whether processing should start with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER
// (if checkpoint exists).
func (c *Store) GetCheckpoint(streamName, shardID string) (string, error) {
2020-07-22 03:31:38 +00:00
val, ok := c.Load(streamName + ":" + shardID)
if !ok {
return "", nil
}
return val.(string), nil
}