Revert "Added unit tests for IKinesisProxy injection in Worker Builder"

This reverts commit aa944c1706.
Reverting to undo changes to import ordering.
This commit is contained in:
parijas 2018-01-08 16:51:35 -08:00
parent aa944c1706
commit 72c77d3c1a
2 changed files with 29 additions and 104 deletions

View file

@ -14,33 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -59,6 +32,33 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
@ -1063,7 +1063,6 @@ public class Worker implements Runnable {
/**
* Builder to construct a Worker instance.
*/
@Getter
public static class Builder {
private IRecordProcessorFactory recordProcessorFactory;
@ -1199,7 +1198,7 @@ public class Worker implements Runnable {
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* Sets an implementation of IKinesisProxy.
* KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams
*
* @return A reference to this updated object so that method calls can be chained together.
*/
@ -1268,7 +1267,7 @@ public class Worker implements Runnable {
}
}
if (metricsFactory == null) {
metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config);
metricsFactory = getMetricsFactory(cloudWatchClient, config);
}
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);

View file

@ -19,8 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@ -43,7 +41,6 @@ import java.io.File;
import java.lang.Thread.State;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@ -66,13 +63,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition;
@ -1484,70 +1474,6 @@ public class WorkerTest {
}
@Test
public void testBuilderWithDefaultKinesisProxy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker.Builder builder = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config);
builder.build();
assertNotNull(builder.getKinesisProxy());
assertTrue(builder.getKinesisProxy() instanceof KinesisProxy);
}
@Test
public void testBuilderWhenKinesisProxyIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = new MockKinesisProxy();
Worker.Builder builder = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.kinesisProxy(kinesisProxy)
.config(config);
builder.build();
assertNotNull(builder.getKinesisProxy());
assertTrue(builder.getKinesisProxy() instanceof MockKinesisProxy);
}
private class MockKinesisProxy implements IKinesisProxy {
@Override public GetRecordsResult get(String shardIterator, int maxRecords)
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
return null;
}
@Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException {
return null;
}
@Override public Set<String> getAllShardIds() throws ResourceNotFoundException {
return null;
}
@Override public List<Shard> getShardList() throws ResourceNotFoundException {
return null;
}
@Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}
@Override public String getIterator(String shardId, String iteratorEnum)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}
@Override public String getIterator(String shardId, Date timestamp)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}
@Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey,
String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
return null;
}
}
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,