Address comments

This commit is contained in:
Chunxue Yang 2020-04-22 15:00:10 -07:00
parent 93dac82bd6
commit cde09ca191
3 changed files with 18 additions and 17 deletions

View file

@ -53,7 +53,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_ID_KEY = "childShardIds";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
@Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -73,7 +73,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds()));
}
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.childShardIds()));
result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.childShardIds()));
}
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
@ -107,7 +107,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
);
leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_ID_KEY));
leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
leaseToUpdate.pendingCheckpoint(
@ -243,7 +243,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
return result;

View file

@ -52,6 +52,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
@ -195,14 +196,11 @@ public class ShutdownTask implements ConsumerTask {
private void updateLeasesForChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo));
Set<String> childShardIds = new HashSet<>();
for (ChildShard childShard : childShards) {
childShardIds.add(childShard.shardId());
}
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
final Lease upatedLease = currentLease.copy();
upatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(upatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo));
final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo));
}
/*

View file

@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@ -123,7 +124,8 @@ public class ShutdownTaskTest {
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease());
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call();
@ -164,7 +166,8 @@ public class ShutdownTaskTest {
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease());
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call();
@ -253,11 +256,11 @@ public class ShutdownTaskTest {
return childShards;
}
private Lease createLease() {
private Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
Lease lease = new Lease();
lease.leaseKey("shardId-0");
lease.leaseOwner("leaseOwner");
lease.parentShardIds(Collections.singleton("parentShardIds"));
lease.leaseKey(leaseKey);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(parentShardIds);
return lease;
}