Merge pull request #76 from ychunxue/prematureChildShardCreation

fix for premature childShard lease creation
This commit is contained in:
ychunxue 2020-07-28 18:00:18 -07:00 committed by GitHub
commit c5632e38bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 444 additions and 137 deletions

View file

@ -221,7 +221,6 @@ class ShardConsumer {
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration * @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases * @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/ */
@Deprecated @Deprecated
ShardConsumer(ShardInfo shardInfo, ShardConsumer(ShardInfo shardInfo,

View file

@ -15,10 +15,13 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils; import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -29,13 +32,13 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -45,7 +48,8 @@ class ShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 50;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
@ -107,99 +111,40 @@ class ShutdownTask implements ITask {
@Override @Override
public TaskResult call() { public TaskResult call() {
Exception exception; Exception exception;
boolean applicationException = false;
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
try { try {
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); final Runnable leaseLostAction = () -> takeLeaseLostAction();
ShutdownReason localReason = reason;
/* if (reason == ShutdownReason.TERMINATE) {
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
* workers to contend for the lease of this shard.
*/
if(localReason == ShutdownReason.TERMINATE) {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
try { try {
if (!CollectionUtils.isNullOrEmpty(childShards)) { takeShardEndAction(currentShardLease);
createLeasesForChildShardsIfNotExist();
updateCurrentLeaseWithChildShards();
} else {
LOG.warn("Shard " + shardInfo.getShardId()
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
}
} catch (InvalidStateException e) { } catch (InvalidStateException e) {
// If invalidStateException happens, it indicates we are missing childShard related information. // If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down.
// childShard information in the processTask. LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " +
localReason = ShutdownReason.ZOMBIE; "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
dropLease(); dropLease(currentShardLease);
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + throwOnApplicationException(leaseLostAction);
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
} }
} else {
throwOnApplicationException(leaseLostAction);
} }
// If we reached end of the shard, set sequence number to SHARD_END. LOG.debug("Shutting down retrieval strategy.");
if (localReason == ShutdownReason.TERMINATE) { getRecordsCache.shutdown();
recordProcessorCheckpointer.setSequenceNumberAtShardEnd( LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(localReason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
if (localReason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
}
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) {
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
//TODO: Add shard end checkpointing here.
}
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
} catch (Exception e) {
applicationException = true;
throw e;
} finally {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis,
MetricsLevel.SUMMARY);
}
return new TaskResult(null); return new TaskResult(null);
} catch (Exception e) { } catch (Exception e) {
if (applicationException) { if (e instanceof CustomerApplicationException) {
LOG.error("Application exception. ", e); LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e);
} else { } else {
LOG.error("Caught exception: ", e); LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e);
} }
exception = e; exception = e;
// backoff if we encounter an exception. // backoff if we encounter an exception.
try { try {
@ -212,7 +157,116 @@ class ShutdownTask implements ITask {
return new TaskResult(exception); return new TaskResult(exception);
} }
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(KinesisClientLease currentShardLease)
throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) {
throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
}
if (!CollectionUtils.isNullOrEmpty(childShards)) {
// If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information.
createLeasesForChildShardsIfNotExist();
updateCurrentLeaseWithChildShards(currentShardLease);
} else {
LOG.warn("Shard " + shardInfo.getShardId()
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
}
// Checkpoint with SHARD_END sequence number.
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false;
try {
isSuccess = attemptShardEndCheckpointing();
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
}
}
private void takeLeaseLostAction() {
final ShutdownInput leaseLostShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.ZOMBIE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(leaseLostShutdownInput);
}
private boolean attemptShardEndCheckpointing()
throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist."));
if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the recordProcessor to checkpoint with SHARD_END sequence number.
// The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown.
throwOnApplicationException(() -> applicationCheckpointAndVerification());
}
return true;
}
private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
final ShutdownInput shardEndShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(shardEndShutdownInput);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
}
}
private void throwOnApplicationException(Runnable action) throws CustomerApplicationException {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e);
}
}
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
// not present in the lease table before creating the lease entry.
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
final ChildShard childShard = childShards.get(0);
final List<String> parentLeaseKeys = childShard.getParentShards();
if (parentLeaseKeys.size() != 2) {
throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard
+ " does not contain other parent information.");
} else {
boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) ==
Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1)));
if (!isValidLeaseTableState) {
if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
throw new BlockedOnParentShardException(
"Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
} else {
throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table.");
}
}
}
}
// Attempt create leases for child shards.
for (ChildShard childShard : childShards) { for (ChildShard childShard : childShards) {
final String leaseKey = childShard.getShardId(); final String leaseKey = childShard.getShardId();
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
@ -223,18 +277,19 @@ class ShutdownTask implements ITask {
} }
} }
private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { /**
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); * Returns true for 1 in N probability.
if (currentLease == null) { */
throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId()); @VisibleForTesting
} boolean isOneInNProbability(int n) {
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); Random r = new Random();
return 1 == r.nextInt((n - 1) + 1) + 1;
}
private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
currentLease.setChildShardIds(childShardIds); currentLease.setChildShardIds(childShardIds);
final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken())); leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS);
if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId());
}
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
} }
@ -254,13 +309,12 @@ class ShutdownTask implements ITask {
return reason; return reason;
} }
private void dropLease() { private void dropLease(KinesisClientLease currentShardLease) {
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); if (currentShardLease == null) {
if (lease == null) { LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly.");
LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
return; return;
} }
leaseCoordinator.dropLease(lease); leaseCoordinator.dropLease(currentShardLease);
LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey());
} }
} }

View file

@ -733,12 +733,7 @@ public class Worker implements Runnable {
} }
} }
if (!leaseCleanupManager.isRunning()) { leaseCleanupManager.start();
LOG.info("Starting LeaseCleanupManager.");
leaseCleanupManager.start();
} else {
LOG.info("LeaseCleanupManager is already running. No need to start it.");
}
// If we reach this point, then we either skipped the lease sync or did not have any exception for the // If we reach this point, then we either skipped the lease sync or did not have any exception for the
// shard sync in the previous attempt. // shard sync in the previous attempt.

View file

@ -0,0 +1,24 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.exceptions;
public class CustomerApplicationException extends Exception {
public CustomerApplicationException(Throwable t) {super(t);}
public CustomerApplicationException(String message, Throwable t) {super(message, t);}
public CustomerApplicationException(String message) {super(message);}
}

View file

@ -120,6 +120,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
return baseSerializer.getDynamoNonexistantExpectation(); return baseSerializer.getDynamoNonexistantExpectation();
} }
@Override
public Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
return baseSerializer.getDynamoExistentExpectation(leaseKey);
}
@Override @Override
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(KinesisClientLease lease) { public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(KinesisClientLease lease) {
return baseSerializer.getDynamoLeaseCounterUpdate(lease); return baseSerializer.getDynamoLeaseCounterUpdate(lease);

View file

@ -114,13 +114,16 @@ public class LeaseCleanupManager {
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis} * {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
*/ */
public void start() { public void start() {
LOG.debug("Starting lease cleanup thread."); if (!isRunning) {
isRunning = true; LOG.info("Starting lease cleanup thread.");
completedLeaseStopwatch.start(); completedLeaseStopwatch.start();
garbageLeaseStopwatch.start(); garbageLeaseStopwatch.start();
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS); isRunning = true;
} else {
LOG.info("Lease cleanup thread already running, no need to start.");
}
} }
/** /**

View file

@ -620,7 +620,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
UpdateItemRequest request = new UpdateItemRequest(); UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table); request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease)); request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey()));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
@ -628,6 +628,8 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
try { try {
dynamoDBClient.updateItem(request); dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
LOG.warn("Lease update failed for lease with key " + lease.getLeaseKey() + " because the lease did not exist at the time of the update", e);
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e);
} }

View file

@ -137,6 +137,18 @@ public class LeaseSerializer implements ILeaseSerializer<Lease> {
return result; return result;
} }
@Override
public Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
Map<String, ExpectedAttributeValue> result = new HashMap<>();
ExpectedAttributeValue expectedAV = new ExpectedAttributeValue();
expectedAV.setValue(DynamoUtils.createAttributeValue(leaseKey));
expectedAV.setExists(true);
result.put(LEASE_KEY_KEY, expectedAV);
return result;
}
@Override @Override
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Lease lease) { public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Lease lease) {
return getDynamoLeaseCounterUpdate(lease.getLeaseCounter()); return getDynamoLeaseCounterUpdate(lease.getLeaseCounter());

View file

@ -79,6 +79,13 @@ public interface ILeaseSerializer<T extends Lease> {
*/ */
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation(); public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation();
/**
* @return the attribute value map asserting that a lease does exist.
*/
default Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(final String leaseKey) {
throw new UnsupportedOperationException("DynamoExistentExpectation is not implemented");
}
/** /**
* @param lease * @param lease
* @return the attribute value map that increments a lease counter * @return the attribute value map that increments a lease counter

View file

@ -583,7 +583,12 @@ public class ShardConsumerTest {
.thenReturn(getRecordsCache); .thenReturn(getRecordsCache);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); List<String> parentShardIds = new ArrayList<>();
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo, shardInfo,
@ -705,7 +710,11 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); List<String> parentShardIds = new ArrayList<>();
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
@ -758,11 +767,7 @@ public class ShardConsumerTest {
shardSyncer, shardSyncer,
shardSyncStrategy); shardSyncStrategy);
List<String> parentShardIds = new ArrayList<>(); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
parentShardIds.add(shardInfo.getShardId());
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
"leaseOwner",
parentShardIds));
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));

View file

@ -14,11 +14,15 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -31,10 +35,15 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@ -56,6 +65,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE;
/** /**
* *
@ -73,8 +83,8 @@ public class ShutdownTaskTest {
defaultConcurrencyToken, defaultConcurrencyToken,
defaultParentShardIds, defaultParentShardIds,
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
IRecordProcessor defaultRecordProcessor = new TestStreamlet();
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
IMetricsFactory metricsFactory = new NullMetricsFactory();
@Mock @Mock
@ -88,6 +98,8 @@ public class ShutdownTaskTest {
@Mock @Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator; private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock @Mock
private IRecordProcessor defaultRecordProcessor;
@Mock
private LeaseCleanupManager leaseCleanupManager; private LeaseCleanupManager leaseCleanupManager;
/** /**
@ -111,9 +123,11 @@ public class ShutdownTaskTest {
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(getRecordsCache).shutdown(); doNothing().when(getRecordsCache).shutdown();
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease);
} }
/** /**
@ -146,13 +160,12 @@ public class ShutdownTaskTest {
getRecordsCache, getRecordsCache,
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof CustomerApplicationException);
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + final String expectedExceptionMessage = "Customer application throws exception for shard shardId-0";
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage()); Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage());
} }
@ -182,7 +195,7 @@ public class ShutdownTaskTest {
getRecordsCache, getRecordsCache,
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager);
TaskResult result = task.call(); TaskResult result = task.call();
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
@ -190,6 +203,136 @@ public class ShutdownTaskTest {
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
} }
@Test
public final void testCallWhenParentInfoNotPresentInLease() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease);
// Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards(),
leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache, never()).shutdown();
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
}
// Make next attempt with complete parent info in lease table
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards(),
leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNull(result.getException());
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator, never()).dropLease(currentLease);
}
@Test
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null);
for (int i = 0; i < 10; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards(),
leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache, never()).shutdown();
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
}
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards(),
leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
TaskResult result = task.call();
assertNull(result.getException());
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator).dropLease(currentLease);
}
@Test @Test
public final void testCallWhenShardEnd() throws Exception { public final void testCallWhenShardEnd() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
@ -210,11 +353,11 @@ public class ShutdownTaskTest {
getRecordsCache, getRecordsCache,
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
} }
@ -248,7 +391,7 @@ public class ShutdownTaskTest {
leaseCleanupManager); leaseCleanupManager);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
} }
@ -278,7 +421,7 @@ public class ShutdownTaskTest {
leaseCleanupManager); leaseCleanupManager);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
} }
@ -296,7 +439,7 @@ public class ShutdownTaskTest {
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }
private List<ChildShard> constructChildShards() { private List<ChildShard> constructSplitChildShards() {
List<ChildShard> childShards = new ArrayList<>(); List<ChildShard> childShards = new ArrayList<>();
List<String> parentShards = new ArrayList<>(); List<String> parentShards = new ArrayList<>();
parentShards.add(defaultShardId); parentShards.add(defaultShardId);
@ -315,6 +458,21 @@ public class ShutdownTaskTest {
return childShards; return childShards;
} }
private List<ChildShard> constructMergeChildShards() {
List<ChildShard> childShards = new ArrayList<>();
List<String> parentShards = new ArrayList<>();
parentShards.add(defaultShardId);
parentShards.add("ShardId-1");
ChildShard childShard = new ChildShard();
childShard.setShardId("ShardId-2");
childShard.setParentShards(parentShards);
childShard.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"));
childShards.add(childShard);
return childShards;
}
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) { private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
KinesisClientLease lease = new KinesisClientLease(); KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey(leaseKey); lease.setLeaseKey(leaseKey);

View file

@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -82,6 +83,16 @@ public class LeaseCleanupManagerTest {
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
} }
/**
* Tests subsequent calls to start {@link LeaseCleanupManager}.
*/
@Test
public final void testSubsequentStarts() {
leaseCleanupManager.start();
Assert.assertTrue(leaseCleanupManager.isRunning());
leaseCleanupManager.start();
}
/** /**
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
* shard case. * shard case.

View file

@ -27,6 +27,7 @@ import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.TableStatus; import com.amazonaws.services.dynamodbv2.model.TableStatus;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
@ -124,6 +125,37 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
Assert.assertFalse(leaseManager.renewLease(leaseCopy)); Assert.assertFalse(leaseManager.renewLease(leaseCopy));
} }
/**
* Tests leaseManager.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info
*/
@Test
public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
KinesisClientLease lease = builder.withLease("1").build().get("1");
final String leaseKey = lease.getLeaseKey();
leaseManager.deleteLease(lease);
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
final KinesisClientLease deletedLease = leaseManager.getLease(leaseKey);
Assert.assertNull(deletedLease);
}
/**
* Tests leaseManager.updateLeaseWithMetaInfo() on hashKeyRange update
*/
@Test
public void testUpdateLeaseWithMetaInfo() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
KinesisClientLease lease = builder.withLease("1").build().get("1");
final String leaseKey = lease.getLeaseKey();
final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(new HashKeyRange()
.withStartingHashKey("1")
.withEndingHashKey("2"));
lease.setHashKeyRange(hashKeyRangeForLease);
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
final KinesisClientLease updatedLease = leaseManager.getLease(leaseKey);
Assert.assertEquals(lease, updatedLease);
}
/** /**
* Tests takeLease when the lease is not already owned. * Tests takeLease when the lease is not already owned.
*/ */