diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java index 40121e98..ba97ab08 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java @@ -1,5 +1,22 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package software.amazon.kinesis.leases.exceptions; +/** + * Exception type for all exceptions thrown by the customer implemented code. + */ public class CustomerApplicationException extends Exception { public CustomerApplicationException(Throwable e) { super(e);} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 95e52068..33eb4497 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -111,6 +111,8 @@ public class ShutdownTask implements ConsumerTask { // Create new lease for the child shards if they don't exist. if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); + } else { + log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo)); } recordProcessorCheckpointer @@ -118,7 +120,7 @@ public class ShutdownTask implements ConsumerTask { recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointWithShardEnd(), scope, startTime); + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); } @@ -139,7 +141,7 @@ public class ShutdownTask implements ConsumerTask { try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("Shard{}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie); + log.debug("Shard {}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie); } } } finally { @@ -149,7 +151,7 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } - private void applicationCheckpointWithShardEnd() { + private void applicationCheckpointAndVerification() { shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null