Merge pull request #70 from ychunxue/ltr-v1.x-bugfix
Bug fix: No longer need full shard sync for shardEnd
This commit is contained in:
commit
4fd63989d3
3 changed files with 7 additions and 7 deletions
|
|
@ -689,10 +689,6 @@ public class Worker implements Runnable {
|
||||||
assignedShards.add(shardInfo);
|
assignedShards.add(shardInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (foundCompletedShard) {
|
|
||||||
shardSyncStrategy.onFoundCompletedShard();
|
|
||||||
}
|
|
||||||
|
|
||||||
// clean up shard consumers for unassigned shards
|
// clean up shard consumers for unassigned shards
|
||||||
cleanupShardConsumers(assignedShards);
|
cleanupShardConsumers(assignedShards);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2184,7 +2184,8 @@ public class WorkerTest {
|
||||||
private List<Shard> createShardListWithOneSplit() {
|
private List<Shard> createShardListWithOneSplit() {
|
||||||
List<Shard> shards = new ArrayList<Shard>();
|
List<Shard> shards = new ArrayList<Shard>();
|
||||||
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324");
|
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324");
|
||||||
SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("987325", null);
|
SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("39428", "100000");
|
||||||
|
SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("100001", "987324");
|
||||||
HashKeyRange keyRange =
|
HashKeyRange keyRange =
|
||||||
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY);
|
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY);
|
||||||
Shard shard0 = ShardObjectHelper.newShard("shardId-0", null, null, range0, keyRange);
|
Shard shard0 = ShardObjectHelper.newShard("shardId-0", null, null, range0, keyRange);
|
||||||
|
|
@ -2193,6 +2194,9 @@ public class WorkerTest {
|
||||||
Shard shard1 = ShardObjectHelper.newShard("shardId-1", "shardId-0", null, range1, keyRange);
|
Shard shard1 = ShardObjectHelper.newShard("shardId-1", "shardId-0", null, range1, keyRange);
|
||||||
shards.add(shard1);
|
shards.add(shard1);
|
||||||
|
|
||||||
|
Shard shard2 = ShardObjectHelper.newShard("shardId-2", "shardId-0", null, range2, keyRange);
|
||||||
|
shards.add(shard2);
|
||||||
|
|
||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -406,12 +406,12 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
|
||||||
parentShards.add(iterator.shardId);
|
parentShards.add(iterator.shardId);
|
||||||
|
|
||||||
ChildShard leftChild = new ChildShard();
|
ChildShard leftChild = new ChildShard();
|
||||||
leftChild.setShardId("ShardId-1");
|
leftChild.setShardId("shardId-1");
|
||||||
leftChild.setParentShards(parentShards);
|
leftChild.setParentShards(parentShards);
|
||||||
childShards.add(leftChild);
|
childShards.add(leftChild);
|
||||||
|
|
||||||
ChildShard rightChild = new ChildShard();
|
ChildShard rightChild = new ChildShard();
|
||||||
rightChild.setShardId("ShardId-2");
|
rightChild.setShardId("shardId-2");
|
||||||
rightChild.setParentShards(parentShards);
|
rightChild.setParentShards(parentShards);
|
||||||
childShards.add(rightChild);
|
childShards.add(rightChild);
|
||||||
return childShards;
|
return childShards;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue