Addressing comments

This commit is contained in:
Chunxue Yang 2020-04-08 16:47:37 -07:00
parent 4b3c717c53
commit 0715903456
6 changed files with 90 additions and 60 deletions

View file

@ -165,19 +165,6 @@ public class HierarchicalShardSyncer {
}
}
public synchronized Lease createLeaseForChildShard(ChildShard childShard) throws InvalidStateException {
Lease newLease = new Lease();
newLease.leaseKey(childShard.shardId());
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
newLease.parentShardIds(childShard.parentShards());
} else {
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
}
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L);
return newLease;
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Note: This method has package level access solely for testing purposes.
@ -722,6 +709,41 @@ public class HierarchicalShardSyncer {
}
}
public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier);
return multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLeaseForChildShard(childShard, streamIdentifier)
: newKCLLeaseForChildShard(childShard);
}
private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException {
Lease newLease = new Lease();
newLease.leaseKey(childShard.shardId());
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
newLease.parentShardIds(childShard.parentShards());
} else {
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
}
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L);
return newLease;
}
private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), childShard.shardId()));
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
newLease.parentShardIds(childShard.parentShards());
} else {
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
}
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(childShard.shardId());
return newLease;
}
/**
* Helper method to create a new Lease POJO for a shard.
* Note: Package level access only for testing purposes

View file

@ -0,0 +1,10 @@
package software.amazon.kinesis.leases.exceptions;
public class CustomerApplicationException extends Exception {
public CustomerApplicationException(Throwable e) { super(e);}
public CustomerApplicationException(String message, Throwable e) { super(message, e);}
public CustomerApplicationException(String message) { super(message);}
}

View file

@ -31,11 +31,11 @@ import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
@ -100,7 +100,6 @@ public class ShutdownTask implements ConsumerTask {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION);
Exception exception;
boolean applicationException = false;
try {
try {
@ -117,49 +116,30 @@ public class ShutdownTask implements ConsumerTask {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
// Call the shardReocrdsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the Shard_End checkpointing is successful after calling shardEnded.
try {
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
} catch (Exception e) {
applicationException = true;
throw e;
} finally {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
}
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointWithShardEnd(), scope, startTime);
} else {
try {
shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
} catch (Exception e) {
applicationException = true;
throw e;
}
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
}
log.debug("Shutting down retrieval strategy.");
log.debug("Shutting down retrieval strategy for shard {}.", shardInfoIdProvider.apply(shardInfo));
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo));
return new TaskResult(null);
} catch (Exception e) {
if (applicationException) {
log.error("Application exception. ", e);
if (e instanceof CustomerApplicationException) {
log.error("Shard {}: Application exception. ", shardInfoIdProvider.apply(shardInfo), e);
} else {
log.error("Caught exception: ", e);
log.error("Shard {}: Caught exception: ", shardInfoIdProvider.apply(shardInfo), e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep", ie);
log.debug("Shard{}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie);
}
}
} finally {
@ -169,11 +149,32 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception);
}
private void applicationCheckpointWithShardEnd() {
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
}
private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, final long startTime) throws CustomerApplicationException {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfoIdProvider.apply(shardInfo) +": ", e);
} finally {
MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
}
}
private void createLeasesForChildShardsIfNotExist()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
for(ChildShard childShard : childShards) {
if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) {
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard);
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
}
}

View file

@ -121,13 +121,7 @@ public class KinesisDataFetcher {
if (nextIterator != null) {
try {
GetRecordsResponse getRecordsResponse = getRecords(nextIterator);
while (!isValidResponse(getRecordsResponse)) {
log.error("{} : GetRecords response is not valid. nextShardIterator: {}. childShards: {}. Will retry GetRecords with the same nextIterator.",
shardId, getRecordsResponse.nextShardIterator(), getRecordsResponse.childShards());
getRecordsResponse = getRecords(nextIterator);
}
return new AdvancingResult(getRecordsResponse);
return new AdvancingResult(getRecords(nextIterator));
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId);
return TERMINAL_RESULT;
@ -188,11 +182,6 @@ public class KinesisDataFetcher {
}
}
private boolean isValidResponse(GetRecordsResponse response) {
return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards())
: response.childShards() != null && response.childShards().isEmpty();
}
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
@ -297,6 +286,11 @@ public class KinesisDataFetcher {
try {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
if (!isValidResponse(response)) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
}
success = true;
return response;
} catch (ExecutionException e) {
@ -314,6 +308,11 @@ public class KinesisDataFetcher {
}
}
private boolean isValidResponse(GetRecordsResponse response) {
return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards())
: response.childShards() != null && response.childShards().isEmpty();
}
private AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);

View file

@ -35,7 +35,6 @@ import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -61,8 +60,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import javax.swing.*;
import javax.swing.text.AsyncBoxView;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {

View file

@ -52,6 +52,7 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -125,7 +126,7 @@ public class ShutdownTaskTest {
final TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof IllegalArgumentException);
assertTrue(result.getException() instanceof CustomerApplicationException);
}
/**