add mysql checkpoint to kinesis-connectors

This commit is contained in:
dan 2015-05-06 08:44:14 -07:00
parent e5af96fb54
commit 2a285c52d5
2 changed files with 125 additions and 0 deletions

66
mysql_checkpoint.go Normal file
View file

@ -0,0 +1,66 @@
package connector
import (
"database/sql"
"fmt"
l4g "github.com/ezoic/log4go"
_ "github.com/go-sql-driver/mysql"
)
// MysqlCheckpoint implements the Checkpont interface.
// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress.
type MysqlCheckpoint struct {
AppName string
StreamName string
TableName string
Db *sql.DB
sequenceNumber string
}
// CheckpointExists determines if a checkpoint for a particular Shard exists.
// Typically used to determine whether we should start processing the shard with
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool {
l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?")
row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID))
var val string
err := row.Scan(&val)
if err == nil {
l4g.Finest("sequence:%s", val)
c.sequenceNumber = val
return true
}
if err == sql.ErrNoRows {
return false
}
// something bad happened, better blow up the process
panic(err)
}
// SequenceNumber returns the current checkpoint stored for the specified shard.
func (c *MysqlCheckpoint) SequenceNumber() string {
return c.sequenceNumber
}
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point.
func (c *MysqlCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) {
_, err := c.Db.Exec("INSERT INTO "+c.TableName+" (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", sequenceNumber, c.key(shardID), sequenceNumber)
if err != nil {
panic(err)
}
c.sequenceNumber = sequenceNumber
}
// key generates a unique mysql key for storage of Checkpoint.
func (c *MysqlCheckpoint) key(shardID string) string {
return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID)
}

59
mysql_checkpoint_test.go Normal file
View file

@ -0,0 +1,59 @@
package connector
import (
"database/sql"
"os"
"testing"
)
func Test_MysqlKey(t *testing.T) {
k := "app:checkpoint:stream:shard"
c := MysqlCheckpoint{AppName: "app", StreamName: "stream"}
r := c.key("shard")
if r != k {
t.Errorf("key() = %v, want %v", k, r)
}
}
func Test_MysqlCheckpointExists(t *testing.T) {
rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN"))
k := "app:checkpoint:stream:shard"
_, err := rc.Exec("INSERT INTO KinesisConnector.TestCheckpoint (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", "fakeSeqNum", k, "fakeSeqNum")
if err != nil {
t.Fatalf("cannot insert checkpoint into db manually, %s", err)
}
c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc}
r := c.CheckpointExists("shard")
if r != true {
t.Errorf("CheckpointExists() = %v, want %v", false, r)
}
rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
}
func Test_MysqlSetCheckpoint(t *testing.T) {
k := "app:checkpoint:stream:shard"
rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN"))
c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc}
c.SetCheckpoint("shard", "fakeSeqNum")
rslt := rc.QueryRow("SELECT sequence_number FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
var sequenceNumber string
err := rslt.Scan(&sequenceNumber)
if err != nil {
t.Fatalf("cannot scan row for checkpoint key, %s", err)
}
if sequenceNumber != "fakeSeqNum" {
t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", sequenceNumber)
}
rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
}