Address KCLv3 issues from github (#1398)

* Address KCLv3 issues reported on github
1. Fix transitive dependencies and add a maven plugin to catch these
   at build time
2. Remove the redundant shutdown of the leaseCoordinatorThreadPool
3. Fix typo THROUGHOUT_PUT_KBPS
4. Fix shutdown sequence - make sure
    scheduler shutdown without invoking run works
5. Fix backward compatibility check - Avoid flagging methods as deleted
    if it is marked synchronized. Also mark interfaces introduced in KCLv3 as internal.
This commit is contained in:
Aravinda Kidambi Srinivasan 2024-11-12 13:20:16 -07:00 committed by GitHub
parent 004f6d3ea0
commit b154acf7f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 139 additions and 13 deletions

View file

@ -58,14 +58,18 @@ is_non_public_class() {
return $?
}
# Ignore methods that change from abstract to non-abstract (and vice versa) if the class is an interface.
ignore_abstract_changes_in_interfaces() {
# Ignore methods that change from abstract to non-abstract (and vice-versa) if the class is an interface.\
# Ignore methods that change from synchronized to non-synchronized (and vice-versa)
ignore_non_breaking_changes() {
local current_class="$1"
local class_definition=$(javap -classpath "$LATEST_JAR" "$current_class" | head -2 | tail -1)
if [[ $class_definition == *"interface"* ]]
then
LATEST_METHODS=${LATEST_METHODS// abstract / }
CURRENT_METHODS=${CURRENT_METHODS// abstract / }
LATEST_METHODS=${LATEST_METHODS//abstract /}
CURRENT_METHODS=${CURRENT_METHODS//abstract /}
else
LATEST_METHODS=${LATEST_METHODS//synchronized /}
CURRENT_METHODS=${CURRENT_METHODS//synchronized /}
fi
}
@ -103,7 +107,7 @@ find_removed_methods() {
LATEST_METHODS=$(javap -classpath "$LATEST_JAR" "$class")
ignore_abstract_changes_in_interfaces "$class"
ignore_non_breaking_changes "$class"
local removed_methods=$(diff <(echo "$LATEST_METHODS") <(echo "$CURRENT_METHODS") | grep '^<')

View file

@ -90,6 +90,42 @@
<artifactId>netty-nio-client</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>arns</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>utils</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
@ -127,6 +163,36 @@
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.108.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>annotations</artifactId>
<version>2.25.64</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -153,6 +219,18 @@
</dependency>
<!-- Test -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${awssdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${awssdk.version}</version>
<scope>test</scope>
</dependency>
<!-- TODO: Migrate all tests to Junit5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
@ -180,12 +258,24 @@
<version>3.12.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<!-- Using older version to be compatible with Java 8 -->
<!-- https://mvnrepository.com/artifact/com.amazonaws/DynamoDBLocal -->
<dependency>
@ -464,6 +554,25 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>analyze-dependencies</id>
<phase>verify</phase>
<goals>
<goal>analyze-only</goal>
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<!-- Ignore Runtime/Provided/Test/System scopes for unused dependency analysis. -->
<ignoreNonCompile>true</ignoreNonCompile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

View file

@ -223,6 +223,9 @@ public final class DynamicMigrationComponentsInitializer {
workerMetricsThreadPool.shutdown();
try {
if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.info(
"LamThreadPool did not shutdown in {}s, forcefully shutting down",
SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
lamThreadPool.shutdownNow();
}
} catch (final InterruptedException e) {
@ -232,6 +235,9 @@ public final class DynamicMigrationComponentsInitializer {
try {
if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.info(
"WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down",
SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
workerMetricsThreadPool.shutdownNow();
}
} catch (final InterruptedException e) {

View file

@ -17,8 +17,10 @@ package software.amazon.kinesis.coordinator.assignment;
import java.util.List;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
@KinesisClientInternalApi
public interface LeaseAssignmentDecider {
/**

View file

@ -87,7 +87,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien
}
@Override
public void leave() {
public synchronized void leave() {
if (entered && !left) {
log.info("Leaving {}", this);
cancelRollbackMonitor();

View file

@ -14,11 +14,13 @@
*/
package software.amazon.kinesis.coordinator.migration;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
/**
* Interface of a state implementation for the MigrationStateMachine
*/
@KinesisClientInternalApi
public interface MigrationClientVersionState {
/**

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.coordinator.migration;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -28,6 +29,7 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
* 3. Instant roll-forwards - Once any issue has been mitigated, rollfowards are supported instantly
* with KCL Migration tool.
*/
@KinesisClientInternalApi
public interface MigrationStateMachine {
/**
* Initialize the state machine by identifying the initial state when the KCL worker comes up for the first time.

View file

@ -126,7 +126,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine {
if (!stateMachineThreadPool.isShutdown()) {
stateMachineThreadPool.shutdown();
try {
if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.info(
"StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down",
THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS);

View file

@ -17,10 +17,12 @@ package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@KinesisClientInternalApi
public interface LeaseDiscoverer {
/**
* Identifies the leases that are assigned to the current worker but are not being tracked and processed by the

View file

@ -383,7 +383,6 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
}
leaseRenewalThreadpool.shutdownNow();
leaseCoordinatorThreadPool.shutdownNow();
leaseGracefulShutdownHandler.stop();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();

View file

@ -54,7 +54,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
private static final String THROUGHOUT_PUT_KBPS = "throughputKBps";
private static final String THROUGHPUT_KBPS = "throughputKBps";
private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint";
static final String CHECKPOINT_OWNER = "checkpointOwner";
static final String LEASE_OWNER_KEY = "leaseOwner";
@ -113,7 +113,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
}
if (lease.throughputKBps() != null) {
result.put(THROUGHOUT_PUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps()));
result.put(THROUGHPUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps()));
}
if (lease.checkpointOwner() != null) {
@ -155,8 +155,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}
if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS) != null) {
leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS));
if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS) != null) {
leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS));
}
if (DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_OWNER) != null) {
@ -466,7 +466,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
.value(DynamoUtils.createAttributeValue(lease.throughputKBps()))
.action(AttributeAction.PUT)
.build();
result.put(THROUGHOUT_PUT_KBPS, avu);
result.put(THROUGHPUT_KBPS, avu);
return result;
}