kinesis-consumer/store/memory/store.go
2024-04-10 16:45:34 +02:00

41 lines
1.3 KiB
Go

// Package store
//
// The memory store provides a store that can be used for testing and single-threaded applications.
// DO NOT USE this in a production application where persistence beyond a single application lifecycle is necessary
// or when there are multiple consumers.
package store
import (
"fmt"
"sync"
)
// New returns a new in memory store to persist the last consumed offset.
func New() *Store {
return &Store{}
}
// Store is the in-memory data structure that holds the offsets per stream
type Store struct {
sync.Map
}
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
func (c *Store) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty")
}
c.Store(streamName+":"+shardID, sequenceNumber)
return nil
}
// GetCheckpoint determines if a checkpoint for a particular Shard exists.
// Typically, this is used to determine whether processing should start with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER
// (if checkpoint exists).
func (c *Store) GetCheckpoint(streamName, shardID string) (string, error) {
val, ok := c.Load(streamName + ":" + shardID)
if !ok {
return "", nil
}
return val.(string), nil
}