diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 9121df4b..c0bdc060 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -527,6 +527,7 @@ class ConsumerStates { consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), consumer.isCleanupLeasesOfCompletedShards(), + consumer.isIgnoreUnexpectedChildShards(), consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 3fb36754..04d59574 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -200,6 +200,7 @@ public class KinesisClientLibConfiguration { private boolean callProcessRecordsEvenForEmptyRecordList; private long parentShardPollIntervalMillis; private boolean cleanupLeasesUponShardCompletion; + private boolean ignoreUnexpectedChildShards; private ClientConfiguration kinesisClientConfig; private ClientConfiguration dynamoDBClientConfig; private ClientConfiguration cloudWatchClientConfig; @@ -802,6 +803,13 @@ public class KinesisClientLibConfiguration { return cleanupLeasesUponShardCompletion; } + /** + * @return true if we should ignore child shards which have open parents + */ + public boolean shouldIgnoreUnexpectedChildShards() { + return ignoreUnexpectedChildShards; + } + /** * @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before * checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} @@ -1022,6 +1030,16 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param ignoreUnexpectedChildShards Ignore child shards with open parents. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards( + boolean ignoreUnexpectedChildShards) { + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + return this; + } + /** * @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client * @return KinesisClientLibConfiguration diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 9fb8e8e9..95cc663e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -484,6 +484,10 @@ class ShardConsumer { return cleanupLeasesOfCompletedShards; } + boolean isIgnoreUnexpectedChildShards() { + return config.shouldIgnoreUnexpectedChildShards(); + } + long getTaskBackoffTimeMillis() { return taskBackoffTimeMillis; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index ddfb8459..5a0c3d5a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -35,6 +35,7 @@ class ShardSyncTask implements ITask { private final ILeaseManager leaseManager; private InitialPositionInStreamExtended initialPosition; private final boolean cleanupLeasesUponShardCompletion; + private final boolean ignoreUnexpectedChildShards; private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; @@ -49,11 +50,13 @@ class ShardSyncTask implements ITask { ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, long shardSyncTaskIdleTimeMillis) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis; } @@ -68,7 +71,8 @@ class ShardSyncTask implements ITask { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, - cleanupLeasesUponShardCompletion); + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index c1bfae76..be62c66b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -44,6 +44,7 @@ class ShardSyncTaskManager { private final ExecutorService executorService; private final InitialPositionInStreamExtended initialPositionInStream; private boolean cleanupLeasesUponShardCompletion; + private boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; @@ -55,6 +56,7 @@ class ShardSyncTaskManager { * @param initialPositionInStream Initial position in stream * @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait * until they expire) + * @param ignoreUnexpectedChildShards Ignore child shards with open parents * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * @param metricsFactory Metrics factory * @param executorService ExecutorService to execute the shard sync tasks @@ -63,6 +65,7 @@ class ShardSyncTaskManager { final ILeaseManager leaseManager, final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService) { @@ -70,6 +73,7 @@ class ShardSyncTaskManager { this.leaseManager = leaseManager; this.metricsFactory = metricsFactory; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; this.initialPositionInStream = initialPositionInStream; @@ -99,6 +103,7 @@ class ShardSyncTaskManager { leaseManager, initialPositionInStream, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIdleTimeMillis), metricsFactory); future = executorService.submit(currentTask); submittedNewTask = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 52944200..2e309156 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; @@ -60,9 +61,11 @@ class ShardSyncer { static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards) + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); } /** @@ -71,21 +74,28 @@ class ShardSyncer { * @param kinesisProxy * @param leaseManager * @param initialPositionInStream - * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis - * shows this shard to be closed (e.g. parent shard must be closed after a reshard operation). - * If it is open, we assume this is an race condition around a reshard event and throw - * a KinesisClientLibIOException so client can backoff and retry later. + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ + static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } + static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); + checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false); } /** @@ -93,11 +103,9 @@ class ShardSyncer { * * @param kinesisProxy * @param leaseManager - * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis - * does not show this shard to be open (e.g. parent shard must be closed after a reshard operation). - * If it is still open, we assume this is a race condition around a reshard event and - * throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in - * Kinesis at all, we assume this is an old/expired shard and continue with the sync operation. + * @param initialPosition + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException @@ -107,18 +115,23 @@ class ShardSyncer { private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards) + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { List shards = getShardList(kinesisProxy); LOG.debug("Num shards: " + shards.size()); Map shardIdToShardMap = constructShardIdToShardMap(shards); Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); - assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap); - + Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); + if (!ignoreUnexpectedChildShards) { + assertAllParentShardsAreClosed(inconsistentShardIds); + } + List currentLeases = leaseManager.listLeases(); - - List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition); + + List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, + inconsistentShardIds); LOG.debug("Num new leases to create: " + newLeasesToCreate.size()); for (KinesisClientLease lease : newLeasesToCreate) { long startTimeMillis = System.currentTimeMillis(); @@ -149,19 +162,37 @@ class ShardSyncer { /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls * and a reshard operation. - * @param shardIdToChildShardIdsMap - * @param shardIdToShardMap + * @param inconsistentShardIds * @throws KinesisClientLibIOException */ - private static void assertAllParentShardsAreClosed(Map> shardIdToChildShardIdsMap, - Map shardIdToShardMap) throws KinesisClientLibIOException { + private static void assertAllParentShardsAreClosed(Set inconsistentShardIds) + throws KinesisClientLibIOException { + if (!inconsistentShardIds.isEmpty()) { + String ids = StringUtils.join(inconsistentShardIds, ' '); + throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. " + + "This can happen due to a race condition between describeStream and a reshard operation.", + inconsistentShardIds.size(), ids)); + } + } + + /** + * Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor + * parent(s). + * @param shardIdToChildShardIdsMap + * @param shardIdToShardMap + * @return Set of inconsistent open shard ids for shards having open parents. + */ + private static Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap, + Map shardIdToShardMap) { + Set result = new HashSet(); for (String parentShardId : shardIdToChildShardIdsMap.keySet()) { Shard parentShard = shardIdToShardMap.get(parentShardId); if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) { - throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. " - + "This can happen due to a race condition between describeStream and a reshard operation."); + Set childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId); + result.addAll(childShardIdsMap); } } + return result; } /** @@ -296,8 +327,8 @@ class ShardSyncer { /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. - * - * For each open (no ending sequence number) shard that doesn't already have a lease, + * + * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. * If not, set checkpoint of the shard to the initial position specified by the client. @@ -315,27 +346,35 @@ class ShardSyncer { * * For example: * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (3, 4, 5) * New leases to create: (2, 6, 7, 8, 9, 10) * * The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail * before creating all the leases. + * + * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * * * @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param currentLeases List of current leases * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param inconsistentShardIds Set of child shard ids having open parents. * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ static List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStreamExtended initialPosition) { + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds) { Map shardIdToNewLeaseMap = new HashMap(); Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); @@ -354,6 +393,8 @@ class ShardSyncer { LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); if (shardIdsOfCurrentLeases.contains(shardId)) { LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); + } else if (inconsistentShardIds.contains(shardId)) { + LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); } else { LOG.debug("Need to create a lease for shardId " + shardId); KinesisClientLease newLease = newKCLLease(shard); @@ -407,6 +448,17 @@ class ShardSyncer { return newLeasesToCreate; } + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + */ + static List determineNewLeasesToCreate(List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition) { + Set inconsistentShardIds = new HashSet(); + return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + } + /** * Note: Package level access for testing purposes only. * Check if this shard is a descendant of a shard that is (or will be) processed. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index bd40d686..a407f009 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -44,6 +44,7 @@ class ShutdownTask implements ITask { private final ILeaseManager leaseManager; private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; + private final boolean ignoreUnexpectedChildShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; private final GetRecordsCache getRecordsCache; @@ -59,6 +60,7 @@ class ShutdownTask implements ITask { IKinesisProxy kinesisProxy, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, ILeaseManager leaseManager, long backoffTimeMillis, GetRecordsCache getRecordsCache) { @@ -69,6 +71,7 @@ class ShutdownTask implements ITask { this.kinesisProxy = kinesisProxy; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; this.getRecordsCache = getRecordsCache; @@ -127,7 +130,8 @@ class ShutdownTask implements ITask { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, - cleanupLeasesOfCompletedShards); + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index d2ea738d..c0f413be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -407,8 +407,8 @@ public class Worker implements Runnable { this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), - initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, - executorService); + initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), + shardSyncIdleTimeMillis, metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -499,7 +499,8 @@ public class Worker implements Runnable { || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L); + leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, + config.shouldIgnoreUnexpectedChildShards(), 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index 177546db..cccbcb30 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.Date; @@ -407,4 +408,14 @@ public class KinesisClientLibConfigurationTest { fail("Should not have thrown"); } } + + @Test + public void testKCLConfigurationWithIgnoreUnexpectedChildShards() { + KinesisClientLibConfiguration config = + new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker"); + // By default, unexpected child shards should not be ignored. + assertFalse(config.shouldIgnoreUnexpectedChildShards()); + config = config.withIgnoreUnexpectedChildShards(true); + assertTrue(config.shouldIgnoreUnexpectedChildShards()); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 307596e3..37a72741 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -124,6 +124,7 @@ public class ShardSyncTaskIntegrationTest { leaseManager, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, + false, 0L); syncTask.call(); List leases = leaseManager.listLeases(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index b8f6ae56..2736281e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -146,6 +146,39 @@ public class ShardSyncerTest { } } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of + * the shards was marked as inconsistent. + */ + @Test + public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { + List shards = new ArrayList(); + List currentLeases = new ArrayList(); + SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + String shardId0 = "shardId-0"; + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + + String shardId1 = "shardId-1"; + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + + String shardId2 = "shardId-2"; + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + + Set inconsistentShardIds = new HashSet(); + inconsistentShardIds.add(shardId2); + + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + Assert.assertEquals(2, newLeases.size()); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add(shardId0); + expectedLeaseShardIds.add(shardId1); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey())); + } + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * @@ -296,6 +329,41 @@ public class ShardSyncerTest { dataFile.delete(); } + /** + * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + List shards = constructShardListForGraphA(); + Shard shard = shards.get(5); + Assert.assertEquals("shardId-5", shard.getShardId()); + SequenceNumberRange range = shard.getSequenceNumberRange(); + // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 + // is not closed, those children should be ignored when syncing shards, no leases + // should be obtained for them, and we should obtain a lease on the still-open + // parent. + range.setEndingSequenceNumber(null); + shard.setSequenceNumberRange(range); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); + dataFile.deleteOnExit(); + IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, true); + List newLeases = leaseManager.listLeases(); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add("shardId-4"); + expectedLeaseShardIds.add("shardId-5"); + expectedLeaseShardIds.add("shardId-8"); + Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size()); + for (KinesisClientLease lease1 : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); + Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint()); + } + dataFile.delete(); + } + /** * @throws KinesisClientLibIOException * @throws DependencyException @@ -586,7 +654,8 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards); + ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + false); List newLeases = leaseManager.listLeases(); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 17a53137..ddf07e10 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -100,6 +100,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -107,6 +108,7 @@ public class ShutdownTaskTest { kinesisProxy, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache); @@ -126,6 +128,7 @@ public class ShutdownTaskTest { when(kinesisProxy.getShardList()).thenReturn(null); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -133,6 +136,7 @@ public class ShutdownTaskTest { kinesisProxy, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache); @@ -147,7 +151,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); }