Adding account and stream epoch support. Checkpoint 1

This commit is contained in:
Ashwin Giridharan 2020-03-03 16:07:07 -08:00
parent fedd02c2a5
commit 2b507342d8
15 changed files with 115 additions and 60 deletions

View file

@ -171,6 +171,14 @@ public class MultiLangDaemonConfigurationTest {
utilsBean.setProperty(configuration, "retrievalMode", "invalid"); utilsBean.setProperty(configuration, "retrievalMode", "invalid");
} }
// @Test
// TODO : Enable this test once https://github.com/awslabs/amazon-kinesis-client/issues/692 is resolved
public void testmetricsEnabledDimensions() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setMetricsEnabledDimensions(new String[]{"Operation"});
configuration.resolvedConfiguration(shardRecordProcessorFactory);
}
@Test @Test
public void testFanoutConfigSetConsumerName() { public void testFanoutConfigSetConsumerName() {
String consumerArn = "test-consumer"; String consumerArn = "test-consumer";

View file

@ -9,6 +9,8 @@ import lombok.experimental.FieldDefaults;
@Accessors(fluent = true) @Accessors(fluent = true)
@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE) @FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE)
public class StreamConfig { public class StreamConfig {
String streamName; StreamIdentifier streamIdentifier;
InitialPositionInStreamExtended initialPositionInStreamExtended; InitialPositionInStreamExtended initialPositionInStreamExtended;
} }

View file

@ -0,0 +1,33 @@
package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.utils.Validate;
@RequiredArgsConstructor
@EqualsAndHashCode
@Getter
public class StreamIdentifier {
private final String accountName;
private final String streamName;
private final String streamCreationEpoch;
private static final String DEFAULT = "default";
@Override
public String toString(){
return accountName + ":" + streamName + ":" + streamCreationEpoch;
}
public static StreamIdentifier fromString(String streamIdentifier) {
final String[] idTokens = streamIdentifier.split(":");
Validate.isTrue(idTokens.length == 3, "Unable to deserialize StreamIdentifier from " + streamIdentifier);
return new StreamIdentifier(idTokens[0], idTokens[1], idTokens[2]);
}
public static StreamIdentifier fromStreamName(String streamName) {
Validate.notEmpty(streamName, "StreamName should not be empty");
return new StreamIdentifier(DEFAULT, streamName, DEFAULT);
}
}

View file

@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -45,6 +46,7 @@ import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -110,8 +112,8 @@ public class Scheduler implements Runnable {
private final DiagnosticEventHandler diagnosticEventHandler; private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;
private final Function<String, ShardSyncTaskManager> shardSyncTaskManagerProvider; private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<String, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>(); private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final ShardPrioritization shardPrioritization; private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -121,11 +123,11 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis; private final long failoverTimeMillis;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final Either<MultiStreamTracker, StreamConfig> appStreamTracker; private final Either<MultiStreamTracker, StreamConfig> appStreamTracker;
private final Map<String, StreamConfig> currentStreamConfigMap; private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final long listShardsBackoffTimeMillis; private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts; private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher; private final LeaseRefresher leaseRefresher;
private final Function<String, ShardDetector> shardDetectorProvider; private final Function<StreamIdentifier, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards; private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil; private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer; private final HierarchicalShardSyncer hierarchicalShardSyncer;
@ -183,15 +185,16 @@ public class Scheduler implements Runnable {
this.applicationName = this.coordinatorConfig.applicationName(); this.applicationName = this.coordinatorConfig.applicationName();
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker();
if(multiStreamTracker == null) { if(multiStreamTracker == null) {
final StreamConfig streamConfig = new StreamConfig(this.retrievalConfig.streamName(), final StreamConfig streamConfig = new StreamConfig(StreamIdentifier.fromStreamName(this.retrievalConfig.streamName()),
this.retrievalConfig.initialPositionInStreamExtended()); this.retrievalConfig.initialPositionInStreamExtended());
this.appStreamTracker = Either.right(streamConfig); this.appStreamTracker = Either.right(streamConfig);
this.currentStreamConfigMap = new HashMap<String, StreamConfig>() {{ this.currentStreamConfigMap = new HashMap<StreamIdentifier, StreamConfig>() {{
put(streamConfig.streamName(), streamConfig); put(streamConfig.streamIdentifier(), streamConfig);
}}; }};
} else { } else {
this.appStreamTracker = Either.left(multiStreamTracker); this.appStreamTracker = Either.left(multiStreamTracker);
this.currentStreamConfigMap = multiStreamTracker.streamConfigMap(); this.currentStreamConfigMap = multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
} }
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory(); this.metricsFactory = this.metricsConfig.metricsFactory();
@ -218,11 +221,11 @@ public class Scheduler implements Runnable {
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger(); this.diagnosticEventHandler = new DiagnosticEventLogger();
// TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigMap() // TODO : Pass the immutable map here instead of using mst.streamConfigList()
this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false))
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName)); .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -245,7 +248,7 @@ public class Scheduler implements Runnable {
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector(); this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream. // TODO : Halo : Check if this needs to be per stream.
@ -298,17 +301,17 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams // TODO: for already synced streams
for(String streamName : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { for(StreamIdentifier streamIdentifier : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) {
log.info("Syncing Kinesis shard info"); log.info("Syncing Kinesis shard info");
final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName), ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
leaseRefresher, streamConfig.initialPositionInStreamExtended(), leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchicalShardSyncer, metricsFactory); hierarchicalShardSyncer, metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
// Throwing the exception, to prevent further syncs for other stream. // Throwing the exception, to prevent further syncs for other stream.
if (result.getException() != null) { if (result.getException() != null) {
log.error("Caught exception when sync'ing info for " + streamName, result.getException()); log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException());
throw result.getException(); throw result.getException();
} }
} }
@ -366,10 +369,8 @@ public class Scheduler implements Runnable {
} }
for (ShardInfo completedShard : completedShards) { for (ShardInfo completedShard : completedShards) {
final String streamName = completedShard.streamName() final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifier());
.orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) {
Validate.notEmpty(streamName, "Stream name should not be empty");
if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
} }
} }
@ -629,8 +630,8 @@ public class Scheduler implements Runnable {
return consumer; return consumer;
} }
private ShardSyncTaskManager createOrGetShardSyncTaskManager(String streamName) { private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) {
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamName, s -> shardSyncTaskManagerProvider.apply(s)); return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s));
} }
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@ -640,18 +641,17 @@ public class Scheduler implements Runnable {
checkpoint); checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case, // The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application. // get the default stream name for the single stream application.
final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier());
Validate.notEmpty(streamName, "StreamName should not be empty");
// Irrespective of single stream app or multi stream app, streamConfig should always be available. // Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ? // TODO: Halo : if not available, construct a default config ?
final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty"); Validate.notNull(streamConfig, "StreamConfig should not be empty");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamConfig.streamName(), streamConfig.streamIdentifier(),
leaseCoordinator, leaseCoordinator,
executorService, executorService,
cache, cache,
shardRecordProcessorFactory.shardRecordProcessor(streamName), shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier),
checkpoint, checkpoint,
checkpointer, checkpointer,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
@ -664,7 +664,7 @@ public class Scheduler implements Runnable {
streamConfig.initialPositionInStreamExtended(), streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards, ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig.streamName()), shardDetectorProvider.apply(streamConfig.streamIdentifier()),
aggregatorUtil, aggregatorUtil,
hierarchicalShardSyncer, hierarchicalShardSyncer,
metricsFactory); metricsFactory);
@ -716,6 +716,17 @@ public class Scheduler implements Runnable {
executorStateEvent.accept(diagnosticEventHandler); executorStateEvent.accept(diagnosticEventHandler);
} }
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier;
if(streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get());
} else {
streamIdentifier = appStreamTracker.map(mst -> null, sc -> sc.streamIdentifier());
}
Validate.notNull(streamIdentifier, "Stream identifier should not be empty");
return streamIdentifier;
}
/** /**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on

View file

@ -29,7 +29,6 @@ import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -39,7 +38,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Pair;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -163,7 +161,7 @@ public class HierarchicalShardSyncer {
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>(); List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) { for (Lease lease : leaseRefresher.listLeases()) {
if (streamName.equals(((MultiStreamLease)lease).streamName())) { if (streamName.equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease); streamLeases.add(lease);
} }
} }
@ -772,7 +770,7 @@ public class HierarchicalShardSyncer {
} }
newLease.parentShardIds(parentShardIds); newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamName(streamName); newLease.streamIdentifier(streamName);
newLease.shardId(shard.shardId()); newLease.shardId(shard.shardId());
return newLease; return newLease;
} }

View file

@ -17,13 +17,13 @@ import static com.google.common.base.Verify.verifyNotNull;
@Accessors(fluent = true) @Accessors(fluent = true)
public class MultiStreamLease extends Lease { public class MultiStreamLease extends Lease {
@NonNull private String streamName; @NonNull private String streamIdentifier;
@NonNull private String shardId; @NonNull private String shardId;
public MultiStreamLease(Lease other) { public MultiStreamLease(Lease other) {
super(other); super(other);
MultiStreamLease casted = validateAndCast(other); MultiStreamLease casted = validateAndCast(other);
streamName(casted.streamName); streamIdentifier(casted.streamIdentifier);
shardId(casted.shardId); shardId(casted.shardId);
} }
@ -31,7 +31,7 @@ public class MultiStreamLease extends Lease {
public void update(Lease other) { public void update(Lease other) {
MultiStreamLease casted = validateAndCast(other); MultiStreamLease casted = validateAndCast(other);
super.update(casted); super.update(casted);
streamName(casted.streamName); streamIdentifier(casted.streamIdentifier);
shardId(casted.shardId); shardId(casted.shardId);
} }
@ -43,7 +43,7 @@ public class MultiStreamLease extends Lease {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(super.hashCode(), streamName); return Objects.hash(super.hashCode(), streamIdentifier);
} }
@Override @Override
@ -58,11 +58,11 @@ public class MultiStreamLease extends Lease {
return false; return false;
} }
MultiStreamLease other = (MultiStreamLease) obj; MultiStreamLease other = (MultiStreamLease) obj;
if (streamName == null) { if (streamIdentifier == null) {
if (other.streamName != null) { if (other.streamIdentifier != null) {
return false; return false;
} }
} else if (!streamName.equals(other.streamName)) { } else if (!streamIdentifier.equals(other.streamIdentifier)) {
return false; return false;
} }
return true; return true;

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ToString @ToString
public class ShardInfo { public class ShardInfo {
private final Optional<String> streamName; private final Optional<String> streamIdentifier;
private final String shardId; private final String shardId;
private final String concurrencyToken; private final String concurrencyToken;
// Sorted list of parent shardIds. // Sorted list of parent shardIds.
@ -67,7 +67,7 @@ public class ShardInfo {
final String concurrencyToken, final String concurrencyToken,
final Collection<String> parentShardIds, final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber checkpoint,
final String streamName) { final String streamIdentifier) {
this.shardId = shardId; this.shardId = shardId;
this.concurrencyToken = concurrencyToken; this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>(); this.parentShardIds = new LinkedList<>();
@ -78,7 +78,7 @@ public class ShardInfo {
// This makes it easy to check for equality in ShardInfo.equals method. // This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds); Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.streamName = Optional.ofNullable(streamName); this.streamIdentifier = Optional.ofNullable(streamIdentifier);
} }
/** /**
@ -105,7 +105,7 @@ public class ShardInfo {
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder() return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName.orElse("")).toHashCode(); .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifier.orElse("")).toHashCode();
} }
/** /**
@ -130,7 +130,7 @@ public class ShardInfo {
ShardInfo other = (ShardInfo) obj; ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamName.orElse(""), other.streamName.orElse("")).isEquals(); .append(streamIdentifier.orElse(""), other.streamIdentifier.orElse("")).isEquals();
} }
@ -140,8 +140,8 @@ public class ShardInfo {
* @return * @return
*/ */
public static String getLeaseKey(ShardInfo shardInfo) { public static String getLeaseKey(ShardInfo shardInfo) {
return shardInfo.streamName().isPresent() ? return shardInfo.streamIdentifier().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamName().get(), shardInfo.shardId()) : MultiStreamLease.getLeaseKey(shardInfo.streamIdentifier().get(), shardInfo.shardId()) :
shardInfo.shardId(); shardInfo.shardId();
} }

View file

@ -382,7 +382,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
public static ShardInfo convertLeaseToAssignment(final Lease lease) { public static ShardInfo convertLeaseToAssignment(final Lease lease) {
if (lease instanceof MultiStreamLease) { if (lease instanceof MultiStreamLease) {
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint(), ((MultiStreamLease) lease).streamName()); lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier());
} else { } else {
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint()); lease.checkpoint());

View file

@ -21,7 +21,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer
public Map<String, AttributeValue> toDynamoRecord(Lease lease) { public Map<String, AttributeValue> toDynamoRecord(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease); final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValue> result = super.toDynamoRecord(multiStreamLease); final Map<String, AttributeValue> result = super.toDynamoRecord(multiStreamLease);
result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamName())); result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()));
result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId())); result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId()));
return result; return result;
} }
@ -30,7 +30,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer
public MultiStreamLease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) { public MultiStreamLease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) {
final MultiStreamLease multiStreamLease = (MultiStreamLease) super final MultiStreamLease multiStreamLease = (MultiStreamLease) super
.fromDynamoRecord(dynamoRecord, new MultiStreamLease()); .fromDynamoRecord(dynamoRecord, new MultiStreamLease());
multiStreamLease.streamName(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY)); multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY));
multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY)); multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY));
return multiStreamLease; return multiStreamLease;
} }
@ -40,7 +40,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) { public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease); final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValueUpdate> result = super.getDynamoUpdateLeaseUpdate(multiStreamLease); final Map<String, AttributeValueUpdate> result = super.getDynamoUpdateLeaseUpdate(multiStreamLease);
result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamName()))); result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())));
result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId()))); result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId())));
return result; return result;
} }

View file

@ -21,8 +21,8 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -41,7 +41,7 @@ public class ShardConsumerArgument {
@NonNull @NonNull
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
@NonNull @NonNull
private final String streamName; private final StreamIdentifier streamIdentifier;
@NonNull @NonNull
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;
@NonNull @NonNull

View file

@ -2,6 +2,7 @@ package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -15,5 +16,5 @@ public interface MultiStreamTracker {
* *
* @return List of stream names * @return List of stream names
*/ */
Map<String, StreamConfig> streamConfigMap(); List<StreamConfig> streamConfigList();
} }

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.processor; package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.StreamIdentifier;
/** /**
* *
*/ */
@ -28,10 +30,10 @@ public interface ShardRecordProcessorFactory {
/** /**
* Returns a new instance of the ShardRecordProcessor for a stream * Returns a new instance of the ShardRecordProcessor for a stream
* @param streamName * @param streamIdentifier
* @return ShardRecordProcessor * @return ShardRecordProcessor
*/ */
default ShardRecordProcessor shardRecordProcessor(String streamName) { default ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
return shardRecordProcessor(); return shardRecordProcessor();
} }
} }

View file

@ -47,7 +47,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override @Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
final String streamName = shardInfo.streamName().orElse(defaultStreamName); final String streamName = shardInfo.streamIdentifier().orElse(defaultStreamName);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
} }

View file

@ -63,7 +63,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy( return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
} }
@Override @Override

View file

@ -68,7 +68,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy( return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait)); maxRecords, metricsFactory, maxFutureWait));
} }