Expose the shard ID in the buffer (#30)
This commit is contained in:
parent
fedb6812fb
commit
c56cefb667
2 changed files with 7 additions and 0 deletions
|
|
@ -8,6 +8,7 @@ type Buffer struct {
|
||||||
records []*kinesis.Record
|
records []*kinesis.Record
|
||||||
firstSequenceNumber string
|
firstSequenceNumber string
|
||||||
lastSequenceNumber string
|
lastSequenceNumber string
|
||||||
|
shardID string
|
||||||
|
|
||||||
MaxRecordCount int
|
MaxRecordCount int
|
||||||
}
|
}
|
||||||
|
|
@ -51,3 +52,8 @@ func (b *Buffer) FirstSeq() string {
|
||||||
func (b *Buffer) LastSeq() string {
|
func (b *Buffer) LastSeq() string {
|
||||||
return b.lastSequenceNumber
|
return b.lastSequenceNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ShardID returns the shard ID watched by the consumer
|
||||||
|
func (b *Buffer) ShardID() string {
|
||||||
|
return b.shardID
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,7 @@ func (c *Consumer) Start(handler Handler) {
|
||||||
func (c *Consumer) handlerLoop(shardID string, handler Handler) {
|
func (c *Consumer) handlerLoop(shardID string, handler Handler) {
|
||||||
buf := &Buffer{
|
buf := &Buffer{
|
||||||
MaxRecordCount: c.BufferSize,
|
MaxRecordCount: c.BufferSize,
|
||||||
|
shardID: shardID,
|
||||||
}
|
}
|
||||||
|
|
||||||
params := &kinesis.GetShardIteratorInput{
|
params := &kinesis.GetShardIteratorInput{
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue