From ba984fe279219ae7a2023d9cfae53f8abf656fac Mon Sep 17 00:00:00 2001 From: Parijat Sinha Date: Tue, 13 Aug 2019 16:37:57 -0700 Subject: [PATCH] Adding support for metrics in PeriodicShardSyncer (#592) * Changes to support injection of ShardSyncer, LeaseTaker, and LeaseRenewer into KCL Worker * Additional checks around injection of LeaseRenewer and LeaseRenewerThreadPool * Changed accessor on InitialPositionInStreamExtended to public to allow ShardSyncer injection * Changed ShardSyncer to a public interface. Renamed implementation to KinesisShardSyncer. * Removed wild card imports introduced in previous commit * Minor refactoring in Worker Builder * Added license info to ShardSyncer interface. Minor refactoring * Changes to chain constructor in LeaseCoordinator * Changed accessor on InitialPositionInStreamExtended factory methods. Minor changes in Worker builder. * Changes to support periodic shard sync * Patching changes left out in merge * Overriding shard-sync idle time to 0 for periodic shard-sync * Addressed PR feedback * Addresed PR #579 review comments * Modified constructor for DeterministicShuffleShardSyncLeaderDecider * Addressed PR comments * Fixed failing test * Removed redundant member varible * Re-enable metrics for shard-sync * Removed unused method from ShardSyncTaskManager --- .../lib/worker/PeriodicShardSyncManager.java | 13 +++++++------ .../lib/worker/ShardEndShardSyncStrategy.java | 2 +- .../lib/worker/ShardSyncTaskManager.java | 17 ----------------- .../clientlibrary/lib/worker/Worker.java | 2 +- 4 files changed, 9 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index f2fa165e..d129944f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.commons.lang3.Validate; @@ -37,21 +38,21 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; - private final ShardSyncTask shardSyncTask; + private final ITask metricsEmittingShardSyncTask; private final ScheduledExecutorService shardSyncThreadPool; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor()); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, IMetricsFactory metricsFactory) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, IMetricsFactory metricsFactory) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; - this.shardSyncTask = shardSyncTask; + this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); this.shardSyncThreadPool = shardSyncThreadPool; } @@ -79,7 +80,7 @@ class PeriodicShardSyncManager { try { if (leaderDecider.isLeader(workerId)) { LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - shardSyncTask.call(); + metricsEmittingShardSyncTask.call(); } else { LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java index 49adbf8b..dc620aec 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -52,7 +52,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { @Override public TaskResult onShardConsumerShutDown() { - return shardSyncTaskManager.runShardSyncer(); + return onFoundCompletedShard(); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index fb994f50..9601cf64 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -124,21 +124,4 @@ class ShardSyncTaskManager { } return submittedTaskFuture; } - - synchronized TaskResult runShardSyncer() { - Exception exception = null; - - try { - shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - initialPositionInStream, - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards); - } catch (Exception e) { - LOG.error("Caught exception while sync'ing Kinesis shards and leases", e); - exception = e; - } - - return new TaskResult(exception); - } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 64179b3c..22d74ba8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1167,7 +1167,7 @@ public class Worker implements Runnable { new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(), config.shouldCleanupLeasesUponShardCompletion(), config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer))); + shardSyncer), metricsFactory)); } private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {