diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index bbdf78e2..ef750f46 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -43,6 +43,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -361,10 +362,18 @@ public class LeaseManagementConfig { */ private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; + /** + * @deprecated never used and will be removed in future releases + */ + @Deprecated private HierarchicalShardSyncer hierarchicalShardSyncer; private LeaseManagementFactory leaseManagementFactory; + /** + * @deprecated never used and will be removed in future releases + */ + @Deprecated public HierarchicalShardSyncer hierarchicalShardSyncer() { if (hierarchicalShardSyncer == null) { hierarchicalShardSyncer = new HierarchicalShardSyncer(); @@ -419,39 +428,16 @@ public class LeaseManagementConfig { private GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig = GracefulLeaseHandoffConfig.builder().build(); + /** + * @deprecated This is no longer invoked, but {@code leaseManagementFactory(LeaseSerializer, boolean)} + * is invoked instead. Please remove implementation for this method as future + * releases will remove this API. + */ @Deprecated public LeaseManagementFactory leaseManagementFactory() { if (leaseManagementFactory == null) { Validate.notEmpty(streamName(), "Stream name is empty"); - leaseManagementFactory = new DynamoDBLeaseManagementFactory( - kinesisClient(), - streamName(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - initialPositionInStream(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), - tableCreatorCallback(), - dynamoDbRequestTimeout(), - billingMode(), - tags()); + leaseManagementFactory(new DynamoDBLeaseSerializer(), false); } return leaseManagementFactory; } @@ -488,7 +474,6 @@ public class LeaseManagementConfig { cacheMissWarningModulus(), initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index 6f786bc9..788034d1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -31,13 +31,25 @@ public interface LeaseManagementFactory { default LeaseCoordinator createLeaseCoordinator( MetricsFactory metricsFactory, ConcurrentMap shardInfoShardConsumerMap) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Not implemented"); } - ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory); + /** + * @deprecated This method is never invoked, please remove implementation of this method + * as it will be removed in future releases. + */ + @Deprecated + default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory) { + throw new UnsupportedOperationException("Deprecated"); + } + /** + * @deprecated This method is never invoked, please remove implementation of this method + * as it will be removed in future releases. + */ + @Deprecated default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Deprecated"); } default ShardSyncTaskManager createShardSyncTaskManager( @@ -49,10 +61,17 @@ public interface LeaseManagementFactory { DynamoDBLeaseRefresher createLeaseRefresher(); - ShardDetector createShardDetector(); + /** + * @deprecated This method is never invoked, please remove implementation of this method + * as it will be removed in future releases. + */ + @Deprecated + default ShardDetector createShardDetector() { + throw new UnsupportedOperationException("Deprecated"); + } default ShardDetector createShardDetector(StreamConfig streamConfig) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Not implemented"); } LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index add8cf4f..9b63883b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -71,7 +71,7 @@ public class ShardSyncTaskManager { /** * Constructor. * - *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * @deprecated This constructor is deprecated and will be removed in a future release. * * @param shardDetector * @param leaseRefresher @@ -92,18 +92,16 @@ public class ShardSyncTaskManager { long shardSyncIdleTimeMillis, ExecutorService executorService, MetricsFactory metricsFactory) { - this.shardDetector = shardDetector; - this.leaseRefresher = leaseRefresher; - this.initialPositionInStream = initialPositionInStream; - this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; - this.garbageCollectLeases = true; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; - this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; - this.executorService = executorService; - this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); - this.metricsFactory = metricsFactory; - this.shardSyncRequestPending = new AtomicBoolean(false); - this.lock = new ReentrantLock(); + this( + shardDetector, + leaseRefresher, + initialPositionInStream, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIdleTimeMillis, + executorService, + new HierarchicalShardSyncer(), + metricsFactory); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 412dca32..7d902afd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -26,16 +26,15 @@ import java.util.function.Function; import lombok.Data; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.Tag; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.DdbTableConfig; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; @@ -73,9 +72,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private final ExecutorService executorService; - @NonNull - private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer; - @NonNull private final LeaseSerializer leaseSerializer; @@ -113,110 +109,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final LeaseCleanupConfig leaseCleanupConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; - /** - * Constructor. - * @deprecated this is used by the deprecated method in LeaseManagementConfig to construct the LeaseManagement factory - * - * @param kinesisClient - * @param streamName - * @param dynamoDBClient - * @param tableName - * @param workerIdentifier - * @param executorService - * @param initialPositionInStream - * @param failoverTimeMillis - * @param epsilonMillis - * @param maxLeasesForWorker - * @param maxLeasesToStealAtOneTime - * @param maxLeaseRenewalThreads - * @param cleanupLeasesUponShardCompletion - * @param ignoreUnexpectedChildShards - * @param shardSyncIntervalMillis - * @param consistentReads - * @param listShardsBackoffTimeMillis - * @param maxListShardsRetryAttempts - * @param maxCacheMissesBeforeReload - * @param listShardsCacheAllowedAgeInSeconds - * @param cacheMissWarningModulus - * @param initialLeaseTableReadCapacity - * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer - * @param tableCreatorCallback - * @param dynamoDbRequestTimeout - * @param billingMode - * @param tags - */ - @Deprecated - public DynamoDBLeaseManagementFactory( - final KinesisAsyncClient kinesisClient, - final String streamName, - final DynamoDbAsyncClient dynamoDBClient, - final String tableName, - final String workerIdentifier, - final ExecutorService executorService, - final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, - final long epsilonMillis, - final int maxLeasesForWorker, - final int maxLeasesToStealAtOneTime, - final int maxLeaseRenewalThreads, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIntervalMillis, - final boolean consistentReads, - final long listShardsBackoffTimeMillis, - final int maxListShardsRetryAttempts, - final int maxCacheMissesBeforeReload, - final long listShardsCacheAllowedAgeInSeconds, - final int cacheMissWarningModulus, - final long initialLeaseTableReadCapacity, - final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, - final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, - BillingMode billingMode, - Collection tags) { - - this( - kinesisClient, - dynamoDBClient, - tableName, - workerIdentifier, - executorService, - failoverTimeMillis, - LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, - epsilonMillis, - maxLeasesForWorker, - maxLeasesToStealAtOneTime, - maxLeaseRenewalThreads, - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, - shardSyncIntervalMillis, - consistentReads, - listShardsBackoffTimeMillis, - maxListShardsRetryAttempts, - maxCacheMissesBeforeReload, - listShardsCacheAllowedAgeInSeconds, - cacheMissWarningModulus, - initialLeaseTableReadCapacity, - initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, - tableCreatorCallback, - dynamoDbRequestTimeout, - billingMode, - LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED, - LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED, - tags, - new DynamoDBLeaseSerializer(), - null, - false, - LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG, - new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), - LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build()); - this.streamConfig = - new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream); - } - /** * Constructor. * @param kinesisClient @@ -241,7 +133,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout * @param billingMode @@ -255,11 +146,11 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param gracefulLeaseHandoffConfig */ public DynamoDBLeaseManagementFactory( - final KinesisAsyncClient kinesisClient, - final DynamoDbAsyncClient dynamoDBClient, - final String tableName, - final String workerIdentifier, - final ExecutorService executorService, + final @NotNull KinesisAsyncClient kinesisClient, + final @NotNull DynamoDbAsyncClient dynamoDBClient, + final @NotNull String tableName, + final @NotNull String workerIdentifier, + final @NotNull ExecutorService executorService, final long failoverTimeMillis, final boolean enablePriorityLeaseAssignment, final long epsilonMillis, @@ -277,17 +168,16 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, - BillingMode billingMode, + final Duration dynamoDbRequestTimeout, + final BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled, final boolean leaseTablePitrEnabled, - Collection tags, - LeaseSerializer leaseSerializer, - Function customShardDetectorProvider, + final Collection tags, + final @NotNull LeaseSerializer leaseSerializer, + final Function customShardDetectorProvider, boolean isMultiStreamMode, - LeaseCleanupConfig leaseCleanupConfig, + final LeaseCleanupConfig leaseCleanupConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { this.kinesisClient = kinesisClient; @@ -312,7 +202,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.cacheMissWarningModulus = cacheMissWarningModulus; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; - this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; @@ -353,35 +242,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { shardInfoShardConsumerMap); } - /** - * Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory - */ - @Override - @Deprecated - public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) { - return new ShardSyncTaskManager( - this.createShardDetector(), - this.createLeaseRefresher(), - streamConfig.initialPositionInStreamExtended(), - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, - shardSyncIntervalMillis, - executorService, - deprecatedHierarchicalShardSyncer, - metricsFactory); - } - - /** - * Create ShardSyncTaskManager from the streamConfig passed - * @param metricsFactory - * @param streamConfig - * @return ShardSyncTaskManager - */ - @Override - public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { - return createShardSyncTaskManager(metricsFactory, streamConfig, null); - } - /** * Create ShardSyncTaskManager from the streamConfig passed * @@ -427,23 +287,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { tags); } - /** - * Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory - */ - @Override - @Deprecated - public ShardDetector createShardDetector() { - return new KinesisShardDetector( - kinesisClient, - streamConfig.streamIdentifier(), - listShardsBackoffTimeMillis, - maxListShardsRetryAttempts, - listShardsCacheAllowedAgeInSeconds, - maxCacheMissesBeforeReload, - cacheMissWarningModulus, - dynamoDbRequestTimeout); - } - /** * KinesisShardDetector supports reading from service only using streamName. Support for accountId and * stream creation epoch is yet to be provided. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index afdb346c..4af44b14 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -75,8 +75,7 @@ class DynamoDBLeaseRefresherTest { when(mockDdbClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class))) .thenReturn(future); - dynamoDBLeaseRefresherWithPitr.createLeaseTableIfNotExists(); - dynamoDBLeaseRefresherWithPitr.waitUntilLeaseTableExists(1, 30); + setupTable(dynamoDBLeaseRefresherWithPitr); UpdateContinuousBackupsRequest updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder() .tableName(TEST_LEASE_TABLE) @@ -106,8 +105,7 @@ class DynamoDBLeaseRefresherTest { @Test void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException, ProvisionedThroughputException { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); - leaseRefresher.createLeaseTableIfNotExists(); - leaseRefresher.waitUntilLeaseTableExists(1, 30); + setupTable(leaseRefresher); assertFalse(leaseRefresher.isLeaseOwnerToLeaseKeyIndexActive()); @@ -141,8 +139,7 @@ class DynamoDBLeaseRefresherTest { void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse() throws DependencyException, ProvisionedThroughputException { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); - leaseRefresher.createLeaseTableIfNotExists(); - leaseRefresher.waitUntilLeaseTableExists(1, 30); + setupTable(leaseRefresher); dynamoDbAsyncClient.deleteTable( DeleteTableRequest.builder().tableName(TEST_LEASE_TABLE).build()); @@ -155,8 +152,7 @@ class DynamoDBLeaseRefresherTest { @Test void isLeaseOwnerGsiIndexActive() throws DependencyException, ProvisionedThroughputException { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); - leaseRefresher.createLeaseTableIfNotExists(); - leaseRefresher.waitUntilLeaseTableExists(1, 30); + setupTable(leaseRefresher); final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS); final LeaseRefresher leaseRefresherForTest = new DynamoDBLeaseRefresher( @@ -531,11 +527,11 @@ class DynamoDBLeaseRefresherTest { assertFalse(leaseRefresher.assignLease(lease, lease.leaseOwner())); } + @Test void createLeaseTableIfNotExists_billingModeProvisioned_assertCorrectModeAndCapacity() throws Exception { final DynamoDbAsyncClient dbAsyncClient = DynamoDBEmbedded.create().dynamoDbAsyncClient(); final LeaseRefresher leaseRefresher = createLeaseRefresher(createProvisionedTableConfig(), dbAsyncClient); - leaseRefresher.createLeaseTableIfNotExists(); - leaseRefresher.waitUntilLeaseTableExists(1, 1000); + setupTable(leaseRefresher); final DescribeTableResponse describeTableResponse = dbAsyncClient .describeTable(DescribeTableRequest.builder() @@ -550,8 +546,7 @@ class DynamoDBLeaseRefresherTest { void createLeaseTableIfNotExists_billingModeOnDemand_assertCorrectMode() throws Exception { final DynamoDbAsyncClient dbAsyncClient = DynamoDBEmbedded.create().dynamoDbAsyncClient(); final LeaseRefresher leaseRefresher = createLeaseRefresher(createOnDemandTableConfig(), dbAsyncClient); - leaseRefresher.createLeaseTableIfNotExists(); - leaseRefresher.waitUntilLeaseTableExists(1, 1000); + setupTable(leaseRefresher); final DescribeTableResponse describeTableResponse = dbAsyncClient .describeTable(DescribeTableRequest.builder()