Added cache updating behavior for GetShard (#344)

* Added cache updating behavior for GetShard

Customer are occasionally seeing messages about being unable to
retrieve shard information, which is logged as a warning.  This change
will allow the shard map to be updated even when there is no re-shard
operation.

This now triggers a shard list update if there is 1000 cache misses,
or a cache miss occurs when the cache is more than 30 seconds old.
For Kinesis the updates will use ListShards, and for DynamoDB Streams
it will continue to use DescribeStream.

* Adjust some logging, and the zeroing of cache misses a bit

Only log about cache refresh if it's the thread doing the cache
refresh.  If after synchronizing the shard is present, accept that
someone else loaded the shard map, and move on.

If the cache was reloaded, and the shard was found the current thread
will reset the cache misses.

The warnings for the cache miss was using a modulo of 1000 which is
the maximum value for cache misses, so wasn't to useful.
This commit is contained in:
Justin Pfifer 2018-06-07 13:25:03 -07:00 committed by Sahil Palvia
parent a84885db79
commit 2483f8cbf8
2 changed files with 263 additions and 32 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -15,13 +15,19 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -50,7 +56,10 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
/**
* Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards).
@ -61,15 +70,27 @@ public class KinesisProxy implements IKinesisProxyExtended {
private static final EnumSet<ShardIteratorType> EXPECTED_ITERATOR_TYPES = EnumSet
.of(ShardIteratorType.AT_SEQUENCE_NUMBER, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
public static final int MAX_CACHE_MISSES_BEFORE_RELOAD = 1000;
public static final Duration CACHE_MAX_ALLOWED_AGE = Duration.of(30, ChronoUnit.SECONDS);
public static final int CACHE_MISS_WARNING_MODULUS = 250;
private static String defaultServiceName = "kinesis";
private static String defaultRegionId = "us-east-1";;
private AmazonKinesis client;
private AWSCredentialsProvider credentialsProvider;
private AtomicReference<List<Shard>> listOfShardsSinceLastGet = new AtomicReference<>();
private ShardIterationState shardIterationState = null;
@Setter(AccessLevel.PACKAGE)
private volatile Map<String, Shard> cachedShardMap = null;
@Setter(AccessLevel.PACKAGE)
@Getter(AccessLevel.PACKAGE)
private volatile Instant lastCacheUpdateTime = null;
@Setter(AccessLevel.PACKAGE)
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
private final String streamName;
private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
@ -333,19 +354,73 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/
@Override
public Shard getShard(String shardId) {
if (this.listOfShardsSinceLastGet.get() == null) {
//Update this.listOfShardsSinceLastGet as needed.
this.getShardList();
}
for (Shard shard : listOfShardsSinceLastGet.get()) {
if (shard.getShardId().equals(shardId)) {
return shard;
if (this.cachedShardMap == null) {
synchronized (this) {
if (this.cachedShardMap == null) {
this.getShardList();
}
}
}
LOG.warn("Cannot find the shard given the shardId " + shardId);
return null;
Shard shard = cachedShardMap.get(shardId);
if (shard == null) {
if (cacheMisses.incrementAndGet() > MAX_CACHE_MISSES_BEFORE_RELOAD || cacheNeedsTimeUpdate()) {
synchronized (this) {
shard = cachedShardMap.get(shardId);
//
// If after synchronizing we resolve the shard, it means someone else already got it for us.
//
if (shard == null) {
LOG.info("To many shard map cache misses or cache is out of date -- forcing a refresh");
this.getShardList();
shard = verifyAndLogShardAfterCacheUpdate(shardId);
cacheMisses.set(0);
} else {
//
// If someone else got us the shard go ahead and zero cache misses
//
cacheMisses.set(0);
}
}
}
}
if (shard == null) {
String message = "Cannot find the shard given the shardId " + shardId + ". Cache misses: " + cacheMisses;
if (cacheMisses.get() % CACHE_MISS_WARNING_MODULUS == 0) {
LOG.warn(message);
} else {
LOG.debug(message);
}
}
return shard;
}
private Shard verifyAndLogShardAfterCacheUpdate(String shardId) {
Shard shard = cachedShardMap.get(shardId);
if (shard == null) {
LOG.warn("Even after cache refresh shard '" + shardId + "' wasn't found. "
+ "This could indicate a bigger problem");
}
return shard;
}
private boolean cacheNeedsTimeUpdate() {
if (lastCacheUpdateTime == null) {
return true;
}
Instant now = Instant.now();
Duration cacheAge = Duration.between(lastCacheUpdateTime, now);
String baseMessage = "Shard map cache is " + cacheAge + " > " + CACHE_MAX_ALLOWED_AGE + ". ";
if (cacheAge.compareTo(CACHE_MAX_ALLOWED_AGE) > 0) {
LOG.info(baseMessage + "Age exceeds limit -- Refreshing.");
return true;
}
LOG.debug(baseMessage + "Age doesn't exceed limit.");
return false;
}
/**
@ -393,9 +468,12 @@ public class KinesisProxy implements IKinesisProxyExtended {
}
} while (response.getStreamDescription().isHasMoreShards());
}
this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
List<Shard> shards = shardIterationState.getShards();
this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity()));
this.lastCacheUpdateTime = Instant.now();
shardIterationState = new ShardIterationState();
return listOfShardsSinceLastGet.get();
return shards;
}
/**

View file

@ -1,5 +1,5 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -14,39 +14,39 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import lombok.AllArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -59,16 +59,25 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import lombok.AllArgsConstructor;
@RunWith(MockitoJUnitRunner.class)
public class KinesisProxyTest {
private static final String TEST_STRING = "TestString";
@ -77,6 +86,12 @@ public class KinesisProxyTest {
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
private static final int LIST_SHARDS_RETRY_TIMES = 3;
private static final String NEXT_TOKEN = "NextToken";
private static final String SHARD_1 = "shard-1";
private static final String SHARD_2 = "shard-2";
private static final String SHARD_3 = "shard-3";
private static final String SHARD_4 = "shard-4";
private static final String NOT_CACHED_SHARD = "ShardId-0005";
private static final String NEVER_PRESENT_SHARD = "ShardId-0010";
@Mock
private AmazonKinesis mockClient;
@ -96,6 +111,8 @@ public class KinesisProxyTest {
private Shard shard;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private ListShardsResult listShardsResult;
private KinesisProxy proxy;
private KinesisProxy ddbProxy;
@ -104,6 +121,10 @@ public class KinesisProxyTest {
// Test shards for verifying.
private Set<String> shardIdSet;
private List<Shard> shards;
private Map<String, Shard> shardMap;
private List<Shard> updatedShards;
private Map<String, Shard> updatedShardMap;
@Before
public void setUpTest() {
@ -120,11 +141,17 @@ public class KinesisProxyTest {
ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
// Set up test shards
List<String> shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4");
List<String> shardIds = Arrays.asList(SHARD_1, SHARD_2, SHARD_3, SHARD_4);
shardIdSet = new HashSet<>(shardIds);
shards = shardIds.stream().map(shardId -> new Shard().withShardId(shardId)).collect(Collectors.toList());
shardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity()));
updatedShards = new ArrayList<>(shards);
updatedShards.add(new Shard().withShardId(NOT_CACHED_SHARD));
updatedShardMap = updatedShards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity()));
}
@Test
@ -269,7 +296,7 @@ public class KinesisProxyTest {
verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2)));
}
@Test
public void testListShardsWithMoreDataAvailable() {
ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
@ -318,7 +345,7 @@ public class KinesisProxyTest {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class);
proxy.getShardList();
}
@Test
public void testStreamNotInCorrectStatus() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class);
@ -337,6 +364,132 @@ public class KinesisProxyTest {
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
@Test
public void testGetShardCacheEmpty() {
mockListShardsForSingleResponse(shards);
Shard shard = proxy.getShard(SHARD_1);
assertThat(shard.getShardId(), equalTo(SHARD_1));
verify(mockClient).listShards(any());
}
@Test
public void testGetShardCacheNotLoadingWhenCacheHit() {
proxy.setCachedShardMap(shardMap);
Shard shard = proxy.getShard(SHARD_1);
assertThat(shard, notNullValue());
assertThat(shard.getShardId(), equalTo(SHARD_1));
verify(mockClient, never()).listShards(any());
}
@Test
public void testGetShardCacheLoadAfterMaxMisses() {
proxy.setCachedShardMap(shardMap);
proxy.setCacheMisses(new AtomicInteger(KinesisProxy.MAX_CACHE_MISSES_BEFORE_RELOAD));
mockListShardsForSingleResponse(updatedShards);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, notNullValue());
assertThat(shard.getShardId(), equalTo(NOT_CACHED_SHARD));
assertThat(proxy.getCacheMisses().get(), equalTo(0));
verify(mockClient).listShards(any());
}
@Test
public void testGetShardCacheNonLoadBeforeMaxMisses() {
proxy.setCachedShardMap(shardMap);
proxy.setLastCacheUpdateTime(Instant.now());
proxy.setCacheMisses(new AtomicInteger(KinesisProxy.MAX_CACHE_MISSES_BEFORE_RELOAD - 1));
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, nullValue());
assertThat(proxy.getCacheMisses().get(), equalTo(KinesisProxy.MAX_CACHE_MISSES_BEFORE_RELOAD));
verify(mockClient, never()).listShards(any());
}
@Test
public void testGetShardCacheMissesResetsAfterLoad() {
proxy.setCachedShardMap(shardMap);
proxy.setLastCacheUpdateTime(Instant.now());
proxy.setCacheMisses(new AtomicInteger(KinesisProxy.MAX_CACHE_MISSES_BEFORE_RELOAD));
mockListShardsForSingleResponse(updatedShards);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, notNullValue());
assertThat(proxy.getCacheMisses().get(), equalTo(0));
verify(mockClient).listShards(any());
}
@Test
public void testGetShardCacheMissesResetsAfterLoadAfterMiss() {
proxy.setCachedShardMap(shardMap);
proxy.setCacheMisses(new AtomicInteger(KinesisProxy.MAX_CACHE_MISSES_BEFORE_RELOAD));
when(mockClient.listShards(any())).thenReturn(listShardsResult);
when(listShardsResult.getShards()).thenReturn(shards);
when(listShardsResult.getNextToken()).thenReturn(null);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, nullValue());
assertThat(proxy.getCacheMisses().get(), equalTo(0));
}
@Test
public void testGetShardCacheUpdatedFromAge() {
Instant lastUpdateTime = Instant.now().minus(KinesisProxy.CACHE_MAX_ALLOWED_AGE).minus(KinesisProxy.CACHE_MAX_ALLOWED_AGE);
proxy.setCachedShardMap(shardMap);
proxy.setLastCacheUpdateTime(lastUpdateTime);
mockListShardsForSingleResponse(updatedShards);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, notNullValue());
assertThat(shard.getShardId(), equalTo(NOT_CACHED_SHARD));
assertThat(proxy.getLastCacheUpdateTime(), not(equalTo(lastUpdateTime)));
verify(mockClient).listShards(any());
}
@Test
public void testGetShardCacheNotUpdatedIfNotOldEnough() {
Instant lastUpdateTime = Instant.now().minus(KinesisProxy.CACHE_MAX_ALLOWED_AGE.toMillis() / 2, ChronoUnit.MILLIS);
proxy.setCachedShardMap(shardMap);
proxy.setLastCacheUpdateTime(lastUpdateTime);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, nullValue());
assertThat(proxy.getLastCacheUpdateTime(), equalTo(lastUpdateTime));
verify(mockClient, never()).listShards(any());
}
@Test
public void testGetShardCacheAgeEmptyForcesUpdate() {
proxy.setCachedShardMap(shardMap);
mockListShardsForSingleResponse(updatedShards);
Shard shard = proxy.getShard(NOT_CACHED_SHARD);
assertThat(shard, notNullValue());
assertThat(shard.getShardId(), equalTo(NOT_CACHED_SHARD));
verify(mockClient).listShards(any());
}
private void mockListShardsForSingleResponse(List<Shard> shards) {
when(mockClient.listShards(any())).thenReturn(listShardsResult);
when(listShardsResult.getShards()).thenReturn(shards);
when(listShardsResult.getNextToken()).thenReturn(null);
}
private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) {
// Create stream description
StreamDescription description = new StreamDescription();
@ -407,11 +560,11 @@ public class KinesisProxyTest {
return startShardId.equals(shardId);
}
}
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
return new ListShardsRequestMatcher(null, null);
}
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
return new ListShardsRequestMatcher(null, nextToken);
}