Adding testing to validate the MetricsScope setting during checkpoiniting.

This commit is contained in:
Sahil Palvia 2018-01-22 10:21:25 -08:00
parent 7aebbbd9a3
commit 57ea1f5488

View file

@ -14,12 +14,18 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -27,6 +33,7 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
@ -34,12 +41,10 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import org.mockito.runners.MockitoJUnitRunner;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Matchers.anyString;
/** /**
* *
@ -793,4 +798,34 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
} }
@Test
public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory).createMetrics();
Assert.assertFalse(MetricsHelper.isMetricsScopePresent());
}
@Test
public final void testSetMetricsScopeDuringCheckpointing() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
NullMetricsScope scope = new NullMetricsScope();
MetricsHelper.setMetricsScope(scope);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
processingCheckpointer.checkpoint();
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
verify(metricsFactory, never()).createMetrics();
Assert.assertTrue(MetricsHelper.isMetricsScopePresent());
assertEquals(scope, MetricsHelper.getMetricsScope());
MetricsHelper.unsetMetricsScope();
}
} }