Adding support for metrics in PeriodicShardSyncer (#592)
* Changes to support injection of ShardSyncer, LeaseTaker, and LeaseRenewer into KCL Worker * Additional checks around injection of LeaseRenewer and LeaseRenewerThreadPool * Changed accessor on InitialPositionInStreamExtended to public to allow ShardSyncer injection * Changed ShardSyncer to a public interface. Renamed implementation to KinesisShardSyncer. * Removed wild card imports introduced in previous commit * Minor refactoring in Worker Builder * Added license info to ShardSyncer interface. Minor refactoring * Changes to chain constructor in LeaseCoordinator * Changed accessor on InitialPositionInStreamExtended factory methods. Minor changes in Worker builder. * Changes to support periodic shard sync * Patching changes left out in merge * Overriding shard-sync idle time to 0 for periodic shard-sync * Addressed PR feedback * Addresed PR #579 review comments * Modified constructor for DeterministicShuffleShardSyncLeaderDecider * Addressed PR comments * Fixed failing test * Removed redundant member varible * Re-enable metrics for shard-sync * Removed unused method from ShardSyncTaskManager
This commit is contained in:
parent
8f58bafacc
commit
ba984fe279
4 changed files with 9 additions and 25 deletions
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
|
|
@ -37,21 +38,21 @@ class PeriodicShardSyncManager {
|
|||
|
||||
private final String workerId;
|
||||
private final LeaderDecider leaderDecider;
|
||||
private final ShardSyncTask shardSyncTask;
|
||||
private final ITask metricsEmittingShardSyncTask;
|
||||
private final ScheduledExecutorService shardSyncThreadPool;
|
||||
private boolean isRunning;
|
||||
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask) {
|
||||
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor());
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, IMetricsFactory metricsFactory) {
|
||||
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory);
|
||||
}
|
||||
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool) {
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, IMetricsFactory metricsFactory) {
|
||||
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
|
||||
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
|
||||
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
|
||||
this.workerId = workerId;
|
||||
this.leaderDecider = leaderDecider;
|
||||
this.shardSyncTask = shardSyncTask;
|
||||
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
|
||||
this.shardSyncThreadPool = shardSyncThreadPool;
|
||||
}
|
||||
|
||||
|
|
@ -79,7 +80,7 @@ class PeriodicShardSyncManager {
|
|||
try {
|
||||
if (leaderDecider.isLeader(workerId)) {
|
||||
LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
|
||||
shardSyncTask.call();
|
||||
metricsEmittingShardSyncTask.call();
|
||||
} else {
|
||||
LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
|
|||
|
||||
@Override
|
||||
public TaskResult onShardConsumerShutDown() {
|
||||
return shardSyncTaskManager.runShardSyncer();
|
||||
return onFoundCompletedShard();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -124,21 +124,4 @@ class ShardSyncTaskManager {
|
|||
}
|
||||
return submittedTaskFuture;
|
||||
}
|
||||
|
||||
synchronized TaskResult runShardSyncer() {
|
||||
Exception exception = null;
|
||||
|
||||
try {
|
||||
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||
leaseManager,
|
||||
initialPositionInStream,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
ignoreUnexpectedChildShards);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Caught exception while sync'ing Kinesis shards and leases", e);
|
||||
exception = e;
|
||||
}
|
||||
|
||||
return new TaskResult(exception);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1167,7 +1167,7 @@ public class Worker implements Runnable {
|
|||
new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
|
||||
shardSyncer)));
|
||||
shardSyncer), metricsFactory));
|
||||
}
|
||||
|
||||
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue