Merge from multilang-kclv3 branch: Add multi-lang support for new configs, refactor enums, and update documentation

This is a squash of commits a144dfaac117415c400e8786b98060f5660d4276 through 4185f6e72520744e8a18cd04c550bc57a1bfd298.
This commit is contained in:
Furqaan Ali 2024-11-01 14:40:41 -07:00
parent 2524ef83c3
commit 67d54045c1
31 changed files with 1503 additions and 169 deletions

View file

@ -104,6 +104,12 @@
</dependency>
<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -122,6 +128,13 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<!-- Using older version to be compatible with Java 8 -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -141,7 +141,7 @@ public class MultiLangDaemon {
}
}
String propertiesFile(final MultiLangDaemonArguments arguments) {
String validateAndGetPropertiesFileName(final MultiLangDaemonArguments arguments) {
String propertiesFile = "";
if (CollectionUtils.isNotEmpty(arguments.parameters)) {
@ -216,9 +216,9 @@ public class MultiLangDaemon {
MultiLangDaemonArguments arguments = new MultiLangDaemonArguments();
JCommander jCommander = daemon.buildJCommanderAndParseArgs(arguments, args);
try {
String propertiesFile = daemon.propertiesFile(arguments);
String propertiesFileName = daemon.validateAndGetPropertiesFileName(arguments);
daemon.configureLogging(arguments.logConfiguration);
MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFile);
MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFileName);
Scheduler scheduler = daemon.buildScheduler(config);
MultiLangRunner runner = new MultiLangRunner(scheduler);

View file

@ -0,0 +1,56 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.config;
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTableConfig;
@Getter
@Setter
public class CoordinatorStateConfigBean {
interface CoordinatorStateConfigBeanDelegate {
String getCoordinatorStateTableName();
void setCoordinatorStateTableName(String value);
BillingMode getCoordinatorStateBillingMode();
void setCoordinatorStateBillingMode(BillingMode value);
long getCoordinatorStateReadCapacity();
void setCoordinatorStateReadCapacity(long value);
long getCoordinatorStateWriteCapacity();
void setCoordinatorStateWriteCapacity(long value);
}
@ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "tableName")
private String coordinatorStateTableName;
@ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "billingMode")
private BillingMode coordinatorStateBillingMode;
@ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "readCapacity")
private long coordinatorStateReadCapacity;
@ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "writeCapacity")
private long coordinatorStateWriteCapacity;
}

View file

@ -0,0 +1,41 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.config;
import lombok.Getter;
import lombok.Setter;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@Getter
@Setter
public class GracefulLeaseHandoffConfigBean {
interface GracefulLeaseHandoffConfigBeanDelegate {
Long getGracefulLeaseHandoffTimeoutMillis();
void setGracefulLeaseHandoffTimeoutMillis(Long value);
Boolean getIsGracefulLeaseHandoffEnabled();
void setIsGracefulLeaseHandoffEnabled(Boolean value);
}
@ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class)
private Long gracefulLeaseHandoffTimeoutMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class)
private Boolean isGracefulLeaseHandoffEnabled;
}

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.multilang.config;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -41,6 +42,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
@ -55,6 +57,7 @@ import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.multilang.config.converter.DurationConverter;
import software.amazon.kinesis.multilang.config.credentials.V2CredentialWrapper;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@ -156,6 +159,9 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = CoordinatorConfig.class)
private long schedulerInitializationBackoffTimeMillis;
@ConfigurationSettable(configurationClass = CoordinatorConfig.class)
private CoordinatorConfig.ClientVersionConfig clientVersionConfig;
@ConfigurationSettable(configurationClass = LifecycleConfig.class)
private long taskBackoffTimeMillis;
@ -189,6 +195,20 @@ public class MultiLangDaemonConfiguration {
@Delegate(types = PollingConfigBean.PollingConfigBeanDelegate.class)
private final PollingConfigBean pollingConfig = new PollingConfigBean();
@Delegate(types = GracefulLeaseHandoffConfigBean.GracefulLeaseHandoffConfigBeanDelegate.class)
private final GracefulLeaseHandoffConfigBean gracefulLeaseHandoffConfigBean = new GracefulLeaseHandoffConfigBean();
@Delegate(
types = WorkerUtilizationAwareAssignmentConfigBean.WorkerUtilizationAwareAssignmentConfigBeanDelegate.class)
private final WorkerUtilizationAwareAssignmentConfigBean workerUtilizationAwareAssignmentConfigBean =
new WorkerUtilizationAwareAssignmentConfigBean();
@Delegate(types = WorkerMetricsTableConfigBean.WorkerMetricsTableConfigBeanDelegate.class)
private final WorkerMetricsTableConfigBean workerMetricsTableConfigBean = new WorkerMetricsTableConfigBean();
@Delegate(types = CoordinatorStateConfigBean.CoordinatorStateConfigBeanDelegate.class)
private final CoordinatorStateConfigBean coordinatorStateConfigBean = new CoordinatorStateConfigBean();
private boolean validateSequenceNumberBeforeCheckpointing;
private long shutdownGraceMillis;
@ -252,6 +272,25 @@ public class MultiLangDaemonConfiguration {
},
InitialPositionInStream.class);
convertUtilsBean.register(
new Converter() {
@Override
public <T> T convert(Class<T> type, Object value) {
return type.cast(CoordinatorConfig.ClientVersionConfig.valueOf(
value.toString().toUpperCase()));
}
},
CoordinatorConfig.ClientVersionConfig.class);
convertUtilsBean.register(
new Converter() {
@Override
public <T> T convert(Class<T> type, Object value) {
return type.cast(BillingMode.valueOf(value.toString().toUpperCase()));
}
},
BillingMode.class);
convertUtilsBean.register(
new Converter() {
@Override
@ -279,6 +318,8 @@ public class MultiLangDaemonConfiguration {
},
Region.class);
convertUtilsBean.register(new DurationConverter(), Duration.class);
ArrayConverter arrayConverter = new ArrayConverter(String[].class, new StringConverter());
arrayConverter.setDelimiter(',');
convertUtilsBean.register(arrayConverter, String[].class);
@ -370,6 +411,22 @@ public class MultiLangDaemonConfiguration {
retrievalMode.builder(this).build(configsBuilder.kinesisClient(), this));
}
private void handleCoordinatorConfig(CoordinatorConfig coordinatorConfig) {
ConfigurationSettableUtils.resolveFields(
this.coordinatorStateConfigBean, coordinatorConfig.coordinatorStateConfig());
}
private void handleLeaseManagementConfig(LeaseManagementConfig leaseManagementConfig) {
ConfigurationSettableUtils.resolveFields(
this.gracefulLeaseHandoffConfigBean, leaseManagementConfig.gracefulLeaseHandoffConfig());
ConfigurationSettableUtils.resolveFields(
this.workerUtilizationAwareAssignmentConfigBean,
leaseManagementConfig.workerUtilizationAwareAssignmentConfig());
ConfigurationSettableUtils.resolveFields(
this.workerMetricsTableConfigBean,
leaseManagementConfig.workerUtilizationAwareAssignmentConfig().workerMetricsTableConfig());
}
private Object adjustKinesisHttpConfiguration(Object builderObj) {
if (builderObj instanceof KinesisAsyncClientBuilder) {
KinesisAsyncClientBuilder builder = (KinesisAsyncClientBuilder) builderObj;
@ -448,6 +505,8 @@ public class MultiLangDaemonConfiguration {
processorConfig,
retrievalConfig);
handleCoordinatorConfig(coordinatorConfig);
handleLeaseManagementConfig(leaseManagementConfig);
handleRetrievalConfig(retrievalConfig, configsBuilder);
resolveFields(configObjects, null, new HashSet<>(Arrays.asList(ConfigsBuilder.class, PollingConfig.class)));

View file

@ -0,0 +1,56 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.config;
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig;
@Getter
@Setter
public class WorkerMetricsTableConfigBean {
interface WorkerMetricsTableConfigBeanDelegate {
String getWorkerMetricsTableName();
void setWorkerMetricsTableName(String value);
BillingMode getWorkerMetricsBillingMode();
void setWorkerMetricsBillingMode(BillingMode value);
long getWorkerMetricsReadCapacity();
void setWorkerMetricsReadCapacity(long value);
long getWorkerMetricsWriteCapacity();
void setWorkerMetricsWriteCapacity(long value);
}
@ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "tableName")
private String workerMetricsTableName;
@ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "billingMode")
private BillingMode workerMetricsBillingMode;
@ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "readCapacity")
private long workerMetricsReadCapacity;
@ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "writeCapacity")
private long workerMetricsWriteCapacity;
}

View file

@ -0,0 +1,106 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.config;
import java.time.Duration;
import lombok.Getter;
import lombok.Setter;
import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig;
@Getter
@Setter
public class WorkerUtilizationAwareAssignmentConfigBean {
interface WorkerUtilizationAwareAssignmentConfigBeanDelegate {
long getInMemoryWorkerMetricsCaptureFrequencyMillis();
void setInMemoryWorkerMetricsCaptureFrequencyMillis(long value);
long getWorkerMetricsReporterFreqInMillis();
void setWorkerMetricsReporterFreqInMillis(long value);
int getNoOfPersistedMetricsPerWorkerMetrics();
void setNoOfPersistedMetricsPerWorkerMetrics(int value);
Boolean getDisableWorkerMetrics();
void setDisableWorkerMetrics(Boolean value);
double getMaxThroughputPerHostKBps();
void setMaxThroughputPerHostKBps(double value);
int getDampeningPercentage();
void setDampeningPercentage(int value);
int getReBalanceThresholdPercentage();
void setReBalanceThresholdPercentage(int value);
Boolean getAllowThroughputOvershoot();
void setAllowThroughputOvershoot(Boolean value);
int getVarianceBalancingFrequency();
void setVarianceBalancingFrequency(int value);
double getWorkerMetricsEMAAlpha();
void setWorkerMetricsEMAAlpha(double value);
void setStaleWorkerMetricsEntryCleanupDuration(Duration value);
Duration getStaleWorkerMetricsEntryCleanupDuration();
}
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private long inMemoryWorkerMetricsCaptureFrequencyMillis;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private long workerMetricsReporterFreqInMillis;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private int noOfPersistedMetricsPerWorkerMetrics;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private Boolean disableWorkerMetrics;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private double maxThroughputPerHostKBps;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private int dampeningPercentage;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private int reBalanceThresholdPercentage;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private Boolean allowThroughputOvershoot;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private int varianceBalancingFrequency;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private double workerMetricsEMAAlpha;
@ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class)
private Duration staleWorkerMetricsEntryCleanupDuration;
}

View file

@ -0,0 +1,37 @@
package software.amazon.kinesis.multilang.config.converter;
import java.time.Duration;
import org.apache.commons.beanutils.Converter;
/**
* Converter that converts Duration text representation to a Duration object.
* Refer to {@code Duration.parse} javadocs for the exact text representation.
*/
public class DurationConverter implements Converter {
@Override
public <T> T convert(Class<T> type, Object value) {
if (value == null) {
return null;
}
if (type != Duration.class) {
throw new ConversionException("Can only convert to Duration");
}
String durationString = value.toString().trim();
final Duration duration = Duration.parse(durationString);
if (duration.isNegative()) {
throw new ConversionException("Negative values are not permitted for duration: " + durationString);
}
return type.cast(duration);
}
public static class ConversionException extends RuntimeException {
public ConversionException(String message) {
super(message);
}
}
}

View file

@ -157,7 +157,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
daemon.propertiesFile(arguments);
daemon.validateAndGetPropertiesFileName(arguments);
}
@Test
@ -166,7 +166,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
arguments.parameters = Collections.singletonList(expectedPropertiesFile);
String propertiesFile = daemon.propertiesFile(arguments);
String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments);
assertThat(propertiesFile, equalTo(expectedPropertiesFile));
}
@ -180,7 +180,7 @@ public class MultiLangDaemonTest {
arguments.parameters = Collections.singletonList(propertiesArgument);
arguments.propertiesFile = propertiesOptions;
String propertiesFile = daemon.propertiesFile(arguments);
String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments);
assertThat(propertiesFile, equalTo(propertiesOptions));
}
@ -193,7 +193,7 @@ public class MultiLangDaemonTest {
MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments();
arguments.parameters = Arrays.asList("parameter1", "parameter2");
daemon.propertiesFile(arguments);
daemon.validateAndGetPropertiesFileName(arguments);
}
@Test

View file

@ -52,6 +52,16 @@ public class ConfigurationSettableUtilsTest {
assertThat(actual, equalTo(expected));
}
@Test
public void testBoolean() {
ConfigResult expected = ConfigResult.builder().bool(false).build();
ConfigObject configObject = ConfigObject.builder().bool(expected.bool).build();
ConfigResult actual = resolve(configObject);
assertThat(actual, equalTo(expected));
}
@Test
public void testHeapValuesSet() {
ConfigResult expected =
@ -147,6 +157,9 @@ public class ConfigurationSettableUtilsTest {
private Long boxedLong;
private ComplexValue complexValue;
@Builder.Default
private Boolean bool = true;
private Optional<String> optionalString;
private Optional<Integer> optionalInteger;
private Optional<Long> optionalLong;
@ -175,6 +188,10 @@ public class ConfigurationSettableUtilsTest {
@ConfigurationSettable(configurationClass = ConfigResult.class)
private int rawInt;
@ConfigurationSettable(configurationClass = ConfigResult.class)
@Builder.Default
private Boolean bool = true;
@ConfigurationSettable(configurationClass = ConfigResult.class)
private Integer boxedInt;

View file

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import com.amazonaws.auth.AWSCredentials;
@ -32,7 +33,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.metrics.MetricsLevel;
import static org.hamcrest.CoreMatchers.equalTo;
@ -40,6 +43,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -69,6 +73,8 @@ public class KinesisClientLibConfiguratorTest {
assertEquals(config.getWorkerIdentifier(), "123");
assertThat(config.getMaxGetRecordsThreadPool(), nullValue());
assertThat(config.getRetryGetRecordsInSeconds(), nullValue());
assertNull(config.getGracefulLeaseHandoffTimeoutMillis());
assertNull(config.getIsGracefulLeaseHandoffEnabled());
}
@Test
@ -147,6 +153,151 @@ public class KinesisClientLibConfiguratorTest {
}
}
@Test
public void testGracefulLeaseHandoffConfig() {
final Long testGracefulLeaseHandoffTimeoutMillis = 12345L;
final boolean testGracefulLeaseHandoffEnabled = true;
final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"gracefulLeaseHandoffTimeoutMillis = " + testGracefulLeaseHandoffTimeoutMillis,
"isGracefulLeaseHandoffEnabled = " + testGracefulLeaseHandoffEnabled
},
'\n'));
assertEquals(testGracefulLeaseHandoffTimeoutMillis, config.getGracefulLeaseHandoffTimeoutMillis());
assertEquals(testGracefulLeaseHandoffEnabled, config.getIsGracefulLeaseHandoffEnabled());
}
@Test
public void testClientVersionConfig() {
final CoordinatorConfig.ClientVersionConfig testClientVersionConfig = Arrays.stream(
CoordinatorConfig.ClientVersionConfig.values())
.findAny()
.orElseThrow(NoSuchElementException::new);
final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"clientVersionConfig = " + testClientVersionConfig.name()
},
'\n'));
assertEquals(testClientVersionConfig, config.getClientVersionConfig());
}
@Test
public void testCoordinatorStateConfig() {
final String testCoordinatorStateTableName = "CoordState";
final BillingMode testCoordinatorStateBillingMode = BillingMode.PAY_PER_REQUEST;
final long testCoordinatorStateReadCapacity = 123;
final long testCoordinatorStateWriteCapacity = 123;
final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"coordinatorStateTableName = " + testCoordinatorStateTableName,
"coordinatorStateBillingMode = " + testCoordinatorStateBillingMode.name(),
"coordinatorStateReadCapacity = " + testCoordinatorStateReadCapacity,
"coordinatorStateWriteCapacity = " + testCoordinatorStateWriteCapacity
},
'\n'));
assertEquals(testCoordinatorStateTableName, config.getCoordinatorStateTableName());
assertEquals(testCoordinatorStateBillingMode, config.getCoordinatorStateBillingMode());
assertEquals(testCoordinatorStateReadCapacity, config.getCoordinatorStateReadCapacity());
assertEquals(testCoordinatorStateWriteCapacity, config.getCoordinatorStateWriteCapacity());
}
@Test
public void testWorkerUtilizationAwareAssignmentConfig() {
final long testInMemoryWorkerMetricsCaptureFrequencyMillis = 123;
final long testWorkerMetricsReporterFreqInMillis = 123;
final long testNoOfPersistedMetricsPerWorkerMetrics = 123;
final Boolean testDisableWorkerMetrics = true;
final double testMaxThroughputPerHostKBps = 123;
final long testDampeningPercentage = 12;
final long testReBalanceThresholdPercentage = 12;
final Boolean testAllowThroughputOvershoot = false;
final long testVarianceBalancingFrequency = 12;
final double testWorkerMetricsEMAAlpha = .123;
final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"inMemoryWorkerMetricsCaptureFrequencyMillis = " + testInMemoryWorkerMetricsCaptureFrequencyMillis,
"workerMetricsReporterFreqInMillis = " + testWorkerMetricsReporterFreqInMillis,
"noOfPersistedMetricsPerWorkerMetrics = " + testNoOfPersistedMetricsPerWorkerMetrics,
"disableWorkerMetrics = " + testDisableWorkerMetrics,
"maxThroughputPerHostKBps = " + testMaxThroughputPerHostKBps,
"dampeningPercentage = " + testDampeningPercentage,
"reBalanceThresholdPercentage = " + testReBalanceThresholdPercentage,
"allowThroughputOvershoot = " + testAllowThroughputOvershoot,
"varianceBalancingFrequency = " + testVarianceBalancingFrequency,
"workerMetricsEMAAlpha = " + testWorkerMetricsEMAAlpha
},
'\n'));
assertEquals(
testInMemoryWorkerMetricsCaptureFrequencyMillis,
config.getInMemoryWorkerMetricsCaptureFrequencyMillis());
assertEquals(testWorkerMetricsReporterFreqInMillis, config.getWorkerMetricsReporterFreqInMillis());
assertEquals(testNoOfPersistedMetricsPerWorkerMetrics, config.getNoOfPersistedMetricsPerWorkerMetrics());
assertEquals(testDisableWorkerMetrics, config.getDisableWorkerMetrics());
assertEquals(testMaxThroughputPerHostKBps, config.getMaxThroughputPerHostKBps(), 0.0001);
assertEquals(testDampeningPercentage, config.getDampeningPercentage());
assertEquals(testReBalanceThresholdPercentage, config.getReBalanceThresholdPercentage());
assertEquals(testAllowThroughputOvershoot, config.getAllowThroughputOvershoot());
assertEquals(testVarianceBalancingFrequency, config.getVarianceBalancingFrequency());
assertEquals(testWorkerMetricsEMAAlpha, config.getWorkerMetricsEMAAlpha(), 0.0001);
}
@Test
public void testWorkerMetricsConfig() {
final String testWorkerMetricsTableName = "CoordState";
final BillingMode testWorkerMetricsBillingMode = BillingMode.PROVISIONED;
final long testWorkerMetricsReadCapacity = 123;
final long testWorkerMetricsWriteCapacity = 123;
final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"workerMetricsTableName = " + testWorkerMetricsTableName,
"workerMetricsBillingMode = " + testWorkerMetricsBillingMode.name(),
"workerMetricsReadCapacity = " + testWorkerMetricsReadCapacity,
"workerMetricsWriteCapacity = " + testWorkerMetricsWriteCapacity
},
'\n'));
assertEquals(testWorkerMetricsTableName, config.getWorkerMetricsTableName());
assertEquals(testWorkerMetricsBillingMode, config.getWorkerMetricsBillingMode());
assertEquals(testWorkerMetricsReadCapacity, config.getWorkerMetricsReadCapacity());
assertEquals(testWorkerMetricsWriteCapacity, config.getWorkerMetricsWriteCapacity());
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidClientVersionConfig() {
getConfiguration(StringUtils.join(
new String[] {
"applicationName = dummyApplicationName",
"streamName = dummyStreamName",
"AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"clientVersionConfig = " + "invalid_client_version_config"
},
'\n'));
}
@Test
public void testWithUnsupportedClientConfigurationVariables() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(

View file

@ -15,6 +15,9 @@
package software.amazon.kinesis.multilang.config;
import java.util.Arrays;
import java.util.NoSuchElementException;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.After;
@ -24,8 +27,16 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
@ -34,6 +45,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -41,6 +53,8 @@ import static org.junit.Assert.assertTrue;
public class MultiLangDaemonConfigurationTest {
private static final String AWS_REGION_PROPERTY_NAME = "aws.region";
private static final String DUMMY_APPLICATION_NAME = "dummyApplicationName";
private static final String DUMMY_STREAM_NAME = "dummyStreamName";
private BeanUtilsBean utilsBean;
private ConvertUtilsBean convertUtilsBean;
@ -71,8 +85,8 @@ public class MultiLangDaemonConfigurationTest {
public MultiLangDaemonConfiguration baseConfiguration() {
MultiLangDaemonConfiguration configuration = new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean);
configuration.setApplicationName("Test");
configuration.setStreamName("Test");
configuration.setApplicationName(DUMMY_APPLICATION_NAME);
configuration.setStreamName(DUMMY_STREAM_NAME);
configuration.getKinesisCredentialsProvider().set("class", DefaultCredentialsProvider.class.getName());
return configuration;
@ -111,6 +125,197 @@ public class MultiLangDaemonConfigurationTest {
assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}
@Test
public void testGracefulLeaseHandoffConfig() {
final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig =
getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig();
final long testGracefulLeaseHandoffTimeoutMillis =
defaultGracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis() + 12345;
final boolean testGracefulLeaseHandoffEnabled =
!defaultGracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled();
final MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setGracefulLeaseHandoffTimeoutMillis(testGracefulLeaseHandoffTimeoutMillis);
configuration.setIsGracefulLeaseHandoffEnabled(testGracefulLeaseHandoffEnabled);
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig();
assertEquals(
testGracefulLeaseHandoffTimeoutMillis, gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis());
assertEquals(testGracefulLeaseHandoffEnabled, gracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled());
}
@Test
public void testGracefulLeaseHandoffUsesDefaults() {
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory);
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig();
final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig =
getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig();
assertEquals(defaultGracefulLeaseHandoffConfig, gracefulLeaseHandoffConfig);
}
@Test
public void testWorkerUtilizationAwareAssignmentConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setInMemoryWorkerMetricsCaptureFrequencyMillis(123);
configuration.setWorkerMetricsReporterFreqInMillis(123);
configuration.setNoOfPersistedMetricsPerWorkerMetrics(123);
configuration.setDisableWorkerMetrics(true);
configuration.setMaxThroughputPerHostKBps(.123);
configuration.setDampeningPercentage(12);
configuration.setReBalanceThresholdPercentage(12);
configuration.setAllowThroughputOvershoot(false);
configuration.setVarianceBalancingFrequency(12);
configuration.setWorkerMetricsEMAAlpha(.123);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig;
LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
assertEquals(config.inMemoryWorkerMetricsCaptureFrequencyMillis(), 123);
assertEquals(config.workerMetricsReporterFreqInMillis(), 123);
assertEquals(config.noOfPersistedMetricsPerWorkerMetrics(), 123);
assertTrue(config.disableWorkerMetrics());
assertEquals(config.maxThroughputPerHostKBps(), .123, .25);
assertEquals(config.dampeningPercentage(), 12);
assertEquals(config.reBalanceThresholdPercentage(), 12);
assertFalse(config.allowThroughputOvershoot());
assertEquals(config.varianceBalancingFrequency(), 12);
assertEquals(config.workerMetricsEMAAlpha(), .123, .25);
}
@Test
public void testWorkerUtilizationAwareAssignmentConfigUsesDefaults() {
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig defaultWorkerUtilAwareAssignmentConfig =
getTestConfigsBuilder().leaseManagementConfig().workerUtilizationAwareAssignmentConfig();
final MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setVarianceBalancingFrequency(
defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency() + 12345);
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig resolvedWorkerUtilAwareAssignmentConfig =
resolvedConfiguration.leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
assertNotEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig);
// apart from the single updated configuration, all other config values should be equal to the default
resolvedWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency(
defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency());
assertEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig);
}
@Test
public void testWorkerMetricsTableConfigBean() {
final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PROVISIONED;
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setWorkerMetricsTableName("testTable");
configuration.setWorkerMetricsBillingMode(testWorkerMetricsTableBillingMode);
configuration.setWorkerMetricsReadCapacity(123);
configuration.setWorkerMetricsWriteCapacity(123);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig;
LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationConfig =
leaseManagementConfig.workerUtilizationAwareAssignmentConfig();
LeaseManagementConfig.WorkerMetricsTableConfig workerMetricsConfig =
workerUtilizationConfig.workerMetricsTableConfig();
assertEquals(workerMetricsConfig.tableName(), "testTable");
assertEquals(workerMetricsConfig.billingMode(), testWorkerMetricsTableBillingMode);
assertEquals(workerMetricsConfig.readCapacity(), 123);
assertEquals(workerMetricsConfig.writeCapacity(), 123);
}
@Test
public void testWorkerMetricsTableConfigUsesDefaults() {
final LeaseManagementConfig.WorkerMetricsTableConfig defaultWorkerMetricsTableConfig = getTestConfigsBuilder()
.leaseManagementConfig()
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig();
final MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setWorkerMetricsBillingMode(Arrays.stream(BillingMode.values())
.filter(billingMode -> billingMode != defaultWorkerMetricsTableConfig.billingMode())
.findFirst()
.orElseThrow(NoSuchElementException::new));
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
final LeaseManagementConfig.WorkerMetricsTableConfig resolvedWorkerMetricsTableConfig = resolvedConfiguration
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig();
assertNotEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig);
// apart from the single updated configuration, all other config values should be equal to the default
resolvedWorkerMetricsTableConfig.billingMode(defaultWorkerMetricsTableConfig.billingMode());
assertEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig);
}
@Test
public void testCoordinatorStateTableConfigBean() {
final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PAY_PER_REQUEST;
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setCoordinatorStateTableName("testTable");
configuration.setCoordinatorStateBillingMode(testWorkerMetricsTableBillingMode);
configuration.setCoordinatorStateReadCapacity(123);
configuration.setCoordinatorStateWriteCapacity(123);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
CoordinatorConfig coordinatorConfig = resolvedConfiguration.getCoordinatorConfig();
CoordinatorConfig.CoordinatorStateTableConfig coordinatorStateConfig =
coordinatorConfig.coordinatorStateConfig();
assertEquals(coordinatorStateConfig.tableName(), "testTable");
assertEquals(coordinatorStateConfig.billingMode(), testWorkerMetricsTableBillingMode);
assertEquals(coordinatorStateConfig.readCapacity(), 123);
assertEquals(coordinatorStateConfig.writeCapacity(), 123);
}
@Test
public void testCoordinatorStateTableConfigUsesDefaults() {
final CoordinatorConfig.CoordinatorStateTableConfig defaultCoordinatorStateTableConfig =
getTestConfigsBuilder().coordinatorConfig().coordinatorStateConfig();
final MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setCoordinatorStateWriteCapacity(defaultCoordinatorStateTableConfig.writeCapacity() + 12345);
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
final CoordinatorConfig.CoordinatorStateTableConfig resolvedCoordinatorStateTableConfig =
resolvedConfiguration.coordinatorConfig.coordinatorStateConfig();
assertNotEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
// apart from the single updated configuration, all other config values should be equal to the default
resolvedCoordinatorStateTableConfig.writeCapacity(defaultCoordinatorStateTableConfig.writeCapacity());
assertEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
}
@Test
public void testSetLeaseTablePitrEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
@ -266,4 +471,43 @@ public class MultiLangDaemonConfigurationTest {
assertThat(fanOutConfig.consumerArn(), equalTo(consumerArn));
}
@Test
public void testClientVersionConfig() {
final CoordinatorConfig.ClientVersionConfig testClientVersionConfig =
CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X;
final MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setClientVersionConfig(testClientVersionConfig);
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);
final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig;
assertEquals(testClientVersionConfig, coordinatorConfig.clientVersionConfig());
}
@Test
public void testClientVersionConfigUsesDefault() {
final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory);
final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig;
assertEquals(
getTestConfigsBuilder().coordinatorConfig().clientVersionConfig(),
coordinatorConfig.clientVersionConfig());
}
private ConfigsBuilder getTestConfigsBuilder() {
return new ConfigsBuilder(
DUMMY_STREAM_NAME,
DUMMY_APPLICATION_NAME,
Mockito.mock(KinesisAsyncClient.class),
Mockito.mock(DynamoDbAsyncClient.class),
Mockito.mock(CloudWatchAsyncClient.class),
"dummyWorkerIdentifier",
shardRecordProcessorFactory);
}
}

View file

@ -0,0 +1,249 @@
package software.amazon.kinesis.multilang.config;
import java.io.IOException;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.coordinator.CoordinatorConfig.ClientVersionConfig;
import software.amazon.kinesis.multilang.MultiLangDaemonConfig;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration.ResolvedConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PropertiesMappingE2ETest {
private static final String PROPERTIES_FILE = "multilang.properties";
private static final String PROPERTIES_FILE_V3 = "multilangv3.properties";
@Test
public void testKclV3PropertiesMapping() throws IOException {
final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE);
final ResolvedConfiguration kclV3Config =
config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory());
assertEquals(
ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X,
kclV3Config.coordinatorConfig.clientVersionConfig());
assertEquals(
"MultiLangTest-CoordinatorState-CustomName",
kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
assertEquals(
BillingMode.PROVISIONED,
kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
assertEquals(
1000, kclV3Config.coordinatorConfig.coordinatorStateConfig().readCapacity());
assertEquals(500, kclV3Config.coordinatorConfig.coordinatorStateConfig().writeCapacity());
assertEquals(
10000L,
kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis());
assertFalse(
kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled());
assertEquals(
5000L,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.inMemoryWorkerMetricsCaptureFrequencyMillis());
assertEquals(
60000L,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsReporterFreqInMillis());
assertEquals(
50,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.noOfPersistedMetricsPerWorkerMetrics());
assertTrue(kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.disableWorkerMetrics());
assertEquals(
10000,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.maxThroughputPerHostKBps());
assertEquals(
90,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.dampeningPercentage());
assertEquals(
5,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.reBalanceThresholdPercentage());
assertFalse(kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.allowThroughputOvershoot());
assertEquals(
Duration.ofHours(12),
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.staleWorkerMetricsEntryCleanupDuration());
assertEquals(
5,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.varianceBalancingFrequency());
assertEquals(
0.18D,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsEMAAlpha());
assertEquals(
"MultiLangTest-WorkerMetrics-CustomName",
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.tableName());
assertEquals(
BillingMode.PROVISIONED,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.billingMode());
assertEquals(
250,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.readCapacity());
assertEquals(
90,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.writeCapacity());
}
@Test
public void testKclV3PropertiesMappingForDefaultValues() throws IOException {
final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE_V3);
final ResolvedConfiguration kclV3Config =
config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory());
assertEquals(ClientVersionConfig.CLIENT_VERSION_CONFIG_3X, kclV3Config.coordinatorConfig.clientVersionConfig());
assertEquals(
"MultiLangTest-CoordinatorState",
kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
assertEquals(
BillingMode.PAY_PER_REQUEST,
kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
assertEquals(
30_000L,
kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis());
assertTrue(
kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled());
assertEquals(
1000L,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.inMemoryWorkerMetricsCaptureFrequencyMillis());
assertEquals(
30000L,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsReporterFreqInMillis());
assertEquals(
10,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.noOfPersistedMetricsPerWorkerMetrics());
assertFalse(kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.disableWorkerMetrics());
assertEquals(
Double.MAX_VALUE,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.maxThroughputPerHostKBps());
assertEquals(
60,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.dampeningPercentage());
assertEquals(
10,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.reBalanceThresholdPercentage());
assertTrue(kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.allowThroughputOvershoot());
assertEquals(
Duration.ofDays(1),
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.staleWorkerMetricsEntryCleanupDuration());
assertEquals(
3,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.varianceBalancingFrequency());
assertEquals(
0.5D,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsEMAAlpha());
assertEquals(
"MultiLangTest-WorkerMetricStats",
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.tableName());
assertEquals(
BillingMode.PAY_PER_REQUEST,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.billingMode());
}
private static class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return null;
}
}
}

View file

@ -0,0 +1,68 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.config;
import java.util.Optional;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class WorkerUtilizationAwareAssignmentConfigBeanTest {
@Mock
private KinesisAsyncClient kinesisAsyncClient;
@Test
public void testAllPropertiesTransit() {
PollingConfigBean pollingConfigBean = new PollingConfigBean();
pollingConfigBean.setIdleTimeBetweenReadsInMillis(1000);
pollingConfigBean.setMaxGetRecordsThreadPool(20);
pollingConfigBean.setMaxRecords(5000);
pollingConfigBean.setRetryGetRecordsInSeconds(30);
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
MultiLangDaemonConfiguration multiLangDaemonConfiguration =
new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean);
multiLangDaemonConfiguration.setStreamName("test-stream");
PollingConfig pollingConfig = pollingConfigBean.build(kinesisAsyncClient, multiLangDaemonConfiguration);
assertThat(pollingConfig.kinesisClient(), equalTo(kinesisAsyncClient));
assertThat(pollingConfig.streamName(), equalTo(multiLangDaemonConfiguration.getStreamName()));
assertThat(
pollingConfig.idleTimeBetweenReadsInMillis(),
equalTo(pollingConfigBean.getIdleTimeBetweenReadsInMillis()));
assertThat(
pollingConfig.maxGetRecordsThreadPool(),
equalTo(Optional.of(pollingConfigBean.getMaxGetRecordsThreadPool())));
assertThat(pollingConfig.maxRecords(), equalTo(pollingConfigBean.getMaxRecords()));
assertThat(
pollingConfig.retryGetRecordsInSeconds(),
equalTo(Optional.of(pollingConfigBean.getRetryGetRecordsInSeconds())));
}
}

View file

@ -91,3 +91,73 @@ validateSequenceNumberBeforeCheckpointing = true
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
maxActiveThreads = -1
################### KclV3 configurations ###################
# Coordinator config
# Version the KCL needs to operate in. For more details check the KCLv3 migration
# documentation. Default is CLIENT_VERSION_CONFIG_3X
clientVersionConfig = CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x
# TODO: include table deletion protection and pitr config once its added
# Configurations to control how the CoordinatorState DDB table is created
# Default name is applicationName-CoordinatorState in PAY_PER_REQUEST
coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName
coordinatorStateBillingMode = PROVISIONED
coordinatorStateReadCapacity = 1000
coordinatorStateWriteCapacity = 500
# Graceful handoff config - tuning of the shutdown behavior during lease transfers
# default values are 30000 and true respectively
gracefulLeaseHandoffTimeoutMillis = 10000
isGracefulLeaseHandoffEnabled = false
# WorkerMetricStats table config - control how the DDB table is created
## Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST
# TODO: include table deletion protection and pitr config once its added
workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName
workerMetricsBillingMode = PROVISIONED
workerMetricsReadCapacity = 250
workerMetricsWriteCapacity = 90
# WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm
#
# frequency of capturing worker metrics in memory. Default is 1s
inMemoryWorkerMetricsCaptureFrequencyMillis = 5000
# frequency of reporting worker metric stats to storage. Default is 30s
workerMetricsReporterFreqInMillis = 60000
# No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10
noOfPersistedMetricsPerWorkerMetrics = 50
# Disable use of worker metrics to balance lease, default is false.
# If it is true, the algorithm balances lease based on worker's processing throughput.
disableWorkerMetrics = true
# Max throughput per host 10 MBps, to limit processing to the given value
# Default is unlimited.
maxThroughputPerHostKBps = 10000
# Dampen the load that is rebalanced during lease re-balancing, default is 60%
dampeningPercentage = 90
# Configures the allowed variance range for worker utilization. The upper
# limit is calculated as average * (1 + reBalanceThresholdPercentage/100).
# The lower limit is average * (1 - reBalanceThresholdPercentage/100). If
# any worker's utilization falls outside this range, lease re-balancing is
# triggered. The re-balancing algorithm aims to bring variance within the
# specified range. It also avoids thrashing by ensuring the utilization of
# the worker receiving the load after re-balancing doesn't exceed the fleet
# average. This might cause no re-balancing action even the utilization is
# out of the variance range. The default value is 10, representing +/-10%
# variance from the average value.
reBalanceThresholdPercentage = 5
# Whether at-least one lease must be taken from a high utilization worker
# during re-balancing when there is no lease assigned to that worker which has
# throughput is less than or equal to the minimum throughput that needs to be
# moved away from that worker to bring the worker back into the allowed variance.
# Default is true.
allowThroughputOvershoot = false
# Lease assignment is performed every failoverTimeMillis but re-balance will
# be attempted only once in 5 times based on the below config. Default is 3.
varianceBalancingFrequency = 5
# Alpha value used for calculating exponential moving average of worker's metricStats.
workerMetricsEMAAlpha = 0.18
# Duration after which workerMetricStats entry from WorkerMetricStats table will
# be cleaned up.
# Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days)
# Refer to Duration.parse javadocs for more details
staleWorkerMetricsEntryCleanupDuration = PT12H

View file

@ -0,0 +1,167 @@
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py
# The Stream arn: arn:aws:kinesis:<region>:<account id>:stream/<stream name>
# Important: streamArn takes precedence over streamName if both are set
streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample
# The name of an Amazon Kinesis stream to process.
# Important: streamArn takes precedence over streamName if both are set
streamName = kclpysample
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = MultiLangTest
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/3.8
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON
# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended',
# and uncomment below line with right timestamp value.
# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
#initialPositionInStreamExtended = 1636609142
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# The KCL defaults to us-east-1
regionName = us-east-1
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
workerId = "workerId"
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
maxRecords = 10000
# Idle time between record reads in milliseconds.
idleTimeBetweenReadsInMillis = 1000
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
maxActiveThreads = -1
################### KclV3 configurations ###################
# Coordinator config
clientVersionConfig = CLIENT_VERSION_CONFIG_3x
## Let all other config be defaults
## TODO: include table deletion protection and pitr config once its added
## Configurations to control how the CoordinatorState DDB table is created
## Default name is applicationName-CoordinatorState in PAY_PER_REQUEST
#coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName
#coordinatorStateBillingMode = PROVISIONED
#coordinatorStateReadCapacity = 1000
#coordinatorStateWriteCapacity = 500
#
## Graceful handoff config - tuning of the shutdown behavior during lease transfers
## default values are 30000 and true respectively
#gracefulLeaseHandoffTimeoutMillis = 10000
#isGracefulLeaseHandoffEnabled = false
#
## WorkerMetricStats table config - control how the DDB table is created
### Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST
## TODO: include table deletion protection and pitr config once its added
#workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName
#workerMetricsBillingMode = PROVISIONED
#workerMetricsReadCapacity = 250
#workerMetricsWriteCapacity = 90
#
## WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm
##
## frequency of capturing worker metrics in memory. Default is 1s
#inMemoryWorkerMetricsCaptureFrequencyMillis = 5000
## frequency of reporting worker metric stats to storage. Default is 30s
#workerMetricsReporterFreqInMillis = 60000
## No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10.
## This provides historic values that are used to compute the workers current
## utilization using an exponential-moving-average.
#noOfPersistedMetricsPerWorkerMetrics = 50
## Disable use of worker metrics to balance lease, default is false.
## If it is true, the algorithm balances lease based on worker's processing throughput.
#disableWorkerMetrics = true
## Max throughput per host 10 MBps, to limit processing to the given value
## Default is unlimited.
#maxThroughputPerHostKBps = 10000
## Dampen the load that is rebalanced during lease re-balancing, default is 60%
#dampeningPercentage = 90
## Configures the allowed variance range for worker utilization. The upper
## limit is calculated as average * (1 + reBalanceThresholdPercentage/100).
## The lower limit is average * (1 - reBalanceThresholdPercentage/100). If
## any worker's utilization falls outside this range, lease re-balancing is
## triggered. The re-balancing algorithm aims to bring variance within the
## specified range. It also avoids thrashing by ensuring the utilization of
## the worker receiving the load after re-balancing doesn't exceed the fleet
## average. This might cause no re-balancing action even the utilization is
## out of the variance range. The default value is 10, representing +/-10%
## variance from the average value.
#reBalanceThresholdPercentage = 5
## Whether at-least one lease must be taken from a high utilization worker
## during re-balancing when there is no lease assigned to that worker which has
## throughput is less than or equal to the minimum throughput that needs to be
## moved away from that worker to bring the worker back into the allowed variance.
## Default is true.
#allowThroughputOvershoot = false
## Lease assignment is performed every failoverTimeMillis but re-balance will
## be attempted only once in 5 times based on the below config. Default is 3.
#varianceBalancingFrequency = 5
## Alpha value used for calculating exponential moving average of worker's metricStats.
## Default is 0.5, a higher alpha value will make re-balancing more sensitive
## to recent metricStats.
#workerMetricsEMAAlpha = 0.18
## Duration after which workerMetricStats entry from WorkerMetricStats table will
## be cleaned up. Default is 1 day.
## Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days)
## Refer to Duration.parse javadocs for more details
#staleWorkerMetricsEntryCleanupDuration = PT12H

View file

@ -123,7 +123,7 @@ public class CoordinatorConfig {
* This version also allows rolling back to the compatible mode from the
* auto-toggled 3.x mode.
*/
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x,
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X,
/**
* A new application operating with KCLv3.x will use this value. Also, an application
* that has successfully upgraded to 3.x version and no longer needs the ability
@ -131,14 +131,14 @@ public class CoordinatorConfig {
* KCL will operate with new algorithms introduced in 3.x which is not compatible
* with prior versions. And once in this version, rollback to 2.x is not supported.
*/
CLIENT_VERSION_CONFIG_3x,
CLIENT_VERSION_CONFIG_3X,
}
/**
* Client version KCL must operate in, by default it operates in 3.x version which is not
* compatible with prior versions.
*/
private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3x;
private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3X;
public static class CoordinatorStateTableConfig extends DdbTableConfig {
private CoordinatorStateTableConfig(final String applicationName) {

View file

@ -159,7 +159,7 @@ public final class DynamicMigrationComponentsInitializer {
// always collect metrics so that when we flip to start reporting we will have accurate historical data.
log.info("Start collection of WorkerMetricStats");
workerMetricsManager.startManager();
if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x) {
if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X) {
initializeComponentsFor3x();
} else {
initializeComponentsForMigration(migrationStateMachineStartingClientVersion);
@ -187,7 +187,7 @@ public final class DynamicMigrationComponentsInitializer {
log.info("Initializing for migration to 3x");
dualMode = true;
final LeaderDecider initialLeaderDecider;
if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) {
if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT;
initialLeaderDecider = ddbLockBasedLeaderDeciderCreator.get();
} else {
@ -292,8 +292,8 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to support upgrade from 2x. This can happen
* at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x.
* Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2x.
* at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X.
* Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2X.
*/
public synchronized void initializeClientVersionForUpgradeFrom2x(final ClientVersion fromClientVersion)
throws DependencyException {
@ -306,8 +306,8 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to run vanilla 3x functionality. This can happen
* at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x, or dynamically
* during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK
* at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X, or dynamically
* during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK
*/
public synchronized void initializeClientVersionFor3x(final ClientVersion fromClientVersion)
throws DependencyException {
@ -322,14 +322,14 @@ public final class DynamicMigrationComponentsInitializer {
log.info("Starting LAM");
leaseAssignmentManager.start();
}
// nothing to do when transitioning from CLIENT_VERSION_3x_WITH_ROLLBACK.
// nothing to do when transitioning from CLIENT_VERSION_3X_WITH_ROLLBACK.
}
/**
* Initialize KCL with components and configuration to run 2x compatible functionality
* while allowing roll-forward. This can happen at KCL Worker startup when MigrationStateMachine
* starts in ClientVersion.CLIENT_VERSION_2x (after a rollback)
* Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK.
* starts in ClientVersion.CLIENT_VERSION_2X (after a rollback)
* Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK.
*/
public synchronized void initializeClientVersionFor2x(final ClientVersion fromClientVersion) {
log.info("Initializing KCL components for rollback to 2x from {}", fromClientVersion);
@ -341,7 +341,7 @@ public final class DynamicMigrationComponentsInitializer {
// and WorkerMetricStats table
}
if (fromClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) {
if (fromClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
// we are rolling back after flip
currentAssignmentMode = DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT;
notifyLeaseAssignmentModeChange();
@ -361,14 +361,14 @@ public final class DynamicMigrationComponentsInitializer {
/**
* Initialize KCL with components and configuration to run vanilla 3x functionality
* while allowing roll-back to 2x functionality. This can happen at KCL Worker startup
* when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK (after the flip)
* Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2x.
* when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK (after the flip)
* Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2X.
*/
public synchronized void initializeClientVersionFor3xWithRollback(final ClientVersion fromClientVersion)
throws DependencyException {
log.info("Initializing KCL components for 3x with rollback from {}", fromClientVersion);
if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) {
if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) {
// dynamic flip
currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT;
notifyLeaseAssignmentModeChange();

View file

@ -31,28 +31,28 @@ public enum ClientVersion {
* KCL workers will emit WorkerMetricStats and run KCLv2.x algorithms for leader election and lease
* assignment. KCL will also monitor for upgrade to KCLv3.x readiness of the worker fleet.
*/
CLIENT_VERSION_UPGRADE_FROM_2x,
CLIENT_VERSION_UPGRADE_FROM_2X,
/**
* This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK,
* This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK,
* which can only be initiated using a KCL migration tool, when customer wants to revert to KCLv2.x functionality.
* In this version, KCL will not emit WorkerMetricStats and run KCLv2.x algorithms for leader election
* and lease assignment. In this version, KCL will monitor for roll-forward scenario where
* client version is updated to CLIENT_VERSION_UPGRADE_FROM_2x using the migration tool.
* client version is updated to CLIENT_VERSION_UPGRADE_FROM_2X using the migration tool.
*/
CLIENT_VERSION_2x,
CLIENT_VERSION_2X,
/**
* When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2x and when worker fleet is determined to be
* When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2X and when worker fleet is determined to be
* KCLv3.x ready (when lease table GSI is active and worker-metrics are being emitted by all lease owners)
* then the leader will initiate the switch to KCLv3.x algorithms for leader election and lease assignment,
* by using this version and persisting it in the {@link MigrationState} that allows all worker hosts
* to also flip to KCLv3.x functionality. In this KCL will also monitor for rollback to detect when the
* customer updates version to CLIENT_VERSION_2x using migration tool, so that it instantly flips back
* to CLIENT_VERSION_2x.
* customer updates version to CLIENT_VERSION_2X using migration tool, so that it instantly flips back
* to CLIENT_VERSION_2X.
*/
CLIENT_VERSION_3x_WITH_ROLLBACK,
CLIENT_VERSION_3X_WITH_ROLLBACK,
/**
* A new application starting KCLv3.x or an upgraded application from KCLv2.x after upgrade is successful
* can use this version to default all KCLv3.x algorithms without any monitor to rollback.
*/
CLIENT_VERSION_3x;
CLIENT_VERSION_3X;
}

View file

@ -144,11 +144,11 @@ public class ClientVersionChangeMonitor implements Runnable {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, METRICS_OPERATION);
try {
switch (expectedVersion) {
case CLIENT_VERSION_3x_WITH_ROLLBACK:
case CLIENT_VERSION_3X_WITH_ROLLBACK:
scope.addData("CurrentState:3xWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
break;
case CLIENT_VERSION_2x:
case CLIENT_VERSION_UPGRADE_FROM_2x:
case CLIENT_VERSION_2X:
case CLIENT_VERSION_UPGRADE_FROM_2X:
scope.addData("CurrentState:2xCompatibleWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
break;
default:

View file

@ -32,13 +32,13 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
* State for CLIENT_VERSION_2x. In this state, the only allowed valid transition is
* State for CLIENT_VERSION_2X. In this state, the only allowed valid transition is
* the roll-forward scenario which can only be performed using the KCL Migration tool.
* So when the state machine enters this state, a monitor is started to detect the
* roll-forward scenario.
@ -60,7 +60,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
@Override
public ClientVersion clientVersion() {
return CLIENT_VERSION_2x;
return CLIENT_VERSION_2X;
}
@Override
@ -102,7 +102,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
/**
* Callback handler to handle client version changes in MigrationState in DDB.
* @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2x
* @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2X
* @throws InvalidStateException during transition to the next state based on the new ClientVersion
* or if the new state in DDB is unexpected.
*/
@ -115,18 +115,18 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
final MetricsScope scope =
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2x) {
if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2X) {
log.info(
"A roll-forward has been initiated for the application. Transition to {}",
CLIENT_VERSION_UPGRADE_FROM_2x);
CLIENT_VERSION_UPGRADE_FROM_2X);
// If this succeeds, the monitor will cancel itself.
stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2x, newState);
stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2X, newState);
} else {
// This should not happen, so throw an exception that allows the monitor to continue monitoring
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
// so we could also write back CLIENT_VERSION_2x to DDB to ensure all workers have consistent
// so we could also write back CLIENT_VERSION_2X to DDB to ensure all workers have consistent
// behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture
@ -134,7 +134,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat
log.error(
"Migration state has invalid client version {}. Transition from {} is not supported",
newState,
CLIENT_VERSION_2x);
CLIENT_VERSION_2X);
throw new InvalidStateException(String.format("Unexpected new state %s", newState));
}
} catch (final InvalidStateException | DependencyException e) {

View file

@ -22,7 +22,7 @@ import software.amazon.kinesis.coordinator.DynamicMigrationComponentsInitializer
import software.amazon.kinesis.leases.exceptions.DependencyException;
/**
* State for CLIENT_VERSION_3x which enables KCL to run 3.x algorithms on new KCLv3.x application
* State for CLIENT_VERSION_3X which enables KCL to run 3.x algorithms on new KCLv3.x application
* or successfully upgraded application which upgraded from v2.x. This is a terminal state of the
* state machine and no rollbacks are supported in this state.
*/
@ -38,7 +38,7 @@ public class MigrationClientVersion3xState implements MigrationClientVersionStat
@Override
public ClientVersion clientVersion() {
return ClientVersion.CLIENT_VERSION_3x;
return ClientVersion.CLIENT_VERSION_3X;
}
@Override

View file

@ -31,15 +31,15 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
* State for CLIENT_VERSION_3x_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms
* State for CLIENT_VERSION_3X_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms
* during the upgrade process after all KCL workers in the fleet are 3.x complaint. Since this
* is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2x, it also supports rollback if customers
* is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2X, it also supports rollback if customers
* see regression to allow for instant rollbacks as well. This would be achieved by customers
* running a KCL migration tool to update MigrationState in DDB. So this state monitors for
* rollback triggers and performs state transitions accordingly.
@ -62,7 +62,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
@Override
public ClientVersion clientVersion() {
return ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
return ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
}
@Override
@ -108,17 +108,17 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
switch (newState.getClientVersion()) {
case CLIENT_VERSION_2x:
log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x);
stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2x, newState);
case CLIENT_VERSION_2X:
log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X);
stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2X, newState);
break;
case CLIENT_VERSION_3x:
case CLIENT_VERSION_3X:
log.info("Customer has switched to 3.x after successful upgrade, state machine will move to a"
+ "terminal state and stop monitoring. Rollbacks will no longer be supported anymore");
stateMachine.transitionTo(CLIENT_VERSION_3x, newState);
stateMachine.transitionTo(CLIENT_VERSION_3X, newState);
// This worker will still be running the migrationAdaptive components in 3.x mode which will
// no longer dynamically switch back to 2.x mode, however to directly run 3.x component without
// adaption to migration (i.e. move to CLIENT_VERSION_3x state), it requires this worker to go
// adaption to migration (i.e. move to CLIENT_VERSION_3X state), it requires this worker to go
// through the current deployment which initiated the switch to 3.x mode.
break;
default:
@ -126,7 +126,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
// so we could also write back CLIENT_VERSION_3x_WITH_ROLLBACK to DDB to ensure all workers have
// so we could also write back CLIENT_VERSION_3X_WITH_ROLLBACK to DDB to ensure all workers have
// consistent behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture

View file

@ -31,10 +31,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRATION_HASH_KEY;
/**
@ -44,13 +44,13 @@ import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRA
* as follows
* ClientVersionConfig | MigrationState (DDB) | initial client version
* --------------------+---------------------------------+--------------------------------
* COMPATIBLE_WITH_2x | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2x
* 3x | Does not exist | CLIENT_VERSION_3x
* COMPATIBLE_WITH_2x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x_WITH_ROLLBACK
* 3x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x
* any | CLIENT_VERSION_2x | CLIENT_VERSION_2x
* any | CLIENT_VERSION_UPGRADE_FROM_2x | CLIENT_VERSION_UPGRADE_FROM_2x
* any | CLIENT_VERSION_3x | CLIENT_VERSION_3x
* COMPATIBLE_WITH_2X | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2X
* 3X | Does not exist | CLIENT_VERSION_3X
* COMPATIBLE_WITH_2X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X_WITH_ROLLBACK
* 3X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X
* any | CLIENT_VERSION_2X | CLIENT_VERSION_2X
* any | CLIENT_VERSION_UPGRADE_FROM_2X | CLIENT_VERSION_UPGRADE_FROM_2X
* any | CLIENT_VERSION_3X | CLIENT_VERSION_3X
*/
@KinesisClientInternalApi
@RequiredArgsConstructor
@ -110,26 +110,26 @@ public class MigrationClientVersionStateInitializer {
nextClientVersion = getNextClientVersionBasedOnConfigVersion();
log.info("Application is starting in {}", nextClientVersion);
break;
case CLIENT_VERSION_3x_WITH_ROLLBACK:
if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3x) {
case CLIENT_VERSION_3X_WITH_ROLLBACK:
if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3X) {
// upgrade successful, allow transition to 3x.
log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3x);
nextClientVersion = CLIENT_VERSION_3x;
log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3X);
nextClientVersion = CLIENT_VERSION_3X;
break;
}
log.info("Initialize with {}", CLIENT_VERSION_3x_WITH_ROLLBACK);
log.info("Initialize with {}", CLIENT_VERSION_3X_WITH_ROLLBACK);
nextClientVersion = migrationState.getClientVersion();
break;
case CLIENT_VERSION_2x:
log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2x);
case CLIENT_VERSION_2X:
log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2X);
nextClientVersion = migrationState.getClientVersion();
break;
case CLIENT_VERSION_UPGRADE_FROM_2x:
log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2x);
case CLIENT_VERSION_UPGRADE_FROM_2X:
log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2X);
nextClientVersion = migrationState.getClientVersion();
break;
case CLIENT_VERSION_3x:
log.info("Initialize with {}", CLIENT_VERSION_3x);
case CLIENT_VERSION_3X:
log.info("Initialize with {}", CLIENT_VERSION_3X);
nextClientVersion = migrationState.getClientVersion();
break;
default:
@ -180,10 +180,10 @@ public class MigrationClientVersionStateInitializer {
private ClientVersion getNextClientVersionBasedOnConfigVersion() {
switch (clientVersionConfig) {
case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x:
return CLIENT_VERSION_UPGRADE_FROM_2x;
case CLIENT_VERSION_CONFIG_3x:
return CLIENT_VERSION_3x;
case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X:
return CLIENT_VERSION_UPGRADE_FROM_2X;
case CLIENT_VERSION_CONFIG_3X:
return CLIENT_VERSION_3X;
}
throw new IllegalStateException(String.format("Unknown configured Client version %s", clientVersionConfig));
}

View file

@ -32,23 +32,23 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X;
import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC;
import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION;
/**
* State for CLIENT_VERSION_UPGRADE_FROM_2x. When state machine enters this state,
* State for CLIENT_VERSION_UPGRADE_FROM_2X. When state machine enters this state,
* KCL is initialized to operate in dual mode for Lease assignment and Leader decider algorithms
* which initially start in 2.x compatible mode and when all the KCL workers are 3.x compliant,
* it dynamically switches to the 3.x algorithms. It also monitors for rollback
* initiated from customer via the KCL migration tool and instantly switches back to the 2.x
* complaint algorithms.
* The allowed state transitions are to CLIENT_VERSION_3x_WITH_ROLLBACK when KCL workers are
* 3.x complaint, and to CLIENT_VERSION_2x when customer has initiated a rollback.
* The allowed state transitions are to CLIENT_VERSION_3X_WITH_ROLLBACK when KCL workers are
* 3.x complaint, and to CLIENT_VERSION_2X when customer has initiated a rollback.
* Only the leader KCL worker performs migration ready monitor and notifies all workers (including
* itself) via a MigrationState update. When all worker's monitor notice the MigrationState change
* (including itself), it will transition to CLIENT_VERSION_3x_WITH_ROLLBACK.
* (including itself), it will transition to CLIENT_VERSION_3X_WITH_ROLLBACK.
*/
@KinesisClientInternalApi
@RequiredArgsConstructor
@ -71,7 +71,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
@Override
public ClientVersion clientVersion() {
return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x;
return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X;
}
@Override
@ -170,7 +170,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
/**
* Callback handler to handle client version changes in MigrationState in DDB.
* @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2x
* @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2X
* @throws InvalidStateException during transition to the next state based on the new ClientVersion
* or if the new state in DDB is unexpected.
*/
@ -184,23 +184,23 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION);
try {
switch (newState.getClientVersion()) {
case CLIENT_VERSION_2x:
log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x);
case CLIENT_VERSION_2X:
log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X);
// cancel monitor asynchronously
cancelMigrationReadyMonitor();
stateMachine.transitionTo(CLIENT_VERSION_2x, newState);
stateMachine.transitionTo(CLIENT_VERSION_2X, newState);
break;
case CLIENT_VERSION_3x_WITH_ROLLBACK:
log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3x_WITH_ROLLBACK);
case CLIENT_VERSION_3X_WITH_ROLLBACK:
log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3X_WITH_ROLLBACK);
cancelMigrationReadyMonitor();
stateMachine.transitionTo(CLIENT_VERSION_3x_WITH_ROLLBACK, newState);
stateMachine.transitionTo(CLIENT_VERSION_3X_WITH_ROLLBACK, newState);
break;
default:
// This should not happen, so throw an exception that allows the monitor to continue monitoring
// changes, this allows KCL to operate in the current state and keep monitoring until a valid
// state transition is possible.
// However, there could be a split brain here, new workers will use DDB value as source of truth,
// so we could also write back CLIENT_VERSION_UPGRADE_FROM_2x to DDB to ensure all workers have
// so we could also write back CLIENT_VERSION_UPGRADE_FROM_2X to DDB to ensure all workers have
// consistent behavior.
// Ideally we don't expect modifications to DDB table out of the KCL migration tool scope,
// so keeping it simple and not writing back to DDB, the error log below would help capture
@ -222,7 +222,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
try {
final MigrationState newMigrationState = currentMigrationState
.copy()
.update(CLIENT_VERSION_3x_WITH_ROLLBACK, initializer.workerIdentifier());
.update(CLIENT_VERSION_3X_WITH_ROLLBACK, initializer.workerIdentifier());
log.info("Updating Migration State in DDB with {} prev state {}", newMigrationState, currentMigrationState);
return coordinatorStateDAO.updateCoordinatorStateWithExpectation(
newMigrationState, currentMigrationState.getDynamoClientVersionExpectation());
@ -230,7 +230,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient
log.warn(
"Exception occurred when toggling to {}, upgradeReadyMonitor will retry the update"
+ " if upgrade condition is still true",
CLIENT_VERSION_3x_WITH_ROLLBACK,
CLIENT_VERSION_3X_WITH_ROLLBACK,
e);
scope.addData(FAULT_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
return false;

View file

@ -190,7 +190,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
currentMigrationClientVersionState = nextMigrationClientVersionState;
log.info("Successfully transitioned to {}", nextMigrationClientVersionState.clientVersion());
if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3x) {
if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3X) {
terminate();
}
success = true;
@ -220,10 +220,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
private MigrationClientVersionState createMigrationClientVersionState(
final ClientVersion clientVersion, final MigrationState migrationState) {
switch (clientVersion) {
case CLIENT_VERSION_2x:
case CLIENT_VERSION_2X:
return new MigrationClientVersion2xState(
this, coordinatorStateDAO, stateMachineThreadPool, initializer, random);
case CLIENT_VERSION_UPGRADE_FROM_2x:
case CLIENT_VERSION_UPGRADE_FROM_2X:
return new MigrationClientVersionUpgradeFrom2xState(
this,
timeProvider,
@ -233,10 +233,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
random,
migrationState,
flipTo3XStabilizerTimeInSeconds);
case CLIENT_VERSION_3x_WITH_ROLLBACK:
case CLIENT_VERSION_3X_WITH_ROLLBACK:
return new MigrationClientVersion3xWithRollbackState(
this, coordinatorStateDAO, stateMachineThreadPool, initializer, random);
case CLIENT_VERSION_3x:
case CLIENT_VERSION_3X:
return new MigrationClientVersion3xState(this, initializer);
}
throw new IllegalStateException(String.format("Unknown client version %s", clientVersion));
@ -246,7 +246,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
if (currentMigrationClientVersionState != null) {
return currentMigrationClientVersionState.clientVersion();
} else if (terminated) {
return ClientVersion.CLIENT_VERSION_3x;
return ClientVersion.CLIENT_VERSION_3X;
}
throw new UnsupportedOperationException(
"No current state when state machine is either not initialized" + " or already terminated");

View file

@ -29,7 +29,6 @@ import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
@ -381,8 +380,8 @@ public class LeaseManagementConfig {
* to shut down and an option to enable or disable graceful lease handoff.
* </p>
*/
@Data
@Builder
@Getter
@Accessors(fluent = true)
public static class GracefulLeaseHandoffConfig {
/**
@ -550,7 +549,7 @@ public class LeaseManagementConfig {
private int dampeningPercentage = 60;
/**
* Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less
* than 20% of fleet level average then reBalance is triggered.
* than 10% of fleet level average then reBalance is triggered.
* Leases are taken from workers with metrics value more than fleet level average. The load to take from these
* workers is determined by evaluating how far they are with respect to fleet level average.
*/
@ -565,7 +564,7 @@ public class LeaseManagementConfig {
private boolean allowThroughputOvershoot = true;
/**
* Duration after which workerMetrics entry from WorkerMetricStats table will be cleaned up. When an entry's
* Duration after which workerMetricStats entry from WorkerMetricStats table will be cleaned up. When an entry's
* lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed
* from the table.
*/
@ -580,15 +579,16 @@ public class LeaseManagementConfig {
private WorkerMetricsTableConfig workerMetricsTableConfig;
/**
* Frequency to perform worker variance balancing frequency. This value is used with respect to the LAM freq,
* Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency,
* that is every third (as default) iteration of LAM the worker variance balancing will be performed.
* Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration
* and so on.
* NOTE: LAM frequency = failoverTimeMillis
*/
private int varianceBalancingFrequency = 3;
/**
* Alpha value used for calculating exponential moving average of worker's metrics values. Selecting
* Alpha value used for calculating exponential moving average of worker's metricStats. Selecting
* higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average
* and selecting smaller alpha values gives more weightage to past value and high smoothing effect.
*/

View file

@ -189,7 +189,7 @@ public class CoordinatorStateDAOTest {
createCoordinatorState("key1");
final MigrationState migrationState = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID)
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID);
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID);
doaUnderTest.createCoordinatorStateIfNotExists(migrationState);
final AmazonDynamoDBLockClient dynamoDBLockClient = new AmazonDynamoDBLockClient(doaUnderTest
@ -223,7 +223,7 @@ public class CoordinatorStateDAOTest {
// Make sure the record has not changed due to using
// ddb lock client
Assertions.assertEquals(
ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x.toString(),
ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X.toString(),
item.get(CLIENT_VERSION_ATTRIBUTE_NAME).s());
} else if (LEADER_HASH_KEY.equals(key)) {
Assertions.assertEquals("TEST_WORKER", item.get("ownerName").s());
@ -264,7 +264,7 @@ public class CoordinatorStateDAOTest {
if ("Migration3.0".equals(keyValue)) {
Assertions.assertTrue(state instanceof MigrationState);
final MigrationState migrationState = (MigrationState) state;
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, migrationState.getClientVersion());
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, migrationState.getClientVersion());
return;
}
Assertions.assertEquals(3, state.getAttributes().size());
@ -379,7 +379,7 @@ public class CoordinatorStateDAOTest {
final MigrationState state = createMigrationState();
/* Test step - update the state with mismatched condition */
final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation(
updatedState, updatedState.getDynamoClientVersionExpectation());
@ -403,7 +403,7 @@ public class CoordinatorStateDAOTest {
.build())
.join();
Assertions.assertEquals(
ClientVersion.CLIENT_VERSION_2x.name(),
ClientVersion.CLIENT_VERSION_2X.name(),
response.item().get("cv").s());
Assertions.assertEquals(WORKER_ID, response.item().get("mb").s());
Assertions.assertEquals(
@ -433,7 +433,7 @@ public class CoordinatorStateDAOTest {
/* Test step - update with new state object */
final MigrationState updatedState =
new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation(updatedState, null);
@ -491,7 +491,7 @@ public class CoordinatorStateDAOTest {
final HashMap<String, AttributeValue> item = new HashMap<String, AttributeValue>() {
{
put("key", AttributeValue.fromS("Migration3.0"));
put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3x.toString()));
put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3X.toString()));
put("mb", AttributeValue.fromS("DUMMY_WORKER"));
put("mts", AttributeValue.fromN(String.valueOf(System.currentTimeMillis())));
}

View file

@ -112,7 +112,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testInitialize_ClientVersion3_X() throws DependencyException {
// Test initializing to verify correct leader decider is created
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X);
verify(mockWorkerMetricsManager).startManager();
verify(mockDdbLockBasedLeaderDeciderCreator).get();
@ -147,7 +147,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testInitialize_ClientVersion_3_xWithRollback() throws DependencyException {
// Test initializing to verify correct leader decider is created
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
verify(mockWorkerMetricsManager).startManager();
@ -171,7 +171,7 @@ public class DynamicMigrationComponentsInitializerTest {
}
@ParameterizedTest
@CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2x", "CLIENT_VERSION_2x"})
@CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2X", "CLIENT_VERSION_2X"})
public void testInitialize_ClientVersion_All2_X(final ClientVersion clientVersion) throws DependencyException {
// Test initializing to verify correct leader decider is created
migrationInitializer.initialize(clientVersion);
@ -187,7 +187,7 @@ public class DynamicMigrationComponentsInitializerTest {
verify(mockConsumer).initialize(eq(true), eq(LeaseAssignmentMode.DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT));
// test initialization from state machine
if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) {
if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) {
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// start worker stats and create gsi without waiting
verify(mockWorkerMetricsDAO).initialize();
@ -211,7 +211,7 @@ public class DynamicMigrationComponentsInitializerTest {
when(mockLamThreadPool.awaitTermination(anyLong(), any())).thenReturn(true);
when(mockWorkerMetricsScheduler.awaitTermination(anyLong(), any())).thenReturn(true);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.shutdown();
verify(mockLamThreadPool).shutdown();
@ -226,7 +226,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void initializationFails_WhenGsiIsNotActiveIn3_X() throws DependencyException {
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X);
// test initialization from state machine
assertThrows(
@ -236,7 +236,7 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void initializationDoesNotFail_WhenGsiIsNotActiveIn3_XWithRollback() throws DependencyException {
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
// test initialization from state machine
assertDoesNotThrow(
@ -245,11 +245,11 @@ public class DynamicMigrationComponentsInitializerTest {
@Test
public void testComponentsInitialization_AfterFlip() throws DependencyException {
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// Test flip
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
// verify
verify(mockLam).start();
@ -266,13 +266,13 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2X);
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_INIT);
// test roll-forward
reset(mockWorkerMetricsScheduler);
reset(mockLeaseRefresher);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2x);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2X);
// verify
verify(mockWorkerMetricsScheduler)
@ -288,11 +288,11 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT);
// test rollback before flip
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x);
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X);
// verify
verify(mockFuture).cancel(anyBoolean());
@ -305,11 +305,11 @@ public class DynamicMigrationComponentsInitializerTest {
.when(mockWorkerMetricsScheduler)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT);
// test rollback before flip
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
// verify
verify(mockFuture).cancel(anyBoolean());
@ -337,7 +337,7 @@ public class DynamicMigrationComponentsInitializerTest {
}
});
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT);
// run the worker stats reporting thread

View file

@ -60,10 +60,10 @@ public class ClientVersionChangeMonitorTest {
@ParameterizedTest
@CsvSource({
"CLIENT_VERSION_2x, CLIENT_VERSION_UPGRADE_FROM_2x",
"CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_2x",
"CLIENT_VERSION_UPGRADE_FROM_2x, CLIENT_VERSION_3x_WITH_ROLLBACK",
"CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_3x"
"CLIENT_VERSION_2X, CLIENT_VERSION_UPGRADE_FROM_2X",
"CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_2X",
"CLIENT_VERSION_UPGRADE_FROM_2X, CLIENT_VERSION_3X_WITH_ROLLBACK",
"CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_3X"
})
public void testMonitor(final ClientVersion currentClientVersion, final ClientVersion changedClientVersion)
throws Exception {
@ -104,7 +104,7 @@ public class ClientVersionChangeMonitorTest {
mockCoordinatorStateDAO,
mockScheduler,
mockCallback,
ClientVersion.CLIENT_VERSION_2x,
ClientVersion.CLIENT_VERSION_2X,
mockRandom);
monitorUnderTest.startMonitor();
@ -112,7 +112,7 @@ public class ClientVersionChangeMonitorTest {
verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject());
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER")
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER");
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER");
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
argumentCaptor.getValue().run();
@ -130,7 +130,7 @@ public class ClientVersionChangeMonitorTest {
mockCoordinatorStateDAO,
mockScheduler,
mockCallback,
ClientVersion.CLIENT_VERSION_2x,
ClientVersion.CLIENT_VERSION_2X,
mockRandom);
monitorUnderTest.startMonitor();
@ -138,7 +138,7 @@ public class ClientVersionChangeMonitorTest {
verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject());
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER")
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER");
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER");
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
doThrow(new InvalidStateException("test exception")).when(mockCallback).accept(any());

View file

@ -107,8 +107,8 @@ public class MigrationStateMachineTest {
@ParameterizedTest
@CsvSource({
"CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x, CLIENT_VERSION_UPGRADE_FROM_2x",
"CLIENT_VERSION_CONFIG_3x, CLIENT_VERSION_3x"
"CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, CLIENT_VERSION_UPGRADE_FROM_2X",
"CLIENT_VERSION_CONFIG_3X, CLIENT_VERSION_3X"
})
public void testStateMachineInitialization(
final ClientVersionConfig config, final ClientVersion expectedStateMachineState) throws Exception {
@ -118,7 +118,7 @@ public class MigrationStateMachineTest {
@Test
public void testMigrationReadyFlip() throws Exception {
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@ -130,7 +130,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollbackAfterFlip() throws Exception {
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@ -150,7 +150,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollForward() throws Exception {
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@ -177,7 +177,7 @@ public class MigrationStateMachineTest {
@Test
public void testRollbackBeforeFlip() throws Exception {
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@ -189,7 +189,7 @@ public class MigrationStateMachineTest {
@Test
public void successfulUpgradeAfterFlip() throws Exception {
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x);
stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
// After initialization, state machine should start to monitor for upgrade readiness
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@ -244,15 +244,15 @@ public class MigrationStateMachineTest {
// Invoke the monitor callbacks so the version flips to 3.x with rollback
migrationReadyMonitorRunnable.run();
Assertions.assertEquals(
ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK,
ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK,
stateCaptor.getValue().getClientVersion());
versionChangeMonitorRunnable.run();
Assertions.assertEquals(
ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion());
ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer)
.initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x));
.initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X));
log.info("TestLog ----------- flip done -------------");
}
@ -263,33 +263,33 @@ public class MigrationStateMachineTest {
: runnableCaptor.getAllValues().get(1);
final MigrationState state =
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate rollback before flip -------------");
versionChangeMonitorRunnable.run();
log.info("TestLog ----------- rollback before flip done -------------");
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x));
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X));
}
private void initiateAndTestRollBack(final Runnable rollbackMonitorRunnable) throws Exception {
final MigrationState state =
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID);
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate rollback -------------");
rollbackMonitorRunnable.run();
log.info("TestLog ----------- rollback done -------------");
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK));
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK));
}
private void initiateAndTestRollForward(final Runnable rollforwardMonitorRunnable) throws Exception {
final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID)
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID);
.update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
@ -297,20 +297,20 @@ public class MigrationStateMachineTest {
rollforwardMonitorRunnable.run();
log.info("TestLog ----------- roll-forward done -------------");
Assertions.assertEquals(
ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2x));
ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2X));
}
private void initiateAndTestSuccessfulUpgrade(final Runnable successfulUpgradeMonitor) throws Exception {
final MigrationState state =
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3x, WORKER_ID);
new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3X, WORKER_ID);
when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state);
reset(mockMigrationStateMachineThreadPool);
reset(mockInitializer);
log.info("TestLog ----------- Initiate successful upgrade -------------");
successfulUpgradeMonitor.run();
log.info("TestLog ----------- successful upgrade done -------------");
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK);
Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, stateMachineUnderTest.getCurrentClientVersion());
verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK);
}
}