addressing comments

This commit is contained in:
Chunxue Yang 2020-07-28 17:51:50 -07:00
parent 38cef8963a
commit 22737c4a5b
9 changed files with 93 additions and 86 deletions

View file

@ -532,8 +532,7 @@ class ConsumerStates {
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards(), consumer.getShardSyncStrategy(), consumer.getChildShards(),
consumer.getLeaseCleanupManager(), consumer.getLeaseCleanupManager());
consumer.getMetricsFactory());
} }
@Override @Override

View file

@ -54,7 +54,6 @@ class ShardConsumer {
private final ExecutorService executorService; private final ExecutorService executorService;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final KinesisDataFetcher dataFetcher; private final KinesisDataFetcher dataFetcher;
@Getter
private final IMetricsFactory metricsFactory; private final IMetricsFactory metricsFactory;
private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint; private ICheckpoint checkpoint;

View file

@ -22,7 +22,6 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils; import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -33,8 +32,6 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.List; import java.util.List;
@ -51,10 +48,8 @@ class ShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
@VisibleForTesting @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 10; static final int RETRY_RANDOM_MAX_RANGE = 50;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
@ -72,7 +67,6 @@ class ShutdownTask implements ITask {
private final ShardSyncStrategy shardSyncStrategy; private final ShardSyncStrategy shardSyncStrategy;
private final List<ChildShard> childShards; private final List<ChildShard> childShards;
private final LeaseCleanupManager leaseCleanupManager; private final LeaseCleanupManager leaseCleanupManager;
private final IMetricsFactory metricsFactory;
/** /**
* Constructor. * Constructor.
@ -90,7 +84,7 @@ class ShutdownTask implements ITask {
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards, ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) { LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -106,7 +100,6 @@ class ShutdownTask implements ITask {
this.shardSyncStrategy = shardSyncStrategy; this.shardSyncStrategy = shardSyncStrategy;
this.childShards = childShards; this.childShards = childShards;
this.leaseCleanupManager = leaseCleanupManager; this.leaseCleanupManager = leaseCleanupManager;
this.metricsFactory = metricsFactory;
} }
/* /*
@ -117,61 +110,55 @@ class ShutdownTask implements ITask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION);
Exception exception; Exception exception;
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
try { try {
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); final Runnable leaseLostAction = () -> takeLeaseLostAction();
try { if (reason == ShutdownReason.TERMINATE) {
final long startTime = System.currentTimeMillis();
final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final Runnable leaseLostAction = () -> takeLeaseLostAction();
if (reason == ShutdownReason.TERMINATE) {
try {
takeShardEndAction(currentShardLease, startTime);
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down.
LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " +
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
dropLease(currentShardLease);
throwOnApplicationException(leaseLostAction, startTime);
}
} else {
throwOnApplicationException(leaseLostAction, startTime);
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
return new TaskResult(null);
} catch (Exception e) {
if (e instanceof CustomerApplicationException) {
LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e);
} else {
LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e);
}
exception = e;
// backoff if we encounter an exception.
try { try {
Thread.sleep(this.backoffTimeMillis); takeShardEndAction(currentShardLease);
} catch (InterruptedException ie) { } catch (InvalidStateException e) {
LOG.debug("Interrupted sleep", ie); // If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down.
LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " +
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
dropLease(currentShardLease);
throwOnApplicationException(leaseLostAction);
} }
} else {
throwOnApplicationException(leaseLostAction);
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
return new TaskResult(null);
} catch (Exception e) {
if (e instanceof CustomerApplicationException) {
LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e);
} else {
LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug("Interrupted sleep", ie);
} }
} finally {
MetricsHelper.endScope();
} }
return new TaskResult(exception); return new TaskResult(exception);
} }
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime) private void takeShardEndAction(KinesisClientLease currentShardLease)
throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException {
// Create new lease for the child shards if they don't exist. // Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
@ -194,7 +181,7 @@ class ShutdownTask implements ITask {
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false; boolean isSuccess = false;
try { try {
isSuccess = attemptShardEndCheckpointing(startTime); isSuccess = attemptShardEndCheckpointing();
} finally { } finally {
// Check if either the shard end ddb persist is successful or // Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to // if childshards is empty. When child shards is empty then either it is due to
@ -214,14 +201,14 @@ class ShutdownTask implements ITask {
recordProcessor.shutdown(leaseLostShutdownInput); recordProcessor.shutdown(leaseLostShutdownInput);
} }
private boolean attemptShardEndCheckpointing(long startTime) private boolean attemptShardEndCheckpointing()
throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist."));
if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the recordProcessor to checkpoint with SHARD_END sequence number. // Call the recordProcessor to checkpoint with SHARD_END sequence number.
// The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime); throwOnApplicationException(() -> applicationCheckpointAndVerification());
} }
return true; return true;
} }
@ -246,13 +233,11 @@ class ShutdownTask implements ITask {
} }
} }
private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException { private void throwOnApplicationException(Runnable action) throws CustomerApplicationException {
try { try {
action.run(); action.run();
} catch (Exception e) { } catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e);
} finally {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
} }
} }

View file

@ -121,8 +121,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
} }
@Override @Override
public Map<String, ExpectedAttributeValue> getDynamoExistantExpectation(final String leaseKey) { public Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
return baseSerializer.getDynamoExistantExpectation(leaseKey); return baseSerializer.getDynamoExistentExpectation(leaseKey);
} }
@Override @Override

View file

@ -620,7 +620,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
UpdateItemRequest request = new UpdateItemRequest(); UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table); request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease)); request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoExistantExpectation(lease.getLeaseKey())); request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey()));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
@ -628,6 +628,8 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
try { try {
dynamoDBClient.updateItem(request); dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
LOG.warn("Lease update failed for lease with key " + lease.getLeaseKey() + " because the lease did not exist at the time of the update", e);
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e);
} }

View file

@ -138,7 +138,7 @@ public class LeaseSerializer implements ILeaseSerializer<Lease> {
} }
@Override @Override
public Map<String, ExpectedAttributeValue> getDynamoExistantExpectation(final String leaseKey) { public Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
Map<String, ExpectedAttributeValue> result = new HashMap<>(); Map<String, ExpectedAttributeValue> result = new HashMap<>();
ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(); ExpectedAttributeValue expectedAV = new ExpectedAttributeValue();

View file

@ -82,8 +82,8 @@ public interface ILeaseSerializer<T extends Lease> {
/** /**
* @return the attribute value map asserting that a lease does exist. * @return the attribute value map asserting that a lease does exist.
*/ */
default Map<String, ExpectedAttributeValue> getDynamoExistantExpectation(final String leaseKey) { default Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); throw new UnsupportedOperationException("DynamoExistentExpectation is not implemented");
} }
/** /**

View file

@ -161,8 +161,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager, leaseCleanupManager);
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof CustomerApplicationException); Assert.assertTrue(result.getException() instanceof CustomerApplicationException);
@ -197,8 +196,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager, leaseCleanupManager);
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
@ -236,8 +234,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager, leaseCleanupManager));
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -263,8 +260,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager, leaseCleanupManager));
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
@ -302,8 +298,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager, leaseCleanupManager));
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -328,8 +323,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager, leaseCleanupManager));
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
@ -360,8 +354,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager, leaseCleanupManager);
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -395,8 +388,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
Collections.emptyList(), Collections.emptyList(),
leaseCleanupManager, leaseCleanupManager);
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -426,8 +418,7 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
Collections.emptyList(), Collections.emptyList(),
leaseCleanupManager, leaseCleanupManager);
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -444,8 +435,7 @@ public class ShutdownTaskTest {
ShutdownTask task = new ShutdownTask(null, null, null, null, ShutdownTask task = new ShutdownTask(null, null, null, null,
null, null, false, null, null, false,
false, leaseCoordinator, 0, false, leaseCoordinator, 0,
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager);
leaseCleanupManager, metricsFactory);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }

View file

@ -27,6 +27,7 @@ import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.TableStatus; import com.amazonaws.services.dynamodbv2.model.TableStatus;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
@ -124,6 +125,37 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
Assert.assertFalse(leaseManager.renewLease(leaseCopy)); Assert.assertFalse(leaseManager.renewLease(leaseCopy));
} }
/**
* Tests leaseManager.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info
*/
@Test
public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
KinesisClientLease lease = builder.withLease("1").build().get("1");
final String leaseKey = lease.getLeaseKey();
leaseManager.deleteLease(lease);
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
final KinesisClientLease deletedLease = leaseManager.getLease(leaseKey);
Assert.assertNull(deletedLease);
}
/**
* Tests leaseManager.updateLeaseWithMetaInfo() on hashKeyRange update
*/
@Test
public void testUpdateLeaseWithMetaInfo() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
KinesisClientLease lease = builder.withLease("1").build().get("1");
final String leaseKey = lease.getLeaseKey();
final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(new HashKeyRange()
.withStartingHashKey("1")
.withEndingHashKey("2"));
lease.setHashKeyRange(hashKeyRangeForLease);
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
final KinesisClientLease updatedLease = leaseManager.getLease(leaseKey);
Assert.assertEquals(lease, updatedLease);
}
/** /**
* Tests takeLease when the lease is not already owned. * Tests takeLease when the lease is not already owned.
*/ */