From 67d54045c1971926609acfdec446c4653db4a76e Mon Sep 17 00:00:00 2001
From: Furqaan Ali
Date: Fri, 1 Nov 2024 14:40:41 -0700
Subject: [PATCH] Merge from multilang-kclv3 branch: Add multi-lang support for
new configs, refactor enums, and update documentation
This is a squash of commits a144dfaac117415c400e8786b98060f5660d4276 through 4185f6e72520744e8a18cd04c550bc57a1bfd298.
---
amazon-kinesis-client-multilang/pom.xml | 13 +
.../kinesis/multilang/MultiLangDaemon.java | 6 +-
.../config/CoordinatorStateConfigBean.java | 56 ++++
.../GracefulLeaseHandoffConfigBean.java | 41 +++
.../config/MultiLangDaemonConfiguration.java | 59 +++++
.../config/WorkerMetricsTableConfigBean.java | 56 ++++
...rUtilizationAwareAssignmentConfigBean.java | 106 ++++++++
.../config/converter/DurationConverter.java | 37 +++
.../multilang/MultiLangDaemonTest.java | 8 +-
.../ConfigurationSettableUtilsTest.java | 17 ++
.../KinesisClientLibConfiguratorTest.java | 151 +++++++++++
.../MultiLangDaemonConfigurationTest.java | 248 ++++++++++++++++-
.../config/PropertiesMappingE2ETest.java | 249 ++++++++++++++++++
...lizationAwareAssignmentConfigBeanTest.java | 68 +++++
.../src/test/resources/multilang.properties | 70 +++++
.../src/test/resources/multilangv3.properties | 167 ++++++++++++
.../coordinator/CoordinatorConfig.java | 6 +-
...DynamicMigrationComponentsInitializer.java | 26 +-
.../coordinator/migration/ClientVersion.java | 18 +-
.../migration/ClientVersionChangeMonitor.java | 6 +-
.../MigrationClientVersion2xState.java | 20 +-
.../MigrationClientVersion3xState.java | 4 +-
...ationClientVersion3xWithRollbackState.java | 24 +-
...igrationClientVersionStateInitializer.java | 52 ++--
...rationClientVersionUpgradeFrom2xState.java | 34 +--
.../migration/MigrationStateMachineImpl.java | 12 +-
.../kinesis/leases/LeaseManagementConfig.java | 12 +-
.../coordinator/CoordinatorStateDAOTest.java | 14 +-
...micMigrationComponentsInitializerTest.java | 32 +--
.../ClientVersionChangeMonitorTest.java | 16 +-
.../migration/MigrationStateMachineTest.java | 44 ++--
31 files changed, 1503 insertions(+), 169 deletions(-)
create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java
create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java
create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java
create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java
create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java
create mode 100644 amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java
create mode 100644 amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java
create mode 100644 amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties
diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index f5f21d96..ea3acfe1 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -104,6 +104,12 @@
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.11.3
+ test
+
junit
junit
@@ -122,6 +128,13 @@
1.3
test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ 3.12.4
+ test
+
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java
index 4588b246..94bdfd10 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java
@@ -141,7 +141,7 @@ public class MultiLangDaemon {
}
}
- String propertiesFile(final MultiLangDaemonArguments arguments) {
+ String validateAndGetPropertiesFileName(final MultiLangDaemonArguments arguments) {
String propertiesFile = "";
if (CollectionUtils.isNotEmpty(arguments.parameters)) {
@@ -216,9 +216,9 @@ public class MultiLangDaemon {
MultiLangDaemonArguments arguments = new MultiLangDaemonArguments();
JCommander jCommander = daemon.buildJCommanderAndParseArgs(arguments, args);
try {
- String propertiesFile = daemon.propertiesFile(arguments);
+ String propertiesFileName = daemon.validateAndGetPropertiesFileName(arguments);
daemon.configureLogging(arguments.logConfiguration);
- MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFile);
+ MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFileName);
Scheduler scheduler = daemon.buildScheduler(config);
MultiLangRunner runner = new MultiLangRunner(scheduler);
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java
new file mode 100644
index 00000000..eaf93f49
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2024 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTableConfig;
+
+@Getter
+@Setter
+public class CoordinatorStateConfigBean {
+
+ interface CoordinatorStateConfigBeanDelegate {
+ String getCoordinatorStateTableName();
+
+ void setCoordinatorStateTableName(String value);
+
+ BillingMode getCoordinatorStateBillingMode();
+
+ void setCoordinatorStateBillingMode(BillingMode value);
+
+ long getCoordinatorStateReadCapacity();
+
+ void setCoordinatorStateReadCapacity(long value);
+
+ long getCoordinatorStateWriteCapacity();
+
+ void setCoordinatorStateWriteCapacity(long value);
+ }
+
+ @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "tableName")
+ private String coordinatorStateTableName;
+
+ @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "billingMode")
+ private BillingMode coordinatorStateBillingMode;
+
+ @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "readCapacity")
+ private long coordinatorStateReadCapacity;
+
+ @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "writeCapacity")
+ private long coordinatorStateWriteCapacity;
+}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java
new file mode 100644
index 00000000..97327962
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2024 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import software.amazon.kinesis.leases.LeaseManagementConfig;
+
+@Getter
+@Setter
+public class GracefulLeaseHandoffConfigBean {
+
+ interface GracefulLeaseHandoffConfigBeanDelegate {
+ Long getGracefulLeaseHandoffTimeoutMillis();
+
+ void setGracefulLeaseHandoffTimeoutMillis(Long value);
+
+ Boolean getIsGracefulLeaseHandoffEnabled();
+
+ void setIsGracefulLeaseHandoffEnabled(Boolean value);
+ }
+
+ @ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class)
+ private Long gracefulLeaseHandoffTimeoutMillis;
+
+ @ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class)
+ private Boolean isGracefulLeaseHandoffEnabled;
+}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
index 3336be88..5c7a11b2 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
@@ -17,6 +17,7 @@ package software.amazon.kinesis.multilang.config;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@@ -41,6 +42,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
@@ -55,6 +57,7 @@ import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsLevel;
+import software.amazon.kinesis.multilang.config.converter.DurationConverter;
import software.amazon.kinesis.multilang.config.credentials.V2CredentialWrapper;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@@ -156,6 +159,9 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = CoordinatorConfig.class)
private long schedulerInitializationBackoffTimeMillis;
+ @ConfigurationSettable(configurationClass = CoordinatorConfig.class)
+ private CoordinatorConfig.ClientVersionConfig clientVersionConfig;
+
@ConfigurationSettable(configurationClass = LifecycleConfig.class)
private long taskBackoffTimeMillis;
@@ -189,6 +195,20 @@ public class MultiLangDaemonConfiguration {
@Delegate(types = PollingConfigBean.PollingConfigBeanDelegate.class)
private final PollingConfigBean pollingConfig = new PollingConfigBean();
+ @Delegate(types = GracefulLeaseHandoffConfigBean.GracefulLeaseHandoffConfigBeanDelegate.class)
+ private final GracefulLeaseHandoffConfigBean gracefulLeaseHandoffConfigBean = new GracefulLeaseHandoffConfigBean();
+
+ @Delegate(
+ types = WorkerUtilizationAwareAssignmentConfigBean.WorkerUtilizationAwareAssignmentConfigBeanDelegate.class)
+ private final WorkerUtilizationAwareAssignmentConfigBean workerUtilizationAwareAssignmentConfigBean =
+ new WorkerUtilizationAwareAssignmentConfigBean();
+
+ @Delegate(types = WorkerMetricsTableConfigBean.WorkerMetricsTableConfigBeanDelegate.class)
+ private final WorkerMetricsTableConfigBean workerMetricsTableConfigBean = new WorkerMetricsTableConfigBean();
+
+ @Delegate(types = CoordinatorStateConfigBean.CoordinatorStateConfigBeanDelegate.class)
+ private final CoordinatorStateConfigBean coordinatorStateConfigBean = new CoordinatorStateConfigBean();
+
private boolean validateSequenceNumberBeforeCheckpointing;
private long shutdownGraceMillis;
@@ -252,6 +272,25 @@ public class MultiLangDaemonConfiguration {
},
InitialPositionInStream.class);
+ convertUtilsBean.register(
+ new Converter() {
+ @Override
+ public T convert(Class type, Object value) {
+ return type.cast(CoordinatorConfig.ClientVersionConfig.valueOf(
+ value.toString().toUpperCase()));
+ }
+ },
+ CoordinatorConfig.ClientVersionConfig.class);
+
+ convertUtilsBean.register(
+ new Converter() {
+ @Override
+ public T convert(Class type, Object value) {
+ return type.cast(BillingMode.valueOf(value.toString().toUpperCase()));
+ }
+ },
+ BillingMode.class);
+
convertUtilsBean.register(
new Converter() {
@Override
@@ -279,6 +318,8 @@ public class MultiLangDaemonConfiguration {
},
Region.class);
+ convertUtilsBean.register(new DurationConverter(), Duration.class);
+
ArrayConverter arrayConverter = new ArrayConverter(String[].class, new StringConverter());
arrayConverter.setDelimiter(',');
convertUtilsBean.register(arrayConverter, String[].class);
@@ -370,6 +411,22 @@ public class MultiLangDaemonConfiguration {
retrievalMode.builder(this).build(configsBuilder.kinesisClient(), this));
}
+ private void handleCoordinatorConfig(CoordinatorConfig coordinatorConfig) {
+ ConfigurationSettableUtils.resolveFields(
+ this.coordinatorStateConfigBean, coordinatorConfig.coordinatorStateConfig());
+ }
+
+ private void handleLeaseManagementConfig(LeaseManagementConfig leaseManagementConfig) {
+ ConfigurationSettableUtils.resolveFields(
+ this.gracefulLeaseHandoffConfigBean, leaseManagementConfig.gracefulLeaseHandoffConfig());
+ ConfigurationSettableUtils.resolveFields(
+ this.workerUtilizationAwareAssignmentConfigBean,
+ leaseManagementConfig.workerUtilizationAwareAssignmentConfig());
+ ConfigurationSettableUtils.resolveFields(
+ this.workerMetricsTableConfigBean,
+ leaseManagementConfig.workerUtilizationAwareAssignmentConfig().workerMetricsTableConfig());
+ }
+
private Object adjustKinesisHttpConfiguration(Object builderObj) {
if (builderObj instanceof KinesisAsyncClientBuilder) {
KinesisAsyncClientBuilder builder = (KinesisAsyncClientBuilder) builderObj;
@@ -448,6 +505,8 @@ public class MultiLangDaemonConfiguration {
processorConfig,
retrievalConfig);
+ handleCoordinatorConfig(coordinatorConfig);
+ handleLeaseManagementConfig(leaseManagementConfig);
handleRetrievalConfig(retrievalConfig, configsBuilder);
resolveFields(configObjects, null, new HashSet<>(Arrays.asList(ConfigsBuilder.class, PollingConfig.class)));
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java
new file mode 100644
index 00000000..5cb9a6ec
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2024 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig;
+
+@Getter
+@Setter
+public class WorkerMetricsTableConfigBean {
+
+ interface WorkerMetricsTableConfigBeanDelegate {
+ String getWorkerMetricsTableName();
+
+ void setWorkerMetricsTableName(String value);
+
+ BillingMode getWorkerMetricsBillingMode();
+
+ void setWorkerMetricsBillingMode(BillingMode value);
+
+ long getWorkerMetricsReadCapacity();
+
+ void setWorkerMetricsReadCapacity(long value);
+
+ long getWorkerMetricsWriteCapacity();
+
+ void setWorkerMetricsWriteCapacity(long value);
+ }
+
+ @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "tableName")
+ private String workerMetricsTableName;
+
+ @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "billingMode")
+ private BillingMode workerMetricsBillingMode;
+
+ @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "readCapacity")
+ private long workerMetricsReadCapacity;
+
+ @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "writeCapacity")
+ private long workerMetricsWriteCapacity;
+}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java
new file mode 100644
index 00000000..fc335283
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2024 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import java.time.Duration;
+
+import lombok.Getter;
+import lombok.Setter;
+import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig;
+
+@Getter
+@Setter
+public class WorkerUtilizationAwareAssignmentConfigBean {
+
+ interface WorkerUtilizationAwareAssignmentConfigBeanDelegate {
+ long getInMemoryWorkerMetricsCaptureFrequencyMillis();
+
+ void setInMemoryWorkerMetricsCaptureFrequencyMillis(long value);
+
+ long getWorkerMetricsReporterFreqInMillis();
+
+ void setWorkerMetricsReporterFreqInMillis(long value);
+
+ int getNoOfPersistedMetricsPerWorkerMetrics();
+
+ void setNoOfPersistedMetricsPerWorkerMetrics(int value);
+
+ Boolean getDisableWorkerMetrics();
+
+ void setDisableWorkerMetrics(Boolean value);
+
+ double getMaxThroughputPerHostKBps();
+
+ void setMaxThroughputPerHostKBps(double value);
+
+ int getDampeningPercentage();
+
+ void setDampeningPercentage(int value);
+
+ int getReBalanceThresholdPercentage();
+
+ void setReBalanceThresholdPercentage(int value);
+
+ Boolean getAllowThroughputOvershoot();
+
+ void setAllowThroughputOvershoot(Boolean value);
+
+ int getVarianceBalancingFrequency();
+
+ void setVarianceBalancingFrequency(int value);
+
+ double getWorkerMetricsEMAAlpha();
+
+ void setWorkerMetricsEMAAlpha(double value);
+
+ void setStaleWorkerMetricsEntryCleanupDuration(Duration value);
+
+ Duration getStaleWorkerMetricsEntryCleanupDuration();
+ }
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private long inMemoryWorkerMetricsCaptureFrequencyMillis;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private long workerMetricsReporterFreqInMillis;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private int noOfPersistedMetricsPerWorkerMetrics;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private Boolean disableWorkerMetrics;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private double maxThroughputPerHostKBps;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private int dampeningPercentage;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private int reBalanceThresholdPercentage;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private Boolean allowThroughputOvershoot;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private int varianceBalancingFrequency;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private double workerMetricsEMAAlpha;
+
+ @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
+ private Duration staleWorkerMetricsEntryCleanupDuration;
+}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java
new file mode 100644
index 00000000..3c07f1f2
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java
@@ -0,0 +1,37 @@
+package software.amazon.kinesis.multilang.config.converter;
+
+import java.time.Duration;
+
+import org.apache.commons.beanutils.Converter;
+
+/**
+ * Converter that converts Duration text representation to a Duration object.
+ * Refer to {@code Duration.parse} javadocs for the exact text representation.
+ */
+public class DurationConverter implements Converter {
+
+ @Override
+ public T convert(Class type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (type != Duration.class) {
+ throw new ConversionException("Can only convert to Duration");
+ }
+
+ String durationString = value.toString().trim();
+ final Duration duration = Duration.parse(durationString);
+ if (duration.isNegative()) {
+ throw new ConversionException("Negative values are not permitted for duration: " + durationString);
+ }
+
+ return type.cast(duration);
+ }
+
+ public static class ConversionException extends RuntimeException {
+ public ConversionException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java
index 3e689437..453f81aa 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java
@@ -157,7 +157,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
- daemon.propertiesFile(arguments);
+ daemon.validateAndGetPropertiesFileName(arguments);
}
@Test
@@ -166,7 +166,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
arguments.parameters = Collections.singletonList(expectedPropertiesFile);
- String propertiesFile = daemon.propertiesFile(arguments);
+ String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments);
assertThat(propertiesFile, equalTo(expectedPropertiesFile));
}
@@ -180,7 +180,7 @@ public class MultiLangDaemonTest {
arguments.parameters = Collections.singletonList(propertiesArgument);
arguments.propertiesFile = propertiesOptions;
- String propertiesFile = daemon.propertiesFile(arguments);
+ String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments);
assertThat(propertiesFile, equalTo(propertiesOptions));
}
@@ -193,7 +193,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
arguments.parameters = Arrays.asList("parameter1", "parameter2");
- daemon.propertiesFile(arguments);
+ daemon.validateAndGetPropertiesFileName(arguments);
}
@Test
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java
index 5e0db340..cee3cad2 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java
@@ -52,6 +52,16 @@ public class ConfigurationSettableUtilsTest {
assertThat(actual, equalTo(expected));
}
+ @Test
+ public void testBoolean() {
+ ConfigResult expected = ConfigResult.builder().bool(false).build();
+
+ ConfigObject configObject = ConfigObject.builder().bool(expected.bool).build();
+ ConfigResult actual = resolve(configObject);
+
+ assertThat(actual, equalTo(expected));
+ }
+
@Test
public void testHeapValuesSet() {
ConfigResult expected =
@@ -147,6 +157,9 @@ public class ConfigurationSettableUtilsTest {
private Long boxedLong;
private ComplexValue complexValue;
+ @Builder.Default
+ private Boolean bool = true;
+
private Optional optionalString;
private Optional optionalInteger;
private Optional optionalLong;
@@ -175,6 +188,10 @@ public class ConfigurationSettableUtilsTest {
@ConfigurationSettable(configurationClass = ConfigResult.class)
private int rawInt;
+ @ConfigurationSettable(configurationClass = ConfigResult.class)
+ @Builder.Default
+ private Boolean bool = true;
+
@ConfigurationSettable(configurationClass = ConfigResult.class)
private Integer boxedInt;
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
index b0e3b870..9b38bc30 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
+import java.util.NoSuchElementException;
import java.util.Set;
import com.amazonaws.auth.AWSCredentials;
@@ -32,7 +33,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.metrics.MetricsLevel;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,6 +43,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -69,6 +73,8 @@ public class KinesisClientLibConfiguratorTest {
assertEquals(config.getWorkerIdentifier(), "123");
assertThat(config.getMaxGetRecordsThreadPool(), nullValue());
assertThat(config.getRetryGetRecordsInSeconds(), nullValue());
+ assertNull(config.getGracefulLeaseHandoffTimeoutMillis());
+ assertNull(config.getIsGracefulLeaseHandoffEnabled());
}
@Test
@@ -147,6 +153,151 @@ public class KinesisClientLibConfiguratorTest {
}
}
+ @Test
+ public void testGracefulLeaseHandoffConfig() {
+ final Long testGracefulLeaseHandoffTimeoutMillis = 12345L;
+ final boolean testGracefulLeaseHandoffEnabled = true;
+
+ final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "gracefulLeaseHandoffTimeoutMillis = " + testGracefulLeaseHandoffTimeoutMillis,
+ "isGracefulLeaseHandoffEnabled = " + testGracefulLeaseHandoffEnabled
+ },
+ '\n'));
+
+ assertEquals(testGracefulLeaseHandoffTimeoutMillis, config.getGracefulLeaseHandoffTimeoutMillis());
+ assertEquals(testGracefulLeaseHandoffEnabled, config.getIsGracefulLeaseHandoffEnabled());
+ }
+
+ @Test
+ public void testClientVersionConfig() {
+ final CoordinatorConfig.ClientVersionConfig testClientVersionConfig = Arrays.stream(
+ CoordinatorConfig.ClientVersionConfig.values())
+ .findAny()
+ .orElseThrow(NoSuchElementException::new);
+
+ final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "clientVersionConfig = " + testClientVersionConfig.name()
+ },
+ '\n'));
+
+ assertEquals(testClientVersionConfig, config.getClientVersionConfig());
+ }
+
+ @Test
+ public void testCoordinatorStateConfig() {
+ final String testCoordinatorStateTableName = "CoordState";
+ final BillingMode testCoordinatorStateBillingMode = BillingMode.PAY_PER_REQUEST;
+ final long testCoordinatorStateReadCapacity = 123;
+ final long testCoordinatorStateWriteCapacity = 123;
+
+ final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "coordinatorStateTableName = " + testCoordinatorStateTableName,
+ "coordinatorStateBillingMode = " + testCoordinatorStateBillingMode.name(),
+ "coordinatorStateReadCapacity = " + testCoordinatorStateReadCapacity,
+ "coordinatorStateWriteCapacity = " + testCoordinatorStateWriteCapacity
+ },
+ '\n'));
+
+ assertEquals(testCoordinatorStateTableName, config.getCoordinatorStateTableName());
+ assertEquals(testCoordinatorStateBillingMode, config.getCoordinatorStateBillingMode());
+ assertEquals(testCoordinatorStateReadCapacity, config.getCoordinatorStateReadCapacity());
+ assertEquals(testCoordinatorStateWriteCapacity, config.getCoordinatorStateWriteCapacity());
+ }
+
+ @Test
+ public void testWorkerUtilizationAwareAssignmentConfig() {
+ final long testInMemoryWorkerMetricsCaptureFrequencyMillis = 123;
+ final long testWorkerMetricsReporterFreqInMillis = 123;
+ final long testNoOfPersistedMetricsPerWorkerMetrics = 123;
+ final Boolean testDisableWorkerMetrics = true;
+ final double testMaxThroughputPerHostKBps = 123;
+ final long testDampeningPercentage = 12;
+ final long testReBalanceThresholdPercentage = 12;
+ final Boolean testAllowThroughputOvershoot = false;
+ final long testVarianceBalancingFrequency = 12;
+ final double testWorkerMetricsEMAAlpha = .123;
+
+ final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "inMemoryWorkerMetricsCaptureFrequencyMillis = " + testInMemoryWorkerMetricsCaptureFrequencyMillis,
+ "workerMetricsReporterFreqInMillis = " + testWorkerMetricsReporterFreqInMillis,
+ "noOfPersistedMetricsPerWorkerMetrics = " + testNoOfPersistedMetricsPerWorkerMetrics,
+ "disableWorkerMetrics = " + testDisableWorkerMetrics,
+ "maxThroughputPerHostKBps = " + testMaxThroughputPerHostKBps,
+ "dampeningPercentage = " + testDampeningPercentage,
+ "reBalanceThresholdPercentage = " + testReBalanceThresholdPercentage,
+ "allowThroughputOvershoot = " + testAllowThroughputOvershoot,
+ "varianceBalancingFrequency = " + testVarianceBalancingFrequency,
+ "workerMetricsEMAAlpha = " + testWorkerMetricsEMAAlpha
+ },
+ '\n'));
+
+ assertEquals(
+ testInMemoryWorkerMetricsCaptureFrequencyMillis,
+ config.getInMemoryWorkerMetricsCaptureFrequencyMillis());
+ assertEquals(testWorkerMetricsReporterFreqInMillis, config.getWorkerMetricsReporterFreqInMillis());
+ assertEquals(testNoOfPersistedMetricsPerWorkerMetrics, config.getNoOfPersistedMetricsPerWorkerMetrics());
+ assertEquals(testDisableWorkerMetrics, config.getDisableWorkerMetrics());
+ assertEquals(testMaxThroughputPerHostKBps, config.getMaxThroughputPerHostKBps(), 0.0001);
+ assertEquals(testDampeningPercentage, config.getDampeningPercentage());
+ assertEquals(testReBalanceThresholdPercentage, config.getReBalanceThresholdPercentage());
+ assertEquals(testAllowThroughputOvershoot, config.getAllowThroughputOvershoot());
+ assertEquals(testVarianceBalancingFrequency, config.getVarianceBalancingFrequency());
+ assertEquals(testWorkerMetricsEMAAlpha, config.getWorkerMetricsEMAAlpha(), 0.0001);
+ }
+
+ @Test
+ public void testWorkerMetricsConfig() {
+ final String testWorkerMetricsTableName = "CoordState";
+ final BillingMode testWorkerMetricsBillingMode = BillingMode.PROVISIONED;
+ final long testWorkerMetricsReadCapacity = 123;
+ final long testWorkerMetricsWriteCapacity = 123;
+
+ final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "workerMetricsTableName = " + testWorkerMetricsTableName,
+ "workerMetricsBillingMode = " + testWorkerMetricsBillingMode.name(),
+ "workerMetricsReadCapacity = " + testWorkerMetricsReadCapacity,
+ "workerMetricsWriteCapacity = " + testWorkerMetricsWriteCapacity
+ },
+ '\n'));
+
+ assertEquals(testWorkerMetricsTableName, config.getWorkerMetricsTableName());
+ assertEquals(testWorkerMetricsBillingMode, config.getWorkerMetricsBillingMode());
+ assertEquals(testWorkerMetricsReadCapacity, config.getWorkerMetricsReadCapacity());
+ assertEquals(testWorkerMetricsWriteCapacity, config.getWorkerMetricsWriteCapacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidClientVersionConfig() {
+ getConfiguration(StringUtils.join(
+ new String[] {
+ "applicationName = dummyApplicationName",
+ "streamName = dummyStreamName",
+ "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
+ "clientVersionConfig = " + "invalid_client_version_config"
+ },
+ '\n'));
+ }
+
@Test
public void testWithUnsupportedClientConfigurationVariables() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
index 1c45eb6e..c451ef81 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
@@ -15,6 +15,9 @@
package software.amazon.kinesis.multilang.config;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.After;
@@ -24,8 +27,16 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.common.ConfigsBuilder;
+import software.amazon.kinesis.coordinator.CoordinatorConfig;
+import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
@@ -34,6 +45,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -41,6 +53,8 @@ import static org.junit.Assert.assertTrue;
public class MultiLangDaemonConfigurationTest {
private static final String AWS_REGION_PROPERTY_NAME = "aws.region";
+ private static final String DUMMY_APPLICATION_NAME = "dummyApplicationName";
+ private static final String DUMMY_STREAM_NAME = "dummyStreamName";
private BeanUtilsBean utilsBean;
private ConvertUtilsBean convertUtilsBean;
@@ -71,8 +85,8 @@ public class MultiLangDaemonConfigurationTest {
public MultiLangDaemonConfiguration baseConfiguration() {
MultiLangDaemonConfiguration configuration = new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean);
- configuration.setApplicationName("Test");
- configuration.setStreamName("Test");
+ configuration.setApplicationName(DUMMY_APPLICATION_NAME);
+ configuration.setStreamName(DUMMY_STREAM_NAME);
configuration.getKinesisCredentialsProvider().set("class", DefaultCredentialsProvider.class.getName());
return configuration;
@@ -111,6 +125,197 @@ public class MultiLangDaemonConfigurationTest {
assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}
+ @Test
+ public void testGracefulLeaseHandoffConfig() {
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig =
+ getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig();
+
+ final long testGracefulLeaseHandoffTimeoutMillis =
+ defaultGracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis() + 12345;
+ final boolean testGracefulLeaseHandoffEnabled =
+ !defaultGracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled();
+
+ final MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setGracefulLeaseHandoffTimeoutMillis(testGracefulLeaseHandoffTimeoutMillis);
+ configuration.setIsGracefulLeaseHandoffEnabled(testGracefulLeaseHandoffEnabled);
+
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
+ resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig();
+
+ assertEquals(
+ testGracefulLeaseHandoffTimeoutMillis, gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis());
+ assertEquals(testGracefulLeaseHandoffEnabled, gracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled());
+ }
+
+ @Test
+ public void testGracefulLeaseHandoffUsesDefaults() {
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory);
+
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
+ resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig();
+
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig =
+ getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig();
+
+ assertEquals(defaultGracefulLeaseHandoffConfig, gracefulLeaseHandoffConfig);
+ }
+
+ @Test
+ public void testWorkerUtilizationAwareAssignmentConfig() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+
+ configuration.setInMemoryWorkerMetricsCaptureFrequencyMillis(123);
+ configuration.setWorkerMetricsReporterFreqInMillis(123);
+ configuration.setNoOfPersistedMetricsPerWorkerMetrics(123);
+ configuration.setDisableWorkerMetrics(true);
+ configuration.setMaxThroughputPerHostKBps(.123);
+ configuration.setDampeningPercentage(12);
+ configuration.setReBalanceThresholdPercentage(12);
+ configuration.setAllowThroughputOvershoot(false);
+ configuration.setVarianceBalancingFrequency(12);
+ configuration.setWorkerMetricsEMAAlpha(.123);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+ LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig;
+ LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
+ leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
+
+ assertEquals(config.inMemoryWorkerMetricsCaptureFrequencyMillis(), 123);
+ assertEquals(config.workerMetricsReporterFreqInMillis(), 123);
+ assertEquals(config.noOfPersistedMetricsPerWorkerMetrics(), 123);
+ assertTrue(config.disableWorkerMetrics());
+ assertEquals(config.maxThroughputPerHostKBps(), .123, .25);
+ assertEquals(config.dampeningPercentage(), 12);
+ assertEquals(config.reBalanceThresholdPercentage(), 12);
+ assertFalse(config.allowThroughputOvershoot());
+ assertEquals(config.varianceBalancingFrequency(), 12);
+ assertEquals(config.workerMetricsEMAAlpha(), .123, .25);
+ }
+
+ @Test
+ public void testWorkerUtilizationAwareAssignmentConfigUsesDefaults() {
+ final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig defaultWorkerUtilAwareAssignmentConfig =
+ getTestConfigsBuilder().leaseManagementConfig().workerUtilizationAwareAssignmentConfig();
+
+ final MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setVarianceBalancingFrequency(
+ defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency() + 12345);
+
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig resolvedWorkerUtilAwareAssignmentConfig =
+ resolvedConfiguration.leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
+
+ assertNotEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig);
+
+ // apart from the single updated configuration, all other config values should be equal to the default
+ resolvedWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency(
+ defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency());
+ assertEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig);
+ }
+
+ @Test
+ public void testWorkerMetricsTableConfigBean() {
+ final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PROVISIONED;
+
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+
+ configuration.setWorkerMetricsTableName("testTable");
+ configuration.setWorkerMetricsBillingMode(testWorkerMetricsTableBillingMode);
+ configuration.setWorkerMetricsReadCapacity(123);
+ configuration.setWorkerMetricsWriteCapacity(123);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+ LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig;
+ LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationConfig =
+ leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
+ LeaseManagementConfig.WorkerMetricsTableConfig workerMetricsConfig =
+ workerUtilizationConfig.workerMetricsTableConfig();
+
+ assertEquals(workerMetricsConfig.tableName(), "testTable");
+ assertEquals(workerMetricsConfig.billingMode(), testWorkerMetricsTableBillingMode);
+ assertEquals(workerMetricsConfig.readCapacity(), 123);
+ assertEquals(workerMetricsConfig.writeCapacity(), 123);
+ }
+
+ @Test
+ public void testWorkerMetricsTableConfigUsesDefaults() {
+ final LeaseManagementConfig.WorkerMetricsTableConfig defaultWorkerMetricsTableConfig = getTestConfigsBuilder()
+ .leaseManagementConfig()
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig();
+
+ final MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setWorkerMetricsBillingMode(Arrays.stream(BillingMode.values())
+ .filter(billingMode -> billingMode != defaultWorkerMetricsTableConfig.billingMode())
+ .findFirst()
+ .orElseThrow(NoSuchElementException::new));
+
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ final LeaseManagementConfig.WorkerMetricsTableConfig resolvedWorkerMetricsTableConfig = resolvedConfiguration
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig();
+
+ assertNotEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig);
+
+ // apart from the single updated configuration, all other config values should be equal to the default
+ resolvedWorkerMetricsTableConfig.billingMode(defaultWorkerMetricsTableConfig.billingMode());
+ assertEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig);
+ }
+
+ @Test
+ public void testCoordinatorStateTableConfigBean() {
+ final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PAY_PER_REQUEST;
+
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+
+ configuration.setCoordinatorStateTableName("testTable");
+ configuration.setCoordinatorStateBillingMode(testWorkerMetricsTableBillingMode);
+ configuration.setCoordinatorStateReadCapacity(123);
+ configuration.setCoordinatorStateWriteCapacity(123);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+ CoordinatorConfig coordinatorConfig = resolvedConfiguration.getCoordinatorConfig();
+ CoordinatorConfig.CoordinatorStateTableConfig coordinatorStateConfig =
+ coordinatorConfig.coordinatorStateConfig();
+ assertEquals(coordinatorStateConfig.tableName(), "testTable");
+ assertEquals(coordinatorStateConfig.billingMode(), testWorkerMetricsTableBillingMode);
+ assertEquals(coordinatorStateConfig.readCapacity(), 123);
+ assertEquals(coordinatorStateConfig.writeCapacity(), 123);
+ }
+
+ @Test
+ public void testCoordinatorStateTableConfigUsesDefaults() {
+ final CoordinatorConfig.CoordinatorStateTableConfig defaultCoordinatorStateTableConfig =
+ getTestConfigsBuilder().coordinatorConfig().coordinatorStateConfig();
+
+ final MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setCoordinatorStateWriteCapacity(defaultCoordinatorStateTableConfig.writeCapacity() + 12345);
+
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ final CoordinatorConfig.CoordinatorStateTableConfig resolvedCoordinatorStateTableConfig =
+ resolvedConfiguration.coordinatorConfig.coordinatorStateConfig();
+
+ assertNotEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
+
+ // apart from the single updated configuration, all other config values should be equal to the default
+ resolvedCoordinatorStateTableConfig.writeCapacity(defaultCoordinatorStateTableConfig.writeCapacity());
+ assertEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
+ }
+
@Test
public void testSetLeaseTablePitrEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
@@ -266,4 +471,43 @@ public class MultiLangDaemonConfigurationTest {
assertThat(fanOutConfig.consumerArn(), equalTo(consumerArn));
}
+
+ @Test
+ public void testClientVersionConfig() {
+ final CoordinatorConfig.ClientVersionConfig testClientVersionConfig =
+ CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X;
+
+ final MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setClientVersionConfig(testClientVersionConfig);
+
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig;
+
+ assertEquals(testClientVersionConfig, coordinatorConfig.clientVersionConfig());
+ }
+
+ @Test
+ public void testClientVersionConfigUsesDefault() {
+ final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory);
+
+ final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig;
+
+ assertEquals(
+ getTestConfigsBuilder().coordinatorConfig().clientVersionConfig(),
+ coordinatorConfig.clientVersionConfig());
+ }
+
+ private ConfigsBuilder getTestConfigsBuilder() {
+ return new ConfigsBuilder(
+ DUMMY_STREAM_NAME,
+ DUMMY_APPLICATION_NAME,
+ Mockito.mock(KinesisAsyncClient.class),
+ Mockito.mock(DynamoDbAsyncClient.class),
+ Mockito.mock(CloudWatchAsyncClient.class),
+ "dummyWorkerIdentifier",
+ shardRecordProcessorFactory);
+ }
}
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java
new file mode 100644
index 00000000..cf52a5ff
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java
@@ -0,0 +1,249 @@
+package software.amazon.kinesis.multilang.config;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.kinesis.coordinator.CoordinatorConfig.ClientVersionConfig;
+import software.amazon.kinesis.multilang.MultiLangDaemonConfig;
+import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration.ResolvedConfiguration;
+import software.amazon.kinesis.processor.ShardRecordProcessor;
+import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PropertiesMappingE2ETest {
+ private static final String PROPERTIES_FILE = "multilang.properties";
+ private static final String PROPERTIES_FILE_V3 = "multilangv3.properties";
+
+ @Test
+ public void testKclV3PropertiesMapping() throws IOException {
+ final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE);
+
+ final ResolvedConfiguration kclV3Config =
+ config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory());
+
+ assertEquals(
+ ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X,
+ kclV3Config.coordinatorConfig.clientVersionConfig());
+
+ assertEquals(
+ "MultiLangTest-CoordinatorState-CustomName",
+ kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
+ assertEquals(
+ BillingMode.PROVISIONED,
+ kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
+ assertEquals(
+ 1000, kclV3Config.coordinatorConfig.coordinatorStateConfig().readCapacity());
+ assertEquals(500, kclV3Config.coordinatorConfig.coordinatorStateConfig().writeCapacity());
+
+ assertEquals(
+ 10000L,
+ kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis());
+ assertFalse(
+ kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled());
+
+ assertEquals(
+ 5000L,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .inMemoryWorkerMetricsCaptureFrequencyMillis());
+ assertEquals(
+ 60000L,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsReporterFreqInMillis());
+ assertEquals(
+ 50,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .noOfPersistedMetricsPerWorkerMetrics());
+ assertTrue(kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .disableWorkerMetrics());
+ assertEquals(
+ 10000,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .maxThroughputPerHostKBps());
+ assertEquals(
+ 90,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .dampeningPercentage());
+ assertEquals(
+ 5,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .reBalanceThresholdPercentage());
+ assertFalse(kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .allowThroughputOvershoot());
+ assertEquals(
+ Duration.ofHours(12),
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .staleWorkerMetricsEntryCleanupDuration());
+ assertEquals(
+ 5,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .varianceBalancingFrequency());
+ assertEquals(
+ 0.18D,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsEMAAlpha());
+
+ assertEquals(
+ "MultiLangTest-WorkerMetrics-CustomName",
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .tableName());
+ assertEquals(
+ BillingMode.PROVISIONED,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .billingMode());
+ assertEquals(
+ 250,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .readCapacity());
+ assertEquals(
+ 90,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .writeCapacity());
+ }
+
+ @Test
+ public void testKclV3PropertiesMappingForDefaultValues() throws IOException {
+ final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE_V3);
+
+ final ResolvedConfiguration kclV3Config =
+ config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory());
+
+ assertEquals(ClientVersionConfig.CLIENT_VERSION_CONFIG_3X, kclV3Config.coordinatorConfig.clientVersionConfig());
+
+ assertEquals(
+ "MultiLangTest-CoordinatorState",
+ kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
+ assertEquals(
+ BillingMode.PAY_PER_REQUEST,
+ kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
+
+ assertEquals(
+ 30_000L,
+ kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis());
+ assertTrue(
+ kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled());
+
+ assertEquals(
+ 1000L,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .inMemoryWorkerMetricsCaptureFrequencyMillis());
+ assertEquals(
+ 30000L,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsReporterFreqInMillis());
+ assertEquals(
+ 10,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .noOfPersistedMetricsPerWorkerMetrics());
+ assertFalse(kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .disableWorkerMetrics());
+ assertEquals(
+ Double.MAX_VALUE,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .maxThroughputPerHostKBps());
+ assertEquals(
+ 60,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .dampeningPercentage());
+ assertEquals(
+ 10,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .reBalanceThresholdPercentage());
+ assertTrue(kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .allowThroughputOvershoot());
+ assertEquals(
+ Duration.ofDays(1),
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .staleWorkerMetricsEntryCleanupDuration());
+ assertEquals(
+ 3,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .varianceBalancingFrequency());
+ assertEquals(
+ 0.5D,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsEMAAlpha());
+
+ assertEquals(
+ "MultiLangTest-WorkerMetricStats",
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .tableName());
+ assertEquals(
+ BillingMode.PAY_PER_REQUEST,
+ kclV3Config
+ .leaseManagementConfig
+ .workerUtilizationAwareAssignmentConfig()
+ .workerMetricsTableConfig()
+ .billingMode());
+ }
+
+ private static class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
+ @Override
+ public ShardRecordProcessor shardRecordProcessor() {
+ return null;
+ }
+ }
+}
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java
new file mode 100644
index 00000000..71ada01f
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2024 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import java.util.Optional;
+
+import org.apache.commons.beanutils.BeanUtilsBean;
+import org.apache.commons.beanutils.ConvertUtilsBean;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerUtilizationAwareAssignmentConfigBeanTest {
+
+ @Mock
+ private KinesisAsyncClient kinesisAsyncClient;
+
+ @Test
+ public void testAllPropertiesTransit() {
+ PollingConfigBean pollingConfigBean = new PollingConfigBean();
+ pollingConfigBean.setIdleTimeBetweenReadsInMillis(1000);
+ pollingConfigBean.setMaxGetRecordsThreadPool(20);
+ pollingConfigBean.setMaxRecords(5000);
+ pollingConfigBean.setRetryGetRecordsInSeconds(30);
+
+ ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
+ BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
+
+ MultiLangDaemonConfiguration multiLangDaemonConfiguration =
+ new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean);
+ multiLangDaemonConfiguration.setStreamName("test-stream");
+
+ PollingConfig pollingConfig = pollingConfigBean.build(kinesisAsyncClient, multiLangDaemonConfiguration);
+
+ assertThat(pollingConfig.kinesisClient(), equalTo(kinesisAsyncClient));
+ assertThat(pollingConfig.streamName(), equalTo(multiLangDaemonConfiguration.getStreamName()));
+ assertThat(
+ pollingConfig.idleTimeBetweenReadsInMillis(),
+ equalTo(pollingConfigBean.getIdleTimeBetweenReadsInMillis()));
+ assertThat(
+ pollingConfig.maxGetRecordsThreadPool(),
+ equalTo(Optional.of(pollingConfigBean.getMaxGetRecordsThreadPool())));
+ assertThat(pollingConfig.maxRecords(), equalTo(pollingConfigBean.getMaxRecords()));
+ assertThat(
+ pollingConfig.retryGetRecordsInSeconds(),
+ equalTo(Optional.of(pollingConfigBean.getRetryGetRecordsInSeconds())));
+ }
+}
diff --git a/amazon-kinesis-client-multilang/src/test/resources/multilang.properties b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties
index 34cb0c1a..3611fa0a 100644
--- a/amazon-kinesis-client-multilang/src/test/resources/multilang.properties
+++ b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties
@@ -91,3 +91,73 @@ validateSequenceNumberBeforeCheckpointing = true
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
maxActiveThreads = -1
+
+################### KclV3 configurations ###################
+# Coordinator config
+# Version the KCL needs to operate in. For more details check the KCLv3 migration
+# documentation. Default is CLIENT_VERSION_CONFIG_3X
+clientVersionConfig = CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x
+# TODO: include table deletion protection and pitr config once its added
+# Configurations to control how the CoordinatorState DDB table is created
+# Default name is applicationName-CoordinatorState in PAY_PER_REQUEST
+coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName
+coordinatorStateBillingMode = PROVISIONED
+coordinatorStateReadCapacity = 1000
+coordinatorStateWriteCapacity = 500
+
+# Graceful handoff config - tuning of the shutdown behavior during lease transfers
+# default values are 30000 and true respectively
+gracefulLeaseHandoffTimeoutMillis = 10000
+isGracefulLeaseHandoffEnabled = false
+
+# WorkerMetricStats table config - control how the DDB table is created
+## Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST
+# TODO: include table deletion protection and pitr config once its added
+workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName
+workerMetricsBillingMode = PROVISIONED
+workerMetricsReadCapacity = 250
+workerMetricsWriteCapacity = 90
+
+# WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm
+#
+# frequency of capturing worker metrics in memory. Default is 1s
+inMemoryWorkerMetricsCaptureFrequencyMillis = 5000
+# frequency of reporting worker metric stats to storage. Default is 30s
+workerMetricsReporterFreqInMillis = 60000
+# No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10
+noOfPersistedMetricsPerWorkerMetrics = 50
+# Disable use of worker metrics to balance lease, default is false.
+# If it is true, the algorithm balances lease based on worker's processing throughput.
+disableWorkerMetrics = true
+# Max throughput per host 10 MBps, to limit processing to the given value
+# Default is unlimited.
+maxThroughputPerHostKBps = 10000
+# Dampen the load that is rebalanced during lease re-balancing, default is 60%
+dampeningPercentage = 90
+# Configures the allowed variance range for worker utilization. The upper
+# limit is calculated as average * (1 + reBalanceThresholdPercentage/100).
+# The lower limit is average * (1 - reBalanceThresholdPercentage/100). If
+# any worker's utilization falls outside this range, lease re-balancing is
+# triggered. The re-balancing algorithm aims to bring variance within the
+# specified range. It also avoids thrashing by ensuring the utilization of
+# the worker receiving the load after re-balancing doesn't exceed the fleet
+# average. This might cause no re-balancing action even the utilization is
+# out of the variance range. The default value is 10, representing +/-10%
+# variance from the average value.
+reBalanceThresholdPercentage = 5
+# Whether at-least one lease must be taken from a high utilization worker
+# during re-balancing when there is no lease assigned to that worker which has
+# throughput is less than or equal to the minimum throughput that needs to be
+# moved away from that worker to bring the worker back into the allowed variance.
+# Default is true.
+allowThroughputOvershoot = false
+# Lease assignment is performed every failoverTimeMillis but re-balance will
+# be attempted only once in 5 times based on the below config. Default is 3.
+varianceBalancingFrequency = 5
+# Alpha value used for calculating exponential moving average of worker's metricStats.
+workerMetricsEMAAlpha = 0.18
+# Duration after which workerMetricStats entry from WorkerMetricStats table will
+# be cleaned up.
+# Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days)
+# Refer to Duration.parse javadocs for more details
+staleWorkerMetricsEntryCleanupDuration = PT12H
diff --git a/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties b/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties
new file mode 100644
index 00000000..d92c8016
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties
@@ -0,0 +1,167 @@
+# The script that abides by the multi-language protocol. This script will
+# be executed by the MultiLangDaemon, which will communicate with this script
+# over STDIN and STDOUT according to the multi-language protocol.
+executableName = sample_kclpy_app.py
+
+# The Stream arn: arn:aws:kinesis:::stream/
+# Important: streamArn takes precedence over streamName if both are set
+streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample
+
+# The name of an Amazon Kinesis stream to process.
+# Important: streamArn takes precedence over streamName if both are set
+streamName = kclpysample
+
+# Used by the KCL as the name of this application. Will be used as the name
+# of an Amazon DynamoDB table which will store the lease and checkpoint
+# information for workers with this application name
+applicationName = MultiLangTest
+
+# Users can change the credentials provider the KCL will use to retrieve credentials.
+# The DefaultAWSCredentialsProviderChain checks several other providers, which is
+# described here:
+# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
+AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
+
+# Appended to the user agent of the KCL. Does not impact the functionality of the
+# KCL in any other way.
+processingLanguage = python/3.8
+
+# Valid options at TRIM_HORIZON or LATEST.
+# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
+initialPositionInStream = TRIM_HORIZON
+
+# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended',
+# and uncomment below line with right timestamp value.
+# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
+#initialPositionInStreamExtended = 1636609142
+
+# The following properties are also available for configuring the KCL Worker that is created
+# by the MultiLangDaemon.
+
+# The KCL defaults to us-east-1
+regionName = us-east-1
+
+# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
+# will be regarded as having problems and it's shards will be assigned to other workers.
+# For applications that have a large number of shards, this msy be set to a higher number to reduce
+# the number of DynamoDB IOPS required for tracking leases
+failoverTimeMillis = 10000
+
+# A worker id that uniquely identifies this worker among all workers using the same applicationName
+# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
+workerId = "workerId"
+
+# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
+shardSyncIntervalMillis = 60000
+
+# Max records to fetch from Kinesis in a single GetRecords call.
+maxRecords = 10000
+
+# Idle time between record reads in milliseconds.
+idleTimeBetweenReadsInMillis = 1000
+
+# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
+callProcessRecordsEvenForEmptyRecordList = false
+
+# Interval in milliseconds between polling to check for parent shard completion.
+# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
+# completion of parent shards).
+parentShardPollIntervalMillis = 10000
+
+# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
+# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
+# to delete the ones we don't need any longer.
+cleanupLeasesUponShardCompletion = true
+
+# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
+taskBackoffTimeMillis = 500
+
+# Buffer metrics for at most this long before publishing to CloudWatch.
+metricsBufferTimeMillis = 10000
+
+# Buffer at most this many metrics before publishing to CloudWatch.
+metricsMaxQueueSize = 10000
+
+# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
+# to RecordProcessorCheckpointer#checkpoint(String) by default.
+validateSequenceNumberBeforeCheckpointing = true
+
+# The maximum number of active threads for the MultiLangDaemon to permit.
+# If a value is provided then a FixedThreadPool is used with the maximum
+# active threads set to the provided value. If a non-positive integer or no
+# value is provided a CachedThreadPool is used.
+maxActiveThreads = -1
+
+################### KclV3 configurations ###################
+# Coordinator config
+clientVersionConfig = CLIENT_VERSION_CONFIG_3x
+
+## Let all other config be defaults
+## TODO: include table deletion protection and pitr config once its added
+## Configurations to control how the CoordinatorState DDB table is created
+## Default name is applicationName-CoordinatorState in PAY_PER_REQUEST
+#coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName
+#coordinatorStateBillingMode = PROVISIONED
+#coordinatorStateReadCapacity = 1000
+#coordinatorStateWriteCapacity = 500
+#
+## Graceful handoff config - tuning of the shutdown behavior during lease transfers
+## default values are 30000 and true respectively
+#gracefulLeaseHandoffTimeoutMillis = 10000
+#isGracefulLeaseHandoffEnabled = false
+#
+## WorkerMetricStats table config - control how the DDB table is created
+### Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST
+## TODO: include table deletion protection and pitr config once its added
+#workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName
+#workerMetricsBillingMode = PROVISIONED
+#workerMetricsReadCapacity = 250
+#workerMetricsWriteCapacity = 90
+#
+## WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm
+##
+## frequency of capturing worker metrics in memory. Default is 1s
+#inMemoryWorkerMetricsCaptureFrequencyMillis = 5000
+## frequency of reporting worker metric stats to storage. Default is 30s
+#workerMetricsReporterFreqInMillis = 60000
+## No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10.
+## This provides historic values that are used to compute the workers current
+## utilization using an exponential-moving-average.
+#noOfPersistedMetricsPerWorkerMetrics = 50
+## Disable use of worker metrics to balance lease, default is false.
+## If it is true, the algorithm balances lease based on worker's processing throughput.
+#disableWorkerMetrics = true
+## Max throughput per host 10 MBps, to limit processing to the given value
+## Default is unlimited.
+#maxThroughputPerHostKBps = 10000
+## Dampen the load that is rebalanced during lease re-balancing, default is 60%
+#dampeningPercentage = 90
+## Configures the allowed variance range for worker utilization. The upper
+## limit is calculated as average * (1 + reBalanceThresholdPercentage/100).
+## The lower limit is average * (1 - reBalanceThresholdPercentage/100). If
+## any worker's utilization falls outside this range, lease re-balancing is
+## triggered. The re-balancing algorithm aims to bring variance within the
+## specified range. It also avoids thrashing by ensuring the utilization of
+## the worker receiving the load after re-balancing doesn't exceed the fleet
+## average. This might cause no re-balancing action even the utilization is
+## out of the variance range. The default value is 10, representing +/-10%
+## variance from the average value.
+#reBalanceThresholdPercentage = 5
+## Whether at-least one lease must be taken from a high utilization worker
+## during re-balancing when there is no lease assigned to that worker which has
+## throughput is less than or equal to the minimum throughput that needs to be
+## moved away from that worker to bring the worker back into the allowed variance.
+## Default is true.
+#allowThroughputOvershoot = false
+## Lease assignment is performed every failoverTimeMillis but re-balance will
+## be attempted only once in 5 times based on the below config. Default is 3.
+#varianceBalancingFrequency = 5
+## Alpha value used for calculating exponential moving average of worker's metricStats.
+## Default is 0.5, a higher alpha value will make re-balancing more sensitive
+## to recent metricStats.
+#workerMetricsEMAAlpha = 0.18
+## Duration after which workerMetricStats entry from WorkerMetricStats table will
+## be cleaned up. Default is 1 day.
+## Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days)
+## Refer to Duration.parse javadocs for more details
+#staleWorkerMetricsEntryCleanupDuration = PT12H
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java
index 238f4f37..d646aab9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java
@@ -123,7 +123,7 @@ public class CoordinatorConfig {
* This version also allows rolling back to the compatible mode from the
* auto-toggled 3.x mode.
*/
- CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x,
+ CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X,
/**
* A new application operating with KCLv3.x will use this value. Also, an application
* that has successfully upgraded to 3.x version and no longer needs the ability
@@ -131,14 +131,14 @@ public class CoordinatorConfig {
* KCL will operate with new algorithms introduced in 3.x which is not compatible
* with prior versions. And once in this version, rollback to 2.x is not supported.
*/
- CLIENT_VERSION_CONFIG_3x,
+ CLIENT_VERSION_CONFIG_3X,
}
/**
* Client version KCL must operate in, by default it operates in 3.x version which is not
* compatible with prior versions.
*/
- private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3x;
+ private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3X;
public static class CoordinatorStateTableConfig extends DdbTableConfig {
private CoordinatorStateTableConfig(final String applicationName) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
index 8d1e78a1..c4aecdda 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
@@ -159,7 +159,7 @@ public final class DynamicMigrationComponentsInitializer {
// always collect metrics so that when we flip to start reporting we will have accurate historical data.
log.info("Start collection of WorkerMetricStats");
workerMetricsManager.startManager();
- if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x) {
+ if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X) {
initializeComponentsFor3x();
} else {
initializeComponentsForMigration(migrationStateMachineStartingClientVersion);
@@ -187,7 +187,7 @@ public final class DynamicMigrationComponentsInitializer {
log.info("Initializing for migration to 3x");
dualMode = true;
final LeaderDecider initialLeaderDecider;
- if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) {
+ if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT;
initialLeaderDecider = ddbLockBasedLeaderDeciderCreator.get();
} else {
@@ -292,8 +292,8 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to support upgrade from 2x. This can happen
- * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x.
- * Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2x.
+ * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X.
+ * Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2X.
*/
public synchronized void initializeClientVersionForUpgradeFrom2x(final ClientVersion fromClientVersion)
throws DependencyException {
@@ -306,8 +306,8 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to run vanilla 3x functionality. This can happen
- * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x, or dynamically
- * during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK
+ * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X, or dynamically
+ * during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK
*/
public synchronized void initializeClientVersionFor3x(final ClientVersion fromClientVersion)
throws DependencyException {
@@ -322,14 +322,14 @@ public final class DynamicMigrationComponentsInitializer {
log.info("Starting LAM");
leaseAssignmentManager.start();
}
- // nothing to do when transitioning from CLIENT_VERSION_3x_WITH_ROLLBACK.
+ // nothing to do when transitioning from CLIENT_VERSION_3X_WITH_ROLLBACK.
}
/**
* Initialize KCL with components and configuration to run 2x compatible functionality
* while allowing roll-forward. This can happen at KCL Worker startup when MigrationStateMachine
- * starts in ClientVersion.CLIENT_VERSION_2x (after a rollback)
- * Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK.
+ * starts in ClientVersion.CLIENT_VERSION_2X (after a rollback)
+ * Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK.
*/
public synchronized void initializeClientVersionFor2x(final ClientVersion fromClientVersion) {
log.info("Initializing KCL components for rollback to 2x from {}", fromClientVersion);
@@ -341,7 +341,7 @@ public final class DynamicMigrationComponentsInitializer {
// and WorkerMetricStats table
}
- if (fromClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) {
+ if (fromClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
// we are rolling back after flip
currentAssignmentMode = DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT;
notifyLeaseAssignmentModeChange();
@@ -361,14 +361,14 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to run vanilla 3x functionality
* while allowing roll-back to 2x functionality. This can happen at KCL Worker startup
- * when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK (after the flip)
- * Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2x.
+ * when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK (after the flip)
+ * Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2X.
*/
public synchronized void initializeClientVersionFor3xWithRollback(final ClientVersion fromClientVersion)
throws DependencyException {
log.info("Initializing KCL components for 3x with rollback from {}", fromClientVersion);
- if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) {
+ if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) {
// dynamic flip
currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT;
notifyLeaseAssignmentModeChange();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java
index 2ea07965..ccbd9085 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java
@@ -31,28 +31,28 @@ public enum ClientVersion {
* KCL workers will emit WorkerMetricStats and run KCLv2.x algorithms for leader election and lease
* assignment. KCL will also monitor for upgrade to KCLv3.x readiness of the worker fleet.
*/
- CLIENT_VERSION_UPGRADE_FROM_2x,
+ CLIENT_VERSION_UPGRADE_FROM_2X,
/**
- * This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK,
+ * This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK,
* which can only be initiated using a KCL migration tool, when customer wants to revert to KCLv2.x functionality.
* In this version, KCL will not emit WorkerMetricStats and run KCLv2.x algorithms for leader election
* and lease assignment. In this version, KCL will monitor for roll-forward scenario where
- * client version is updated to CLIENT_VERSION_UPGRADE_FROM_2x using the migration tool.
+ * client version is updated to CLIENT_VERSION_UPGRADE_FROM_2X using the migration tool.
*/
- CLIENT_VERSION_2x,
+ CLIENT_VERSION_2X,
/**
- * When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2x and when worker fleet is determined to be
+ * When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2X and when worker fleet is determined to be
* KCLv3.x ready (when lease table GSI is active and worker-metrics are being emitted by all lease owners)
* then the leader will initiate the switch to KCLv3.x algorithms for leader election and lease assignment,
* by using this version and persisting it in the {@link MigrationState} that allows all worker hosts
* to also flip to KCLv3.x functionality. In this KCL will also monitor for rollback to detect when the
- * customer updates version to CLIENT_VERSION_2x using migration tool, so that it instantly flips back
- * to CLIENT_VERSION_2x.
+ * customer updates version to CLIENT_VERSION_2X using migration tool, so that it instantly flips back
+ * to CLIENT_VERSION_2X.
*/
- CLIENT_VERSION_3x_WITH_ROLLBACK,
+ CLIENT_VERSION_3X_WITH_ROLLBACK,
/**
* A new application starting KCLv3.x or an upgraded application from KCLv2.x after upgrade is successful
* can use this version to default all KCLv3.x algorithms without any monitor to rollback.
*/
- CLIENT_VERSION_3x;
+ CLIENT_VERSION_3X;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java
index 9db6c6b8..29777fa3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java
@@ -144,11 +144,11 @@ public class ClientVersionChangeMonitor implements Runnable {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, METRICS_OPERATION);
try {
switch (expectedVersion) {
- case CLIENT_VERSION_3x_WITH_ROLLBACK:
+ case CLIENT_VERSION_3X_WITH_ROLLBACK:
scope.addData("CurrentState:3xWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
break;
- case CLIENT_VERSION_2x:
- case CLIENT_VERSION_UPGRADE_FROM_2x:
+ case CLIENT_VERSION_2X:
+ case CLIENT_VERSION_UPGRADE_FROM_2X:
scope.addData("CurrentState:2xCompatibleWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
break;
default:
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java
index 7deac9bc..45d29a41 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java
@@ -32,13 +32,13 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
- * State for CLIENT_VERSION_2x. In this state, the only allowed valid transition is
+ * State for CLIENT_VERSION_2X. In this state, the only allowed valid transition is
* the roll-forward scenario which can only be performed using the KCL Migration tool.
* So when the state machine enters this state, a monitor is started to detect the
* roll-forward scenario.
@@ -60,7 +60,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
@Override
public ClientVersion clientVersion() {
- return CLIENT_VERSION_2x;
+ return CLIENT_VERSION_2X;
}
@Override
@@ -102,7 +102,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
/**
* Callback handler to handle client version changes in MigrationState in DDB.
- * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2x
+ * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2X
* @throws InvalidStateException during transition to the next state based on the new ClientVersion
* or if the new state in DDB is unexpected.
*/
@@ -115,18 +115,18 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
final MetricsScope scope =
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
- if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2x) {
+ if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2X) {
log.info(
"A roll-forward has been initiated for the application. Transition to {}",
- CLIENT_VERSION_UPGRADE_FROM_2x);
+ CLIENT_VERSION_UPGRADE_FROM_2X);
// If this succeeds, the monitor will cancel itself.
- stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2x, newState);
+ stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2X, newState);
} else {
// This should not happen, so throw an exception that allows the monitor to continue monitoring
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
- // so we could also write back CLIENT_VERSION_2x to DDB to ensure all workers have consistent
+ // so we could also write back CLIENT_VERSION_2X to DDB to ensure all workers have consistent
// behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture
@@ -134,7 +134,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
log.error(
"Migration state has invalid client version {}. Transition from {} is not supported",
newState,
- CLIENT_VERSION_2x);
+ CLIENT_VERSION_2X);
throw new InvalidStateException(String.format("Unexpected new state %s", newState));
}
} catch (final InvalidStateException | DependencyException e) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java
index 4eba4165..1e857311 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java
@@ -22,7 +22,7 @@ import software.amazon.kinesis.coordinator.DynamicMigrationComponentsInitializer
import software.amazon.kinesis.leases.exceptions.DependencyException;
/**
- * State for CLIENT_VERSION_3x which enables KCL to run 3.x algorithms on new KCLv3.x application
+ * State for CLIENT_VERSION_3X which enables KCL to run 3.x algorithms on new KCLv3.x application
* or successfully upgraded application which upgraded from v2.x. This is a terminal state of the
* state machine and no rollbacks are supported in this state.
*/
@@ -38,7 +38,7 @@ public class MigrationClientVersion3xState implements MigrationClientVersionStat
@Override
public ClientVersion clientVersion() {
- return ClientVersion.CLIENT_VERSION_3x;
+ return ClientVersion.CLIENT_VERSION_3X;
}
@Override
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
index 512d3c05..6235c5a9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
@@ -31,15 +31,15 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
- * State for CLIENT_VERSION_3x_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms
+ * State for CLIENT_VERSION_3X_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms
* during the upgrade process after all KCL workers in the fleet are 3.x complaint. Since this
- * is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2x, it also supports rollback if customers
+ * is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2X, it also supports rollback if customers
* see regression to allow for instant rollbacks as well. This would be achieved by customers
* running a KCL migration tool to update MigrationState in DDB. So this state monitors for
* rollback triggers and performs state transitions accordingly.
@@ -62,7 +62,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
@Override
public ClientVersion clientVersion() {
- return ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
+ return ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
}
@Override
@@ -108,17 +108,17 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
switch (newState.getClientVersion()) {
- case CLIENT_VERSION_2x:
- log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x);
- stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2x, newState);
+ case CLIENT_VERSION_2X:
+ log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X);
+ stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2X, newState);
break;
- case CLIENT_VERSION_3x:
+ case CLIENT_VERSION_3X:
log.info("Customer has switched to 3.x after successful upgrade, state machine will move to a"
+ "terminal state and stop monitoring. Rollbacks will no longer be supported anymore");
- stateMachine.transitionTo(CLIENT_VERSION_3x, newState);
+ stateMachine.transitionTo(CLIENT_VERSION_3X, newState);
// This worker will still be running the migrationAdaptive components in 3.x mode which will
// no longer dynamically switch back to 2.x mode, however to directly run 3.x component without
- // adaption to migration (i.e. move to CLIENT_VERSION_3x state), it requires this worker to go
+ // adaption to migration (i.e. move to CLIENT_VERSION_3X state), it requires this worker to go
// through the current deployment which initiated the switch to 3.x mode.
break;
default:
@@ -126,7 +126,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
- // so we could also write back CLIENT_VERSION_3x_WITH_ROLLBACK to DDB to ensure all workers have
+ // so we could also write back CLIENT_VERSION_3X_WITH_ROLLBACK to DDB to ensure all workers have
// consistent behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java
index 02231218..970bd6ed 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java
@@ -31,10 +31,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRATION_HASH_KEY;
/**
@@ -44,13 +44,13 @@ import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRA
* as follows
* ClientVersionConfig | MigrationState (DDB) | initial client version
* --------------------+---------------------------------+--------------------------------
- * COMPATIBLE_WITH_2x | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2x
- * 3x | Does not exist | CLIENT_VERSION_3x
- * COMPATIBLE_WITH_2x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x_WITH_ROLLBACK
- * 3x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x
- * any | CLIENT_VERSION_2x | CLIENT_VERSION_2x
- * any | CLIENT_VERSION_UPGRADE_FROM_2x | CLIENT_VERSION_UPGRADE_FROM_2x
- * any | CLIENT_VERSION_3x | CLIENT_VERSION_3x
+ * COMPATIBLE_WITH_2X | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2X
+ * 3X | Does not exist | CLIENT_VERSION_3X
+ * COMPATIBLE_WITH_2X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X_WITH_ROLLBACK
+ * 3X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X
+ * any | CLIENT_VERSION_2X | CLIENT_VERSION_2X
+ * any | CLIENT_VERSION_UPGRADE_FROM_2X | CLIENT_VERSION_UPGRADE_FROM_2X
+ * any | CLIENT_VERSION_3X | CLIENT_VERSION_3X
*/
@KinesisClientInternalApi
@RequiredArgsConstructor
@@ -110,26 +110,26 @@ public class MigrationClientVersionStateInitializer {
nextClientVersion = getNextClientVersionBasedOnConfigVersion();
log.info("Application is starting in {}", nextClientVersion);
break;
- case CLIENT_VERSION_3x_WITH_ROLLBACK:
- if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3x) {
+ case CLIENT_VERSION_3X_WITH_ROLLBACK:
+ if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3X) {
// upgrade successful, allow transition to 3x.
- log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3x);
- nextClientVersion = CLIENT_VERSION_3x;
+ log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3X);
+ nextClientVersion = CLIENT_VERSION_3X;
break;
}
- log.info("Initialize with {}", CLIENT_VERSION_3x_WITH_ROLLBACK);
+ log.info("Initialize with {}", CLIENT_VERSION_3X_WITH_ROLLBACK);
nextClientVersion = migrationState.getClientVersion();
break;
- case CLIENT_VERSION_2x:
- log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2x);
+ case CLIENT_VERSION_2X:
+ log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2X);
nextClientVersion = migrationState.getClientVersion();
break;
- case CLIENT_VERSION_UPGRADE_FROM_2x:
- log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2x);
+ case CLIENT_VERSION_UPGRADE_FROM_2X:
+ log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2X);
nextClientVersion = migrationState.getClientVersion();
break;
- case CLIENT_VERSION_3x:
- log.info("Initialize with {}", CLIENT_VERSION_3x);
+ case CLIENT_VERSION_3X:
+ log.info("Initialize with {}", CLIENT_VERSION_3X);
nextClientVersion = migrationState.getClientVersion();
break;
default:
@@ -180,10 +180,10 @@ public class MigrationClientVersionStateInitializer {
private ClientVersion getNextClientVersionBasedOnConfigVersion() {
switch (clientVersionConfig) {
- case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x:
- return CLIENT_VERSION_UPGRADE_FROM_2x;
- case CLIENT_VERSION_CONFIG_3x:
- return CLIENT_VERSION_3x;
+ case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X:
+ return CLIENT_VERSION_UPGRADE_FROM_2X;
+ case CLIENT_VERSION_CONFIG_3X:
+ return CLIENT_VERSION_3X;
}
throw new IllegalStateException(String.format("Unknown configured Client version %s", clientVersionConfig));
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java
index c0a59410..86106a07 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java
@@ -32,23 +32,23 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
-import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
+import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
- * State for CLIENT_VERSION_UPGRADE_FROM_2x. When state machine enters this state,
+ * State for CLIENT_VERSION_UPGRADE_FROM_2X. When state machine enters this state,
* KCL is initialized to operate in dual mode for Lease assignment and Leader decider algorithms
* which initially start in 2.x compatible mode and when all the KCL workers are 3.x compliant,
* it dynamically switches to the 3.x algorithms. It also monitors for rollback
* initiated from customer via the KCL migration tool and instantly switches back to the 2.x
* complaint algorithms.
- * The allowed state transitions are to CLIENT_VERSION_3x_WITH_ROLLBACK when KCL workers are
- * 3.x complaint, and to CLIENT_VERSION_2x when customer has initiated a rollback.
+ * The allowed state transitions are to CLIENT_VERSION_3X_WITH_ROLLBACK when KCL workers are
+ * 3.x complaint, and to CLIENT_VERSION_2X when customer has initiated a rollback.
* Only the leader KCL worker performs migration ready monitor and notifies all workers (including
* itself) via a MigrationState update. When all worker's monitor notice the MigrationState change
- * (including itself), it will transition to CLIENT_VERSION_3x_WITH_ROLLBACK.
+ * (including itself), it will transition to CLIENT_VERSION_3X_WITH_ROLLBACK.
*/
@KinesisClientInternalApi
@RequiredArgsConstructor
@@ -71,7 +71,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
@Override
public ClientVersion clientVersion() {
- return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
+ return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
}
@Override
@@ -170,7 +170,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
/**
* Callback handler to handle client version changes in MigrationState in DDB.
- * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2x
+ * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2X
* @throws InvalidStateException during transition to the next state based on the new ClientVersion
* or if the new state in DDB is unexpected.
*/
@@ -184,23 +184,23 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
switch (newState.getClientVersion()) {
- case CLIENT_VERSION_2x:
- log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x);
+ case CLIENT_VERSION_2X:
+ log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X);
// cancel monitor asynchronously
cancelMigrationReadyMonitor();
- stateMachine.transitionTo(CLIENT_VERSION_2x, newState);
+ stateMachine.transitionTo(CLIENT_VERSION_2X, newState);
break;
- case CLIENT_VERSION_3x_WITH_ROLLBACK:
- log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3x_WITH_ROLLBACK);
+ case CLIENT_VERSION_3X_WITH_ROLLBACK:
+ log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3X_WITH_ROLLBACK);
cancelMigrationReadyMonitor();
- stateMachine.transitionTo(CLIENT_VERSION_3x_WITH_ROLLBACK, newState);
+ stateMachine.transitionTo(CLIENT_VERSION_3X_WITH_ROLLBACK, newState);
break;
default:
// This should not happen, so throw an exception that allows the monitor to continue monitoring
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
- // so we could also write back CLIENT_VERSION_UPGRADE_FROM_2x to DDB to ensure all workers have
+ // so we could also write back CLIENT_VERSION_UPGRADE_FROM_2X to DDB to ensure all workers have
// consistent behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture
@@ -222,7 +222,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
try {
final MigrationState newMigrationState = currentMigrationState
.copy()
- .update(CLIENT_VERSION_3x_WITH_ROLLBACK, initializer.workerIdentifier());
+ .update(CLIENT_VERSION_3X_WITH_ROLLBACK, initializer.workerIdentifier());
log.info("Updating Migration State in DDB with {} prev state {}", newMigrationState, currentMigrationState);
return coordinatorStateDAO.updateCoordinatorStateWithExpectation(
newMigrationState, currentMigrationState.getDynamoClientVersionExpectation());
@@ -230,7 +230,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
log.warn(
"Exception occurred when toggling to {}, upgradeReadyMonitor will retry the update"
+ " if upgrade condition is still true",
- CLIENT_VERSION_3x_WITH_ROLLBACK,
+ CLIENT_VERSION_3X_WITH_ROLLBACK,
e);
scope.addData(FAULT_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
return false;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
index 247dec13..6bc08189 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
@@ -190,7 +190,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
currentMigrationClientVersionState = nextMigrationClientVersionState;
log.info("Successfully transitioned to {}", nextMigrationClientVersionState.clientVersion());
- if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3x) {
+ if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3X) {
terminate();
}
success = true;
@@ -220,10 +220,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
private MigrationClientVersionState createMigrationClientVersionState(
final ClientVersion clientVersion, final MigrationState migrationState) {
switch (clientVersion) {
- case CLIENT_VERSION_2x:
+ case CLIENT_VERSION_2X:
return new MigrationClientVersion2xState(
this, coordinatorStateDAO, stateMachineThreadPool, initializer, random);
- case CLIENT_VERSION_UPGRADE_FROM_2x:
+ case CLIENT_VERSION_UPGRADE_FROM_2X:
return new MigrationClientVersionUpgradeFrom2xState(
this,
timeProvider,
@@ -233,10 +233,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
random,
migrationState,
flipTo3XStabilizerTimeInSeconds);
- case CLIENT_VERSION_3x_WITH_ROLLBACK:
+ case CLIENT_VERSION_3X_WITH_ROLLBACK:
return new MigrationClientVersion3xWithRollbackState(
this, coordinatorStateDAO, stateMachineThreadPool, initializer, random);
- case CLIENT_VERSION_3x:
+ case CLIENT_VERSION_3X:
return new MigrationClientVersion3xState(this, initializer);
}
throw new IllegalStateException(String.format("Unknown client version %s", clientVersion));
@@ -246,7 +246,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
if (currentMigrationClientVersionState != null) {
return currentMigrationClientVersionState.clientVersion();
} else if (terminated) {
- return ClientVersion.CLIENT_VERSION_3x;
+ return ClientVersion.CLIENT_VERSION_3X;
}
throw new UnsupportedOperationException(
"No current state when state machine is either not initialized" + " or already terminated");
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 77a45a04..bbdf78e2 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -29,7 +29,6 @@ import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Builder;
import lombok.Data;
-import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
@@ -381,8 +380,8 @@ public class LeaseManagementConfig {
* to shut down and an option to enable or disable graceful lease handoff.
*
*/
+ @Data
@Builder
- @Getter
@Accessors(fluent = true)
public static class GracefulLeaseHandoffConfig {
/**
@@ -550,7 +549,7 @@ public class LeaseManagementConfig {
private int dampeningPercentage = 60;
/**
* Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less
- * than 20% of fleet level average then reBalance is triggered.
+ * than 10% of fleet level average then reBalance is triggered.
* Leases are taken from workers with metrics value more than fleet level average. The load to take from these
* workers is determined by evaluating how far they are with respect to fleet level average.
*/
@@ -565,7 +564,7 @@ public class LeaseManagementConfig {
private boolean allowThroughputOvershoot = true;
/**
- * Duration after which workerMetrics entry from WorkerMetricStats table will be cleaned up. When an entry's
+ * Duration after which workerMetricStats entry from WorkerMetricStats table will be cleaned up. When an entry's
* lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed
* from the table.
*/
@@ -580,15 +579,16 @@ public class LeaseManagementConfig {
private WorkerMetricsTableConfig workerMetricsTableConfig;
/**
- * Frequency to perform worker variance balancing frequency. This value is used with respect to the LAM freq,
+ * Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency,
* that is every third (as default) iteration of LAM the worker variance balancing will be performed.
* Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration
* and so on.
+ * NOTE: LAM frequency = failoverTimeMillis
*/
private int varianceBalancingFrequency = 3;
/**
- * Alpha value used for calculating exponential moving average of worker's metrics values. Selecting
+ * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting
* higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average
* and selecting smaller alpha values gives more weightage to past value and high smoothing effect.
*/
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java
index 3874b8a7..2ec51318 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java
@@ -189,7 +189,7 @@ public class CoordinatorStateDAOTest {
createCoordinatorState("key1");
final MigrationState migrationState = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID)
- .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID);
+ .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID);
doaUnderTest.createCoordinatorStateIfNotExists(migrationState);
final AmazonDynamoDBLockClient dynamoDBLockClient = new AmazonDynamoDBLockClient(doaUnderTest
@@ -223,7 +223,7 @@ public class CoordinatorStateDAOTest {
// Make sure the record has not changed due to using
// ddb lock client
Assertions.assertEquals(
- ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x.toString(),
+ ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X.toString(),
item.get(CLIENT_VERSION_ATTRIBUTE_NAME).s());
} else if (LEADER_HASH_KEY.equals(key)) {
Assertions.assertEquals("TEST_WORKER", item.get("ownerName").s());
@@ -264,7 +264,7 @@ public class CoordinatorStateDAOTest {
if ("Migration3.0".equals(keyValue)) {
Assertions.assertTrue(state instanceof MigrationState);
final MigrationState migrationState = (MigrationState) state;
- Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, migrationState.getClientVersion());
+ Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, migrationState.getClientVersion());
return;
}
Assertions.assertEquals(3, state.getAttributes().size());
@@ -379,7 +379,7 @@ public class CoordinatorStateDAOTest {
final MigrationState state = createMigrationState();
/* Test step - update the state with mismatched condition */
- final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
+ final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation(
updatedState, updatedState.getDynamoClientVersionExpectation());
@@ -403,7 +403,7 @@ public class CoordinatorStateDAOTest {
.build())
.join();
Assertions.assertEquals(
- ClientVersion.CLIENT_VERSION_2x.name(),
+ ClientVersion.CLIENT_VERSION_2X.name(),
response.item().get("cv").s());
Assertions.assertEquals(WORKER_ID, response.item().get("mb").s());
Assertions.assertEquals(
@@ -433,7 +433,7 @@ public class CoordinatorStateDAOTest {
/* Test step - update with new state object */
final MigrationState updatedState =
- new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
+ new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation(updatedState, null);
@@ -491,7 +491,7 @@ public class CoordinatorStateDAOTest {
final HashMap item = new HashMap() {
{
put("key", AttributeValue.fromS("Migration3.0"));
- put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3x.toString()));
+ put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3X.toString()));
put("mb", AttributeValue.fromS("DUMMY_WORKER"));
put("mts", AttributeValue.fromN(String.valueOf(System.currentTimeMillis())));
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java
index fce69191..6d9cfb98 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java
@@ -112,7 +112,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testInitialize_ClientVersion3_X() throws DependencyException {
// Test initializing to verify correct leader decider is created
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X);
verify(mockWorkerMetricsManager).startManager();
verify(mockDdbLockBasedLeaderDeciderCreator).get();
@@ -147,7 +147,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testInitialize_ClientVersion_3_xWithRollback() throws DependencyException {
// Test initializing to verify correct leader decider is created
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
verify(mockWorkerMetricsManager).startManager();
@@ -171,7 +171,7 @@ public class DynamicMigrationComponentsInitializerTest {
}
@ParameterizedTest
- @CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2x", "CLIENT_VERSION_2x"})
+ @CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2X", "CLIENT_VERSION_2X"})
public void testInitialize_ClientVersion_All2_X(final ClientVersion clientVersion) throws DependencyException {
// Test initializing to verify correct leader decider is created
migrationInitializer.initialize(clientVersion);
@@ -187,7 +187,7 @@ public class DynamicMigrationComponentsInitializerTest {
verify(mockConsumer).initialize(eq(true), eq(LeaseAssignmentMode.DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT));
// test initialization from state machine
- if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) {
+ if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) {
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// start worker stats and create gsi without waiting
verify(mockWorkerMetricsDAO).initialize();
@@ -211,7 +211,7 @@ public class DynamicMigrationComponentsInitializerTest {
when(mockLamThreadPool.awaitTermination(anyLong(), any())).thenReturn(true);
when(mockWorkerMetricsScheduler.awaitTermination(anyLong(), any())).thenReturn(true);
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.shutdown();
verify(mockLamThreadPool).shutdown();
@@ -226,7 +226,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void initializationFails_WhenGsiIsNotActiveIn3_X() throws DependencyException {
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X);
// test initialization from state machine
assertThrows(
@@ -236,7 +236,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void initializationDoesNotFail_WhenGsiIsNotActiveIn3_XWithRollback() throws DependencyException {
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
// test initialization from state machine
assertDoesNotThrow(
@@ -245,11 +245,11 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testComponentsInitialization_AfterFlip() throws DependencyException {
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// Test flip
- migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
+ migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
// verify
verify(mockLam).start();
@@ -266,13 +266,13 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2X);
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_INIT);
// test roll-forward
reset(mockWorkerMetricsScheduler);
reset(mockLeaseRefresher);
- migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2x);
+ migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2X);
// verify
verify(mockWorkerMetricsScheduler)
@@ -288,11 +288,11 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// test rollback before flip
- migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
+ migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
// verify
verify(mockFuture).cancel(anyBoolean());
@@ -305,11 +305,11 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT);
// test rollback before flip
- migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
// verify
verify(mockFuture).cancel(anyBoolean());
@@ -337,7 +337,7 @@ public class DynamicMigrationComponentsInitializerTest {
}
});
- migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT);
// run the worker stats reporting thread
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java
index b4ef5315..e83189a4 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java
@@ -60,10 +60,10 @@ public class ClientVersionChangeMonitorTest {
@ParameterizedTest
@CsvSource({
- "CLIENT_VERSION_2x, CLIENT_VERSION_UPGRADE_FROM_2x",
- "CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_2x",
- "CLIENT_VERSION_UPGRADE_FROM_2x, CLIENT_VERSION_3x_WITH_ROLLBACK",
- "CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_3x"
+ "CLIENT_VERSION_2X, CLIENT_VERSION_UPGRADE_FROM_2X",
+ "CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_2X",
+ "CLIENT_VERSION_UPGRADE_FROM_2X, CLIENT_VERSION_3X_WITH_ROLLBACK",
+ "CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_3X"
})
public void testMonitor(final ClientVersion currentClientVersion, final ClientVersion changedClientVersion)
throws Exception {
@@ -104,7 +104,7 @@ public class ClientVersionChangeMonitorTest {
mockCoordinatorStateDAO,
mockScheduler,
mockCallback,
- ClientVersion.CLIENT_VERSION_2x,
+ ClientVersion.CLIENT_VERSION_2X,
mockRandom);
monitorUnderTest.startMonitor();
@@ -112,7 +112,7 @@ public class ClientVersionChangeMonitorTest {
verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject());
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER")
- .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER");
+ .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER");
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
argumentCaptor.getValue().run();
@@ -130,7 +130,7 @@ public class ClientVersionChangeMonitorTest {
mockCoordinatorStateDAO,
mockScheduler,
mockCallback,
- ClientVersion.CLIENT_VERSION_2x,
+ ClientVersion.CLIENT_VERSION_2X,
mockRandom);
monitorUnderTest.startMonitor();
@@ -138,7 +138,7 @@ public class ClientVersionChangeMonitorTest {
verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject());
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER")
- .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER");
+ .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER");
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
doThrow(new InvalidStateException("test exception")).when(mockCallback).accept(any());
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java
index 22436946..88e16380 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java
@@ -107,8 +107,8 @@ public class MigrationStateMachineTest {
@ParameterizedTest
@CsvSource({
- "CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x, CLIENT_VERSION_UPGRADE_FROM_2x",
- "CLIENT_VERSION_CONFIG_3x, CLIENT_VERSION_3x"
+ "CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, CLIENT_VERSION_UPGRADE_FROM_2X",
+ "CLIENT_VERSION_CONFIG_3X, CLIENT_VERSION_3X"
})
public void testStateMachineInitialization(
final ClientVersionConfig config, final ClientVersion expectedStateMachineState) throws Exception {
@@ -118,7 +118,7 @@ public class MigrationStateMachineTest {
@Test
public void testMigrationReadyFlip() throws Exception {
- stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
+ stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
final ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -130,7 +130,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollbackAfterFlip() throws Exception {
- stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
+ stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -150,7 +150,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollForward() throws Exception {
- stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
+ stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -177,7 +177,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollbackBeforeFlip() throws Exception {
- stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
+ stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -189,7 +189,7 @@ public class MigrationStateMachineTest {
@Test
public void successfulUpgradeAfterFlip() throws Exception {
- stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
+ stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -244,15 +244,15 @@ public class MigrationStateMachineTest {
// Invoke the monitor callbacks so the version flips to 3.x with rollback
migrationReadyMonitorRunnable.run();
Assertions.assertEquals(
- ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK,
+ ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK,
stateCaptor.getValue().getClientVersion());
versionChangeMonitorRunnable.run();
Assertions.assertEquals(
- ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion());
+ ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer)
- .initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x));
+ .initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X));
log.info("TestLog ----------- flip done -------------");
}
@@ -263,33 +263,33 @@ public class MigrationStateMachineTest {
: runnableCaptor.getAllValues().get(1);
final MigrationState state =
- new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
+ new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate rollback before flip -------------");
versionChangeMonitorRunnable.run();
log.info("TestLog ----------- rollback before flip done -------------");
- Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion());
- verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x));
+ Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion());
+ verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X));
}
private void initiateAndTestRollBack(final Runnable rollbackMonitorRunnable) throws Exception {
final MigrationState state =
- new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
+ new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate rollback -------------");
rollbackMonitorRunnable.run();
log.info("TestLog ----------- rollback done -------------");
- Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion());
- verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK));
+ Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion());
+ verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK));
}
private void initiateAndTestRollForward(final Runnable rollforwardMonitorRunnable) throws Exception {
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID)
- .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID);
+ .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
@@ -297,20 +297,20 @@ public class MigrationStateMachineTest {
rollforwardMonitorRunnable.run();
log.info("TestLog ----------- roll-forward done -------------");
Assertions.assertEquals(
- ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, stateMachineUnderTest.getCurrentClientVersion());
- verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2x));
+ ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, stateMachineUnderTest.getCurrentClientVersion());
+ verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2X));
}
private void initiateAndTestSuccessfulUpgrade(final Runnable successfulUpgradeMonitor) throws Exception {
final MigrationState state =
- new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3x, WORKER_ID);
+ new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate successful upgrade -------------");
successfulUpgradeMonitor.run();
log.info("TestLog ----------- successful upgrade done -------------");
- Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, stateMachineUnderTest.getCurrentClientVersion());
- verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
+ Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, stateMachineUnderTest.getCurrentClientVersion());
+ verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
}
}