Get consumer running again

Temporary fix for lease management <=> checkpoint issue

Initialize the get records cache with the shard position, and pass
that configuration to the data fetcher.
This commit is contained in:
Pfifer, Justin 2018-04-18 11:08:21 -07:00
parent 4abbbfbaa7
commit a6f4aa9651
17 changed files with 86 additions and 47 deletions

View file

@ -15,12 +15,12 @@
package software.amazon.kinesis.checkpoint;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.processor.ICheckpoint;
/**
*
*/
public interface CheckpointFactory {
ICheckpoint createCheckpoint();
ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator);
}

View file

@ -40,14 +40,7 @@ public class DynamoDBCheckpointFactory implements CheckpointFactory {
private final IMetricsFactory metricsFactory;
@Override
public ICheckpoint createCheckpoint() {
return new KinesisClientLibLeaseCoordinator(leaseManager,
workerIdentifier,
failoverTimeMillis,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
metricsFactory);
public ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator) {
return leaseCoordinator;
}
}

View file

@ -37,7 +37,6 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
@ -144,12 +143,19 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName();
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint();
this.leaseCoordinator =
this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator();
//
// TODO: Figure out what to do with lease manage <=> checkpoint relationship
//
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(this.leaseCoordinator);
this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.leaseCoordinator =
this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator();
this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory().createShardSyncTaskManager();
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
@ -527,7 +533,7 @@ public class Scheduler implements Runnable {
streamName,
leaseManager,
executorService,
retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo),
retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory),
processorFactory.createRecordProcessor(),
checkpoint,
coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,

View file

@ -168,7 +168,8 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
// Don't renew expired lease during regular renewals. getCopyOfHeldLease may have returned null
// triggering the application processing to treat this as a lost lease (fail checkpoint with
// ShutdownException).
if (renewEvenIfExpired || (!lease.isExpired(leaseDurationNanos, System.nanoTime()))) {
boolean isLeaseExpired = lease.isExpired(leaseDurationNanos, System.nanoTime());
if (renewEvenIfExpired || !isLeaseExpired) {
renewedLease = leaseManager.renewLease(lease);
}
if (renewedLease) {

View file

@ -257,7 +257,8 @@ class ConsumerStates {
return new InitializeTask(consumer.shardInfo(),
consumer.recordProcessor(),
consumer.checkpoint(),
consumer.recordProcessorCheckpointer(),
consumer.recordProcessorCheckpointer(), consumer.initialPositionInStream(),
consumer.getRecordsCache(),
consumer.taskBackoffTimeMillis());
}

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
@ -21,6 +22,7 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.MetricsLevel;
@ -43,6 +45,11 @@ public class InitializeTask implements ITask {
private final ICheckpoint checkpoint;
@NonNull
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final GetRecordsCache cache;
// Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis;
@ -64,6 +71,8 @@ public class InitializeTask implements ITask {
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId());
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
cache.start(initialCheckpoint, initialPositionInStream);
recordProcessorCheckpointer.largestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);

View file

@ -15,9 +15,12 @@
package software.amazon.kinesis.retrieval;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the
* GetRecordsRetrievalStrategy class.
@ -33,7 +36,8 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
}
@Override
public void start() {
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
//
// Nothing to do here
//

View file

@ -15,7 +15,9 @@
package software.amazon.kinesis.retrieval;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* This class is used as a cache for Prefetching data from Kinesis.
@ -24,7 +26,7 @@ public interface GetRecordsCache {
/**
* This method calls the start behavior on the cache, if available.
*/
void start();
void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended);
/**
* This method returns the next set of records from the Cache if present, or blocks the request till it gets the

View file

@ -20,6 +20,7 @@ import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.apache.commons.lang.Validate;
import com.amazonaws.SdkClientException;
@ -34,6 +35,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
@ -105,11 +107,13 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
}
@Override
public void start() {
public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
if (executorService.isShutdown()) {
throw new IllegalStateException("ExecutorService has been shutdown.");
}
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
if (!started) {
log.info("Starting prefetching thread.");
executorService.execute(defaultGetRecordsCacheDaemon);
@ -150,7 +154,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public void addDataArrivedListener(DataArrivedListener dataArrivedListener) {
if (dataArrivedListener != null) {
if (this.dataArrivedListener != null) {
log.warn("Attempting to reset the data arrived listener for {}. This shouldn't happen", shardId);
}
this.dataArrivedListener = dataArrivedListener;

View file

@ -19,8 +19,8 @@ import java.util.Optional;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
@ -119,6 +119,7 @@ public class RetrievalConfig {
public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) {
retrievalFactory = new SynchronousBlockingRetrievalFactory(streamName(), amazonKinesis(),
recordsFetcherFactory,
listShardsBackoffTimeInMillis(), maxListShardsRetryAttempts(), maxRecords());
}
return retrievalFactory;

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsFactory;
/**
*
@ -23,5 +24,5 @@ import software.amazon.kinesis.leases.ShardInfo;
public interface RetrievalFactory {
GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo);
GetRecordsCache createGetRecordsCache(ShardInfo shardInfo);
GetRecordsCache createGetRecordsCache(ShardInfo shardInfo, IMetricsFactory metricsFactory);
}

View file

@ -19,6 +19,7 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
import lombok.Data;
import lombok.NonNull;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsFactory;
/**
*
@ -34,6 +35,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
private final String streamName;
@NonNull
private final AmazonKinesis amazonKinesis;
private final RecordsFetcherFactory recordsFetcherFactory;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private final int maxRecords;
@ -45,7 +48,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
}
@Override
public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo) {
throw new UnsupportedOperationException();
public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo, IMetricsFactory metricsFactory) {
return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo), shardInfo.shardId(), metricsFactory, maxRecords);
}
}

View file

@ -65,6 +65,7 @@ import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
@ -131,7 +132,7 @@ public class SchedulerTest {
when(leaseCoordinator.leaseManager()).thenReturn(leaseManager);
when(shardSyncTaskManager.leaseManagerProxy()).thenReturn(leaseManagerProxy);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class))).thenReturn(getRecordsCache);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(IMetricsFactory.class))).thenReturn(getRecordsCache);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
@ -457,7 +458,7 @@ public class SchedulerTest {
private class TestKinesisCheckpointFactory implements CheckpointFactory {
@Override
public ICheckpoint createCheckpoint() {
public ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator) {
return checkpoint;
}
}

View file

@ -308,7 +308,7 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
verify(getRecordsCache).start(any(ExtendedSequenceNumber.class), any(InitialPositionInStreamExtended.class));
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {
@ -410,7 +410,7 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
verify(getRecordsCache).start(any(ExtendedSequenceNumber.class), any(InitialPositionInStreamExtended.class));
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {

View file

@ -32,6 +32,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -50,6 +51,7 @@ import com.amazonaws.services.kinesis.model.Record;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* These are the integration tests for the PrefetchGetRecordsCache class.
@ -74,6 +76,10 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@Mock
private AmazonKinesis amazonKinesis;
@Mock
private ExtendedSequenceNumber extendedSequenceNumber;
@Mock
private InitialPositionInStreamExtended initialPosition;
@Before
public void setup() {
@ -95,7 +101,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@Test
public void testRollingCache() {
getRecordsCache.start();
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);
ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult();
@ -111,7 +117,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@Test
public void testFullCache() {
getRecordsCache.start();
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS);
assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE);
@ -141,7 +147,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
operation,
"test-shard-2");
getRecordsCache.start();
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);
final Record record = mock(Record.class);
@ -152,7 +158,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
records.add(record);
records.add(record);
records.add(record);
getRecordsCache2.start();
getRecordsCache2.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);
@ -181,7 +187,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
}).thenCallRealMethod();
doNothing().when(dataFetcher).restartIterator();
getRecordsCache.start();
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();

View file

@ -37,6 +37,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -52,6 +53,7 @@ import com.amazonaws.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.PrefetchGetRecordsCache;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* Test class for the PrefetchGetRecordsCache class.
@ -73,6 +75,10 @@ public class PrefetchGetRecordsCacheTest {
private Record record;
@Mock
private KinesisDataFetcher dataFetcher;
@Mock
private InitialPositionInStreamExtended initialPosition;
@Mock
private ExtendedSequenceNumber sequenceNumber;
private List<Record> records;
private ExecutorService executorService;
@ -80,6 +86,7 @@ public class PrefetchGetRecordsCacheTest {
private PrefetchGetRecordsCache getRecordsCache;
private String operation = "ProcessTask";
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
@ -114,7 +121,7 @@ public class PrefetchGetRecordsCacheTest {
records.add(record);
records.add(record);
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
ProcessRecordsInput result = getRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
@ -130,7 +137,7 @@ public class PrefetchGetRecordsCacheTest {
records.add(record);
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
// Sleep for a few seconds for the cache to fill up.
sleep(2000);
@ -144,7 +151,7 @@ public class PrefetchGetRecordsCacheTest {
int recordsSize = 4500;
when(records.size()).thenReturn(recordsSize);
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
sleep(2000);
@ -159,7 +166,7 @@ public class PrefetchGetRecordsCacheTest {
int recordsSize = 200;
when(records.size()).thenReturn(recordsSize);
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
// Sleep for a few seconds for the cache to fill up.
sleep(2000);
@ -175,7 +182,7 @@ public class PrefetchGetRecordsCacheTest {
IntStream.range(0, recordsSize).forEach(i -> records.add(record));
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
verify(executorService).execute(any());
@ -207,7 +214,7 @@ public class PrefetchGetRecordsCacheTest {
@Test
public void testExpiredIteratorException() {
getRecordsCache.start();
getRecordsCache.start(sequenceNumber, initialPosition);
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult);
doNothing().when(dataFetcher).restartIterator();