Introducing dedicated shard syncer for each of the streamconfig

This commit is contained in:
Ashwin Giridharan 2020-06-07 20:35:25 -07:00
parent b5d0301b31
commit b323e7c487
6 changed files with 49 additions and 47 deletions

View file

@ -160,7 +160,7 @@ public class Scheduler implements Runnable {
private final Function<StreamConfig, ShardDetector> shardDetectorProvider; private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards; private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil; private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer; private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
private final long schedulerInitializationBackoffTimeMillis; private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider; private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>(); private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
@ -284,8 +284,7 @@ public class Scheduler implements Runnable {
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : LTR : Check if this needs to be per stream. this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
@ -922,7 +921,7 @@ public class Scheduler implements Runnable {
ignoreUnexpetedChildShards, ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig), shardDetectorProvider.apply(streamConfig),
aggregatorUtil, aggregatorUtil,
hierarchicalShardSyncer, hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory); metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());

View file

@ -74,7 +74,7 @@ public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode; private final boolean isMultiStreamMode;
private String streamIdentifier = ""; private final String streamIdentifier;
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
@ -84,10 +84,12 @@ public class HierarchicalShardSyncer {
public HierarchicalShardSyncer() { public HierarchicalShardSyncer() {
isMultiStreamMode = false; isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
} }
public HierarchicalShardSyncer(final boolean isMultiStreamMode) { public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this.isMultiStreamMode = isMultiStreamMode; this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
} }
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer = private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
@ -118,7 +120,6 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
final List<Shard> latestShards = isLeaseTableEmpty ? final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
@ -132,7 +133,6 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
if (!CollectionUtils.isNullOrEmpty(latestShards)) { if (!CollectionUtils.isNullOrEmpty(latestShards)) {

View file

@ -276,19 +276,6 @@ public class LeaseManagementConfig {
return hierarchicalShardSyncer; return hierarchicalShardSyncer;
} }
/**
* Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates
* leases to accommodate more than one stream.
* @param isMultiStreamingMode
* @return HierarchicalShardSyncer
*/
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
}
return hierarchicalShardSyncer;
}
@Deprecated @Deprecated
public LeaseManagementFactory leaseManagementFactory() { public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) { if (leaseManagementFactory == null) {
@ -351,12 +338,13 @@ public class LeaseManagementConfig {
cacheMissWarningModulus(), cacheMissWarningModulus(),
initialLeaseTableReadCapacity(), initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(), initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(isMultiStreamingMode), hierarchicalShardSyncer(),
tableCreatorCallback(), tableCreatorCallback(),
dynamoDbRequestTimeout(), dynamoDbRequestTimeout(),
billingMode(), billingMode(),
leaseSerializer, leaseSerializer,
customShardDetectorProvider()); customShardDetectorProvider(),
isMultiStreamingMode);
} }
return leaseManagementFactory; return leaseManagementFactory;
} }

View file

@ -55,7 +55,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull @NonNull
private final ExecutorService executorService; private final ExecutorService executorService;
@NonNull @NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer; private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
@NonNull @NonNull
private final LeaseSerializer leaseSerializer; private final LeaseSerializer leaseSerializer;
@NonNull @NonNull
@ -82,6 +82,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback; private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout; private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode; private final BillingMode billingMode;
private final boolean isMultiStreamMode;
/** /**
* Constructor. * Constructor.
@ -207,7 +208,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer * @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
*/ */
@Deprecated @Deprecated
@ -221,14 +222,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
} }
/** /**
@ -257,7 +258,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer * @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
* @param dynamoDbRequestTimeout * @param dynamoDbRequestTimeout
*/ */
@ -272,7 +273,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) { Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
@ -280,7 +281,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
} }
/** /**
@ -309,7 +310,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer * @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
* @param dynamoDbRequestTimeout * @param dynamoDbRequestTimeout
* @param billingMode * @param billingMode
@ -325,7 +326,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) { Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
@ -334,7 +335,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
} }
/** /**
@ -362,7 +363,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer * @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
* @param dynamoDbRequestTimeout * @param dynamoDbRequestTimeout
* @param billingMode * @param billingMode
@ -376,7 +377,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName, this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
@ -384,8 +385,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null); null, false);
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
} }
@ -412,11 +413,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer * @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
* @param dynamoDbRequestTimeout * @param dynamoDbRequestTimeout
* @param billingMode * @param billingMode
* @param leaseSerializer * @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
*/ */
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
@ -427,9 +430,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider) { Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode) {
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient; this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName; this.tableName = tableName;
@ -451,12 +454,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus; this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback; this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode; this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer; this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider; this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
} }
@Override @Override
@ -481,8 +485,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIntervalMillis, shardSyncIntervalMillis,
executorService, executorService, deprecatedHierarchicalShardSyncer,
hierarchicalShardSyncer,
metricsFactory); metricsFactory);
} }
@ -501,7 +504,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIntervalMillis, shardSyncIntervalMillis,
executorService, executorService,
hierarchicalShardSyncer, new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
metricsFactory); metricsFactory);
} }

View file

@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.LeaseManagementFactory;
@ -153,11 +154,13 @@ public class SchedulerTest {
@Mock @Mock
private MultiStreamTracker multiStreamTracker; private MultiStreamTracker multiStreamTracker;
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap = new HashMap<>(); private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap;
private Map<StreamIdentifier, ShardDetector> shardDetectorMap = new HashMap<>(); private Map<StreamIdentifier, ShardDetector> shardDetectorMap;
@Before @Before
public void setup() { public void setup() {
shardSyncTaskManagerMap = new HashMap<>();
shardDetectorMap = new HashMap<>();
shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
@ -190,6 +193,7 @@ public class SchedulerTest {
}); });
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
@ -334,6 +338,8 @@ public class SchedulerTest {
scheduler.initialize(); scheduler.initialize();
shardDetectorMap.values().stream() shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, times(1)).listShards()); .forEach(shardDetector -> verify(shardDetector, times(1)).listShards());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer());
} }
@Test @Test
@ -352,6 +358,10 @@ public class SchedulerTest {
.forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards()); .forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards());
shardDetectorMap.values().stream() shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards()); .forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer());
} }
@ -1035,6 +1045,8 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) { if(shardSyncFirstAttemptFailure) {

View file

@ -112,7 +112,7 @@ public class HierarchicalShardSyncerTest {
} }
private void setupMultiStream() { private void setupMultiStream() {
hierarchicalShardSyncer = new HierarchicalShardSyncer(true); hierarchicalShardSyncer = new HierarchicalShardSyncer(true, STREAM_IDENTIFIER);
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
} }