diff --git a/clientlibrary/worker/checkpointer.go b/clientlibrary/worker/checkpointer.go index 3171fc6..7107cd7 100644 --- a/clientlibrary/worker/checkpointer.go +++ b/clientlibrary/worker/checkpointer.go @@ -246,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) doesTableExist() bool { TableName: aws.String(checkpointer.TableName), } _, err := checkpointer.svc.DescribeTable(input) - return (err == nil) + return err == nil } func (checkpointer *DynamoCheckpoint) saveItem(item map[string]*dynamodb.AttributeValue) error { diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index 94a090a..7d4cf8f 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -47,7 +47,7 @@ func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error rc.shard.mux.Lock() // checkpoint the last sequence of a closed shard - if rc.shard.EndingSequenceNumber == aws.StringValue(sequenceNumber) { + if sequenceNumber == nil { rc.shard.Checkpoint = SHARD_END } else { rc.shard.Checkpoint = aws.StringValue(sequenceNumber) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index a71c3ae..7fce564 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -146,7 +146,7 @@ func (w *Worker) initialize() error { err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID) if err != nil { - log.Errorf("Failed to start monitoring service: %s", err) + log.Errorf("Failed to start monitoring service: %+v", err) } w.mService = w.metricsConfig.GetMonitoringService() @@ -195,9 +195,8 @@ func (w *Worker) eventLoop() { for { err := w.syncShard() if err != nil { - log.Errorf("Error getting Kinesis shards: %v", err) - // Back-off? - time.Sleep(500 * time.Millisecond) + log.Errorf("Error getting Kinesis shards: %+v", err) + time.Sleep(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond) } log.Infof("Found %d shards", len(w.shardStatus)) @@ -210,17 +209,17 @@ func (w *Worker) eventLoop() { } } - // max number of lease has not been reached + // max number of lease has not been reached yet if counter < w.kclConfig.MaxLeasesForWorker { for _, shard := range w.shardStatus { - // We already own this shard so carry on + // already owner of the shard if shard.getLeaseOwner() == w.workerID { continue } err := w.checkpointer.FetchCheckpoint(shard) if err != nil { - // checkpoint may not existed yet if not an error condition. + // checkpoint may not existed yet is not an error condition. if err != ErrSequenceIDNotFound { log.Error(err) // move on to next shard @@ -249,6 +248,8 @@ func (w *Worker) eventLoop() { sc := w.newShardConsumer(shard) go sc.getRecords(shard) w.waitGroup.Add(1) + // exit from for loop and not to grab more shard for now. + break } } @@ -272,16 +273,18 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err args := &kinesis.DescribeStreamInput{ StreamName: aws.String(w.streamName), } + if startShardID != "" { args.ExclusiveStartShardId = aws.String(startShardID) } + streamDesc, err := w.kc.DescribeStream(args) if err != nil { return err } if *streamDesc.StreamDescription.StreamStatus != "ACTIVE" { - return errors.New("Stream not active") + return errors.New("stream not active") } var lastShardID string diff --git a/clientlibrary/worker/worker_test.go b/clientlibrary/worker/worker_test.go index bdabb54..380a5f7 100644 --- a/clientlibrary/worker/worker_test.go +++ b/clientlibrary/worker/worker_test.go @@ -154,4 +154,11 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) + + // When the value of {@link ShutdownInput#getShutdownReason()} is + // {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you + // checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. + if input.ShutdownReason == kc.TERMINATE { + input.Checkpointer.Checkpoint(nil) + } }