Merge pull request #13 from Clever/fix-checkpoint-with-archive-consumer

Fix checkpoint with archive consumer
This commit is contained in:
Xavi 2017-08-20 16:47:49 -07:00 committed by GitHub
commit 7f39898b70
3 changed files with 56 additions and 9 deletions

View file

@ -114,7 +114,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
} }
func (b *batcherManager) sendCheckpoint( func (b *batcherManager) sendCheckpoint(
tag string, lastIgnoredPair kcl.SequencePair, batchers map[string]*batcher, tag string, lastIgnoredPair, lastProcessedPair kcl.SequencePair, batchers map[string]*batcher,
) { ) {
smallest := lastIgnoredPair smallest := lastIgnoredPair
@ -133,9 +133,10 @@ func (b *batcherManager) sendCheckpoint(
} }
} }
if !smallest.IsNil() { if smallest.IsNil() { // This can occur when all messages in a stream go into one batch
b.chkpntManager.Checkpoint(smallest) smallest = lastProcessedPair
} }
b.chkpntManager.Checkpoint(smallest)
} }
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a // startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
@ -164,7 +165,7 @@ func (b *batcherManager) startMessageHandler(
for tag, batcher := range batchers { for tag, batcher := range batchers {
if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) {
b.sendBatch(batcher, tag) b.sendBatch(batcher, tag)
b.sendCheckpoint(tag, lastIgnoredPair, batchers) b.sendCheckpoint(tag, lastIgnoredPair, lastProcessedPair, batchers)
batcher.Clear() batcher.Clear()
} }
} }
@ -179,7 +180,7 @@ func (b *batcherManager) startMessageHandler(
err := batcher.AddMessage(tmp.msg, tmp.pair) err := batcher.AddMessage(tmp.msg, tmp.pair)
if err == ErrBatchFull { if err == ErrBatchFull {
b.sendBatch(batcher, tmp.tag) b.sendBatch(batcher, tmp.tag)
b.sendCheckpoint(tmp.tag, lastIgnoredPair, batchers) b.sendCheckpoint(tmp.tag, lastIgnoredPair, lastProcessedPair, batchers)
batcher.AddMessage(tmp.msg, tmp.pair) batcher.AddMessage(tmp.msg, tmp.pair)
} else if err != nil { } else if err != nil {

View file

@ -162,6 +162,53 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) {
assert.Contains(mockcheckpointer.recievedSequences, "4") assert.Contains(mockcheckpointer.recievedSequences, "4")
} }
func TestProcessRecordsSingleBatchBasic(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchCount: 2,
CheckpointFreq: 1, // Don't throttle checks points
})
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag1")},
kcl.Record{SequenceNumber: "3", Data: encode("tag1")},
kcl.Record{SequenceNumber: "4", Data: encode("tag1")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "5", Data: encode("tag1")},
kcl.Record{SequenceNumber: "6", Data: encode("tag1")},
kcl.Record{SequenceNumber: "7", Data: encode("tag1")},
kcl.Record{SequenceNumber: "8", Data: encode("tag1")},
})
assert.NoError(err)
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
assert.Contains(mocksender.batches, "tag1")
assert.Equal(4, len(mocksender.batches["tag1"]))
assert.Contains(mockcheckpointer.recievedSequences, "2")
assert.Contains(mockcheckpointer.recievedSequences, "4")
assert.Contains(mockcheckpointer.recievedSequences, "6")
assert.Contains(mockcheckpointer.recievedSequences, "8")
}
func TestProcessRecordsMutliBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchBasic(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)

View file

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"os"
"sync" "sync"
"time" "time"
) )
@ -181,12 +180,12 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
case "ThrottlingException": case "ThrottlingException":
sleep := 5 * time.Second sleep := 5 * time.Second
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep) kclp.ioHandler.writeError(fmt.Sprintf("Checkpointing throttling, pause for %s", sleep))
time.Sleep(sleep) time.Sleep(sleep)
case "InvalidStateException": case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing") kclp.ioHandler.writeError("MultiLangDaemon invalid state while checkpointing")
default: default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg) kclp.ioHandler.writeError(fmt.Sprintf("Encountered an error while checkpointing: %s", msg))
} }
seq := action.SequenceNumber seq := action.SequenceNumber