Merge pull request #48 from ashwing/ltr_1_hsyncperstreamconfig

Hierachical stream syncer per stream config changes
This commit is contained in:
ashwing 2020-06-10 10:35:24 -07:00 committed by GitHub
commit 61aeae3ea1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 47 deletions

View file

@ -160,7 +160,7 @@ public class Scheduler implements Runnable {
private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
@ -284,8 +284,7 @@ public class Scheduler implements Runnable {
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : LTR : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
@ -922,7 +921,7 @@ public class Scheduler implements Runnable {
ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig),
aggregatorUtil,
hierarchicalShardSyncer,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());

View file

@ -74,7 +74,7 @@ public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode;
private String streamIdentifier = "";
private final String streamIdentifier;
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
@ -84,10 +84,12 @@ public class HierarchicalShardSyncer {
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode) {
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
@ -118,7 +120,6 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
@ -132,7 +133,6 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
if (!CollectionUtils.isNullOrEmpty(latestShards)) {

View file

@ -276,19 +276,6 @@ public class LeaseManagementConfig {
return hierarchicalShardSyncer;
}
/**
* Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates
* leases to accommodate more than one stream.
* @param isMultiStreamingMode
* @return HierarchicalShardSyncer
*/
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
}
return hierarchicalShardSyncer;
}
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
@ -351,12 +338,13 @@ public class LeaseManagementConfig {
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(isMultiStreamingMode),
hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer,
customShardDetectorProvider());
customShardDetectorProvider(),
isMultiStreamingMode);
}
return leaseManagementFactory;
}

View file

@ -55,7 +55,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final ExecutorService executorService;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
@NonNull
private final LeaseSerializer leaseSerializer;
@NonNull
@ -82,6 +82,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean isMultiStreamMode;
/**
* Constructor.
@ -207,7 +208,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
*/
@Deprecated
@ -221,14 +222,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
/**
@ -257,7 +258,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
@ -272,7 +273,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
@ -280,7 +281,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
}
/**
@ -309,7 +310,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
@ -325,7 +326,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
@ -334,7 +335,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
@ -362,7 +363,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
@ -376,7 +377,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
@ -384,8 +385,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null, false);
this.streamConfig = streamConfig;
}
@ -412,11 +413,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
@ -427,9 +430,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider) {
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -451,12 +454,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
}
@Override
@ -481,8 +485,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
executorService, deprecatedHierarchicalShardSyncer,
metricsFactory);
}
@ -501,7 +504,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
metricsFactory);
}

View file

@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
@ -153,11 +154,13 @@ public class SchedulerTest {
@Mock
private MultiStreamTracker multiStreamTracker;
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap = new HashMap<>();
private Map<StreamIdentifier, ShardDetector> shardDetectorMap = new HashMap<>();
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap;
private Map<StreamIdentifier, ShardDetector> shardDetectorMap;
@Before
public void setup() {
shardSyncTaskManagerMap = new HashMap<>();
shardDetectorMap = new HashMap<>();
shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
@ -190,6 +193,7 @@ public class SchedulerTest {
});
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
@ -334,6 +338,8 @@ public class SchedulerTest {
scheduler.initialize();
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, times(1)).listShards());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer());
}
@Test
@ -352,6 +358,10 @@ public class SchedulerTest {
.forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards());
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer());
shardSyncTaskManagerMap.values().stream()
.forEach(shardSyncTM -> verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer());
}
@ -1035,6 +1045,8 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) {

View file

@ -112,7 +112,7 @@ public class HierarchicalShardSyncerTest {
}
private void setupMultiStream() {
hierarchicalShardSyncer = new HierarchicalShardSyncer(true);
hierarchicalShardSyncer = new HierarchicalShardSyncer(true, STREAM_IDENTIFIER);
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
}