Add checkpoint interface

* Add first checkpoint implementation of RedisCheckpoint.
* Move MsgBuffer into the buffers.go file.
This commit is contained in:
Harlow Ward 2014-07-27 20:31:51 -07:00
parent 0ba88d49bc
commit 57dc74bd48
5 changed files with 93 additions and 2 deletions

45
checkpoints.go Normal file
View file

@ -0,0 +1,45 @@
package etl
import (
"fmt"
"github.com/hoisie/redis"
)
type Checkpoint interface {
CheckpointExists(streamName string, shardID string) bool
SequenceNumber() string
SetCheckpoint(streamName string, shardId string, sequenceNumber string)
}
type RedisCheckpoint struct {
appName string
client redis.Client
sequenceNumber string
}
func (c RedisCheckpoint) SequenceNumber() string {
return c.sequenceNumber
}
func (c *RedisCheckpoint) CheckpointExists(streamName string, shardId string) bool {
key := c.keyGen(streamName, shardId)
val, _ := c.client.Get(key)
if val != nil {
c.sequenceNumber = string(val)
return true
} else {
return false
}
}
func (c *RedisCheckpoint) SetCheckpoint(streamName string, shardId string, sequenceNumber string) {
key := c.keyGen(streamName, shardId)
c.client.Set(key, []byte(sequenceNumber))
c.sequenceNumber = sequenceNumber
}
func (c RedisCheckpoint) keyGen(streamName string, shardId string) string {
return fmt.Sprintf("%v:checkpoint:%v:%v", c.appName, streamName, shardId)
}

47
checkpoints_test.go Normal file
View file

@ -0,0 +1,47 @@
package etl
import (
"github.com/hoisie/redis"
"testing"
)
func TestKeyGen(t *testing.T) {
k := "app:checkpoint:stream:shard"
c := RedisCheckpoint{appName: "app"}
r := c.keyGen("stream", "shard")
if r != k {
t.Errorf("Key() = %v, want %v", k, r)
}
}
func TestCheckpointExists(t *testing.T) {
var rc redis.Client
k := "app:checkpoint:stream:shard"
rc.Set(k, []byte("fakeSeqNum"))
c := RedisCheckpoint{appName: "app"}
r := c.CheckpointExists("stream", "shard")
if r != true {
t.Errorf("CheckpointExists() = %v, want %v", false, r)
}
rc.Del(k)
}
func TestSetCheckpoint(t *testing.T) {
k := "app:checkpoint:stream:shard"
var rc redis.Client
c := RedisCheckpoint{appName: "app"}
c.SetCheckpoint("stream", "shard", "fakeSeqNum")
r, _ := rc.Get(k)
if string(r) != "fakeSeqNum" {
t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", r)
}
rc.Del(k)
}

View file

@ -13,8 +13,7 @@ func main() {
k := kinesis.New("", "", kinesis.Region{})
s := "inputStream"
c := etl.RedisCheckpoint{}
c.SetAppName("sampleApp")
c := etl.RedisCheckpoint{appName: "sampleApp"}
e := etl.S3Emitter{}
e.SetBucketName("bucketName")