diff --git a/redis_checkpoint.go b/redis_checkpoint.go index 51517b2..a4819c8 100644 --- a/redis_checkpoint.go +++ b/redis_checkpoint.go @@ -15,6 +15,8 @@ type RedisCheckpoint struct { StreamName string } +// Check whether a checkpoint for a particular Shard exists. Typically used to determine whether we should +// start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { val, _ := c.client.Get(c.key(shardID)) @@ -26,15 +28,19 @@ func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { } } +// Get the current checkpoint stored for the specified shard. func (c *RedisCheckpoint) SequenceNumber() string { return c.sequenceNumber } +// Record a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { c.client.Set(c.key(shardID), []byte(sequenceNumber)) c.sequenceNumber = sequenceNumber } +// Generate a unique Redis key for storage of Checkpoint. func (c *RedisCheckpoint) key(shardID string) string { return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) }