From 6acb51d66e9446b62690d367363624641b3c916a Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Fri, 8 Nov 2024 12:10:15 -0800 Subject: [PATCH] Fix shutdown sequence Fix a few issues in shutdown sequence and make sure scheduler shutdown without invoking run works Inorder for the main thread to shutdown, the scheduler's final shutdown still needs to be invoked, which only happens via the run method. --- amazon-kinesis-client/pom.xml | 5 +++++ .../coordinator/DynamicMigrationComponentsInitializer.java | 6 ++++++ .../MigrationClientVersion3xWithRollbackState.java | 2 +- .../coordinator/migration/MigrationStateMachineImpl.java | 2 +- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index a7eafd54..9e31537b 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -120,6 +120,11 @@ http-client-spi ${awssdk.version} + + software.amazon.awssdk + dynamodb-enhanced + ${awssdk.version} + software.amazon.glue diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java index c4aecdda..e9740cca 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java @@ -223,6 +223,9 @@ public final class DynamicMigrationComponentsInitializer { workerMetricsThreadPool.shutdown(); try { if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "LamThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); lamThreadPool.shutdownNow(); } } catch (final InterruptedException e) { @@ -232,6 +235,9 @@ public final class DynamicMigrationComponentsInitializer { try { if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); workerMetricsThreadPool.shutdownNow(); } } catch (final InterruptedException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java index 6235c5a9..912f0dc9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java @@ -87,7 +87,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien } @Override - public void leave() { + public synchronized void leave() { if (entered && !left) { log.info("Leaving {}", this); cancelRollbackMonitor(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java index 96e16a0f..ad744bfa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java @@ -126,7 +126,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine { if (!stateMachineThreadPool.isShutdown()) { stateMachineThreadPool.shutdown(); try { - if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.info( "StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down", THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS);