Added unit tests for IKinesisProxy injection in Worker Builder
This commit is contained in:
parent
35e32d42a2
commit
aa944c1706
2 changed files with 104 additions and 29 deletions
|
|
@ -14,6 +14,33 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import com.amazonaws.regions.Region;
|
||||||
|
import com.amazonaws.regions.RegionUtils;
|
||||||
|
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||||
|
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||||
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||||
|
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||||
|
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||||
|
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||||
|
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -32,33 +59,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import com.amazonaws.regions.Region;
|
|
||||||
import com.amazonaws.regions.RegionUtils;
|
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
|
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
|
||||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
|
||||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
|
|
||||||
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
|
|
||||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
|
||||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
|
||||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
|
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
|
||||||
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
|
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
|
||||||
|
|
@ -1063,6 +1063,7 @@ public class Worker implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Builder to construct a Worker instance.
|
* Builder to construct a Worker instance.
|
||||||
*/
|
*/
|
||||||
|
@Getter
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
||||||
private IRecordProcessorFactory recordProcessorFactory;
|
private IRecordProcessorFactory recordProcessorFactory;
|
||||||
|
|
@ -1198,7 +1199,7 @@ public class Worker implements Runnable {
|
||||||
* Set KinesisProxy for the worker.
|
* Set KinesisProxy for the worker.
|
||||||
*
|
*
|
||||||
* @param kinesisProxy
|
* @param kinesisProxy
|
||||||
* KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams
|
* Sets an implementation of IKinesisProxy.
|
||||||
*
|
*
|
||||||
* @return A reference to this updated object so that method calls can be chained together.
|
* @return A reference to this updated object so that method calls can be chained together.
|
||||||
*/
|
*/
|
||||||
|
|
@ -1267,7 +1268,7 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (metricsFactory == null) {
|
if (metricsFactory == null) {
|
||||||
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config);
|
||||||
}
|
}
|
||||||
if (shardPrioritization == null) {
|
if (shardPrioritization == null) {
|
||||||
shardPrioritization = new ParentsFirstShardPrioritization(1);
|
shardPrioritization = new ParentsFirstShardPrioritization(1);
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.isA;
|
import static org.hamcrest.CoreMatchers.isA;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
|
@ -41,6 +43,7 @@ import java.io.File;
|
||||||
import java.lang.Thread.State;
|
import java.lang.Thread.State;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
@ -63,6 +66,13 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
||||||
|
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||||
|
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||||
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
|
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
|
||||||
|
import com.amazonaws.services.kinesis.model.PutRecordResult;
|
||||||
|
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.hamcrest.Condition;
|
import org.hamcrest.Condition;
|
||||||
|
|
@ -1474,6 +1484,70 @@ public class WorkerTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWithDefaultKinesisProxy() {
|
||||||
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
|
||||||
|
Worker.Builder builder = new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.config(config);
|
||||||
|
builder.build();
|
||||||
|
assertNotNull(builder.getKinesisProxy());
|
||||||
|
assertTrue(builder.getKinesisProxy() instanceof KinesisProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWhenKinesisProxyIsSet() {
|
||||||
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
IKinesisProxy kinesisProxy = new MockKinesisProxy();
|
||||||
|
Worker.Builder builder = new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.kinesisProxy(kinesisProxy)
|
||||||
|
.config(config);
|
||||||
|
builder.build();
|
||||||
|
assertNotNull(builder.getKinesisProxy());
|
||||||
|
assertTrue(builder.getKinesisProxy() instanceof MockKinesisProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MockKinesisProxy implements IKinesisProxy {
|
||||||
|
@Override public GetRecordsResult get(String shardIterator, int maxRecords)
|
||||||
|
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Set<String> getAllShardIds() throws ResourceNotFoundException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public List<Shard> getShardList() throws ResourceNotFoundException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
|
||||||
|
throws ResourceNotFoundException, InvalidArgumentException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getIterator(String shardId, String iteratorEnum)
|
||||||
|
throws ResourceNotFoundException, InvalidArgumentException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getIterator(String shardId, Date timestamp)
|
||||||
|
throws ResourceNotFoundException, InvalidArgumentException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey,
|
||||||
|
String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private abstract class InjectableWorker extends Worker {
|
private abstract class InjectableWorker extends Worker {
|
||||||
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
||||||
KinesisClientLibConfiguration config, StreamConfig streamConfig,
|
KinesisClientLibConfiguration config, StreamConfig streamConfig,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue