KCL: Add support for handling shard split

Add support for handling child/parent shard. When processing
child shard, it has to wait until parent shard finished before
processing itself.

Change-Id: I8bbf104c22ae93409d856be9c6829988c1b2d7eb
This commit is contained in:
Tao Jiang 2018-04-18 20:09:52 -07:00
parent c05bfb7ac8
commit 869a8e4275
4 changed files with 81 additions and 12 deletions

View file

@ -19,6 +19,10 @@ const (
LEASE_OWNER_KEY = "AssignedTo"
LEASE_TIMEOUT_KEY = "LeaseTimeout"
CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint"
PARENT_SHARD_ID_KEY = "ParentShardId"
// We've completely processed all records in this shard.
SHARD_END = "SHARD_END"
// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
ErrLeaseNotAquired = "Lease is already held by another node"
@ -124,6 +128,10 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s
},
}
if len(shard.ParentShardId) > 0 {
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId}
}
if shard.Checkpoint != "" {
marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{
S: &shard.Checkpoint,
@ -165,6 +173,11 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) err
S: &leaseTimeout,
},
}
if len(shard.ParentShardId) > 0 {
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId}
}
return checkpointer.saveItem(marshalledCheckpoint)
}

View file

@ -45,7 +45,14 @@ func (pc *PreparedCheckpointer) Checkpoint() error {
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
rc.shard.mux.Lock()
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
// checkpoint the last sequence of a closed shard
if rc.shard.EndingSequenceNumber == aws.StringValue(sequenceNumber) {
rc.shard.Checkpoint = SHARD_END
} else {
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
}
rc.shard.mux.Unlock()
return rc.checkpoint.CheckpointSequence(rc.shard)
}

View file

@ -96,6 +96,12 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) {
func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
defer sc.waitGroup.Done()
// If the shard is child shard, need to wait until the parent finished.
if err := sc.waitOnParentShard(shard); err != nil {
log.Errorf("Error in waiting for parent shard: %v to finish. Error: %+v", shard.ParentShardId, err)
return err
}
shardIterator, err := sc.getShardIterator(shard)
if err != nil {
log.Errorf("Unable to get shard iterator for %s: %v", shard.ID, err)
@ -208,3 +214,28 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
}
}
}
// Need to wait until the parent shard finished
func (sc *ShardConsumer) waitOnParentShard(shard *shardStatus) error {
if len(shard.ParentShardId) == 0 {
return nil
}
pshard := &shardStatus{
ID: shard.ParentShardId,
mux: &sync.Mutex{},
}
for {
if err := sc.checkpointer.FetchCheckpoint(pshard); err != nil {
return err
}
// Parent shard is finished.
if pshard.Checkpoint == SHARD_END {
return nil
}
time.Sleep(time.Duration(sc.kclConfig.ParentShardPollIntervalMillis) * time.Millisecond)
}
}

View file

@ -22,11 +22,16 @@ import (
)
type shardStatus struct {
ID string
Checkpoint string
AssignedTo string
mux *sync.Mutex
LeaseTimeout time.Time
ID string
ParentShardId string
Checkpoint string
AssignedTo string
mux *sync.Mutex
LeaseTimeout time.Time
// Shard Range
StartingSequenceNumber string
// child shard doesn't have end sequence number
EndingSequenceNumber string
}
func (ss *shardStatus) getLeaseOwner() string {
@ -214,19 +219,29 @@ func (w *Worker) eventLoop() {
err := w.checkpointer.FetchCheckpoint(shard)
if err != nil {
// checkpoint may not existed yet if not an error condition.
if err != ErrSequenceIDNotFound {
log.Fatal(err)
log.Error(err)
// move on to next shard
continue
}
}
// The shard is closed and we have processed all records
if shard.Checkpoint == SHARD_END {
continue
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
if err.Error() == ErrLeaseNotAquired {
continue
// cannot get lease on the shard
if err.Error() != ErrLeaseNotAquired {
log.Error(err)
}
log.Fatal(err)
continue
}
// log metrics on got lease
w.mService.LeaseGained(shard.ID)
log.Infof("Start Shard Consumer for shard: %v", shard.ID)
@ -272,8 +287,11 @@ func (w *Worker) getShardIDs(startShardID string) error {
if _, ok := w.shardStatus[*s.ShardId]; !ok {
log.Debugf("Found shard with id %s", *s.ShardId)
w.shardStatus[*s.ShardId] = &shardStatus{
ID: *s.ShardId,
mux: &sync.Mutex{},
ID: *s.ShardId,
ParentShardId: aws.StringValue(s.ParentShardId),
mux: &sync.Mutex{},
StartingSequenceNumber: aws.StringValue(s.SequenceNumberRange.StartingSequenceNumber),
EndingSequenceNumber: aws.StringValue(s.SequenceNumberRange.EndingSequenceNumber),
}
}
lastShardID = *s.ShardId