Shard Syncer and Logging changes

This commit is contained in:
Ashwin Giridharan 2020-03-23 13:13:49 -07:00
parent c00c943a79
commit a6f767bf96
7 changed files with 30 additions and 19 deletions

View file

@ -135,7 +135,7 @@ public class Scheduler implements Runnable {
private final boolean isMultiStreamMode;
// TODO : halo : make sure we generate streamConfig if entry not present.
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final MultiStreamTracker multiStreamTracker;
private MultiStreamTracker multiStreamTracker;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
@ -201,14 +201,13 @@ public class Scheduler implements Runnable {
this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> true, streamConfig -> false);
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker ->
multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)),
multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker;
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
this.multiStreamTracker = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> multiStreamTracker,
streamConfig -> null);
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.

View file

@ -45,6 +45,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
@ -82,8 +83,8 @@ public class ShutdownTask implements ConsumerTask {
private final TaskType taskType = TaskType.SHUTDOWN;
private String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
private static final Function<ShardInfo, String> shardInfoIdProvider = shardInfo -> shardInfo
.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId());
/*
* Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc)
@ -114,7 +115,7 @@ public class ShutdownTask implements ConsumerTask {
if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
localReason = ShutdownReason.LEASE_LOST;
dropLease();
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoId);
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoIdProvider.apply(shardInfo));
}
}
@ -126,7 +127,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfoId, shardInfo.concurrencyToken(), localReason);
shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), localReason);
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
.checkpointer(recordProcessorCheckpointer).build();
final long startTime = System.currentTimeMillis();
@ -137,7 +138,7 @@ public class ShutdownTask implements ConsumerTask {
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfoId + ". Application must checkpoint upon shard end. " +
+ shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
} else {
@ -145,7 +146,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Shutting down retrieval strategy.");
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfoId);
log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo));
} catch (Exception e) {
applicationException = true;
throw e;
@ -154,11 +155,11 @@ public class ShutdownTask implements ConsumerTask {
}
if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfoId);
log.debug("Looking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo));
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfoId);
log.debug("Finished checking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo));
}
return new TaskResult(null);

View file

@ -57,6 +57,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
@ -173,6 +174,7 @@ public class SchedulerTest {
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
@ -641,6 +643,7 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception"))

View file

@ -98,6 +98,7 @@ public class HierarchicalShardSyncerTest {
@Before
public void setup() {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
private void setupMultiStream() {
@ -1155,7 +1156,7 @@ public class HierarchicalShardSyncerTest {
// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
// * Current leases: (4, 5, 7)
// */
@Test
// @Test
public void understandLeaseBehavior() {
final List<Shard> shards = constructShardListForGraphA();
// final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),

View file

@ -119,10 +119,10 @@ public class ConsumerStatesTest {
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
argument, taskExecutionListener, 0));
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
}

View file

@ -76,6 +76,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -120,7 +121,7 @@ public class PrefetchRecordsPublisherTest {
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream"));
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchRecordsPublisher(
MAX_SIZE,

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Ignore;
@ -23,6 +24,7 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -37,11 +39,15 @@ public class RecordsFetcherFactoryTest {
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private MetricsFactory metricsFactory;
@Mock
private KinesisDataFetcher kinesisDataFetcher;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory();
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher);
when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
@Test