From 1c8bd8e71efabc5ba4577f09c6a16c5cb6e0e033 Mon Sep 17 00:00:00 2001 From: stair <123031771+stair-aws@users.noreply.github.com> Date: Mon, 13 Feb 2023 13:58:02 -0500 Subject: [PATCH] Minor optimizations (e.g., calculate-once, `put` instead of `get+put`) (#1041) and code cleanup (e.g., removed unused imports, updated Javadoc). No functional change. --- .../coordinator/PeriodicShardSyncManager.java | 10 ++--- .../leases/HierarchicalShardSyncer.java | 6 +-- .../software/amazon/kinesis/leases/Lease.java | 30 ++++++--------- .../kinesis/leases/LeaseManagementConfig.java | 8 ---- .../amazon/kinesis/leases/LeaseRefresher.java | 2 - .../leases/dynamodb/DynamoDBLeaseTaker.java | 14 ++----- .../kinesis/lifecycle/InitializeTask.java | 1 - .../kinesis/lifecycle/ShardConsumer.java | 4 +- .../kinesis/retrieval/RecordsPublisher.java | 5 --- .../kinesis/retrieval/RetrievalConfig.java | 4 +- .../retrieval/polling/PollingConfig.java | 5 --- .../polling/PrefetchRecordsPublisher.java | 10 ++--- ...ynchronousGetRecordsRetrievalStrategy.java | 1 - .../dynamodb/DynamoDBLeaseRefresherTest.java | 1 - .../dynamodb/DynamoDBLeaseTakerTest.java | 37 +------------------ .../kinesis/lifecycle/ShutdownTaskTest.java | 4 -- 16 files changed, 30 insertions(+), 112 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a2d05e6d..a885c4d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -76,7 +76,7 @@ class PeriodicShardSyncManager { @VisibleForTesting static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager"; - private Map hashRangeHoleTrackerMap = new HashMap<>(); + private final Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; private final LeaderDecider leaderDecider; @@ -142,15 +142,12 @@ class PeriodicShardSyncManager { /** * Runs shardSync once * Does not schedule periodic shardSync - * @return the result of the task */ public synchronized void syncShardsOnce() throws Exception { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - for(Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); - log.info("Syncing Kinesis shard info for " + streamIdentifier); - final StreamConfig streamConfig = streamConfigEntry.getValue(); + for (StreamConfig streamConfig : currentStreamConfigMap.values()) { + log.info("Syncing Kinesis shard info for {}", streamConfig); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask(); if (taskResult.getException() != null) { @@ -283,7 +280,6 @@ class PeriodicShardSyncManager { "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. Shard sync will be initiated when threshold reaches " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); - } else { // If hole is not present, clear any previous tracking for this stream and return false; hashRangeHoleTrackerMap.remove(streamIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 068db578..e44125a5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -62,7 +62,7 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it - * and begun processing it's child shards. + * and begun processing its child shards. */ @Slf4j @KinesisClientInternalApi @@ -432,7 +432,7 @@ public class HierarchicalShardSyncer { if (!shardIdsOfCurrentLeases.contains(parentShardId)) { Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); - /** + /* * If the lease for the parent shard does not already exist, there are two cases in which we * would want to create it: * - If we have already marked the parentShardId for lease creation in a prior recursive @@ -454,7 +454,7 @@ public class HierarchicalShardSyncer { } } - /** + /* * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will * add a lease just like we do for TRIM_HORIZON. However we will only return back records diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 4074db22..f761a9a7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -42,7 +42,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState", "isMarkedForLeaseSteal"}) @ToString public class Lease { - /* + /** * See javadoc for System.nanoTime - summary: * * Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two @@ -51,62 +51,57 @@ public class Lease { private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365); /** - * @return leaseKey - identifies the unit of work associated with this lease. + * Identifies the unit of work associated with this lease. */ private String leaseKey; /** - * @return current owner of the lease, may be null. + * Current owner of the lease, may be null. */ private String leaseOwner; /** - * @return leaseCounter is incremented periodically by the holder of the lease. Used for optimistic locking. + * LeaseCounter is incremented periodically by the holder of the lease. Used for optimistic locking. */ private Long leaseCounter = 0L; - /* + /** * This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not * persisted in DynamoDB and excluded from hashCode and equals. */ private UUID concurrencyToken; - /* + /** * This field is used by LeaseRenewer and LeaseTaker to track the last time a lease counter was incremented. It is * deliberately not persisted in DynamoDB and excluded from hashCode and equals. */ private Long lastCounterIncrementNanos; /** - * @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after + * Most recently application-supplied checkpoint value. During fail over, the new worker will pick up after * the old worker's last checkpoint. */ private ExtendedSequenceNumber checkpoint; /** - * @return pending checkpoint, possibly null. + * Pending checkpoint, possibly null. */ private ExtendedSequenceNumber pendingCheckpoint; /** - * Last pending application state. Deliberately excluded from hashCode and equals. - * - * @return pending checkpoint state, possibly null. + * Last pending checkpoint state, possibly null. Deliberately excluded from hashCode and equals. */ private byte[] pendingCheckpointState; - /** * Denotes whether the lease is marked for stealing. Deliberately excluded from hashCode and equals and * not persisted in DynamoDB. - * - * @return flag for denoting lease is marked for stealing. */ @Setter private boolean isMarkedForLeaseSteal; /** - * @return count of distinct lease holders between checkpoints. + * Count of distinct lease holders between checkpoints. */ private Long ownerSwitchesSinceCheckpoint = 0L; - private Set parentShardIds = new HashSet<>(); - private Set childShardIds = new HashSet<>(); + private final Set parentShardIds = new HashSet<>(); + private final Set childShardIds = new HashSet<>(); private HashKeyRangeForLease hashKeyRangeForLease; /** @@ -319,5 +314,4 @@ public class Lease { return new Lease(this); } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 89e6a3bf..4f2d3a2b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -64,22 +64,16 @@ public class LeaseManagementConfig { /** * Name of the table to use in DynamoDB - * - * @return String */ @NonNull private final String tableName; /** * Client to be used to access DynamoDB service. - * - * @return {@link DynamoDbAsyncClient} */ @NonNull private final DynamoDbAsyncClient dynamoDBClient; /** * Client to be used to access Kinesis Data Streams service. - * - * @return {@link KinesisAsyncClient} */ @NonNull private final KinesisAsyncClient kinesisClient; @@ -90,8 +84,6 @@ public class LeaseManagementConfig { private String streamName; /** * Used to distinguish different workers/processes of a KCL application. - * - * @return String */ @NonNull private final String workerIdentifier; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 2fca59c7..7ec5b5ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -210,8 +210,6 @@ public interface LeaseRefresher { * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing * library such as leaseCounter, leaseOwner, or leaseKey. * - * @return true if update succeeded, false otherwise - * * @throws InvalidStateException if lease table does not exist * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 4a4f086f..9fb91f14 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -73,7 +73,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private long veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; - public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; @@ -184,7 +183,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED); } - if (lastException != null) { log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by" + " last retry:", workerIdentifier, lastException); @@ -319,8 +317,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { for (Lease lease : freshList) { String leaseKey = lease.leaseKey(); - Lease oldLease = allLeases.get(leaseKey); - allLeases.put(leaseKey, lease); + final Lease oldLease = allLeases.put(leaseKey, lease); notUpdated.remove(leaseKey); if (oldLease != null) { @@ -384,7 +381,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { Set leasesToTake = new HashSet<>(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); - List veryOldLeases = new ArrayList<>(); final int numAvailableLeases = expiredLeases.size(); int numLeases = 0; @@ -402,7 +398,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return leasesToTake; } - int target; if (numWorkers >= numLeases) { // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself. @@ -435,9 +430,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. - veryOldLeases = allLeases.values().stream() - .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() - > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos) + final long nanoThreshold = System.nanoTime() - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos); + final List veryOldLeases = allLeases.values().stream() + .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos()) .collect(Collectors.toList()); if (!veryOldLeases.isEmpty()) { @@ -481,7 +476,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { workerIdentifier, numLeases, numAvailableLeases, numWorkers, target, myCount, leasesToTake.size()); } - } finally { scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 4108dd9b..7816c1e1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -21,7 +21,6 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.metrics.MetricsFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index b6e7c068..a575a953 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle; import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -33,7 +32,6 @@ import lombok.Getter; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.leases.ShardInfo; @@ -75,7 +73,7 @@ public class ShardConsumer { private volatile Instant taskDispatchedAt; private volatile boolean taskIsRunning = false; - /* + /** * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * much coordination/synchronization to handle concurrent reads/updates. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index 5fc029b4..98c0375e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -21,15 +21,11 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.Optional; - /** * Provides a record publisher that will retrieve records from Kinesis for processing */ public interface RecordsPublisher extends Publisher { - - /** * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will * begin from that sequence number, otherwise it will use the initial position. @@ -47,7 +43,6 @@ public interface RecordsPublisher extends Publisher { */ void restartFrom(RecordsRetrieved recordsRetrieved); - /** * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 000b71b7..f45fa80d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -108,7 +108,7 @@ public class RetrievalConfig { * * @deprecated Initial stream position is now handled by {@link StreamTracker}. * @see StreamTracker#orphanedStreamInitialPositionInStream() - * @see StreamTracker#createConfig(StreamIdentifier) + * @see StreamTracker#createStreamConfig(StreamIdentifier) */ @Deprecated private InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended @@ -138,7 +138,7 @@ public class RetrievalConfig { * * @deprecated Initial stream position is now handled by {@link StreamTracker}. * @see StreamTracker#orphanedStreamInitialPositionInStream() - * @see StreamTracker#createConfig(StreamIdentifier) + * @see StreamTracker#createStreamConfig(StreamIdentifier) */ @Deprecated public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index 181cea76..a37e7121 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.Optional; import java.util.function.Function; -import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -47,8 +46,6 @@ public class PollingConfig implements RetrievalSpecificConfig { Function dataFetcherProvider; /** * Name of the Kinesis stream. - * - * @return String */ private String streamName; @@ -63,8 +60,6 @@ public class PollingConfig implements RetrievalSpecificConfig { /** * Client used to access to Kinesis service. - * - * @return {@link KinesisAsyncClient} */ @NonNull private final KinesisAsyncClient kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 1a49cdfb..ab406244 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -70,7 +70,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from * the record processor is blocked till records are retrieved from Kinesis. - * + *

* There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events * to the Subscriber (ShardConsumer in KCL). */ @@ -81,9 +81,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L; - private int maxPendingProcessRecordsInput; - private int maxByteSize; - private int maxRecordsCount; + private final int maxPendingProcessRecordsInput; + private final int maxByteSize; + private final int maxRecordsCount; private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; @@ -447,7 +447,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } - private class DefaultGetRecordsCacheDaemon implements Runnable { volatile boolean isShutdown = false; @@ -483,7 +482,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); if (publisherSession.prefetchCounters().shouldGetNewRecords()) { try { - sleepBeforeNextCall(); GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); lastSuccessfulCall = Instant.now(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java index 7f3b54d5..cdf03fac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java @@ -14,7 +14,6 @@ */ package software.amazon.kinesis.retrieval.polling; -import java.util.Optional; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index beed73f2..ac814d75 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -22,7 +22,6 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 193970f6..b6e74a6b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -15,10 +15,8 @@ package software.amazon.kinesis.leases.dynamodb; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -28,19 +26,14 @@ import java.util.concurrent.Callable; import java.util.function.Function; import java.util.stream.Collectors; -import junit.framework.Assert; - -import org.junit.After; -import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; -import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -68,34 +61,6 @@ public class DynamoDBLeaseTakerTest { this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory); } - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @After - public void tearDown() throws Exception { - } - /** * Test method for {@link DynamoDBLeaseTaker#stringJoin(java.util.Collection, java.lang.String)}. */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 688bd199..6617984d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -17,10 +17,8 @@ package software.amazon.kinesis.lifecycle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -48,7 +46,6 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; -import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCleanupManager; @@ -265,7 +262,6 @@ public class ShutdownTaskTest { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); - Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) .thenReturn(null, null, null, null, null, null, null, null, null, null, null);