Remove deprecated method and code review for Lucien's test changes

Remove few more deprecated methods

Also did minor changes in DynamoDBLeaseRefresherTest
This commit is contained in:
Aravinda Kidambi Srinivasan 2024-11-01 16:28:40 -07:00
parent 67d54045c1
commit 0ac22c750b
5 changed files with 69 additions and 229 deletions

View file

@ -43,6 +43,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -361,10 +362,18 @@ public class LeaseManagementConfig {
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
/**
* @deprecated never used and will be removed in future releases
*/
@Deprecated
private HierarchicalShardSyncer hierarchicalShardSyncer;
private LeaseManagementFactory leaseManagementFactory;
/**
* @deprecated never used and will be removed in future releases
*/
@Deprecated
public HierarchicalShardSyncer hierarchicalShardSyncer() {
if (hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
@ -419,39 +428,16 @@ public class LeaseManagementConfig {
private GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
GracefulLeaseHandoffConfig.builder().build();
/**
* @deprecated This is no longer invoked, but {@code leaseManagementFactory(LeaseSerializer, boolean)}
* is invoked instead. Please remove implementation for this method as future
* releases will remove this API.
*/
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
Validate.notEmpty(streamName(), "Stream name is empty");
leaseManagementFactory = new DynamoDBLeaseManagementFactory(
kinesisClient(),
streamName(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
tags());
leaseManagementFactory(new DynamoDBLeaseSerializer(), false);
}
return leaseManagementFactory;
}
@ -488,7 +474,6 @@ public class LeaseManagementConfig {
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),

View file

@ -31,13 +31,25 @@ public interface LeaseManagementFactory {
default LeaseCoordinator createLeaseCoordinator(
MetricsFactory metricsFactory, ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Not implemented");
}
ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory);
/**
* @deprecated This method is never invoked, please remove implementation of this method
* as it will be removed in future releases.
*/
@Deprecated
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory) {
throw new UnsupportedOperationException("Deprecated");
}
/**
* @deprecated This method is never invoked, please remove implementation of this method
* as it will be removed in future releases.
*/
@Deprecated
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Deprecated");
}
default ShardSyncTaskManager createShardSyncTaskManager(
@ -49,10 +61,17 @@ public interface LeaseManagementFactory {
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();
/**
* @deprecated This method is never invoked, please remove implementation of this method
* as it will be removed in future releases.
*/
@Deprecated
default ShardDetector createShardDetector() {
throw new UnsupportedOperationException("Deprecated");
}
default ShardDetector createShardDetector(StreamConfig streamConfig) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Not implemented");
}
LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory);

View file

@ -71,7 +71,7 @@ public class ShardSyncTaskManager {
/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
* @deprecated This constructor is deprecated and will be removed in a future release.
*
* @param shardDetector
* @param leaseRefresher
@ -92,18 +92,16 @@ public class ShardSyncTaskManager {
long shardSyncIdleTimeMillis,
ExecutorService executorService,
MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.garbageCollectLeases = true;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
this(
shardDetector,
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
executorService,
new HierarchicalShardSyncer(),
metricsFactory);
}
/**

View file

@ -26,16 +26,15 @@ import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
@ -73,9 +72,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final ExecutorService executorService;
@NonNull
private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
@NonNull
private final LeaseSerializer leaseSerializer;
@ -113,110 +109,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final LeaseCleanupConfig leaseCleanupConfig;
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
/**
* Constructor.
* @deprecated this is used by the deprecated method in LeaseManagementConfig to construct the LeaseManagement factory
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
@Deprecated
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final String streamName,
final DynamoDbAsyncClient dynamoDBClient,
final String tableName,
final String workerIdentifier,
final ExecutorService executorService,
final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
Collection<Tag> tags) {
this(
kinesisClient,
dynamoDBClient,
tableName,
workerIdentifier,
executorService,
failoverTimeMillis,
LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
consistentReads,
listShardsBackoffTimeMillis,
maxListShardsRetryAttempts,
maxCacheMissesBeforeReload,
listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
hierarchicalShardSyncer,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags,
new DynamoDBLeaseSerializer(),
null,
false,
LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build());
this.streamConfig =
new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream);
}
/**
* Constructor.
* @param kinesisClient
@ -241,7 +133,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
@ -255,11 +146,11 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param gracefulLeaseHandoffConfig
*/
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
final String tableName,
final String workerIdentifier,
final ExecutorService executorService,
final @NotNull KinesisAsyncClient kinesisClient,
final @NotNull DynamoDbAsyncClient dynamoDBClient,
final @NotNull String tableName,
final @NotNull String workerIdentifier,
final @NotNull ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
@ -277,17 +168,16 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
final Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
Collection<Tag> tags,
LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider,
final Collection<Tag> tags,
final @NotNull LeaseSerializer leaseSerializer,
final Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this.kinesisClient = kinesisClient;
@ -312,7 +202,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
@ -353,35 +242,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
shardInfoShardConsumerMap);
}
/**
* Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory
*/
@Override
@Deprecated
public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) {
return new ShardSyncTaskManager(
this.createShardDetector(),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
deprecatedHierarchicalShardSyncer,
metricsFactory);
}
/**
* Create ShardSyncTaskManager from the streamConfig passed
* @param metricsFactory
* @param streamConfig
* @return ShardSyncTaskManager
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
return createShardSyncTaskManager(metricsFactory, streamConfig, null);
}
/**
* Create ShardSyncTaskManager from the streamConfig passed
*
@ -427,23 +287,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
tags);
}
/**
* Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory
*/
@Override
@Deprecated
public ShardDetector createShardDetector() {
return new KinesisShardDetector(
kinesisClient,
streamConfig.streamIdentifier(),
listShardsBackoffTimeMillis,
maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds,
maxCacheMissesBeforeReload,
cacheMissWarningModulus,
dynamoDbRequestTimeout);
}
/**
* KinesisShardDetector supports reading from service only using streamName. Support for accountId and
* stream creation epoch is yet to be provided.

View file

@ -75,8 +75,7 @@ class DynamoDBLeaseRefresherTest {
when(mockDdbClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class)))
.thenReturn(future);
dynamoDBLeaseRefresherWithPitr.createLeaseTableIfNotExists();
dynamoDBLeaseRefresherWithPitr.waitUntilLeaseTableExists(1, 30);
setupTable(dynamoDBLeaseRefresherWithPitr);
UpdateContinuousBackupsRequest updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder()
.tableName(TEST_LEASE_TABLE)
@ -106,8 +105,7 @@ class DynamoDBLeaseRefresherTest {
@Test
void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException, ProvisionedThroughputException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
leaseRefresher.createLeaseTableIfNotExists();
leaseRefresher.waitUntilLeaseTableExists(1, 30);
setupTable(leaseRefresher);
assertFalse(leaseRefresher.isLeaseOwnerToLeaseKeyIndexActive());
@ -141,8 +139,7 @@ class DynamoDBLeaseRefresherTest {
void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse()
throws DependencyException, ProvisionedThroughputException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
leaseRefresher.createLeaseTableIfNotExists();
leaseRefresher.waitUntilLeaseTableExists(1, 30);
setupTable(leaseRefresher);
dynamoDbAsyncClient.deleteTable(
DeleteTableRequest.builder().tableName(TEST_LEASE_TABLE).build());
@ -155,8 +152,7 @@ class DynamoDBLeaseRefresherTest {
@Test
void isLeaseOwnerGsiIndexActive() throws DependencyException, ProvisionedThroughputException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
leaseRefresher.createLeaseTableIfNotExists();
leaseRefresher.waitUntilLeaseTableExists(1, 30);
setupTable(leaseRefresher);
final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS);
final LeaseRefresher leaseRefresherForTest = new DynamoDBLeaseRefresher(
@ -531,11 +527,11 @@ class DynamoDBLeaseRefresherTest {
assertFalse(leaseRefresher.assignLease(lease, lease.leaseOwner()));
}
@Test
void createLeaseTableIfNotExists_billingModeProvisioned_assertCorrectModeAndCapacity() throws Exception {
final DynamoDbAsyncClient dbAsyncClient = DynamoDBEmbedded.create().dynamoDbAsyncClient();
final LeaseRefresher leaseRefresher = createLeaseRefresher(createProvisionedTableConfig(), dbAsyncClient);
leaseRefresher.createLeaseTableIfNotExists();
leaseRefresher.waitUntilLeaseTableExists(1, 1000);
setupTable(leaseRefresher);
final DescribeTableResponse describeTableResponse = dbAsyncClient
.describeTable(DescribeTableRequest.builder()
@ -550,8 +546,7 @@ class DynamoDBLeaseRefresherTest {
void createLeaseTableIfNotExists_billingModeOnDemand_assertCorrectMode() throws Exception {
final DynamoDbAsyncClient dbAsyncClient = DynamoDBEmbedded.create().dynamoDbAsyncClient();
final LeaseRefresher leaseRefresher = createLeaseRefresher(createOnDemandTableConfig(), dbAsyncClient);
leaseRefresher.createLeaseTableIfNotExists();
leaseRefresher.waitUntilLeaseTableExists(1, 1000);
setupTable(leaseRefresher);
final DescribeTableResponse describeTableResponse = dbAsyncClient
.describeTable(DescribeTableRequest.builder()