diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java index b8e227c4..d25bacc7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java @@ -185,8 +185,7 @@ public class KinesisShardConsumer implements IShardConsumer{ new SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), - metricsFactory), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 49c1b0e5..eb6e62ad 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -14,9 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,8 +50,6 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer private SequenceNumberValidator sequenceNumberValidator; private ExtendedSequenceNumber sequenceNumberAtShardEnd; - - private IMetricsFactory metricsFactory; /** * Only has package level access, since only the Amazon Kinesis Client Library should be creating these. @@ -64,12 +59,10 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer */ public RecordProcessorCheckpointer(ShardInfo shardInfo, ICheckpoint checkpoint, - SequenceNumberValidator validator, - IMetricsFactory metricsFactory) { + SequenceNumberValidator validator) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; - this.metricsFactory = metricsFactory; } /** @@ -290,33 +283,22 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer // just checkpoint at SHARD_END checkpointToRecord = ExtendedSequenceNumber.SHARD_END; } - - boolean unsetMetrics = false; + // Don't checkpoint a value we already successfully checkpointed - try { - if (!MetricsHelper.isMetricsScopePresent()) { - MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics())); - unsetMetrics = true; - } - if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken() - + " checkpoint to " + checkpointToRecord); - } - checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken()); - lastCheckpointValue = checkpointToRecord; - } catch (ThrottlingException | ShutdownException | InvalidStateException - | KinesisClientLibDependencyException e) { - throw e; - } catch (KinesisClientLibException e) { - LOG.warn("Caught exception setting checkpoint.", e); - throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e); + if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken() + + " checkpoint to " + checkpointToRecord); } - } - } finally { - if (unsetMetrics) { - MetricsHelper.unsetMetricsScope(); + checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken()); + lastCheckpointValue = checkpointToRecord; + } catch (ThrottlingException | ShutdownException | InvalidStateException + | KinesisClientLibDependencyException e) { + throw e; + } catch (KinesisClientLibException e) { + LOG.warn("Caught exception setting checkpoint.", e); + throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index e751c4b3..c051327e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1135,8 +1135,7 @@ public class Worker implements Runnable { new SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), - metricsFactory); + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null this.shardConsumerFactory = new KinesisShardConsumerFactory(); @@ -1526,7 +1525,7 @@ public class Worker implements Runnable { // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). if (leaseRenewer == null) { ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); - leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); + leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool, metricsFactory); } if (leaderDecider == null) { diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index c64acf99..116dc573 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -209,7 +209,7 @@ public class LeaseCoordinator { this(new LeaseTaker<>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime), - new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount)), + new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount), metricsFactory), leaseDurationMillis, epsilonMillis, maxLeasesForWorker, diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index eb3cf1ac..663e6aaf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,21 +57,24 @@ public class LeaseRenewer implements ILeaseRenewer { private final String workerIdentifier; private final long leaseDurationNanos; private final ExecutorService executorService; + private final IMetricsFactory metricsFactory; /** * Constructor. - * - * @param leaseManager LeaseManager to use - * @param workerIdentifier identifier of this worker + * + * @param leaseManager LeaseManager to use + * @param workerIdentifier identifier of this worker * @param leaseDurationMillis duration of a lease in milliseconds - * @param executorService ExecutorService to use for renewing leases in parallel + * @param executorService ExecutorService to use for renewing leases in parallel + * @param metricsFactory Factory to use for MetricsScope objects */ public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, - ExecutorService executorService) { + ExecutorService executorService, IMetricsFactory metricsFactory) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); this.executorService = executorService; + this.metricsFactory = metricsFactory; } /** @@ -292,6 +296,7 @@ public class LeaseRenewer implements ILeaseRenewer { return false; } + MetricsHelper.startScope(metricsFactory); long startTime = System.currentTimeMillis(); boolean success = false; try { @@ -333,6 +338,7 @@ public class LeaseRenewer implements ILeaseRenewer { } } finally { MetricsHelper.addSuccessAndLatency("UpdateLease", startTime, success, MetricsLevel.DETAILED); + MetricsHelper.endScope(); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java index afdce2d7..68e26c2b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java @@ -84,7 +84,7 @@ public class MetricsHelper { * * @return true if metrics scope is present, else returns false */ - public static boolean isMetricsScopePresent() { + private static boolean isMetricsScopePresent() { return currentScope.get() != null; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 0f708398..5f404e2c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -14,19 +14,15 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map.Entry; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -42,8 +38,6 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; @@ -59,7 +53,7 @@ public class RecordProcessorCheckpointerTest { private ShardInfo shardInfo; private SequenceNumberValidator sequenceNumberValidator; private String shardId = "shardId-123"; - + @Mock IMetricsFactory metricsFactory; @@ -92,7 +86,7 @@ public class RecordProcessorCheckpointerTest { public final void testCheckpoint() throws Exception { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, null); processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.checkpoint(); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); @@ -104,15 +98,15 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.checkpoint(); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); } - + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}. - */ + */ @Test public final void testCheckpointRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025"); @@ -120,7 +114,7 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.checkpoint(record); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } - + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}. @@ -128,7 +122,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointSubRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030"); @@ -137,7 +131,7 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.checkpoint(subRecord); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } - + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber)}. @@ -145,14 +139,14 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.checkpoint("5035"); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } - + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber, long subSequenceNumber)}. @@ -160,7 +154,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -174,7 +168,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testCheckpointAtShardEnd() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -191,7 +185,7 @@ public class RecordProcessorCheckpointerTest { public final void testPrepareCheckpoint() throws Exception { // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); @@ -222,7 +216,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025"); @@ -247,7 +241,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointSubRecord() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030"); @@ -273,7 +267,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -297,7 +291,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -320,7 +314,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testPrepareCheckpointAtShardEnd() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); @@ -344,7 +338,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040")); @@ -376,7 +370,7 @@ public class RecordProcessorCheckpointerTest { @Test public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040")); @@ -411,7 +405,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testUpdate() throws Exception { - RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); + RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); checkpointer.setLargestPermittedCheckpointValue(sequenceNumber); @@ -432,7 +426,7 @@ public class RecordProcessorCheckpointerTest { SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); // Several checkpoints we're gonna hit ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); @@ -481,7 +475,7 @@ public class RecordProcessorCheckpointerTest { } catch (IllegalArgumentException e) { } catch (NullPointerException e) { - + } Assert.assertEquals("Checkpoint value should not have changed", secondSequenceNumber, @@ -520,7 +514,7 @@ public class RecordProcessorCheckpointerTest { SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); // Several checkpoints we're gonna hit ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); @@ -648,7 +642,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER); } } @@ -668,7 +662,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER); } } @@ -689,7 +683,7 @@ public class RecordProcessorCheckpointerTest { for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER); } } @@ -838,47 +832,4 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); } } - - @Test - public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception { - // First call to checkpoint - RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); - IMetricsScope scope = null; - if (MetricsHelper.isMetricsScopePresent()) { - scope = MetricsHelper.getMetricsScope(); - MetricsHelper.unsetMetricsScope(); - } - ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); - processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); - processingCheckpointer.checkpoint(); - Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); - verify(metricsFactory).createMetrics(); - Assert.assertFalse(MetricsHelper.isMetricsScopePresent()); - if (scope != null) { - MetricsHelper.setMetricsScope(scope); - } - } - - @Test - public final void testSetMetricsScopeDuringCheckpointing() throws Exception { - // First call to checkpoint - RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); - boolean shouldUnset = false; - if (!MetricsHelper.isMetricsScopePresent()) { - shouldUnset = true; - MetricsHelper.setMetricsScope(new NullMetricsScope()); - } - ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019"); - processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber); - processingCheckpointer.checkpoint(); - Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); - verify(metricsFactory, never()).createMetrics(); - Assert.assertTrue(MetricsHelper.isMetricsScopePresent()); - assertEquals(NullMetricsScope.class, MetricsHelper.getMetricsScope().getClass()); - if (shouldUnset) { - MetricsHelper.unsetMetricsScope(); - } - } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 8936a28e..514421dc 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -408,8 +408,7 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ), - metricsFactory + ) ); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); @@ -642,8 +641,7 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ), - metricsFactory + ) ); KinesisShardConsumer consumer = @@ -791,8 +789,7 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ), - metricsFactory + ) ); KinesisShardConsumer consumer = @@ -926,8 +923,7 @@ public class ShardConsumerTest { streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() - ), - metricsFactory + ) ); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java index 534012d8..d6703571 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.leases.impl; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,7 +36,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { @Before public void setUp() { renewer = new LeaseRenewer( - leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool()); + leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool(), new NullMetricsFactory()); } @Test @@ -244,7 +245,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { builder.withLease(shardId, owner); Map leases = builder.build(); LeaseRenewer renewer =new LeaseRenewer( - leaseManager, owner, 30000L, Executors.newCachedThreadPool()); + leaseManager, owner, 30000L, Executors.newCachedThreadPool(), new NullMetricsFactory()); renewer.initialize(); Map heldLeases = renewer.getCurrentlyHeldLeases(); Assert.assertEquals(leases.size(), heldLeases.size()); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerTest.java index c7f7d3e4..b36ec61a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerTest.java @@ -21,6 +21,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -70,7 +71,7 @@ public class LeaseRenewerTest { renewer = new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, - Executors.newCachedThreadPool()); + Executors.newCachedThreadPool(), new NullMetricsFactory()); } @After