From 2a285c52d52c47e7cef7bd91923b987ee24ecebb Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 6 May 2015 08:44:14 -0700 Subject: [PATCH] add mysql checkpoint to kinesis-connectors --- mysql_checkpoint.go | 66 ++++++++++++++++++++++++++++++++++++++++ mysql_checkpoint_test.go | 59 +++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 mysql_checkpoint.go create mode 100644 mysql_checkpoint_test.go diff --git a/mysql_checkpoint.go b/mysql_checkpoint.go new file mode 100644 index 0000000..4bac5ba --- /dev/null +++ b/mysql_checkpoint.go @@ -0,0 +1,66 @@ +package connector + +import ( + "database/sql" + "fmt" + + l4g "github.com/ezoic/log4go" + _ "github.com/go-sql-driver/mysql" +) + +// MysqlCheckpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. +type MysqlCheckpoint struct { + AppName string + StreamName string + TableName string + Db *sql.DB + + sequenceNumber string +} + +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool { + + l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?") + + row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID)) + var val string + err := row.Scan(&val) + if err == nil { + l4g.Finest("sequence:%s", val) + c.sequenceNumber = val + return true + } + + if err == sql.ErrNoRows { + return false + } + + // something bad happened, better blow up the process + panic(err) +} + +// SequenceNumber returns the current checkpoint stored for the specified shard. +func (c *MysqlCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *MysqlCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { + + _, err := c.Db.Exec("INSERT INTO "+c.TableName+" (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", sequenceNumber, c.key(shardID), sequenceNumber) + if err != nil { + panic(err) + } + c.sequenceNumber = sequenceNumber + +} + +// key generates a unique mysql key for storage of Checkpoint. +func (c *MysqlCheckpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) +} diff --git a/mysql_checkpoint_test.go b/mysql_checkpoint_test.go new file mode 100644 index 0000000..7ef1906 --- /dev/null +++ b/mysql_checkpoint_test.go @@ -0,0 +1,59 @@ +package connector + +import ( + "database/sql" + "os" + "testing" +) + +func Test_MysqlKey(t *testing.T) { + k := "app:checkpoint:stream:shard" + c := MysqlCheckpoint{AppName: "app", StreamName: "stream"} + + r := c.key("shard") + + if r != k { + t.Errorf("key() = %v, want %v", k, r) + } +} + +func Test_MysqlCheckpointExists(t *testing.T) { + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + k := "app:checkpoint:stream:shard" + + _, err := rc.Exec("INSERT INTO KinesisConnector.TestCheckpoint (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", "fakeSeqNum", k, "fakeSeqNum") + if err != nil { + t.Fatalf("cannot insert checkpoint into db manually, %s", err) + } + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + + r := c.CheckpointExists("shard") + + if r != true { + t.Errorf("CheckpointExists() = %v, want %v", false, r) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +} + +func Test_MysqlSetCheckpoint(t *testing.T) { + k := "app:checkpoint:stream:shard" + + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + c.SetCheckpoint("shard", "fakeSeqNum") + + rslt := rc.QueryRow("SELECT sequence_number FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) + var sequenceNumber string + err := rslt.Scan(&sequenceNumber) + if err != nil { + t.Fatalf("cannot scan row for checkpoint key, %s", err) + } + + if sequenceNumber != "fakeSeqNum" { + t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", sequenceNumber) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +}