diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 387d9155..3782d177 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -165,19 +165,6 @@ public class HierarchicalShardSyncer { } } - public synchronized Lease createLeaseForChildShard(ChildShard childShard) throws InvalidStateException { - Lease newLease = new Lease(); - newLease.leaseKey(childShard.shardId()); - if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) { - newLease.parentShardIds(childShard.parentShards()); - } else { - throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); - } - newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); - newLease.ownerSwitchesSinceCheckpoint(0L); - return newLease; - } - // CHECKSTYLE:ON CyclomaticComplexity /** Note: This method has package level access solely for testing purposes. @@ -722,6 +709,41 @@ public class HierarchicalShardSyncer { } } + public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException { + final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier); + + return multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLeaseForChildShard(childShard, streamIdentifier) + : newKCLLeaseForChildShard(childShard); + } + + private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException { + Lease newLease = new Lease(); + newLease.leaseKey(childShard.shardId()); + if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) { + newLease.parentShardIds(childShard.parentShards()); + } else { + throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); + } + newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + newLease.ownerSwitchesSinceCheckpoint(0L); + return newLease; + } + + private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException { + MultiStreamLease newLease = new MultiStreamLease(); + newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), childShard.shardId())); + if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) { + newLease.parentShardIds(childShard.parentShards()); + } else { + throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); + } + newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + newLease.ownerSwitchesSinceCheckpoint(0L); + newLease.streamIdentifier(streamIdentifier.serialize()); + newLease.shardId(childShard.shardId()); + return newLease; + } + /** * Helper method to create a new Lease POJO for a shard. * Note: Package level access only for testing purposes diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java new file mode 100644 index 00000000..40121e98 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java @@ -0,0 +1,10 @@ +package software.amazon.kinesis.leases.exceptions; + +public class CustomerApplicationException extends Exception { + + public CustomerApplicationException(Throwable e) { super(e);} + + public CustomerApplicationException(String message, Throwable e) { super(message, e);} + + public CustomerApplicationException(String message) { super(message);} +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 66ed7cbc..95e52068 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -31,11 +31,11 @@ import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; -import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsScope; @@ -100,7 +100,6 @@ public class ShutdownTask implements ConsumerTask { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION); Exception exception; - boolean applicationException = false; try { try { @@ -117,49 +116,30 @@ public class ShutdownTask implements ConsumerTask { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - // Call the shardReocrdsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the Shard_End checkpointing is successful after calling shardEnded. - try { - shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); - if (lastCheckpointValue == null - || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { - throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + - "See ShardRecordProcessor.shardEnded javadocs for more information."); - } - } catch (Exception e) { - applicationException = true; - throw e; - } finally { - MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); - } + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointWithShardEnd(), scope, startTime); } else { - try { - shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()); - } catch (Exception e) { - applicationException = true; - throw e; - } + throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); } - log.debug("Shutting down retrieval strategy."); + log.debug("Shutting down retrieval strategy for shard {}.", shardInfoIdProvider.apply(shardInfo)); recordsPublisher.shutdown(); log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo)); return new TaskResult(null); } catch (Exception e) { - if (applicationException) { - log.error("Application exception. ", e); + if (e instanceof CustomerApplicationException) { + log.error("Shard {}: Application exception. ", shardInfoIdProvider.apply(shardInfo), e); } else { - log.error("Caught exception: ", e); + log.error("Shard {}: Caught exception: ", shardInfoIdProvider.apply(shardInfo), e); } exception = e; // backoff if we encounter an exception. try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("Interrupted sleep", ie); + log.debug("Shard{}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie); } } } finally { @@ -169,11 +149,32 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } + private void applicationCheckpointWithShardEnd() { + shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); + if (lastCheckpointValue == null + || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { + throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + + "See ShardRecordProcessor.shardEnded javadocs for more information."); + } + } + + private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, final long startTime) throws CustomerApplicationException { + try { + action.run(); + } catch (Exception e) { + throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfoIdProvider.apply(shardInfo) +": ", e); + } finally { + MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); + } + } + private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException { for(ChildShard childShard : childShards) { if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) { - final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard); + final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index dc25b20c..a96e2134 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -121,13 +121,7 @@ public class KinesisDataFetcher { if (nextIterator != null) { try { - GetRecordsResponse getRecordsResponse = getRecords(nextIterator); - while (!isValidResponse(getRecordsResponse)) { - log.error("{} : GetRecords response is not valid. nextShardIterator: {}. childShards: {}. Will retry GetRecords with the same nextIterator.", - shardId, getRecordsResponse.nextShardIterator(), getRecordsResponse.childShards()); - getRecordsResponse = getRecords(nextIterator); - } - return new AdvancingResult(getRecordsResponse); + return new AdvancingResult(getRecords(nextIterator)); } catch (ResourceNotFoundException e) { log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId); return TERMINAL_RESULT; @@ -188,11 +182,6 @@ public class KinesisDataFetcher { } } - private boolean isValidResponse(GetRecordsResponse response) { - return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards()) - : response.childShards() != null && response.childShards().isEmpty(); - } - /** * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number. * @param initialCheckpoint Current checkpoint sequence number for this shard. @@ -297,6 +286,11 @@ public class KinesisDataFetcher { try { final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); + if (!isValidResponse(response)) { + throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + + ". nextShardIterator: " + response.nextShardIterator() + + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); + } success = true; return response; } catch (ExecutionException e) { @@ -314,6 +308,11 @@ public class KinesisDataFetcher { } } + private boolean isValidResponse(GetRecordsResponse response) { + return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards()) + : response.childShards() != null && response.childShards().isEmpty(); + } + private AWSExceptionManager createExceptionManager() { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(ResourceNotFoundException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index b4164f90..06e5afc7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -35,7 +35,6 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -61,8 +60,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; -import javax.swing.*; -import javax.swing.text.AsyncBoxView; @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 8a9024f6..cbb9b834 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -52,6 +52,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -125,7 +126,7 @@ public class ShutdownTaskTest { final TaskResult result = task.call(); assertNotNull(result.getException()); - assertTrue(result.getException() instanceof IllegalArgumentException); + assertTrue(result.getException() instanceof CustomerApplicationException); } /**