diff --git a/consumer.go b/consumer.go index 396c345..08a8b67 100644 --- a/consumer.go +++ b/consumer.go @@ -10,7 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -const maxBufferSize = 1000 +const maxBufferSize = 400 func NewConsumer(appName, streamName string) *Consumer { svc := kinesis.New(session.New()) @@ -99,12 +99,12 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { if len(resp.Records) > 0 { for _, r := range resp.Records { b.AddRecord(r) - } - if b.ShouldFlush() { - handler.HandleRecords(*b) - checkpoint.SetCheckpoint(shardID, b.LastSeq()) - b.Flush() + if b.ShouldFlush() { + handler.HandleRecords(*b) + checkpoint.SetCheckpoint(shardID, b.LastSeq()) + b.Flush() + } } } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { logger.Log("fatal", "nextShardIterator", "msg", err.Error()) diff --git a/examples/firehose/main.go b/examples/firehose/main.go new file mode 100644 index 0000000..90d06fd --- /dev/null +++ b/examples/firehose/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/harlow/kinesis-connectors" +) + +var ( + app = flag.String("a", "", "App name") + stream = flag.String("s", "", "Kinesis stream name") + delivery = flag.String("f", "", "Firehose delivery name") +) + +func convertToFirehoseRecrods(kRecs []*kinesis.Record) []*firehose.Record { + fhRecs := []*firehose.Record{} + for _, kr := range kRecs { + fr := &firehose.Record{Data: kr.Data} + fhRecs = append(fhRecs, fr) + } + return fhRecs +} + +func main() { + flag.Parse() + svc := firehose.New(session.New()) + + c := connector.NewConsumer(*app, *stream) + c.Start(connector.HandlerFunc(func(b connector.Buffer) { + params := &firehose.PutRecordBatchInput{ + DeliveryStreamName: aws.String(*delivery), + Records: convertToFirehoseRecrods(b.GetRecords()), + } + + _, err := svc.PutRecordBatch(params) + + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + + fmt.Println("Put records to Firehose") + })) + + select {} // run forever +}