Addressing minor comments
This commit is contained in:
parent
0715903456
commit
734d4918d0
2 changed files with 22 additions and 3 deletions
|
|
@ -1,5 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
package software.amazon.kinesis.leases.exceptions;
|
package software.amazon.kinesis.leases.exceptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception type for all exceptions thrown by the customer implemented code.
|
||||||
|
*/
|
||||||
public class CustomerApplicationException extends Exception {
|
public class CustomerApplicationException extends Exception {
|
||||||
|
|
||||||
public CustomerApplicationException(Throwable e) { super(e);}
|
public CustomerApplicationException(Throwable e) { super(e);}
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,8 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
// Create new lease for the child shards if they don't exist.
|
// Create new lease for the child shards if they don't exist.
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
createLeasesForChildShardsIfNotExist();
|
createLeasesForChildShardsIfNotExist();
|
||||||
|
} else {
|
||||||
|
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
recordProcessorCheckpointer
|
recordProcessorCheckpointer
|
||||||
|
|
@ -118,7 +120,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||||
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
||||||
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
||||||
throwOnApplicationException(() -> applicationCheckpointWithShardEnd(), scope, startTime);
|
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
|
||||||
} else {
|
} else {
|
||||||
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
||||||
}
|
}
|
||||||
|
|
@ -139,7 +141,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(this.backoffTimeMillis);
|
Thread.sleep(this.backoffTimeMillis);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
log.debug("Shard{}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie);
|
log.debug("Shard {}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -149,7 +151,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return new TaskResult(exception);
|
return new TaskResult(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applicationCheckpointWithShardEnd() {
|
private void applicationCheckpointAndVerification() {
|
||||||
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
|
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
|
||||||
if (lastCheckpointValue == null
|
if (lastCheckpointValue == null
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue