From 71556ee7f7f7d980797e5e3d4e440ae3941d2160 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 21 Dec 2014 01:08:44 -0800 Subject: [PATCH] Remove namespace from RedisCheckpoint The interface for checkpoint added another dependence to inject into the Pipeline. With this removed we can use the checkpoint directly from the pipeline. --- .travis.yml | 8 +++ checkpoint.go | 52 ++++++++++++++++--- ...s_checkpoint_test.go => checkpoint_test.go | 6 +-- redis_checkpoint.go | 48 ----------------- 4 files changed, 56 insertions(+), 58 deletions(-) create mode 100644 .travis.yml rename redis_checkpoint_test.go => checkpoint_test.go (80%) delete mode 100644 redis_checkpoint.go diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f90d3e5 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +language: go +script: go test +notifications: + email: false +branches: + only: + - master + - development \ No newline at end of file diff --git a/checkpoint.go b/checkpoint.go index 62fd931..11b5cde 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -1,10 +1,48 @@ package connector -// Checkpoint is used by Pipeline.ProcessShard when they want to checkpoint their progress. -// The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, -// so they can checkpoint their progress. -type Checkpoint interface { - CheckpointExists(shardID string) bool - SequenceNumber() string - SetCheckpoint(shardID string, sequenceNumber string) +import ( + "fmt" + + "github.com/hoisie/redis" +) + +// Checkpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. +type Checkpoint struct { + AppName string + StreamName string + + client redis.Client + sequenceNumber string +} + +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *Checkpoint) CheckpointExists(shardID string) bool { + val, _ := c.client.Get(c.key(shardID)) + + if val != nil && string(val) != "" { + c.sequenceNumber = string(val) + return true + } + + return false +} + +// SequenceNumber returns the current checkpoint stored for the specified shard. +func (c *Checkpoint) SequenceNumber() string { + return c.sequenceNumber +} + +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) { + c.client.Set(c.key(shardID), []byte(sequenceNumber)) + c.sequenceNumber = sequenceNumber +} + +// key generates a unique Redis key for storage of Checkpoint. +func (c *Checkpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) } diff --git a/redis_checkpoint_test.go b/checkpoint_test.go similarity index 80% rename from redis_checkpoint_test.go rename to checkpoint_test.go index 860a36d..40e8188 100644 --- a/redis_checkpoint_test.go +++ b/checkpoint_test.go @@ -8,7 +8,7 @@ import ( func TestKey(t *testing.T) { k := "app:checkpoint:stream:shard" - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + c := Checkpoint{AppName: "app", StreamName: "stream"} r := c.key("shard") @@ -21,7 +21,7 @@ func TestCheckpointExists(t *testing.T) { var rc redis.Client k := "app:checkpoint:stream:shard" rc.Set(k, []byte("fakeSeqNum")) - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + c := Checkpoint{AppName: "app", StreamName: "stream"} r := c.CheckpointExists("shard") @@ -35,7 +35,7 @@ func TestCheckpointExists(t *testing.T) { func TestSetCheckpoint(t *testing.T) { k := "app:checkpoint:stream:shard" var rc redis.Client - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + c := Checkpoint{AppName: "app", StreamName: "stream"} c.SetCheckpoint("shard", "fakeSeqNum") r, _ := rc.Get(k) diff --git a/redis_checkpoint.go b/redis_checkpoint.go deleted file mode 100644 index 2bf0271..0000000 --- a/redis_checkpoint.go +++ /dev/null @@ -1,48 +0,0 @@ -package connector - -import ( - "fmt" - - "github.com/hoisie/redis" -) - -// RedisCheckpoint implements the Checkpont interface. -// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. -type RedisCheckpoint struct { - AppName string - StreamName string - - client redis.Client - sequenceNumber string -} - -// CheckpointExists determines if a checkpoint for a particular Shard exists. -// Typically used to determine whether we should start processing the shard with -// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { - val, _ := c.client.Get(c.key(shardID)) - - if val != nil && string(val) != "" { - c.sequenceNumber = string(val) - return true - } - - return false -} - -// SequenceNumber returns the current checkpoint stored for the specified shard. -func (c *RedisCheckpoint) SequenceNumber() string { - return c.sequenceNumber -} - -// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). -// Upon failover, record processing is resumed from this point. -func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { - c.client.Set(c.key(shardID), []byte(sequenceNumber)) - c.sequenceNumber = sequenceNumber -} - -// key generates a unique Redis key for storage of Checkpoint. -func (c *RedisCheckpoint) key(shardID string) string { - return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) -}