Fixing updateParentShardupdateCurrentLeaseWithChildShards method

This commit is contained in:
Chunxue Yang 2020-07-09 16:03:49 -07:00
parent 849ffea028
commit 995511afe5
3 changed files with 9 additions and 10 deletions

View file

@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
@ -180,8 +181,8 @@ class PeriodicShardSyncManager {
try {
final ShardSyncResponse shardSyncResponse = checkForShardSync();
MetricsHelper.addSuccessAndLatency(runStartMillis, shardSyncResponse.shouldDoShardSync(), MetricsLevel.SUMMARY);
MetricsHelper.addSuccessAndLatency(runStartMillis, shardSyncResponse.isHoleDetected(), MetricsLevel.SUMMARY);
MetricsHelper.getMetricsScope().addData("ShouldDoShardSync", shardSyncResponse.shouldDoShardSync() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
MetricsHelper.getMetricsScope().addData("HashRangeHoleDetected", shardSyncResponse.isHoleDetected() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
if (shardSyncResponse.shouldDoShardSync()) {
LOG.info("Periodic shard syncer initiating shard sync due to the reason - " +
shardSyncResponse.reasonForDecision());

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
@ -210,10 +211,7 @@ class ShutdownTask implements ITask {
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
currentLease.setChildShardIds(childShardIds);
final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken()));
if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId());
}
leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS);
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
}

View file

@ -34,6 +34,7 @@ import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@ -110,7 +111,6 @@ public class ShutdownTaskTest {
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
}
/**
@ -208,7 +208,7 @@ public class ShutdownTaskTest {
constructChildShards());
TaskResult result = task.call();
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class));
verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
}
@ -241,7 +241,7 @@ public class ShutdownTaskTest {
Collections.emptyList());
TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
}
@ -270,7 +270,7 @@ public class ShutdownTaskTest {
Collections.emptyList());
TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
}