Modify RecordProcessorCheckpointer#advancePosition Metrics usage to ensure proper closure (#1224)

This commit is contained in:
furq-aws 2024-02-23 10:22:20 -08:00 committed by GitHub
parent 2ab263f273
commit 48fab1fe71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 67 additions and 132 deletions

View file

@ -185,8 +185,7 @@ public class KinesisShardConsumer implements IShardConsumer{
new SequenceNumberValidator( new SequenceNumberValidator(
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())),
metricsFactory),
leaseCoordinator, leaseCoordinator,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,

View file

@ -14,9 +14,6 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -53,8 +50,6 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
private SequenceNumberValidator sequenceNumberValidator; private SequenceNumberValidator sequenceNumberValidator;
private ExtendedSequenceNumber sequenceNumberAtShardEnd; private ExtendedSequenceNumber sequenceNumberAtShardEnd;
private IMetricsFactory metricsFactory;
/** /**
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these. * Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
@ -64,12 +59,10 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
*/ */
public RecordProcessorCheckpointer(ShardInfo shardInfo, public RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint, ICheckpoint checkpoint,
SequenceNumberValidator validator, SequenceNumberValidator validator) {
IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.sequenceNumberValidator = validator; this.sequenceNumberValidator = validator;
this.metricsFactory = metricsFactory;
} }
/** /**
@ -290,33 +283,22 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
// just checkpoint at SHARD_END // just checkpoint at SHARD_END
checkpointToRecord = ExtendedSequenceNumber.SHARD_END; checkpointToRecord = ExtendedSequenceNumber.SHARD_END;
} }
boolean unsetMetrics = false;
// Don't checkpoint a value we already successfully checkpointed // Don't checkpoint a value we already successfully checkpointed
try { if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
if (!MetricsHelper.isMetricsScopePresent()) { try {
MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics())); if (LOG.isDebugEnabled()) {
unsetMetrics = true; LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
} + " checkpoint to " + checkpointToRecord);
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
+ " checkpoint to " + checkpointToRecord);
}
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
lastCheckpointValue = checkpointToRecord;
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
} }
} checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
} finally { lastCheckpointValue = checkpointToRecord;
if (unsetMetrics) { } catch (ThrottlingException | ShutdownException | InvalidStateException
MetricsHelper.unsetMetricsScope(); | KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
} }
} }
} }

View file

@ -1135,8 +1135,7 @@ public class Worker implements Runnable {
new SequenceNumberValidator( new SequenceNumberValidator(
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
metricsFactory);
if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null
this.shardConsumerFactory = new KinesisShardConsumerFactory(); this.shardConsumerFactory = new KinesisShardConsumerFactory();
@ -1526,7 +1525,7 @@ public class Worker implements Runnable {
// We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT).
if (leaseRenewer == null) { if (leaseRenewer == null) {
ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads());
leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool, metricsFactory);
} }
if (leaderDecider == null) { if (leaderDecider == null) {

View file

@ -209,7 +209,7 @@ public class LeaseCoordinator<T extends Lease> {
this(new LeaseTaker<>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis) this(new LeaseTaker<>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime), .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime),
new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount)), new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount), metricsFactory),
leaseDurationMillis, leaseDurationMillis,
epsilonMillis, epsilonMillis,
maxLeasesForWorker, maxLeasesForWorker,

View file

@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -56,21 +57,24 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
private final String workerIdentifier; private final String workerIdentifier;
private final long leaseDurationNanos; private final long leaseDurationNanos;
private final ExecutorService executorService; private final ExecutorService executorService;
private final IMetricsFactory metricsFactory;
/** /**
* Constructor. * Constructor.
* *
* @param leaseManager LeaseManager to use * @param leaseManager LeaseManager to use
* @param workerIdentifier identifier of this worker * @param workerIdentifier identifier of this worker
* @param leaseDurationMillis duration of a lease in milliseconds * @param leaseDurationMillis duration of a lease in milliseconds
* @param executorService ExecutorService to use for renewing leases in parallel * @param executorService ExecutorService to use for renewing leases in parallel
* @param metricsFactory Factory to use for MetricsScope objects
*/ */
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis, public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis,
ExecutorService executorService) { ExecutorService executorService, IMetricsFactory metricsFactory) {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier; this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
this.executorService = executorService; this.executorService = executorService;
this.metricsFactory = metricsFactory;
} }
/** /**
@ -292,6 +296,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
return false; return false;
} }
MetricsHelper.startScope(metricsFactory);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
@ -333,6 +338,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
} }
} finally { } finally {
MetricsHelper.addSuccessAndLatency("UpdateLease", startTime, success, MetricsLevel.DETAILED); MetricsHelper.addSuccessAndLatency("UpdateLease", startTime, success, MetricsLevel.DETAILED);
MetricsHelper.endScope();
} }
} }

View file

@ -84,7 +84,7 @@ public class MetricsHelper {
* *
* @return true if metrics scope is present, else returns false * @return true if metrics scope is present, else returns false
*/ */
public static boolean isMetricsScopePresent() { private static boolean isMetricsScopePresent() {
return currentScope.get() != null; return currentScope.get() != null;
} }

View file

@ -14,19 +14,15 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -42,8 +38,6 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
@ -59,7 +53,7 @@ public class RecordProcessorCheckpointerTest {
private ShardInfo shardInfo; private ShardInfo shardInfo;
private SequenceNumberValidator sequenceNumberValidator; private SequenceNumberValidator sequenceNumberValidator;
private String shardId = "shardId-123"; private String shardId = "shardId-123";
@Mock @Mock
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
@ -92,7 +86,7 @@ public class RecordProcessorCheckpointerTest {
public final void testCheckpoint() throws Exception { public final void testCheckpoint() throws Exception {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.checkpoint(); processingCheckpointer.checkpoint();
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
@ -104,15 +98,15 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.checkpoint(); processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}.
*/ */
@Test @Test
public final void testCheckpointRecord() throws Exception { public final void testCheckpointRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
Record record = new Record().withSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025");
@ -120,7 +114,7 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.checkpoint(record); processingCheckpointer.checkpoint(record);
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}.
@ -128,7 +122,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointSubRecord() throws Exception { public final void testCheckpointSubRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
Record record = new Record().withSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030");
@ -137,7 +131,7 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.checkpoint(subRecord); processingCheckpointer.checkpoint(subRecord);
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber)}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber)}.
@ -145,14 +139,14 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointSequenceNumber() throws Exception { public final void testCheckpointSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
processingCheckpointer.checkpoint("5035"); processingCheckpointer.checkpoint("5035");
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber, long subSequenceNumber)}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber, long subSequenceNumber)}.
@ -160,7 +154,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointExtendedSequenceNumber() throws Exception { public final void testCheckpointExtendedSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -174,7 +168,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testCheckpointAtShardEnd() throws Exception { public final void testCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -191,7 +185,7 @@ public class RecordProcessorCheckpointerTest {
public final void testPrepareCheckpoint() throws Exception { public final void testPrepareCheckpoint() throws Exception {
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001");
@ -222,7 +216,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointRecord() throws Exception { public final void testPrepareCheckpointRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
Record record = new Record().withSequenceNumber("5025"); Record record = new Record().withSequenceNumber("5025");
@ -247,7 +241,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointSubRecord() throws Exception { public final void testPrepareCheckpointSubRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
Record record = new Record().withSequenceNumber("5030"); Record record = new Record().withSequenceNumber("5030");
@ -273,7 +267,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointSequenceNumber() throws Exception { public final void testPrepareCheckpointSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -297,7 +291,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception { public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -320,7 +314,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testPrepareCheckpointAtShardEnd() throws Exception { public final void testPrepareCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
@ -344,7 +338,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception { public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040")); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040"));
@ -376,7 +370,7 @@ public class RecordProcessorCheckpointerTest {
@Test @Test
public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception { public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040")); processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040"));
@ -411,7 +405,7 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testUpdate() throws Exception { public final void testUpdate() throws Exception {
RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory); RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10");
checkpointer.setLargestPermittedCheckpointValue(sequenceNumber); checkpointer.setLargestPermittedCheckpointValue(sequenceNumber);
@ -432,7 +426,7 @@ public class RecordProcessorCheckpointerTest {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
// Several checkpoints we're gonna hit // Several checkpoints we're gonna hit
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
@ -481,7 +475,7 @@ public class RecordProcessorCheckpointerTest {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} catch (NullPointerException e) { } catch (NullPointerException e) {
} }
Assert.assertEquals("Checkpoint value should not have changed", Assert.assertEquals("Checkpoint value should not have changed",
secondSequenceNumber, secondSequenceNumber,
@ -520,7 +514,7 @@ public class RecordProcessorCheckpointerTest {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
// Several checkpoints we're gonna hit // Several checkpoints we're gonna hit
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
@ -648,7 +642,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER);
} }
} }
@ -668,7 +662,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER);
} }
} }
@ -689,7 +683,7 @@ public class RecordProcessorCheckpointerTest {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) { for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER); testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER);
} }
} }
@ -838,47 +832,4 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
} }
@Test
public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
IMetricsScope scope = null;
if (MetricsHelper.isMetricsScopePresent()) {
scope = MetricsHelper.getMetricsScope();
MetricsHelper.unsetMetricsScope();
}
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory).createMetrics();
Assert.assertFalse(MetricsHelper.isMetricsScopePresent());
if (scope != null) {
MetricsHelper.setMetricsScope(scope);
}
}
@Test
public final void testSetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
boolean shouldUnset = false;
if (!MetricsHelper.isMetricsScopePresent()) {
shouldUnset = true;
MetricsHelper.setMetricsScope(new NullMetricsScope());
}
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory, never()).createMetrics();
Assert.assertTrue(MetricsHelper.isMetricsScopePresent());
assertEquals(NullMetricsScope.class, MetricsHelper.getMetricsScope().getClass());
if (shouldUnset) {
MetricsHelper.unsetMetricsScope();
}
}
} }

View file

@ -408,8 +408,7 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
), )
metricsFactory
); );
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
@ -642,8 +641,7 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
), )
metricsFactory
); );
KinesisShardConsumer consumer = KinesisShardConsumer consumer =
@ -791,8 +789,7 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
), )
metricsFactory
); );
KinesisShardConsumer consumer = KinesisShardConsumer consumer =
@ -926,8 +923,7 @@ public class ShardConsumerTest {
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
), )
metricsFactory
); );
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -35,7 +36,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
@Before @Before
public void setUp() { public void setUp() {
renewer = new LeaseRenewer<KinesisClientLease>( renewer = new LeaseRenewer<KinesisClientLease>(
leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool()); leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool(), new NullMetricsFactory());
} }
@Test @Test
@ -244,7 +245,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
builder.withLease(shardId, owner); builder.withLease(shardId, owner);
Map<String, KinesisClientLease> leases = builder.build(); Map<String, KinesisClientLease> leases = builder.build();
LeaseRenewer<KinesisClientLease> renewer =new LeaseRenewer<KinesisClientLease>( LeaseRenewer<KinesisClientLease> renewer =new LeaseRenewer<KinesisClientLease>(
leaseManager, owner, 30000L, Executors.newCachedThreadPool()); leaseManager, owner, 30000L, Executors.newCachedThreadPool(), new NullMetricsFactory());
renewer.initialize(); renewer.initialize();
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases(); Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
Assert.assertEquals(leases.size(), heldLeases.size()); Assert.assertEquals(leases.size(), heldLeases.size());

View file

@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -70,7 +71,7 @@ public class LeaseRenewerTest {
renewer = new LeaseRenewer<>(leaseManager, renewer = new LeaseRenewer<>(leaseManager,
workerIdentifier, workerIdentifier,
leaseDurationMillis, leaseDurationMillis,
Executors.newCachedThreadPool()); Executors.newCachedThreadPool(), new NullMetricsFactory());
} }
@After @After