diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index b5cde2bc..5cf55dbf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -532,8 +532,7 @@ class ConsumerStates { consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy(), consumer.getChildShards(), - consumer.getLeaseCleanupManager(), - consumer.getMetricsFactory()); + consumer.getLeaseCleanupManager()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 10600def..29a95ac4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -54,7 +54,6 @@ class ShardConsumer { private final ExecutorService executorService; private final ShardInfo shardInfo; private final KinesisDataFetcher dataFetcher; - @Getter private final IMetricsFactory metricsFactory; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index a9ec8d76..274aaaa1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -22,7 +22,6 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.UpdateField; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -33,8 +32,6 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import java.util.List; @@ -51,10 +48,8 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); - private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; - private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting - static final int RETRY_RANDOM_MAX_RANGE = 10; + static final int RETRY_RANDOM_MAX_RANGE = 50; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -72,7 +67,6 @@ class ShutdownTask implements ITask { private final ShardSyncStrategy shardSyncStrategy; private final List childShards; private final LeaseCleanupManager leaseCleanupManager; - private final IMetricsFactory metricsFactory; /** * Constructor. @@ -90,7 +84,7 @@ class ShutdownTask implements ITask { long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) { + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -106,7 +100,6 @@ class ShutdownTask implements ITask { this.shardSyncStrategy = shardSyncStrategy; this.childShards = childShards; this.leaseCleanupManager = leaseCleanupManager; - this.metricsFactory = metricsFactory; } /* @@ -117,61 +110,55 @@ class ShutdownTask implements ITask { */ @Override public TaskResult call() { - MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION); Exception exception; + LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + try { - LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " - + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final Runnable leaseLostAction = () -> takeLeaseLostAction(); - try { - final long startTime = System.currentTimeMillis(); - final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - final Runnable leaseLostAction = () -> takeLeaseLostAction(); - - if (reason == ShutdownReason.TERMINATE) { - try { - takeShardEndAction(currentShardLease, startTime); - } catch (InvalidStateException e) { - // If InvalidStateException happens, it indicates we have a non recoverable error in short term. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. - LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); - dropLease(currentShardLease); - throwOnApplicationException(leaseLostAction, startTime); - } - } else { - throwOnApplicationException(leaseLostAction, startTime); - } - - LOG.debug("Shutting down retrieval strategy."); - getRecordsCache.shutdown(); - LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); - return new TaskResult(null); - } catch (Exception e) { - if (e instanceof CustomerApplicationException) { - LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); - } else { - LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); - } - - exception = e; - // backoff if we encounter an exception. + if (reason == ShutdownReason.TERMINATE) { try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug("Interrupted sleep", ie); + takeShardEndAction(currentShardLease); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. + LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction); } + } else { + throwOnApplicationException(leaseLostAction); + } + + LOG.debug("Shutting down retrieval strategy."); + getRecordsCache.shutdown(); + LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); + return new TaskResult(null); + } catch (Exception e) { + if (e instanceof CustomerApplicationException) { + LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); + } else { + LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); + } + + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug("Interrupted sleep", ie); } - } finally { - MetricsHelper.endScope(); } return new TaskResult(exception); } // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. - private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime) + private void takeShardEndAction(KinesisClientLease currentShardLease) throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { // Create new lease for the child shards if they don't exist. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. @@ -194,7 +181,7 @@ class ShutdownTask implements ITask { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { - isSuccess = attemptShardEndCheckpointing(startTime); + isSuccess = attemptShardEndCheckpointing(); } finally { // Check if either the shard end ddb persist is successful or // if childshards is empty. When child shards is empty then either it is due to @@ -214,14 +201,14 @@ class ShutdownTask implements ITask { recordProcessor.shutdown(leaseLostShutdownInput); } - private boolean attemptShardEndCheckpointing(long startTime) + private boolean attemptShardEndCheckpointing() throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { // Call the recordProcessor to checkpoint with SHARD_END sequence number. // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime); + throwOnApplicationException(() -> applicationCheckpointAndVerification()); } return true; } @@ -246,13 +233,11 @@ class ShutdownTask implements ITask { } } - private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException { + private void throwOnApplicationException(Runnable action) throws CustomerApplicationException { try { action.run(); } catch (Exception e) { throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); - } finally { - MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 966faee4..310edb67 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -121,8 +121,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoExistantExpectation(final String leaseKey) { - return baseSerializer.getDynamoExistantExpectation(leaseKey); + public Map getDynamoExistentExpectation(final String leaseKey) { + return baseSerializer.getDynamoExistentExpectation(leaseKey); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 36f56dda..0c70aaa7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -620,7 +620,7 @@ public class LeaseManager implements ILeaseManager { UpdateItemRequest request = new UpdateItemRequest(); request.setTableName(table); request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoExistantExpectation(lease.getLeaseKey())); + request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); @@ -628,6 +628,8 @@ public class LeaseManager implements ILeaseManager { try { dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + LOG.warn("Lease update failed for lease with key " + lease.getLeaseKey() + " because the lease did not exist at the time of the update", e); } catch (AmazonClientException e) { throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index 29bf0f9b..85381560 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -138,7 +138,7 @@ public class LeaseSerializer implements ILeaseSerializer { } @Override - public Map getDynamoExistantExpectation(final String leaseKey) { + public Map getDynamoExistentExpectation(final String leaseKey) { Map result = new HashMap<>(); ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java index 8ff56b7d..2d9ea0c9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java @@ -82,8 +82,8 @@ public interface ILeaseSerializer { /** * @return the attribute value map asserting that a lease does exist. */ - default Map getDynamoExistantExpectation(final String leaseKey) { - throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); + default Map getDynamoExistentExpectation(final String leaseKey) { + throw new UnsupportedOperationException("DynamoExistentExpectation is not implemented"); } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 2942dcb7..8b67f5dc 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -161,8 +161,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof CustomerApplicationException); @@ -197,8 +196,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); @@ -236,8 +234,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -263,8 +260,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); @@ -302,8 +298,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -328,8 +323,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); @@ -360,8 +354,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -395,8 +388,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -426,8 +418,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -444,8 +435,7 @@ public class ShutdownTaskTest { ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, - getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, metricsFactory); + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index d0b0813d..c04ee44f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -27,6 +27,7 @@ import com.amazonaws.services.dynamodbv2.model.ListTablesResult; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.TableStatus; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.model.HashKeyRange; import junit.framework.Assert; import org.junit.Test; @@ -124,6 +125,37 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { Assert.assertFalse(leaseManager.renewLease(leaseCopy)); } + /** + * Tests leaseManager.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info + */ + @Test + public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); + KinesisClientLease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.getLeaseKey(); + leaseManager.deleteLease(lease); + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final KinesisClientLease deletedLease = leaseManager.getLease(leaseKey); + Assert.assertNull(deletedLease); + } + + /** + * Tests leaseManager.updateLeaseWithMetaInfo() on hashKeyRange update + */ + @Test + public void testUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); + KinesisClientLease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.getLeaseKey(); + final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(new HashKeyRange() + .withStartingHashKey("1") + .withEndingHashKey("2")); + lease.setHashKeyRange(hashKeyRangeForLease); + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final KinesisClientLease updatedLease = leaseManager.getLease(leaseKey); + Assert.assertEquals(lease, updatedLease); + } + /** * Tests takeLease when the lease is not already owned. */