Adding more logging around the rejected task executions at the Scheduler and RxJava layer (#559)

* Add diagnostic events for logging visibility

* Refactor logging diagnostics into main Scheduler loop

* Refactor log timing and level and change privacies

* Revert ExecutorStateEvent to accept ExecutorService input type

* Minor style and messaging fixes

* Fix failing unit test

* Refactor diagnostic events to use factory for testing

* Fix constructor overloading for testing

* Refactor DiagnosticEventHandler to no args constructor
This commit is contained in:
Micah Jaffe 2019-06-28 10:02:57 -07:00 committed by Sahil Palvia
parent 6159b869ed
commit fa72cf1517
9 changed files with 524 additions and 2 deletions

View file

@ -0,0 +1,33 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
/**
* An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL.
*/
interface DiagnosticEvent {
/**
* DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors.
*
* @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked.
*/
void accept(DiagnosticEventHandler visitor);
/**
* The string to output to logs when a DiagnosticEvent occurs.
*/
String message();
}

View file

@ -0,0 +1,35 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.NoArgsConstructor;
import software.amazon.kinesis.leases.LeaseCoordinator;
import java.util.concurrent.ExecutorService;
/**
* Creates {@link DiagnosticEvent}s for logging and visibility
*/
@NoArgsConstructor
class DiagnosticEventFactory {
ExecutorStateEvent executorStateEvent(ExecutorService executorService, LeaseCoordinator leaseCoordinator) {
return new ExecutorStateEvent(executorService, leaseCoordinator);
}
RejectedTaskEvent rejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable t) {
return new RejectedTaskEvent(executorStateEvent, t);
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
/**
* An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit
* the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in
* {@link DiagnosticEventLogger}.
*/
interface DiagnosticEventHandler {
/**
* @param event Log or otherwise react to periodic pulses on the thread pool executor state.
*/
void visit(ExecutorStateEvent event);
/**
* @param event Log or otherwise react to rejected tasks in the RxJavaPlugin layer.
*/
void visit(RejectedTaskEvent event);
}

View file

@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
/**
* Internal implementation of {@link DiagnosticEventHandler} used by {@link Scheduler} to log executor state both
* 1) in normal conditions periodically, and 2) in reaction to rejected task exceptions.
*/
@NoArgsConstructor
@Slf4j
@KinesisClientInternalApi
class DiagnosticEventLogger implements DiagnosticEventHandler {
private static final long EXECUTOR_LOG_INTERVAL_MILLIS = 30000L;
private long nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS;
/**
* {@inheritDoc}
*
* Only log at info level every 30s to avoid over-logging, else log at debug level
*/
@Override
public void visit(ExecutorStateEvent event) {
if (System.currentTimeMillis() >= nextExecutorLogTime) {
log.info(event.message());
nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS;
} else {
log.debug(event.message());
}
}
/**
* {@inheritDoc}
*/
@Override
public void visit(RejectedTaskEvent event) {
log.error(event.message(), event.getThrowable());
}
}

View file

@ -0,0 +1,71 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.LeaseCoordinator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@Getter
@ToString(exclude = "isThreadPoolExecutor")
@Slf4j
@KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent {
private static final String MESSAGE = "Current thread pool executor state: ";
private boolean isThreadPoolExecutor;
private String executorName;
private int currentQueueSize;
private int activeThreads;
private int coreThreads;
private int leasesOwned;
private int largestPoolSize;
private int maximumPoolSize;
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true;
ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.executorName = ex.getClass().getSimpleName();
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
@Override
public void accept(DiagnosticEventHandler visitor) {
// logging is only meaningful for a ThreadPoolExecutor executor service (default config)
if (isThreadPoolExecutor) {
visitor.visit(this);
}
}
@Override
public String message() {
return MESSAGE + this.toString();
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@Getter
@ToString
@Slf4j
@KinesisClientInternalApi
class RejectedTaskEvent implements DiagnosticEvent {
private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " +
"Until next release, KCL will not be resilient to task rejections. ";
private ExecutorStateEvent executorStateEvent;
private Throwable throwable;
RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) {
this.executorStateEvent = executorStateEvent;
this.throwable = throwable;
}
@Override
public void accept(DiagnosticEventHandler visitor) {
visitor.visit(this);
}
@Override
public String message() {
return MESSAGE + executorStateEvent.message();
}
}

View file

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
@ -95,6 +96,8 @@ public class Scheduler implements Runnable {
// parent shards
private final long parentShardPollIntervalMillis;
private final ExecutorService executorService;
private final DiagnosticEventFactory diagnosticEventFactory;
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager;
@ -140,6 +143,23 @@ public class Scheduler implements Runnable {
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig) {
this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig,
processorConfig, retrievalConfig, new DiagnosticEventFactory());
}
/**
* Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility
* is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory.
*/
@VisibleForTesting
protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
@NonNull final CoordinatorConfig coordinatorConfig,
@NonNull final LeaseManagementConfig leaseManagementConfig,
@NonNull final LifecycleConfig lifecycleConfig,
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
@ -167,6 +187,8 @@ public class Scheduler implements Runnable {
this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
.createShardSyncTaskManager(this.metricsFactory);
@ -212,7 +234,7 @@ public class Scheduler implements Runnable {
try {
initialize();
log.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e) {
} catch (RuntimeException e) {
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown();
@ -226,8 +248,10 @@ public class Scheduler implements Runnable {
log.info("Worker loop is complete. Exiting from worker.");
}
private void initialize() {
@VisibleForTesting
void initialize() {
synchronized (lock) {
registerErrorHandlerForUndeliverableAsyncTaskExceptions();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
@ -305,6 +329,7 @@ public class Scheduler implements Runnable {
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
logExecutorState();
slog.info("Sleeping ...");
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (Exception e) {
@ -612,6 +637,25 @@ public class Scheduler implements Runnable {
}
}
/**
* Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions
* back to the KCL, as is done below.
*/
private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
RxJavaPlugins.setErrorHandler(t -> {
ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
leaseCoordinator);
RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t);
rejectedTaskEvent.accept(diagnosticEventHandler);
});
}
private void logExecutorState() {
ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
leaseCoordinator);
executorStateEvent.accept(diagnosticEventHandler);
}
/**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on

View file

@ -0,0 +1,167 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseBuilder;
import software.amazon.kinesis.leases.LeaseCoordinator;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class DiagnosticEventsTest {
@Mock
private ThreadPoolExecutor executor;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private DiagnosticEventHandler defaultHandler;
private DiagnosticEventHandler customHandler = new CustomHandler();
private boolean wasCustomHandlerInvoked;
private final Throwable throwable = new TestRejectedTaskException();
private final int activeThreadCount = 2;
private final int corePoolSize = 4;
private final int largestPoolSize = 8;
private final int maximumPoolSize = 16;
private SynchronousQueue<Runnable> executorQueue;
private Collection<Lease> leaseAssignments;
@Before
public void setup() {
wasCustomHandlerInvoked = false;
executorQueue = new SynchronousQueue<>();
final Lease lease = new LeaseBuilder().build();
leaseAssignments = Collections.singletonList(lease);
when(executor.getQueue()).thenReturn(executorQueue);
when(executor.getActiveCount()).thenReturn(activeThreadCount);
when(executor.getCorePoolSize()).thenReturn(corePoolSize);
when(executor.getLargestPoolSize()).thenReturn(largestPoolSize);
when(executor.getMaximumPoolSize()).thenReturn(maximumPoolSize);
when(leaseCoordinator.getAssignments()).thenReturn(leaseAssignments);
}
@Test
public void testExecutorStateEvent() {
ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator);
event.accept(defaultHandler);
assertEquals(event.getActiveThreads(), activeThreadCount);
assertEquals(event.getCoreThreads(), corePoolSize);
assertEquals(event.getLargestPoolSize(), largestPoolSize);
assertEquals(event.getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getCurrentQueueSize(),0);
verify(defaultHandler, times(1)).visit(event);
}
@Test
public void testExecutorStateEventWithCustomHandler() {
ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator);
event.accept(customHandler);
assertTrue(wasCustomHandlerInvoked);
}
@Test
public void testRejectedTaskEvent() {
ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator);
RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable);
event.accept(defaultHandler);
assertEquals(event.getExecutorStateEvent().getActiveThreads(), activeThreadCount);
assertEquals(event.getExecutorStateEvent().getCoreThreads(), corePoolSize);
assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0);
assertTrue(event.getThrowable() instanceof TestRejectedTaskException);
verify(defaultHandler, times(1)).visit(event);
}
@Test
public void testRejectedTaskEventWithCustomHandler() {
ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator);
RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable);
customHandler = new CustomHandler();
event.accept(customHandler);
assertTrue(wasCustomHandlerInvoked);
}
@Test
public void testDiagnosticEventFactory() {
DiagnosticEventFactory factory = new DiagnosticEventFactory();
ExecutorStateEvent executorStateEvent = factory.executorStateEvent(executor, leaseCoordinator);
assertEquals(executorStateEvent.getActiveThreads(), activeThreadCount);
assertEquals(executorStateEvent.getCoreThreads(), corePoolSize);
assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize);
assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize);
assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size());
assertEquals(executorStateEvent.getCurrentQueueSize(),0);
RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent,
new TestRejectedTaskException());
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getActiveThreads(), activeThreadCount);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCoreThreads(), corePoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0);
assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException);
}
private class TestRejectedTaskException extends Exception {
private TestRejectedTaskException() { super(); }
}
private class CustomHandler implements DiagnosticEventHandler {
@Override
public void visit(ExecutorStateEvent event) {
wasCustomHandlerInvoked = true;
}
@Override
public void visit(RejectedTaskEvent event) {
wasCustomHandlerInvoked = true;
}
}
}

View file

@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -35,7 +37,9 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -269,6 +273,38 @@ public class SchedulerTest {
verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
@Test
public void testErrorHandlerForUndeliverableAsyncTaskExceptions() {
DiagnosticEventFactory eventFactory = mock(DiagnosticEventFactory.class);
ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class);
RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class);
when(eventFactory.rejectedTaskEvent(any(), any())).thenReturn(rejectedTaskEvent);
when(eventFactory.executorStateEvent(any(), any())).thenReturn(executorStateEvent);
Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig,
lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory);
Scheduler schedulerSpy = spy(testScheduler);
// reject task on third loop
doCallRealMethod()
.doCallRealMethod()
.doAnswer(invocation -> {
// trigger rejected task in RxJava layer
RxJavaPlugins.onError(new RejectedExecutionException("Test exception."));
return null;
}).when(schedulerSpy).runProcessLoop();
// Scheduler sets error handler in initialize method
schedulerSpy.initialize();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
verify(eventFactory, times(1)).rejectedTaskEvent(eq(executorStateEvent), any());
verify(rejectedTaskEvent, times(1)).accept(any());
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10;