Fixing conflicts.

This commit is contained in:
Sahil Palvia 2018-04-18 16:43:04 -07:00 committed by Justin Pfifer
parent 0104a91828
commit fb531b1b13
3 changed files with 11 additions and 5 deletions

View file

@ -15,12 +15,16 @@
package software.amazon.kinesis.checkpoint; package software.amazon.kinesis.checkpoint;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
/** /**
* *
*/ */
public interface CheckpointFactory { public interface CheckpointFactory {
Checkpointer createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator); Checkpointer createCheckpointer(LeaseCoordinator<KinesisClientLease> leaseCoordinator,
ILeaseManager<KinesisClientLease> leaseManager);
} }

View file

@ -143,14 +143,15 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig; this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName(); this.applicationName = this.coordinatorConfig.applicationName();
this.leaseCoordinator = this.leaseCoordinator =
this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator(); this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator();
this.leaseManager = this.leaseCoordinator.leaseManager();
// //
// TODO: Figure out what to do with lease manage <=> checkpoint relationship // TODO: Figure out what to do with lease manage <=> checkpoint relationship
// //
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(this.leaseCoordinator); this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator,
this.leaseManager);
this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis(); this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
@ -174,7 +175,6 @@ public class Scheduler implements Runnable {
this.streamName = this.retrievalConfig.streamName(); this.streamName = this.retrievalConfig.streamName();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.leaseManager = this.leaseCoordinator.leaseManager();
this.leaseManagerProxy = this.shardSyncTaskManager.leaseManagerProxy(); this.leaseManagerProxy = this.shardSyncTaskManager.leaseManagerProxy();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
} }

View file

@ -50,6 +50,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibN
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
@ -458,7 +459,8 @@ public class SchedulerTest {
private class TestKinesisCheckpointFactory implements CheckpointFactory { private class TestKinesisCheckpointFactory implements CheckpointFactory {
@Override @Override
public Checkpointer createCheckpoint() { public Checkpointer createCheckpointer(final LeaseCoordinator<KinesisClientLease> leaseCoordinator,
final ILeaseManager<KinesisClientLease> leaseManager) {
return checkpoint; return checkpoint;
} }
} }