diff --git a/src/clientlibrary/worker/checkpointer.go b/src/clientlibrary/worker/checkpointer.go index 39584db..4994e63 100644 --- a/src/clientlibrary/worker/checkpointer.go +++ b/src/clientlibrary/worker/checkpointer.go @@ -19,6 +19,10 @@ const ( LEASE_OWNER_KEY = "AssignedTo" LEASE_TIMEOUT_KEY = "LeaseTimeout" CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint" + PARENT_SHARD_ID_KEY = "ParentShardId" + + // We've completely processed all records in this shard. + SHARD_END = "SHARD_END" // ErrLeaseNotAquired is returned when we failed to get a lock on the shard ErrLeaseNotAquired = "Lease is already held by another node" @@ -124,6 +128,10 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s }, } + if len(shard.ParentShardId) > 0 { + marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId} + } + if shard.Checkpoint != "" { marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ S: &shard.Checkpoint, @@ -165,6 +173,11 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) err S: &leaseTimeout, }, } + + if len(shard.ParentShardId) > 0 { + marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId} + } + return checkpointer.saveItem(marshalledCheckpoint) } diff --git a/src/clientlibrary/worker/record-processor-checkpointer.go b/src/clientlibrary/worker/record-processor-checkpointer.go index 4f624f2..69a406e 100644 --- a/src/clientlibrary/worker/record-processor-checkpointer.go +++ b/src/clientlibrary/worker/record-processor-checkpointer.go @@ -45,7 +45,14 @@ func (pc *PreparedCheckpointer) Checkpoint() error { func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error { rc.shard.mux.Lock() - rc.shard.Checkpoint = aws.StringValue(sequenceNumber) + + // checkpoint the last sequence of a closed shard + if rc.shard.EndingSequenceNumber == aws.StringValue(sequenceNumber) { + rc.shard.Checkpoint = SHARD_END + } else { + rc.shard.Checkpoint = aws.StringValue(sequenceNumber) + } + rc.shard.mux.Unlock() return rc.checkpoint.CheckpointSequence(rc.shard) } diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go index 012eff9..9b9f175 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/clientlibrary/worker/shard-consumer.go @@ -96,6 +96,12 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { func (sc *ShardConsumer) getRecords(shard *shardStatus) error { defer sc.waitGroup.Done() + // If the shard is child shard, need to wait until the parent finished. + if err := sc.waitOnParentShard(shard); err != nil { + log.Errorf("Error in waiting for parent shard: %v to finish. Error: %+v", shard.ParentShardId, err) + return err + } + shardIterator, err := sc.getShardIterator(shard) if err != nil { log.Errorf("Unable to get shard iterator for %s: %v", shard.ID, err) @@ -208,3 +214,28 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { } } } + +// Need to wait until the parent shard finished +func (sc *ShardConsumer) waitOnParentShard(shard *shardStatus) error { + if len(shard.ParentShardId) == 0 { + return nil + } + + pshard := &shardStatus{ + ID: shard.ParentShardId, + mux: &sync.Mutex{}, + } + + for { + if err := sc.checkpointer.FetchCheckpoint(pshard); err != nil { + return err + } + + // Parent shard is finished. + if pshard.Checkpoint == SHARD_END { + return nil + } + + time.Sleep(time.Duration(sc.kclConfig.ParentShardPollIntervalMillis) * time.Millisecond) + } +} diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go index 7ecb0a4..d8402ec 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/clientlibrary/worker/worker.go @@ -22,11 +22,16 @@ import ( ) type shardStatus struct { - ID string - Checkpoint string - AssignedTo string - mux *sync.Mutex - LeaseTimeout time.Time + ID string + ParentShardId string + Checkpoint string + AssignedTo string + mux *sync.Mutex + LeaseTimeout time.Time + // Shard Range + StartingSequenceNumber string + // child shard doesn't have end sequence number + EndingSequenceNumber string } func (ss *shardStatus) getLeaseOwner() string { @@ -214,19 +219,29 @@ func (w *Worker) eventLoop() { err := w.checkpointer.FetchCheckpoint(shard) if err != nil { + // checkpoint may not existed yet if not an error condition. if err != ErrSequenceIDNotFound { - log.Fatal(err) + log.Error(err) + // move on to next shard + continue } } + // The shard is closed and we have processed all records + if shard.Checkpoint == SHARD_END { + continue + } + err = w.checkpointer.GetLease(shard, w.workerID) if err != nil { - if err.Error() == ErrLeaseNotAquired { - continue + // cannot get lease on the shard + if err.Error() != ErrLeaseNotAquired { + log.Error(err) } - log.Fatal(err) + continue } + // log metrics on got lease w.mService.LeaseGained(shard.ID) log.Infof("Start Shard Consumer for shard: %v", shard.ID) @@ -272,8 +287,11 @@ func (w *Worker) getShardIDs(startShardID string) error { if _, ok := w.shardStatus[*s.ShardId]; !ok { log.Debugf("Found shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ - ID: *s.ShardId, - mux: &sync.Mutex{}, + ID: *s.ShardId, + ParentShardId: aws.StringValue(s.ParentShardId), + mux: &sync.Mutex{}, + StartingSequenceNumber: aws.StringValue(s.SequenceNumberRange.StartingSequenceNumber), + EndingSequenceNumber: aws.StringValue(s.SequenceNumberRange.EndingSequenceNumber), } } lastShardID = *s.ShardId