From 8e8ee5af734a11b0899cfd8920bcd1650cd3c254 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Wed, 10 Dec 2014 15:38:19 -0800 Subject: [PATCH] Use golint to update Golang styles * Update comments and return statements * Adjust usage of Kinesis library (upgraded local source) --- all_pass_filter.go | 4 ++-- checkpoint.go | 2 +- filter.go | 7 ++----- pipeline.go | 7 +++++-- record_buffer.go | 19 +++++++++---------- record_buffer_test.go | 2 +- redis_checkpoint.go | 19 ++++++++++--------- redshift_emitter.go | 7 ++++--- s3_emitter.go | 9 +++++---- string_to_string_transformer.go | 3 +++ utils.go | 9 +++++---- 11 files changed, 47 insertions(+), 41 deletions(-) diff --git a/all_pass_filter.go b/all_pass_filter.go index 9e18285..f4252a2 100644 --- a/all_pass_filter.go +++ b/all_pass_filter.go @@ -1,9 +1,9 @@ package connector -// A basic implementation of the Filter interface that returns true for all records. +// AllPassFilter an implementation of the Filter interface that returns true for all records. type AllPassFilter struct{} -// Returns true for all records. +// KeepRecord returns true for all records. func (b *AllPassFilter) KeepRecord(r interface{}) bool { return true } diff --git a/checkpoint.go b/checkpoint.go index 110b9a0..62fd931 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -1,6 +1,6 @@ package connector -// Used by Pipeline.ProcessShard when they want to checkpoint their progress. +// Checkpoint is used by Pipeline.ProcessShard when they want to checkpoint their progress. // The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, // so they can checkpoint their progress. type Checkpoint interface { diff --git a/filter.go b/filter.go index 1b4b582..53f9be2 100644 --- a/filter.go +++ b/filter.go @@ -1,10 +1,7 @@ package connector -// The Filter is associated with an Buffer. The Buffer may use the result of calling the -// KeepRecord() method to decide whether to store a record or discard it. - -// A method enabling the buffer to filter records. Return false if you don't want to hold on to -// the record. +// Filter is an interface used for determinint whether to buffer records. +// Returns false if you don't want to hold on to the record. type Filter interface { KeepRecord(r interface{}) bool } diff --git a/pipeline.go b/pipeline.go index 4f960c7..445fac0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -7,7 +7,8 @@ import ( "github.com/sendgridlabs/go-kinesis" ) -// This struct is used by the main application to configure instances of the user's implemented pipline. +// Pipeline is used as a record processor to configure a pipline. +// // The user should implement this such that each method returns a configured implementation of each // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. @@ -20,6 +21,8 @@ type Pipeline struct { Transformer Transformer } +// ProcessShard kicks off the process of a Kinesis Shard. +// It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { args := kinesis.NewArgs() args.Add("ShardId", shardID) @@ -54,7 +57,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if len(recordSet.Records) > 0 { for _, v := range recordSet.Records { - data, err := v.GetData() + data := v.GetData() if err != nil { fmt.Printf("GetData ERROR: %v\n", err) diff --git a/record_buffer.go b/record_buffer.go index 52b2a05..a2a7dfd 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -1,8 +1,7 @@ package connector -// This struct is a basic implementation of the Buffer interface. It is a wrapper on a buffer of -// records that are periodically flushed. It is configured with an implementation of Filter that -// decides whether a record will be added to the buffer to be emitted. +// RecordBuffer is a basic implementation of the Buffer interface. +// It buffer's records and answers questions on when it should be periodically flushed. type RecordBuffer struct { NumRecordsToBuffer int @@ -12,7 +11,7 @@ type RecordBuffer struct { sequencesInBuffer []string } -// Adds a message to the buffer. +// ProcessRecord adds a message to the buffer. func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) { if len(b.sequencesInBuffer) == 0 { b.firstSequenceNumber = sequenceNumber @@ -26,17 +25,17 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) } } -// Returns the records in the buffer. +// Records returns the records in the buffer. func (b *RecordBuffer) Records() []interface{} { return b.recordsInBuffer } -// Returns the number of messages in the buffer. +// NumRecordsInBuffer returns the number of messages in the buffer. func (b RecordBuffer) NumRecordsInBuffer() int { return len(b.sequencesInBuffer) } -// Flushes the content in the buffer and resets the sequence counter. +// Flush empties the buffer and resets the sequence counter. func (b *RecordBuffer) Flush() { b.recordsInBuffer = b.recordsInBuffer[:0] b.sequencesInBuffer = b.sequencesInBuffer[:0] @@ -52,17 +51,17 @@ func (b *RecordBuffer) sequenceExists(sequenceNumber string) bool { return false } -// Determines if the buffer has reached its target size. +// ShouldFlush determines if the buffer has reached its target size. func (b *RecordBuffer) ShouldFlush() bool { return len(b.sequencesInBuffer) >= b.NumRecordsToBuffer } -// Returns the sequence number of the first message in the buffer. +// FirstSequenceNumber returns the sequence number of the first message in the buffer. func (b *RecordBuffer) FirstSequenceNumber() string { return b.firstSequenceNumber } -// Returns the sequence number of the last message in the buffer. +// LastSequenceNumber returns the sequence number of the last message in the buffer. func (b *RecordBuffer) LastSequenceNumber() string { return b.lastSequenceNumber } diff --git a/record_buffer_test.go b/record_buffer_test.go index f35723e..7840e4b 100644 --- a/record_buffer_test.go +++ b/record_buffer_test.go @@ -8,7 +8,7 @@ func (r TestRecord) ToDelimitedString() string { return "test" } -func (r TestRecord) ToJson() []byte { +func (r TestRecord) ToJSON() []byte { return []byte("test") } diff --git a/redis_checkpoint.go b/redis_checkpoint.go index 69e1747..2bf0271 100644 --- a/redis_checkpoint.go +++ b/redis_checkpoint.go @@ -6,8 +6,8 @@ import ( "github.com/hoisie/redis" ) -// A Redis implementation of the Checkpont interface. This class is used to enable the Pipeline.ProcessShard -// to checkpoint their progress. +// RedisCheckpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. type RedisCheckpoint struct { AppName string StreamName string @@ -16,32 +16,33 @@ type RedisCheckpoint struct { sequenceNumber string } -// Check whether a checkpoint for a particular Shard exists. Typically used to determine whether we should -// start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { val, _ := c.client.Get(c.key(shardID)) if val != nil && string(val) != "" { c.sequenceNumber = string(val) return true - } else { - return false } + + return false } -// Get the current checkpoint stored for the specified shard. +// SequenceNumber returns the current checkpoint stored for the specified shard. func (c *RedisCheckpoint) SequenceNumber() string { return c.sequenceNumber } -// Record a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { c.client.Set(c.key(shardID), []byte(sequenceNumber)) c.sequenceNumber = sequenceNumber } -// Generate a unique Redis key for storage of Checkpoint. +// key generates a unique Redis key for storage of Checkpoint. func (c *RedisCheckpoint) key(shardID string) string { return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) } diff --git a/redshift_emitter.go b/redshift_emitter.go index 88e4be8..7bd869b 100644 --- a/redshift_emitter.go +++ b/redshift_emitter.go @@ -7,10 +7,11 @@ import ( "log" "os" + // Postgres package is used when sql.Open is called _ "github.com/lib/pq" ) -// This struct is an implementation of Emitter that buffered batches of records into Redshift one by one. +// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. // It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered // data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. type RedshiftEmitter struct { @@ -21,8 +22,8 @@ type RedshiftEmitter struct { TableName string } -// Invoked when the buffer is full. This method leverages the S3Emitter and then issues a copy command to -// Redshift data store. +// Emit is invoked when the buffer is full. This method leverages the S3Emitter and +// then issues a copy command to Redshift data store. func (e RedshiftEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) diff --git a/s3_emitter.go b/s3_emitter.go index 6298ece..6b81aa4 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -9,8 +9,9 @@ import ( "github.com/crowdmob/goamz/s3" ) -// This implementation of Emitter is used to store files from a Kinesis stream in S3. The use of -// this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this +// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. +// +// The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this // struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated // from the first and last sequence numbers of the records contained in that file separated by a // dash. This struct requires the configuration of an S3 bucket and endpoint. @@ -18,14 +19,14 @@ type S3Emitter struct { S3Bucket string } -// Generates a file name based on the First and Last sequence numbers from the buffer. The current +// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current // UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { date := time.Now().UTC().Format("2006-01-02") return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) } -// Invoked when the buffer is full. This method emits the set of filtered records. +// Emit is invoked when the buffer is full. This method emits the set of filtered records. func (e S3Emitter) Emit(b Buffer, t Transformer) { auth, _ := aws.EnvAuth() s3Con := s3.New(auth, aws.USEast) diff --git a/string_to_string_transformer.go b/string_to_string_transformer.go index 9288895..bc67544 100644 --- a/string_to_string_transformer.go +++ b/string_to_string_transformer.go @@ -1,11 +1,14 @@ package connector +// StringToStringTransformer an implemenation of Transformer interface. type StringToStringTransformer struct{} +// ToRecord takes a byte array and returns a string. func (t StringToStringTransformer) ToRecord(data []byte) interface{} { return string(data) } +// FromRecord takes an string and returns a byte array. func (t StringToStringTransformer) FromRecord(s interface{}) []byte { return []byte(s.(string)) } diff --git a/utils.go b/utils.go index cb6be67..b1ee76b 100644 --- a/utils.go +++ b/utils.go @@ -46,7 +46,7 @@ func upcaseInitial(str string) string { return "" } -// Opens the file path and loads config values into the sturct. +// LoadConfig opens the file path and loads config values into the sturct. func LoadConfig(config interface{}, filename string) error { lines, err := readLines(filename) @@ -89,7 +89,8 @@ func LoadConfig(config interface{}, filename string) error { return nil } -// Creates a new Kinesis stream (uses existing stream if exists) and waits for it to become available. +// CreateAndWaitForStreamToBecomeAvailable creates a new Kinesis stream (uses +// existing stream if exists) and waits for it to become available. func CreateAndWaitForStreamToBecomeAvailable(k *kinesis.Kinesis, streamName string, shardCount int) { if !StreamExists(k, streamName) { err := k.CreateStream(streamName, shardCount) @@ -119,7 +120,7 @@ func CreateAndWaitForStreamToBecomeAvailable(k *kinesis.Kinesis, streamName stri } } -// Check if a Kinesis stream exists. +// StreamExists checks if a Kinesis stream exists. func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, _ := k.ListStreams(args) @@ -131,7 +132,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { return false } -// Delete a Kinesis stream. +// DeleteStream deletes a current Kinesis stream. func DeleteStream(k *kinesis.Kinesis, streamName string) { err := k.DeleteStream("test")