From 57dc74bd48c4b9d26e6e93ffedec1aa93587f30a Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 27 Jul 2014 20:31:51 -0700 Subject: [PATCH] Add checkpoint interface * Add first checkpoint implementation of RedisCheckpoint. * Move MsgBuffer into the buffers.go file. --- msg_buffer.go => buffers.go | 0 msg_buffer_test.go => buffers_test.go | 0 checkpoints.go | 45 +++++++++++++++++++++++++ checkpoints_test.go | 47 +++++++++++++++++++++++++++ examples/consumer.go | 3 +- 5 files changed, 93 insertions(+), 2 deletions(-) rename msg_buffer.go => buffers.go (100%) rename msg_buffer_test.go => buffers_test.go (100%) create mode 100644 checkpoints.go create mode 100644 checkpoints_test.go diff --git a/msg_buffer.go b/buffers.go similarity index 100% rename from msg_buffer.go rename to buffers.go diff --git a/msg_buffer_test.go b/buffers_test.go similarity index 100% rename from msg_buffer_test.go rename to buffers_test.go diff --git a/checkpoints.go b/checkpoints.go new file mode 100644 index 0000000..3b4f28a --- /dev/null +++ b/checkpoints.go @@ -0,0 +1,45 @@ +package etl + +import ( + "fmt" + + "github.com/hoisie/redis" +) + +type Checkpoint interface { + CheckpointExists(streamName string, shardID string) bool + SequenceNumber() string + SetCheckpoint(streamName string, shardId string, sequenceNumber string) +} + +type RedisCheckpoint struct { + appName string + client redis.Client + sequenceNumber string +} + +func (c RedisCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + +func (c *RedisCheckpoint) CheckpointExists(streamName string, shardId string) bool { + key := c.keyGen(streamName, shardId) + val, _ := c.client.Get(key) + + if val != nil { + c.sequenceNumber = string(val) + return true + } else { + return false + } +} + +func (c *RedisCheckpoint) SetCheckpoint(streamName string, shardId string, sequenceNumber string) { + key := c.keyGen(streamName, shardId) + c.client.Set(key, []byte(sequenceNumber)) + c.sequenceNumber = sequenceNumber +} + +func (c RedisCheckpoint) keyGen(streamName string, shardId string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.appName, streamName, shardId) +} diff --git a/checkpoints_test.go b/checkpoints_test.go new file mode 100644 index 0000000..134d7d3 --- /dev/null +++ b/checkpoints_test.go @@ -0,0 +1,47 @@ +package etl + +import ( + "github.com/hoisie/redis" + "testing" +) + +func TestKeyGen(t *testing.T) { + k := "app:checkpoint:stream:shard" + c := RedisCheckpoint{appName: "app"} + + r := c.keyGen("stream", "shard") + + if r != k { + t.Errorf("Key() = %v, want %v", k, r) + } +} + +func TestCheckpointExists(t *testing.T) { + var rc redis.Client + k := "app:checkpoint:stream:shard" + rc.Set(k, []byte("fakeSeqNum")) + c := RedisCheckpoint{appName: "app"} + + r := c.CheckpointExists("stream", "shard") + + if r != true { + t.Errorf("CheckpointExists() = %v, want %v", false, r) + } + + rc.Del(k) +} + +func TestSetCheckpoint(t *testing.T) { + k := "app:checkpoint:stream:shard" + var rc redis.Client + c := RedisCheckpoint{appName: "app"} + c.SetCheckpoint("stream", "shard", "fakeSeqNum") + + r, _ := rc.Get(k) + + if string(r) != "fakeSeqNum" { + t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", r) + } + + rc.Del(k) +} diff --git a/examples/consumer.go b/examples/consumer.go index 2c019e1..9b84811 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -13,8 +13,7 @@ func main() { k := kinesis.New("", "", kinesis.Region{}) s := "inputStream" - c := etl.RedisCheckpoint{} - c.SetAppName("sampleApp") + c := etl.RedisCheckpoint{appName: "sampleApp"} e := etl.S3Emitter{} e.SetBucketName("bucketName")