From f57a3326710d6f8fedf1a7c942f6505c5f51ce00 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 18 Mar 2020 11:56:20 -0700 Subject: [PATCH] Make periodic shard sync support multi streams --- .../coordinator/PeriodicShardSyncManager.java | 44 ++++++------ .../amazon/kinesis/coordinator/Scheduler.java | 56 ++++----------- .../kinesis/leases/ShardSyncTaskManager.java | 71 ++++++++++++++++--- .../kinesis/coordinator/SchedulerTest.java | 19 +++-- 4 files changed, 109 insertions(+), 81 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 7d166b98..5de09c37 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,16 +18,18 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; -import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; -import software.amazon.kinesis.metrics.MetricsFactory; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * The top level orchestrator for coordinating the periodic shard sync related @@ -42,21 +44,21 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; - private final ConsumerTask metricsEmittingShardSyncTask; + private final Map streamToShardSyncTaskManagerMap; private final ScheduledExecutorService shardSyncThreadPool; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap) { + this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor()); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap, + ScheduledExecutorService shardSyncThreadPool) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); - Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; - this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap; this.shardSyncThreadPool = shardSyncThreadPool; } @@ -82,17 +84,14 @@ class PeriodicShardSyncManager { * Does not schedule periodic shardSync * @return the result of the task */ - public synchronized TaskResult syncShardsOnce() { - - Exception lastException = null; - try { - if (!isRunning) { - runShardSync(); + public synchronized void syncShardsOnce() throws Exception { + for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); + if (taskResult.getException() != null) { + throw taskResult.getException(); } - } catch (Exception e) { - lastException = e; } - return new TaskResult(lastException); } public void stop() { @@ -107,10 +106,11 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - final TaskResult taskResult = metricsEmittingShardSyncTask.call(); - if (taskResult != null && taskResult.getException() != null) { - throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException()); + for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { + throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5edf9472..dc730441 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -56,9 +56,7 @@ import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; @@ -72,9 +70,7 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument; import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -85,19 +81,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; /** * @@ -136,7 +121,6 @@ public class Scheduler implements Runnable { private final LeaseCoordinator leaseCoordinator; private final Function shardSyncTaskManagerProvider; private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); - private final ShardSyncTaskManager shardSyncTaskManager; private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; @@ -278,7 +262,8 @@ public class Scheduler implements Runnable { // TODO : Halo : Check if this needs to be per stream. this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); - this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager(); + this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, streamToShardSyncTaskManagerMap); } /** @@ -320,25 +305,16 @@ public class Scheduler implements Runnable { log.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - TaskResult result; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - waitUntilLeaseTableIsReady(); - for(Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); - log.info("Syncing Kinesis shard info for " + streamIdentifier); - final StreamConfig streamConfig = streamConfigEntry.getValue(); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), - leaseRefresher, streamConfig.initialPositionInStreamExtended(), - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, - hierarchicalShardSyncer, metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); - // Throwing the exception, to prevent further syncs for other stream. - if (result.getException() != null) { - log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException()); - throw result.getException(); + if (!waitAndCheckIfLeaseTableIsReady()) { + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); + createOrGetShardSyncTaskManager(streamIdentifier); + log.info("Creating shard sync task for " + streamIdentifier); } + leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -353,9 +329,10 @@ public class Scheduler implements Runnable { log.info("LeaseCoordinator is already running. No need to start it."); } log.info("Scheduling periodicShardSync)"); - // leaderElectedPeriodicShardSyncManager.start(); + // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged - waitUntilHashRangeCovered(); + // TODO: Determine if waitUntilHashRangeCovered() is needed. + //waitUntilHashRangeCovered(); isDone = true; } catch (LeasingException e) { log.error("Caught exception when initializing LeaseCoordinator", e); @@ -382,7 +359,7 @@ public class Scheduler implements Runnable { } @VisibleForTesting - void waitUntilLeaseTableIsReady() throws InterruptedException, + boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; @@ -393,6 +370,7 @@ public class Scheduler implements Runnable { log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } + return !leaseRefresher.isLeaseTableEmpty(); } private void waitUntilHashRangeCovered() throws InterruptedException { @@ -725,14 +703,6 @@ public class Scheduler implements Runnable { argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } - private PeriodicShardSyncManager buildPeriodicShardSyncManager() { - final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(getStreamIdentifier), leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), - leaderDecider, shardSyncTask, metricsFactory); - } - /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. *

diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index f6db72e3..8c8e0464 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -14,9 +14,12 @@ */ package software.amazon.kinesis.leases; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -53,6 +56,10 @@ public class ShardSyncTaskManager { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + private ConsumerTask currentTask; + private CompletableFuture future; + private AtomicBoolean shardSyncRequestPending; + private final ReentrantLock lock; /** * Constructor. @@ -82,6 +89,8 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); this.metricsFactory = metricsFactory; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } /** @@ -110,16 +119,33 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.metricsFactory = metricsFactory; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } - private ConsumerTask currentTask; - private Future future; - - public synchronized boolean syncShardAndLeaseInfo() { - return checkAndSubmitNextTask(); + public TaskResult executeShardSyncTask() { + final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, + leaseRefresher, + initialPositionInStream, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIdleTimeMillis, + hierarchicalShardSyncer, + metricsFactory); + final ConsumerTask metricCollectingTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + return metricCollectingTask.call(); } - private synchronized boolean checkAndSubmitNextTask() { + public boolean syncShardAndLeaseInfo() { + try { + lock.lock(); + return checkAndSubmitNextTask(); + } finally { + lock.unlock(); + } + } + + private boolean checkAndSubmitNextTask() { boolean submittedNewTask = false; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { @@ -145,18 +171,45 @@ public class ShardSyncTaskManager { hierarchicalShardSyncer, metricsFactory), metricsFactory); - future = executorService.submit(currentTask); + future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); submittedNewTask = true; if (log.isDebugEnabled()) { log.debug("Submitted new {} task.", currentTask.taskType()); } } else { if (log.isDebugEnabled()) { - log.debug("Previous {} task still pending. Not submitting new task.", currentTask.taskType()); + log.debug("Previous {} task still pending. Not submitting new task. " + + "Enqueued a request that will be executed when the current request completes.", currentTask.taskType()); } + shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/); } - return submittedNewTask; } + private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) { + if (exception != null || taskResult.getException() != null) { + log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(); + } + } finally { + lock.unlock(); + } + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index eabe25cd..b14f3303 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -78,6 +78,7 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -175,6 +176,7 @@ public class SchedulerTest { when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, @@ -278,14 +280,14 @@ public class SchedulerTest { @Test public final void testInitializationFailureWithRetries() throws Exception { doNothing().when(leaseCoordinator).initialize(); - when(shardDetector.listShards()).thenThrow(new RuntimeException()); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException()); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.run(); - verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); + verify(dynamoDBLeaseRefresher, times(coordinatorConfig.maxInitializationAttempts())).isLeaseTableEmpty(); } @Test @@ -298,18 +300,20 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig); doNothing().when(leaseCoordinator).initialize(); - when(shardDetector.listShards()).thenThrow(new RuntimeException()); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException()); scheduler.run(); // verify initialization was retried for maxInitializationAttempts times - verify(shardDetector, times(maxInitializationAttempts)).listShards(); + verify(dynamoDBLeaseRefresher, times(maxInitializationAttempts)).isLeaseTableEmpty(); } @Test public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); + leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.initialize(); @@ -322,7 +326,7 @@ public class SchedulerTest { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, - workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false)); + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.initialize(); @@ -388,7 +392,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); long startTime = System.currentTimeMillis(); - scheduler.waitUntilLeaseTableIsReady(); + scheduler.waitAndCheckIfLeaseTableIsReady(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -407,7 +411,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); long startTime = System.currentTimeMillis(); - scheduler.waitUntilLeaseTableIsReady(); + scheduler.waitAndCheckIfLeaseTableIsReady(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -681,6 +685,7 @@ public class SchedulerTest { shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception"))