diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java index 2f60671a..2c326182 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.types; +import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -206,7 +207,6 @@ public class UserRecord extends Record { for (Record r : records) { boolean isAggregated = true; - long subSeqNum = 0; ByteBuffer bb = r.getData(); if (bb.remaining() >= magic.length) { @@ -215,7 +215,8 @@ public class UserRecord extends Record { isAggregated = false; } - if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) { + if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) + || bb.remaining() <= DIGEST_SIZE) { isAggregated = false; } @@ -232,68 +233,12 @@ public class UserRecord extends Record { isAggregated = false; } else { try { - Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData); - List pks = ar.getPartitionKeyTableList(); - List ehks = ar.getExplicitHashKeyTableList(); - long aat = r.getApproximateArrivalTimestamp() == null - ? -1 : r.getApproximateArrivalTimestamp().getTime(); - try { - int recordsInCurrRecord = 0; - for (Messages.Record mr : ar.getRecordsList()) { - String explicitHashKey = null; - String partitionKey = pks.get((int) mr.getPartitionKeyIndex()); - if (mr.hasExplicitHashKeyIndex()) { - explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex()); - } - - BigInteger effectiveHashKey = explicitHashKey != null - ? new BigInteger(explicitHashKey) - : new BigInteger(1, md5(partitionKey.getBytes("UTF-8"))); - - if (effectiveHashKey.compareTo(startingHashKey) < 0 - || effectiveHashKey.compareTo(endingHashKey) > 0) { - for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) { - result.remove(result.size() - 1); - } - break; - } - - ++recordsInCurrRecord; - Record record = new Record() - .withData(ByteBuffer.wrap(mr.getData().toByteArray())) - .withPartitionKey(partitionKey) - .withSequenceNumber(r.getSequenceNumber()) - .withApproximateArrivalTimestamp(aat < 0 ? null : new Date(aat)); - result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey)); - } - } catch (Exception e) { - StringBuilder sb = new StringBuilder(); - sb.append("Unexpected exception during deaggregation, record was:\n"); - sb.append("PKS:\n"); - for (String s : pks) { - sb.append(s).append("\n"); - } - sb.append("EHKS: \n"); - for (String s : ehks) { - sb.append(s).append("\n"); - } - for (Messages.Record mr : ar.getRecordsList()) { - sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ") - .append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ") - .append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ") - .append("dataLen=").append(mr.getData().toByteArray().length).append("]\n"); - } - sb.append("Sequence number: ").append(r.getSequenceNumber()).append("\n") - .append("Raw data: ") - .append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n"); - LOG.error(sb.toString(), e); - } + deaggregateRecords(messageData, r, startingHashKey, endingHashKey); } catch (InvalidProtocolBufferException e) { isAggregated = false; } } } - if (!isAggregated) { bb.rewind(); result.add(new UserRecord(r)); @@ -301,5 +246,95 @@ public class UserRecord extends Record { } return result; } + + private static List deaggregateRecords( + byte[] messageData, + Record record, + BigInteger startingHashKey, + BigInteger endingHashKey + ) throws InvalidProtocolBufferException{ + Messages.AggregatedRecord aggregatedRecord = Messages.AggregatedRecord.parseFrom(messageData); + List results = new ArrayList<>(); + + Date arrivalTimeStamp = record.getApproximateArrivalTimestamp(); + String sequenceNumber = record.getSequenceNumber(); + + List partitionKeyTable = aggregatedRecord.getPartitionKeyTableList(); + List explicitHashKeyTable = aggregatedRecord.getExplicitHashKeyTableList(); + + try { + long subSeqNum = 0; + + for (Messages.Record subRecord : aggregatedRecord.getRecordsList()) { + String explicitHashKey = subRecord.hasExplicitHashKeyIndex() ? + explicitHashKeyTable.get((int) subRecord.getExplicitHashKeyIndex()) : null; + + String partitionKey = partitionKeyTable.get((int) subRecord.getPartitionKeyIndex()); + + + BigInteger effectiveHashKey = explicitHashKey != null + ? new BigInteger(explicitHashKey) + : new BigInteger(1, md5(partitionKey.getBytes("UTF-8"))); + + //if hash key is in invalid range, remove all records prior and exit loop + if (effectiveHashKey.compareTo(startingHashKey) < 0 + || effectiveHashKey.compareTo(endingHashKey) > 0) { + for (int toRemove = 0; toRemove < subSeqNum; ++toRemove) { + results.remove(results.size() - 1); + } + break; + } + + Record newRecord = new Record() + .withData(ByteBuffer.wrap(subRecord.getData().toByteArray())) + .withPartitionKey(partitionKey) + .withSequenceNumber(sequenceNumber) + .withApproximateArrivalTimestamp(arrivalTimeStamp); + results.add(new UserRecord(true, newRecord, subSeqNum++, explicitHashKey)); + } + } catch (UnsupportedEncodingException e) { + logUnsupportedEncodingException( + e, + partitionKeyTable, + explicitHashKeyTable, + aggregatedRecord, + record, + messageData + ); + return null; + } + return results; + } + + private static void logUnsupportedEncodingException( + UnsupportedEncodingException e, + List partitionKeyTable, + List explicitHashKeyTable, + Messages.AggregatedRecord aggregatedRecord, + Record record, + byte[] messageData + ) { + StringBuilder sb = new StringBuilder(); + sb.append("Unexpected exception during deaggregation, record was:\n"); + sb.append("PKS:\n"); + for (String s : partitionKeyTable) { + sb.append(s).append("\n"); + } + sb.append("EHKS: \n"); + for (String s : explicitHashKeyTable) { + sb.append(s).append("\n"); + } + for (Messages.Record mr : aggregatedRecord.getRecordsList()) { + sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ") + .append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ") + .append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ") + .append("dataLen=").append(mr.getData().toByteArray().length).append("]\n"); + } + sb.append("Sequence number: ").append(record.getSequenceNumber()).append("\n") + .append("Raw data: ") + .append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n"); + LOG.error(sb.toString(), e); + + } // CHECKSTYLE:ON NPathComplexity }