Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Walid Baruni 2018-01-29 16:00:21 -08:00
commit dc42ef07c7
2 changed files with 19 additions and 5 deletions

View file

@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.261</aws-java-sdk.version> <aws-java-sdk.version>1.11.271</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>

View file

@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -804,12 +805,20 @@ public class RecordProcessorCheckpointerTest {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
IMetricsScope scope = null;
if (MetricsHelper.isMetricsScopePresent()) {
scope = MetricsHelper.getMetricsScope();
MetricsHelper.unsetMetricsScope();
}
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint(); processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory).createMetrics(); verify(metricsFactory).createMetrics();
Assert.assertFalse(MetricsHelper.isMetricsScopePresent()); Assert.assertFalse(MetricsHelper.isMetricsScopePresent());
if (scope != null) {
MetricsHelper.setMetricsScope(scope);
}
} }
@Test @Test
@ -817,15 +826,20 @@ public class RecordProcessorCheckpointerTest {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
NullMetricsScope scope = new NullMetricsScope(); boolean shouldUnset = false;
MetricsHelper.setMetricsScope(scope); if (!MetricsHelper.isMetricsScopePresent()) {
shouldUnset = true;
MetricsHelper.setMetricsScope(new NullMetricsScope());
}
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint(); processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory, never()).createMetrics(); verify(metricsFactory, never()).createMetrics();
Assert.assertTrue(MetricsHelper.isMetricsScopePresent()); Assert.assertTrue(MetricsHelper.isMetricsScopePresent());
assertEquals(scope, MetricsHelper.getMetricsScope()); assertEquals(NullMetricsScope.class, MetricsHelper.getMetricsScope().getClass());
if (shouldUnset) {
MetricsHelper.unsetMetricsScope(); MetricsHelper.unsetMetricsScope();
} }
} }
}