From b0f769bfa7e03e2971f03858b8c3d2ec1342d342 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 21 Jul 2017 01:04:34 +0000 Subject: [PATCH] Breaking change to KCL. Created Checkpointer interface to make testing easier. --- cmd/consumer/main.go | 4 ++-- kcl/kcl.go | 22 ++++++++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 77a75c9..02281a9 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -10,7 +10,7 @@ import ( ) type sampleRecordProcessor struct { - checkpointer *kcl.Checkpointer + checkpointer kcl.Checkpointer checkpointRetries int checkpointFreq time.Duration largestSeq *big.Int @@ -25,7 +25,7 @@ func newSampleRecordProcessor() *sampleRecordProcessor { } } -func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { +func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Checkpointer) error { srp.lastCheckpoint = time.Now() srp.checkpointer = checkpointer return nil diff --git a/kcl/kcl.go b/kcl/kcl.go index 1ccd333..b47347c 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -12,11 +12,17 @@ import ( ) type RecordProcessor interface { - Initialize(shardID string, checkpointer *Checkpointer) error + Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error Shutdown(reason string) error } +type Checkpointer interface { + Checkpoint(sequenceNumber *string, subSequenceNumber *int) error + CheckpointWithRetry(sequenceNumber *string, subSequenceNumber *int, retryCount int) error + Shutdown() +} + type CheckpointError struct { e string } @@ -25,13 +31,13 @@ func (ce CheckpointError) Error() string { return ce.e } -type Checkpointer struct { +type checkpointer struct { mux sync.Mutex ioHandler ioHandler } -func (c *Checkpointer) getAction() (interface{}, error) { +func (c *checkpointer) getAction() (interface{}, error) { line, err := c.ioHandler.readLine() if err != nil { return nil, err @@ -43,7 +49,7 @@ func (c *Checkpointer) getAction() (interface{}, error) { return action, nil } -func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { +func (c *checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { c.mux.Lock() defer c.mux.Unlock() @@ -74,7 +80,7 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int // CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times. // `retryCount` should be >= 0 -func (c *Checkpointer) CheckpointWithRetry( +func (c *checkpointer) CheckpointWithRetry( sequenceNumber *string, subSequenceNumber *int, retryCount int, ) error { sleepDuration := 5 * time.Second @@ -108,7 +114,7 @@ func (c *Checkpointer) CheckpointWithRetry( return nil } -func (c *Checkpointer) Shutdown() { +func (c *checkpointer) Shutdown() { c.CheckpointWithRetry(nil, nil, 5) } @@ -225,7 +231,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R } return &KCLProcess{ ioHandler: i, - checkpointer: &Checkpointer{ + checkpointer: &checkpointer{ ioHandler: i, }, recordProcessor: recordProcessor, @@ -234,7 +240,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R type KCLProcess struct { ioHandler ioHandler - checkpointer *Checkpointer + checkpointer Checkpointer recordProcessor RecordProcessor }