diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index e640243f..0ec40c2c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -60,9 +60,11 @@ class ShardSyncer { static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards) + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false); + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index b8f6ae56..2736281e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -146,6 +146,39 @@ public class ShardSyncerTest { } } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of + * the shards was marked as inconsistent. + */ + @Test + public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { + List shards = new ArrayList(); + List currentLeases = new ArrayList(); + SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + String shardId0 = "shardId-0"; + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + + String shardId1 = "shardId-1"; + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + + String shardId2 = "shardId-2"; + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + + Set inconsistentShardIds = new HashSet(); + inconsistentShardIds.add(shardId2); + + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + Assert.assertEquals(2, newLeases.size()); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add(shardId0); + expectedLeaseShardIds.add(shardId1); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey())); + } + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * @@ -296,6 +329,41 @@ public class ShardSyncerTest { dataFile.delete(); } + /** + * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + List shards = constructShardListForGraphA(); + Shard shard = shards.get(5); + Assert.assertEquals("shardId-5", shard.getShardId()); + SequenceNumberRange range = shard.getSequenceNumberRange(); + // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 + // is not closed, those children should be ignored when syncing shards, no leases + // should be obtained for them, and we should obtain a lease on the still-open + // parent. + range.setEndingSequenceNumber(null); + shard.setSequenceNumberRange(range); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); + dataFile.deleteOnExit(); + IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, true); + List newLeases = leaseManager.listLeases(); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add("shardId-4"); + expectedLeaseShardIds.add("shardId-5"); + expectedLeaseShardIds.add("shardId-8"); + Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size()); + for (KinesisClientLease lease1 : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); + Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint()); + } + dataFile.delete(); + } + /** * @throws KinesisClientLibIOException * @throws DependencyException @@ -586,7 +654,8 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards); + ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + false); List newLeases = leaseManager.listLeases(); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet();