Fixing conflicts.
This commit is contained in:
parent
2ea0f070e0
commit
d4a36f2b28
3 changed files with 11 additions and 5 deletions
|
|
@ -15,12 +15,16 @@
|
|||
|
||||
package software.amazon.kinesis.checkpoint;
|
||||
|
||||
import software.amazon.kinesis.leases.ILeaseManager;
|
||||
import software.amazon.kinesis.leases.KinesisClientLease;
|
||||
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.processor.Checkpointer;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface CheckpointFactory {
|
||||
Checkpointer createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator);
|
||||
Checkpointer createCheckpointer(LeaseCoordinator<KinesisClientLease> leaseCoordinator,
|
||||
ILeaseManager<KinesisClientLease> leaseManager);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,14 +143,15 @@ public class Scheduler implements Runnable {
|
|||
this.retrievalConfig = retrievalConfig;
|
||||
|
||||
this.applicationName = this.coordinatorConfig.applicationName();
|
||||
|
||||
this.leaseCoordinator =
|
||||
this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator();
|
||||
this.leaseManager = this.leaseCoordinator.leaseManager();
|
||||
|
||||
//
|
||||
// TODO: Figure out what to do with lease manage <=> checkpoint relationship
|
||||
//
|
||||
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(this.leaseCoordinator);
|
||||
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator,
|
||||
this.leaseManager);
|
||||
|
||||
this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis();
|
||||
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
|
||||
|
|
@ -174,7 +175,6 @@ public class Scheduler implements Runnable {
|
|||
this.streamName = this.retrievalConfig.streamName();
|
||||
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
|
||||
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
|
||||
this.leaseManager = this.leaseCoordinator.leaseManager();
|
||||
this.leaseManagerProxy = this.shardSyncTaskManager.leaseManagerProxy();
|
||||
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibN
|
|||
import software.amazon.kinesis.checkpoint.Checkpoint;
|
||||
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||
import software.amazon.kinesis.checkpoint.CheckpointFactory;
|
||||
import software.amazon.kinesis.leases.ILeaseManager;
|
||||
import software.amazon.kinesis.leases.KinesisClientLease;
|
||||
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
|
|
@ -458,7 +459,8 @@ public class SchedulerTest {
|
|||
|
||||
private class TestKinesisCheckpointFactory implements CheckpointFactory {
|
||||
@Override
|
||||
public Checkpointer createCheckpoint() {
|
||||
public Checkpointer createCheckpointer(final LeaseCoordinator<KinesisClientLease> leaseCoordinator,
|
||||
final ILeaseManager<KinesisClientLease> leaseManager) {
|
||||
return checkpoint;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue