From 9074864027f2b8ff3acb2c167efb66d5f92b173b Mon Sep 17 00:00:00 2001 From: Mike Watters Date: Thu, 4 Jan 2018 09:29:44 -0700 Subject: [PATCH 1/9] [Issue 210] - Allow unexpected child shards to be ignored (#240) Allow unexpected child shards to be ignored now instead of always throwing an assertion if a child shard has an open parent, consider worker configuration before doing so. if configured to ignore such shards, do not create leases for them during shard sync. this is intended to mitigate failing worker init when processing dynamodb streams with many thousands of shards (which can happen for tables with thousands of partitions). this new behavior can be enabled by adding the following to a configuration/properties file: ``` ignoreUnexpectedChildShards = true ``` --- .../lib/worker/ConsumerStates.java | 1 + .../worker/KinesisClientLibConfiguration.java | 18 +++ .../lib/worker/ShardConsumer.java | 4 + .../lib/worker/ShardSyncTask.java | 6 +- .../lib/worker/ShardSyncTaskManager.java | 5 + .../clientlibrary/lib/worker/ShardSyncer.java | 114 +++++++++++++----- .../lib/worker/ShutdownTask.java | 6 +- .../clientlibrary/lib/worker/Worker.java | 7 +- .../KinesisClientLibConfigurationTest.java | 11 ++ .../worker/ShardSyncTaskIntegrationTest.java | 1 + .../lib/worker/ShardSyncerTest.java | 71 ++++++++++- .../lib/worker/ShutdownTaskTest.java | 6 +- 12 files changed, 212 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 9121df4b..c0bdc060 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -527,6 +527,7 @@ class ConsumerStates { consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), consumer.isCleanupLeasesOfCompletedShards(), + consumer.isIgnoreUnexpectedChildShards(), consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 3fb36754..04d59574 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -200,6 +200,7 @@ public class KinesisClientLibConfiguration { private boolean callProcessRecordsEvenForEmptyRecordList; private long parentShardPollIntervalMillis; private boolean cleanupLeasesUponShardCompletion; + private boolean ignoreUnexpectedChildShards; private ClientConfiguration kinesisClientConfig; private ClientConfiguration dynamoDBClientConfig; private ClientConfiguration cloudWatchClientConfig; @@ -802,6 +803,13 @@ public class KinesisClientLibConfiguration { return cleanupLeasesUponShardCompletion; } + /** + * @return true if we should ignore child shards which have open parents + */ + public boolean shouldIgnoreUnexpectedChildShards() { + return ignoreUnexpectedChildShards; + } + /** * @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before * checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} @@ -1022,6 +1030,16 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param ignoreUnexpectedChildShards Ignore child shards with open parents. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards( + boolean ignoreUnexpectedChildShards) { + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + return this; + } + /** * @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client * @return KinesisClientLibConfiguration diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 9fb8e8e9..95cc663e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -484,6 +484,10 @@ class ShardConsumer { return cleanupLeasesOfCompletedShards; } + boolean isIgnoreUnexpectedChildShards() { + return config.shouldIgnoreUnexpectedChildShards(); + } + long getTaskBackoffTimeMillis() { return taskBackoffTimeMillis; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index ddfb8459..5a0c3d5a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -35,6 +35,7 @@ class ShardSyncTask implements ITask { private final ILeaseManager leaseManager; private InitialPositionInStreamExtended initialPosition; private final boolean cleanupLeasesUponShardCompletion; + private final boolean ignoreUnexpectedChildShards; private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; @@ -49,11 +50,13 @@ class ShardSyncTask implements ITask { ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, long shardSyncTaskIdleTimeMillis) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis; } @@ -68,7 +71,8 @@ class ShardSyncTask implements ITask { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, - cleanupLeasesUponShardCompletion); + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index c1bfae76..be62c66b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -44,6 +44,7 @@ class ShardSyncTaskManager { private final ExecutorService executorService; private final InitialPositionInStreamExtended initialPositionInStream; private boolean cleanupLeasesUponShardCompletion; + private boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; @@ -55,6 +56,7 @@ class ShardSyncTaskManager { * @param initialPositionInStream Initial position in stream * @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait * until they expire) + * @param ignoreUnexpectedChildShards Ignore child shards with open parents * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * @param metricsFactory Metrics factory * @param executorService ExecutorService to execute the shard sync tasks @@ -63,6 +65,7 @@ class ShardSyncTaskManager { final ILeaseManager leaseManager, final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService) { @@ -70,6 +73,7 @@ class ShardSyncTaskManager { this.leaseManager = leaseManager; this.metricsFactory = metricsFactory; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; this.initialPositionInStream = initialPositionInStream; @@ -99,6 +103,7 @@ class ShardSyncTaskManager { leaseManager, initialPositionInStream, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIdleTimeMillis), metricsFactory); future = executorService.submit(currentTask); submittedNewTask = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 52944200..2e309156 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; @@ -60,9 +61,11 @@ class ShardSyncer { static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards) + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); } /** @@ -71,21 +74,28 @@ class ShardSyncer { * @param kinesisProxy * @param leaseManager * @param initialPositionInStream - * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis - * shows this shard to be closed (e.g. parent shard must be closed after a reshard operation). - * If it is open, we assume this is an race condition around a reshard event and throw - * a KinesisClientLibIOException so client can backoff and retry later. + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ + static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } + static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); + checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false); } /** @@ -93,11 +103,9 @@ class ShardSyncer { * * @param kinesisProxy * @param leaseManager - * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis - * does not show this shard to be open (e.g. parent shard must be closed after a reshard operation). - * If it is still open, we assume this is a race condition around a reshard event and - * throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in - * Kinesis at all, we assume this is an old/expired shard and continue with the sync operation. + * @param initialPosition + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException @@ -107,18 +115,23 @@ class ShardSyncer { private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards) + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { List shards = getShardList(kinesisProxy); LOG.debug("Num shards: " + shards.size()); Map shardIdToShardMap = constructShardIdToShardMap(shards); Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); - assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap); - + Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); + if (!ignoreUnexpectedChildShards) { + assertAllParentShardsAreClosed(inconsistentShardIds); + } + List currentLeases = leaseManager.listLeases(); - - List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition); + + List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, + inconsistentShardIds); LOG.debug("Num new leases to create: " + newLeasesToCreate.size()); for (KinesisClientLease lease : newLeasesToCreate) { long startTimeMillis = System.currentTimeMillis(); @@ -149,19 +162,37 @@ class ShardSyncer { /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls * and a reshard operation. - * @param shardIdToChildShardIdsMap - * @param shardIdToShardMap + * @param inconsistentShardIds * @throws KinesisClientLibIOException */ - private static void assertAllParentShardsAreClosed(Map> shardIdToChildShardIdsMap, - Map shardIdToShardMap) throws KinesisClientLibIOException { + private static void assertAllParentShardsAreClosed(Set inconsistentShardIds) + throws KinesisClientLibIOException { + if (!inconsistentShardIds.isEmpty()) { + String ids = StringUtils.join(inconsistentShardIds, ' '); + throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. " + + "This can happen due to a race condition between describeStream and a reshard operation.", + inconsistentShardIds.size(), ids)); + } + } + + /** + * Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor + * parent(s). + * @param shardIdToChildShardIdsMap + * @param shardIdToShardMap + * @return Set of inconsistent open shard ids for shards having open parents. + */ + private static Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap, + Map shardIdToShardMap) { + Set result = new HashSet(); for (String parentShardId : shardIdToChildShardIdsMap.keySet()) { Shard parentShard = shardIdToShardMap.get(parentShardId); if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) { - throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. " - + "This can happen due to a race condition between describeStream and a reshard operation."); + Set childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId); + result.addAll(childShardIdsMap); } } + return result; } /** @@ -296,8 +327,8 @@ class ShardSyncer { /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. - * - * For each open (no ending sequence number) shard that doesn't already have a lease, + * + * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. * If not, set checkpoint of the shard to the initial position specified by the client. @@ -315,27 +346,35 @@ class ShardSyncer { * * For example: * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (3, 4, 5) * New leases to create: (2, 6, 7, 8, 9, 10) * * The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail * before creating all the leases. + * + * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * * * @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param currentLeases List of current leases * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param inconsistentShardIds Set of child shard ids having open parents. * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ static List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStreamExtended initialPosition) { + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds) { Map shardIdToNewLeaseMap = new HashMap(); Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); @@ -354,6 +393,8 @@ class ShardSyncer { LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); if (shardIdsOfCurrentLeases.contains(shardId)) { LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); + } else if (inconsistentShardIds.contains(shardId)) { + LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); } else { LOG.debug("Need to create a lease for shardId " + shardId); KinesisClientLease newLease = newKCLLease(shard); @@ -407,6 +448,17 @@ class ShardSyncer { return newLeasesToCreate; } + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + */ + static List determineNewLeasesToCreate(List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition) { + Set inconsistentShardIds = new HashSet(); + return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + } + /** * Note: Package level access for testing purposes only. * Check if this shard is a descendant of a shard that is (or will be) processed. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index bd40d686..a407f009 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -44,6 +44,7 @@ class ShutdownTask implements ITask { private final ILeaseManager leaseManager; private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; + private final boolean ignoreUnexpectedChildShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; private final GetRecordsCache getRecordsCache; @@ -59,6 +60,7 @@ class ShutdownTask implements ITask { IKinesisProxy kinesisProxy, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, ILeaseManager leaseManager, long backoffTimeMillis, GetRecordsCache getRecordsCache) { @@ -69,6 +71,7 @@ class ShutdownTask implements ITask { this.kinesisProxy = kinesisProxy; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; this.getRecordsCache = getRecordsCache; @@ -127,7 +130,8 @@ class ShutdownTask implements ITask { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, - cleanupLeasesOfCompletedShards); + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index d2ea738d..c0f413be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -407,8 +407,8 @@ public class Worker implements Runnable { this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), - initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, - executorService); + initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), + shardSyncIdleTimeMillis, metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -499,7 +499,8 @@ public class Worker implements Runnable { || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L); + leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, + config.shouldIgnoreUnexpectedChildShards(), 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index 177546db..cccbcb30 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.Date; @@ -407,4 +408,14 @@ public class KinesisClientLibConfigurationTest { fail("Should not have thrown"); } } + + @Test + public void testKCLConfigurationWithIgnoreUnexpectedChildShards() { + KinesisClientLibConfiguration config = + new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker"); + // By default, unexpected child shards should not be ignored. + assertFalse(config.shouldIgnoreUnexpectedChildShards()); + config = config.withIgnoreUnexpectedChildShards(true); + assertTrue(config.shouldIgnoreUnexpectedChildShards()); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 307596e3..37a72741 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -124,6 +124,7 @@ public class ShardSyncTaskIntegrationTest { leaseManager, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, + false, 0L); syncTask.call(); List leases = leaseManager.listLeases(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index b8f6ae56..2736281e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -146,6 +146,39 @@ public class ShardSyncerTest { } } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of + * the shards was marked as inconsistent. + */ + @Test + public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { + List shards = new ArrayList(); + List currentLeases = new ArrayList(); + SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + String shardId0 = "shardId-0"; + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + + String shardId1 = "shardId-1"; + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + + String shardId2 = "shardId-2"; + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + + Set inconsistentShardIds = new HashSet(); + inconsistentShardIds.add(shardId2); + + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + Assert.assertEquals(2, newLeases.size()); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add(shardId0); + expectedLeaseShardIds.add(shardId1); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey())); + } + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * @@ -296,6 +329,41 @@ public class ShardSyncerTest { dataFile.delete(); } + /** + * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + List shards = constructShardListForGraphA(); + Shard shard = shards.get(5); + Assert.assertEquals("shardId-5", shard.getShardId()); + SequenceNumberRange range = shard.getSequenceNumberRange(); + // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 + // is not closed, those children should be ignored when syncing shards, no leases + // should be obtained for them, and we should obtain a lease on the still-open + // parent. + range.setEndingSequenceNumber(null); + shard.setSequenceNumberRange(range); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); + dataFile.deleteOnExit(); + IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, true); + List newLeases = leaseManager.listLeases(); + Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add("shardId-4"); + expectedLeaseShardIds.add("shardId-5"); + expectedLeaseShardIds.add("shardId-8"); + Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size()); + for (KinesisClientLease lease1 : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); + Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint()); + } + dataFile.delete(); + } + /** * @throws KinesisClientLibIOException * @throws DependencyException @@ -586,7 +654,8 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards); + ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + false); List newLeases = leaseManager.listLeases(); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 17a53137..ddf07e10 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -100,6 +100,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -107,6 +108,7 @@ public class ShutdownTaskTest { kinesisProxy, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache); @@ -126,6 +128,7 @@ public class ShutdownTaskTest { when(kinesisProxy.getShardList()).thenReturn(null); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -133,6 +136,7 @@ public class ShutdownTaskTest { kinesisProxy, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache); @@ -147,7 +151,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } From db72cc15f8d708f4833ff957d5fd13d3367f3187 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 4 Jan 2018 08:32:50 -0800 Subject: [PATCH 2/9] Advance version to 1.8.9-SNAPSHOT --- META-INF/MANIFEST.MF | 2 +- pom.xml | 2 +- .../clientlibrary/lib/worker/KinesisClientLibConfiguration.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 35e86907..ecc2b4f0 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.8 +Bundle-Version: 1.8.9 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/pom.xml b/pom.xml index 1c521b0e..fea1c5df 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.8 + 1.8.9-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 04d59574..f69ae180 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.8"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.9"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls From e999c76b776557ec720193fe2e5dd6973d672009 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 12 Jan 2018 12:01:18 -0800 Subject: [PATCH 3/9] Upgraded the AWS SDK version to 1.11.261 (#281) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fea1c5df..44cce92a 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ - 1.11.218 + 1.11.261 1.0.392 libsqlite4java ${project.build.directory}/test-lib From 31fd0b58117962e99b2f793a8ff4c3c17a3d9a81 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 15 Jan 2018 07:52:28 -0800 Subject: [PATCH 4/9] Release 1.8.9 of the Amazon Kinesis Client for Java (#282) * Allow disabling check for the case where a child shard has an open parent shard. There is a race condition where it's possible for the a parent shard to appear open, while having child shards. This check can now be disabled by setting ignoreUnexpectedChildShards in the KinesisClientLibConfiguration to true. * PR #240 * Issue #210 * Upgraded the AWS SDK for Java to 1.11.261 * PR #281 --- README.md | 9 +++++++++ pom.xml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7a89922e..246b1e32 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,15 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes + +### Release 1.8.9 +* Allow disabling check for the case where a child shard has an open parent shard. + There is a race condition where it's possible for the a parent shard to appear open, while having child shards. This check can now be disabled by setting [`ignoreUnexpectedChildShards`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1037) to true. + * [PR #240](https://github.com/awslabs/amazon-kinesis-client/pull/240) + * [Issue #210](https://github.com/awslabs/amazon-kinesis-client/issues/210) +* Upgraded the AWS SDK for Java to 1.11.261 + * [PR #281](https://github.com/awslabs/amazon-kinesis-client/pull/281) + ### Release 1.8.8 * Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`. PrefetchGetRecordsCache will request for a new iterator and start fetching data again. diff --git a/pom.xml b/pom.xml index 44cce92a..d430b221 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.9-SNAPSHOT + 1.8.9 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. From 222bcdaf3f4c5873506d45a2809d5fc88db57f3e Mon Sep 17 00:00:00 2001 From: parijatsinha Date: Mon, 15 Jan 2018 14:31:04 -0800 Subject: [PATCH 5/9] Adding capability to set KinesisProxy using Worker.Builder (#274) * Added IKinesisProxy injector in Worker.Builder to allow injecting custom proxy implementations * Added unit tests for IKinesisProxy injection in Worker Builder * Revert "Added unit tests for IKinesisProxy injection in Worker Builder" This reverts commit aa944c17061b1506c5c55cf3932857b6f6086049. Reverting to undo changes to import ordering. * Added unit tests for IKinesisProxy injection in Worker Builder Re-added unit tests after reverting changes to import ordering. * Revert "Added unit tests for IKinesisProxy injection in Worker Builder" This reverts commit 91e445774beda2097788da7ffc09d04c03314a43. Reverting to refactor unit tests. * Added unit tests for Worker Builder IKinesisProxy injection validation Refactored unit tests as per comments in the pull request. * Added debug logs in KinesisLocalFileDataCreator * Revert "Added debug logs in KinesisLocalFileDataCreator" This reverts commit 1ff00d0b01d6d95a02b7ae67e542977a75b6e307. * Edited JavaDoc for Worker Builder kinesisProxy --- .../clientlibrary/lib/worker/Worker.java | 28 +++++++++++++++++-- .../clientlibrary/lib/worker/WorkerTest.java | 26 +++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index c0f413be..87d5cdb1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -996,6 +997,11 @@ public class Worker implements Runnable { metricsFactory, execService); } + @VisibleForTesting + StreamConfig getStreamConfig() { + return streamConfig; + } + /** * Given configuration, returns appropriate metrics factory. * @@ -1073,6 +1079,7 @@ public class Worker implements Runnable { private IMetricsFactory metricsFactory; private ExecutorService execService; private ShardPrioritization shardPrioritization; + private IKinesisProxy kinesisProxy; /** * Default constructor. @@ -1192,6 +1199,19 @@ public class Worker implements Runnable { return this; } + /** + * Set KinesisProxy for the worker. + * + * @param kinesisProxy + * Sets an implementation of IKinesisProxy. + * + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder kinesisProxy(IKinesisProxy kinesisProxy) { + this.kinesisProxy = kinesisProxy; + return this; + } + /** * Build the Worker instance. * @@ -1257,13 +1277,15 @@ public class Worker implements Runnable { if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } - + if (kinesisProxy == null) { + kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) + .getProxy(config.getStreamName()); + } return new Worker(config.getApplicationName(), recordProcessorFactory, config, - new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), - kinesisClient).getProxy(config.getStreamName()), + new StreamConfig(kinesisProxy, config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index ce406dce..6cc7ef08 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -90,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -1474,6 +1475,31 @@ public class WorkerTest { } + @Test + public void testBuilderWithDefaultKinesisProxy() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); + Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy); + } + + @Test + public void testBuilderWhenKinesisProxyIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + // Create an instance of KinesisLocalFileProxy for injection and validation + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .kinesisProxy(kinesisProxy) + .build(); + Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); + Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, From 71124e40550208cae7e2117415914d601036c8f2 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 16 Jan 2018 15:58:26 -0800 Subject: [PATCH 6/9] Updating version to 1.8.10. (#283) --- pom.xml | 2 +- .../clientlibrary/lib/worker/KinesisClientLibConfiguration.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index d430b221..45327898 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.9 + 1.8.10-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index f69ae180..bd33fc90 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.9"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.10"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls From e65e56380b6accd96b96c423162859c20bca04fc Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 22 Jan 2018 10:37:46 -0800 Subject: [PATCH 7/9] Fixing issue with NullMetrics warning messages (#284) Fixes #48 * Fixing issue with NullMetrics warning messages when trying to checkpoint on a separate thread. * Adding testing to validate the MetricsScope setting during checkpoiniting. --- .../worker/RecordProcessorCheckpointer.java | 47 +++++++--- .../lib/worker/ShardConsumer.java | 3 +- .../kinesis/metrics/impl/MetricsHelper.java | 11 ++- .../RecordProcessorCheckpointerTest.java | 87 ++++++++++++++----- .../lib/worker/ShardConsumerTest.java | 9 +- 5 files changed, 116 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 72e18d73..8e3dfd73 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,6 +53,8 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { private SequenceNumberValidator sequenceNumberValidator; private ExtendedSequenceNumber sequenceNumberAtShardEnd; + + private IMetricsFactory metricsFactory; /** * Only has package level access, since only the Amazon Kinesis Client Library should be creating these. @@ -59,10 +64,12 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { */ RecordProcessorCheckpointer(ShardInfo shardInfo, ICheckpoint checkpoint, - SequenceNumberValidator validator) { + SequenceNumberValidator validator, + IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; + this.metricsFactory = metricsFactory; } /** @@ -283,21 +290,33 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { // just checkpoint at SHARD_END checkpointToRecord = ExtendedSequenceNumber.SHARD_END; } + + boolean unsetMetrics = false; // Don't checkpoint a value we already successfully checkpointed - if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken() - + " checkpoint to " + checkpointToRecord); + try { + if (!MetricsHelper.isMetricsScopePresent()) { + MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics())); + unsetMetrics = true; + } + if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken() + + " checkpoint to " + checkpointToRecord); + } + checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken()); + lastCheckpointValue = checkpointToRecord; + } catch (ThrottlingException | ShutdownException | InvalidStateException + | KinesisClientLibDependencyException e) { + throw e; + } catch (KinesisClientLibException e) { + LOG.warn("Caught exception setting checkpoint.", e); + throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e); } - checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken()); - lastCheckpointValue = checkpointToRecord; - } catch (ThrottlingException | ShutdownException | InvalidStateException - | KinesisClientLibDependencyException e) { - throw e; - } catch (KinesisClientLibException e) { - LOG.warn("Caught exception setting checkpoint.", e); - throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e); + } + } finally { + if (unsetMetrics) { + MetricsHelper.unsetMetricsScope(); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 95cc663e..4a001b9b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -170,7 +170,8 @@ class ShardConsumer { new SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), + metricsFactory), leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java index 4599fbaa..bf104cff 100644 --- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java @@ -72,13 +72,22 @@ public class MetricsHelper { * @param scope */ public static void setMetricsScope(IMetricsScope scope) { - if (currentScope.get() != null) { + if (isMetricsScopePresent()) { throw new RuntimeException(String.format( "Metrics scope is already set for the current thread %s", Thread.currentThread().getName())); } currentScope.set(scope); } + /** + * Checks if current metricsscope is present or not. + * + * @return true if metrics scope is present, else returns false + */ + public static boolean isMetricsScopePresent() { + return currentScope.get() != null; + } + /** * Unsets the metrics scope for the current thread. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 31a1e184..7e637457 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -14,6 +14,13 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -23,7 +30,10 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; @@ -31,15 +41,15 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Matchers.anyString; - /** * */ +@RunWith(MockitoJUnitRunner.class) public class RecordProcessorCheckpointerTest { private String startingSequenceNumber = "13"; private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber); @@ -48,6 +58,9 @@ public class RecordProcessorCheckpointerTest { private ShardInfo shardInfo; private SequenceNumberValidator sequenceNumberValidator; private String shardId = "shardId-123"; + + @Mock + IMetricsFactory metricsFactory; /** * @throws java.lang.Exception @@ -78,7 +91,7 @@ public class RecordProcessorCheckpointerTest { public final void testCheckpoint() throws Exception { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, null); + new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.checkpoint(); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); @@ -98,7 +111,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025"); @@ -114,7 +127,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointSubRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030"); @@ -131,7 +144,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -146,7 +159,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -162,7 +175,7 @@ public class RecordProcessorCheckpointerTest { public final void testPrepareCheckpoint() throws Exception { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); @@ -193,7 +206,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025"); @@ -218,7 +231,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointSubRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030"); @@ -244,7 +257,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -268,7 +281,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -291,7 +304,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040")); @@ -323,7 +336,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040")); @@ -358,7 +371,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testUpdate() throws Exception { - RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); + RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); checkpointer.setLargestPermittedCheckpointValue(sequenceNumber); @@ -379,7 +392,7 @@ public class RecordProcessorCheckpointerTest { SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); // Several checkpoints we're gonna hit ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); @@ -467,7 +480,7 @@ public class RecordProcessorCheckpointerTest { SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); // Several checkpoints we're gonna hit ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); @@ -595,7 +608,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER); } } @@ -615,7 +628,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER); } } @@ -636,7 +649,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER); } } @@ -785,4 +798,34 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); } } + + @Test + public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception { + // First call to checkpoint + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); + ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); + processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); + processingCheckpointer.checkpoint(); + Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); + verify(metricsFactory).createMetrics(); + Assert.assertFalse(MetricsHelper.isMetricsScopePresent()); + } + + @Test + public final void testSetMetricsScopeDuringCheckpointing() throws Exception { + // First call to checkpoint + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); + NullMetricsScope scope = new NullMetricsScope(); + MetricsHelper.setMetricsScope(scope); + ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); + processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); + processingCheckpointer.checkpoint(); + Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); + verify(metricsFactory, never()).createMetrics(); + Assert.assertTrue(MetricsHelper.isMetricsScopePresent()); + assertEquals(scope, MetricsHelper.getMetricsScope()); + MetricsHelper.unsetMetricsScope(); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 8a91c6e6..216d59cd 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -342,7 +342,8 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ) + ), + metricsFactory ); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); @@ -493,7 +494,8 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ) + ), + metricsFactory ); ShardConsumer consumer = @@ -621,7 +623,8 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ) + ), + metricsFactory ); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); From 8d6c7692cbcc883591bb6ed89b573f68153d41d5 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 26 Jan 2018 07:16:46 -0800 Subject: [PATCH 8/9] Updating tests to fix build issues. (#286) --- .../RecordProcessorCheckpointerTest.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 7e637457..a3153aec 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map.Entry; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -804,12 +805,20 @@ public class RecordProcessorCheckpointerTest { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); + IMetricsScope scope = null; + if (MetricsHelper.isMetricsScopePresent()) { + scope = MetricsHelper.getMetricsScope(); + MetricsHelper.unsetMetricsScope(); + } ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); processingCheckpointer.checkpoint(); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); verify(metricsFactory).createMetrics(); Assert.assertFalse(MetricsHelper.isMetricsScopePresent()); + if (scope != null) { + MetricsHelper.setMetricsScope(scope); + } } @Test @@ -817,15 +826,20 @@ public class RecordProcessorCheckpointerTest { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); - NullMetricsScope scope = new NullMetricsScope(); - MetricsHelper.setMetricsScope(scope); + boolean shouldUnset = false; + if (!MetricsHelper.isMetricsScopePresent()) { + shouldUnset = true; + MetricsHelper.setMetricsScope(new NullMetricsScope()); + } ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); processingCheckpointer.checkpoint(); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); verify(metricsFactory, never()).createMetrics(); Assert.assertTrue(MetricsHelper.isMetricsScopePresent()); - assertEquals(scope, MetricsHelper.getMetricsScope()); - MetricsHelper.unsetMetricsScope(); + assertEquals(NullMetricsScope.class, MetricsHelper.getMetricsScope().getClass()); + if (shouldUnset) { + MetricsHelper.unsetMetricsScope(); + } } } From 4309f2303227b7b23f2f227ca755127512990bb3 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 29 Jan 2018 11:41:16 -0800 Subject: [PATCH 9/9] Upgraded Java SDK Version 1.11.271 (#287) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 45327898..e035bfe4 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ - 1.11.261 + 1.11.271 1.0.392 libsqlite4java ${project.build.directory}/test-lib