diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index e03046a0..1caf0629 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -205,7 +205,8 @@ public class ShardSyncTaskManager { private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) { if (exception != null || taskResult.getException() != null) { - log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); + log.error("Caught exception running {} task: {}", currentTask.taskType(), + exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and // submitShardSyncTask is invoked, before completion stage exits (future completes) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index f7ec12c5..792555d2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -12,14 +12,13 @@ import software.amazon.kinesis.metrics.MetricsScope; * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it - * and begun processing it's child shards. + * and begun processing its child shards. * *

NOTE: This class is deprecated and will be removed in a future release.

*/ @Deprecated public class ShardSyncer { private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer(); - private static final boolean garbageCollectLeases = true; /** *

NOTE: This method is deprecated and will be removed in a future release.

diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index e4b38815..9f616b0d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -113,7 +113,7 @@ public class ProcessTask implements ConsumerTask { */ @Override public TaskResult call() { - /** + /* * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension, * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric, * which is the app-level MillsBehindLatest metric. @@ -180,8 +180,6 @@ public class ProcessTask implements ConsumerTask { } } - - private List deaggregateAnyKplRecords(List records) { if (shard == null) { return aggregatorUtil.deaggregate(records); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java index 0c1c4a28..fed58739 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java @@ -15,8 +15,12 @@ package software.amazon.kinesis.retrieval.kpl; import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; -//import com.amazonaws.services.kinesis.clientlibrary.lib.worker.String; +import lombok.EqualsAndHashCode; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; /** @@ -28,10 +32,8 @@ import software.amazon.kinesis.checkpoint.SentinelCheckpoint; * user record therefore has an integer sub-sequence number, in addition to the * regular sequence number of the Kinesis record. The sub-sequence number is * used to checkpoint within an aggregated record. - * - * @author daphnliu - * */ +@EqualsAndHashCode public class ExtendedSequenceNumber implements Comparable { private final String sequenceNumber; private final long subSequenceNumber; @@ -65,6 +67,15 @@ public class ExtendedSequenceNumber implements Comparable SENTINEL_VALUES = Collections.unmodifiableSet( + Arrays.stream(SentinelCheckpoint.values()).map(SentinelCheckpoint::name).collect(Collectors.toSet())); + /** * Construct an ExtendedSequenceNumber. The sub-sequence number defaults to * 0. @@ -87,7 +98,7 @@ public class ExtendedSequenceNumber implements Comparable= 0) { - sb.append("SubsequenceNumber: " + subSequenceNumber()); + sb.append("SubsequenceNumber: ").append(subSequenceNumber()); } - sb.append("}"); + sb.append('}'); return sb.toString(); } - @Override - public int hashCode() { - final int prime = 31; - final int shift = 32; - int hashCode = 1; - hashCode = prime * hashCode + ((sequenceNumber == null) ? 0 : sequenceNumber.hashCode()); - hashCode = prime * hashCode + ((subSequenceNumber < 0) - ? 0 - : (int) (subSequenceNumber ^ (subSequenceNumber >>> shift))); - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - - if (!(obj instanceof ExtendedSequenceNumber)) { - return false; - } - ExtendedSequenceNumber other = (ExtendedSequenceNumber) obj; - - if (!sequenceNumber.equals(other.sequenceNumber())) { - return false; - } - return subSequenceNumber == other.subSequenceNumber(); - } - /** * Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special * logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map @@ -217,30 +195,23 @@ public class ExtendedSequenceNumber implements Comparable