diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java index 51c2cc26..2d8c86d2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java @@ -15,12 +15,16 @@ package software.amazon.kinesis.checkpoint; +import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.processor.Checkpointer; /** * */ public interface CheckpointFactory { - Checkpointer createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator); + Checkpointer createCheckpointer(LeaseCoordinator leaseCoordinator, + ILeaseManager leaseManager); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index cbe3a336..cf11395e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -143,14 +143,15 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); - this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator(); + this.leaseManager = this.leaseCoordinator.leaseManager(); // // TODO: Figure out what to do with lease manage <=> checkpoint relationship // - this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(this.leaseCoordinator); + this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator, + this.leaseManager); this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); @@ -174,7 +175,6 @@ public class Scheduler implements Runnable { this.streamName = this.retrievalConfig.streamName(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); - this.leaseManager = this.leaseCoordinator.leaseManager(); this.leaseManagerProxy = this.shardSyncTaskManager.leaseManagerProxy(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 0d2e4b1d..83b1b48b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -50,6 +50,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibN import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; +import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -458,7 +459,8 @@ public class SchedulerTest { private class TestKinesisCheckpointFactory implements CheckpointFactory { @Override - public Checkpointer createCheckpoint() { + public Checkpointer createCheckpointer(final LeaseCoordinator leaseCoordinator, + final ILeaseManager leaseManager) { return checkpoint; } }