Moving literals to variable for readability
This commit is contained in:
parent
6a0c17745a
commit
f2911f1f0d
4 changed files with 13 additions and 8 deletions
|
|
@ -107,15 +107,14 @@ public class HierarchicalShardSyncer {
|
||||||
final List<Shard> latestShards = isLeaseTableEmpty ?
|
final List<Shard> latestShards = isLeaseTableEmpty ?
|
||||||
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
|
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
|
||||||
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases,
|
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases,
|
||||||
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty, latestShards);
|
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards, isLeaseTableEmpty);
|
||||||
}
|
}
|
||||||
|
|
||||||
//Provide a pre-collcted list of shards to avoid calling ListShards API
|
//Provide a pre-collcted list of shards to avoid calling ListShards API
|
||||||
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
|
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
|
||||||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
||||||
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
|
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
|
||||||
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty,
|
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards, final boolean isLeaseTableEmpty)
|
||||||
List<Shard> latestShards)
|
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
|
|
||||||
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
|
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,7 @@ public class ShardSyncTaskManager {
|
||||||
@NonNull
|
@NonNull
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private final boolean garbageCollectLeases;
|
||||||
private final boolean ignoreUnexpectedChildShards;
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final long shardSyncIdleTimeMillis;
|
private final long shardSyncIdleTimeMillis;
|
||||||
@NonNull
|
@NonNull
|
||||||
|
|
@ -84,6 +85,7 @@ public class ShardSyncTaskManager {
|
||||||
this.leaseRefresher = leaseRefresher;
|
this.leaseRefresher = leaseRefresher;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.garbageCollectLeases = true;
|
||||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
|
|
@ -114,6 +116,7 @@ public class ShardSyncTaskManager {
|
||||||
this.leaseRefresher = leaseRefresher;
|
this.leaseRefresher = leaseRefresher;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.garbageCollectLeases = true;
|
||||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
|
|
@ -128,7 +131,7 @@ public class ShardSyncTaskManager {
|
||||||
leaseRefresher,
|
leaseRefresher,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
true,
|
garbageCollectLeases,
|
||||||
ignoreUnexpectedChildShards,
|
ignoreUnexpectedChildShards,
|
||||||
shardSyncIdleTimeMillis,
|
shardSyncIdleTimeMillis,
|
||||||
hierarchicalShardSyncer,
|
hierarchicalShardSyncer,
|
||||||
|
|
@ -167,7 +170,7 @@ public class ShardSyncTaskManager {
|
||||||
leaseRefresher,
|
leaseRefresher,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
true,
|
garbageCollectLeases,
|
||||||
ignoreUnexpectedChildShards,
|
ignoreUnexpectedChildShards,
|
||||||
shardSyncIdleTimeMillis,
|
shardSyncIdleTimeMillis,
|
||||||
hierarchicalShardSyncer,
|
hierarchicalShardSyncer,
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public class ShardSyncer {
|
public class ShardSyncer {
|
||||||
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
|
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
|
||||||
|
private static final boolean garbageCollectLeases = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
|
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
|
||||||
|
|
@ -41,6 +42,6 @@ public class ShardSyncer {
|
||||||
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||||
KinesisClientLibIOException {
|
KinesisClientLibIOException {
|
||||||
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
||||||
true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty());
|
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,8 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
@NonNull
|
@NonNull
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private final boolean cleanupLeasesOfCompletedShards;
|
private final boolean cleanupLeasesOfCompletedShards;
|
||||||
|
private final boolean garbageCollectLeases = false;
|
||||||
|
private final boolean isLeaseTableEmpty= false;
|
||||||
private final boolean ignoreUnexpectedChildShards;
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
@NonNull
|
@NonNull
|
||||||
private final LeaseCoordinator leaseCoordinator;
|
private final LeaseCoordinator leaseCoordinator;
|
||||||
|
|
@ -155,8 +157,8 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
||||||
// create leases for the child shards
|
// create leases for the child shards
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
||||||
initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
initialPositionInStream, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
||||||
scope, false, latestShards);
|
scope, latestShards, isLeaseTableEmpty);
|
||||||
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue