Fixing issue with NullMetrics warning messages (#284)

Fixes #48 

* Fixing issue with NullMetrics warning messages when trying to checkpoint on a separate thread.

* Adding testing to validate the MetricsScope setting during checkpoiniting.
This commit is contained in:
Sahil Palvia 2018-01-22 10:37:46 -08:00 committed by Justin Pfifer
parent 71124e4055
commit e65e56380b
5 changed files with 116 additions and 41 deletions

View file

@ -14,6 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -51,6 +54,8 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
private ExtendedSequenceNumber sequenceNumberAtShardEnd; private ExtendedSequenceNumber sequenceNumberAtShardEnd;
private IMetricsFactory metricsFactory;
/** /**
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these. * Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
* *
@ -59,10 +64,12 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*/ */
RecordProcessorCheckpointer(ShardInfo shardInfo, RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint, ICheckpoint checkpoint,
SequenceNumberValidator validator) { SequenceNumberValidator validator,
IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.sequenceNumberValidator = validator; this.sequenceNumberValidator = validator;
this.metricsFactory = metricsFactory;
} }
/** /**
@ -283,7 +290,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
// just checkpoint at SHARD_END // just checkpoint at SHARD_END
checkpointToRecord = ExtendedSequenceNumber.SHARD_END; checkpointToRecord = ExtendedSequenceNumber.SHARD_END;
} }
boolean unsetMetrics = false;
// Don't checkpoint a value we already successfully checkpointed // Don't checkpoint a value we already successfully checkpointed
try {
if (!MetricsHelper.isMetricsScopePresent()) {
MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics()));
unsetMetrics = true;
}
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
try { try {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -300,6 +314,11 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e); throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
} }
} }
} finally {
if (unsetMetrics) {
MetricsHelper.unsetMetricsScope();
}
}
} }
/** /**

View file

@ -170,7 +170,8 @@ class ShardConsumer {
new SequenceNumberValidator( new SequenceNumberValidator(
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory),
leaseManager, leaseManager,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,

View file

@ -72,13 +72,22 @@ public class MetricsHelper {
* @param scope * @param scope
*/ */
public static void setMetricsScope(IMetricsScope scope) { public static void setMetricsScope(IMetricsScope scope) {
if (currentScope.get() != null) { if (isMetricsScopePresent()) {
throw new RuntimeException(String.format( throw new RuntimeException(String.format(
"Metrics scope is already set for the current thread %s", Thread.currentThread().getName())); "Metrics scope is already set for the current thread %s", Thread.currentThread().getName()));
} }
currentScope.set(scope); currentScope.set(scope);
} }
/**
* Checks if current metricsscope is present or not.
*
* @return true if metrics scope is present, else returns false
*/
public static boolean isMetricsScopePresent() {
return currentScope.get() != null;
}
/** /**
* Unsets the metrics scope for the current thread. * Unsets the metrics scope for the current thread.
*/ */

View file

@ -14,6 +14,13 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -23,7 +30,10 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
@ -31,15 +41,15 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Matchers.anyString;
/** /**
* *
*/ */
@RunWith(MockitoJUnitRunner.class)
public class RecordProcessorCheckpointerTest { public class RecordProcessorCheckpointerTest {
private String startingSequenceNumber = "13"; private String startingSequenceNumber = "13";
private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber); private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber);
@ -49,6 +59,9 @@ public class RecordProcessorCheckpointerTest {
private SequenceNumberValidator sequenceNumberValidator; private SequenceNumberValidator sequenceNumberValidator;
private String shardId = "shardId-123"; private String shardId = "shardId-123";
@Mock
IMetricsFactory metricsFactory;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@ -78,7 +91,7 @@ public class RecordProcessorCheckpointerTest {
public final void testCheckpoint() throws Exception { public final void testCheckpoint() throws Exception {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null); new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.checkpoint(); processingCheckpointer.checkpoint();
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
@ -98,7 +111,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointRecord() throws Exception { public final void testCheckpointRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
Record record = new Record().withSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025");
@ -114,7 +127,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointSubRecord() throws Exception { public final void testCheckpointSubRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
Record record = new Record().withSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030");
@ -131,7 +144,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointSequenceNumber() throws Exception { public final void testCheckpointSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -146,7 +159,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointExtendedSequenceNumber() throws Exception { public final void testCheckpointExtendedSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -162,7 +175,7 @@ public class RecordProcessorCheckpointerTest {
public final void testPrepareCheckpoint() throws Exception { public final void testPrepareCheckpoint() throws Exception {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001");
@ -193,7 +206,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointRecord() throws Exception { public final void testPrepareCheckpointRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
Record record = new Record().withSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025");
@ -218,7 +231,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointSubRecord() throws Exception { public final void testPrepareCheckpointSubRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
Record record = new Record().withSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030");
@ -244,7 +257,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointSequenceNumber() throws Exception { public final void testPrepareCheckpointSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -268,7 +281,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception { public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -291,7 +304,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception { public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040")); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040"));
@ -323,7 +336,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception { public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040")); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040"));
@ -358,7 +371,7 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testUpdate() throws Exception { public final void testUpdate() throws Exception {
RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10");
checkpointer.setLargestPermittedCheckpointValue(sequenceNumber); checkpointer.setLargestPermittedCheckpointValue(sequenceNumber);
@ -379,7 +392,7 @@ public class RecordProcessorCheckpointerTest {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
// Several checkpoints we're gonna hit // Several checkpoints we're gonna hit
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
@ -467,7 +480,7 @@ public class RecordProcessorCheckpointerTest {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
// Several checkpoints we're gonna hit // Several checkpoints we're gonna hit
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
@ -595,7 +608,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER);
} }
} }
@ -615,7 +628,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER);
} }
} }
@ -636,7 +649,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER);
} }
} }
@ -785,4 +798,34 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
} }
@Test
public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory).createMetrics();
Assert.assertFalse(MetricsHelper.isMetricsScopePresent());
}
@Test
public final void testSetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
NullMetricsScope scope = new NullMetricsScope();
MetricsHelper.setMetricsScope(scope);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory, never()).createMetrics();
Assert.assertTrue(MetricsHelper.isMetricsScopePresent());
assertEquals(scope, MetricsHelper.getMetricsScope());
MetricsHelper.unsetMetricsScope();
}
} }

View file

@ -342,7 +342,8 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
) ),
metricsFactory
); );
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
@ -493,7 +494,8 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
) ),
metricsFactory
); );
ShardConsumer consumer = ShardConsumer consumer =
@ -621,7 +623,8 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
) ),
metricsFactory
); );
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);