use log4go output
This commit is contained in:
parent
5301dfe963
commit
02a9d5d173
5 changed files with 23 additions and 18 deletions
|
|
@ -1,7 +1,6 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -63,7 +62,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
shardInfo, err := ksis.GetShardIterator(args)
|
shardInfo, err := ksis.GetShardIterator(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("GetShardIterator ERROR: %v\n", err)
|
l4g.Critical("GetShardIterator ERROR: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
@ -108,7 +108,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
log.Printf("NextShardIterator ERROR: %v\n", err)
|
l4g.Error("NextShardIterator ERROR: %v", err)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,10 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
// Postgres package is used when sql.Open is called
|
// Postgres package is used when sql.Open is called
|
||||||
|
l4g "github.com/ezoic/sol/log4go"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -31,13 +31,15 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
l4g.Critical(err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(e.copyStatement(s3File))
|
_, err = db.Exec(e.copyStatement(s3File))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
l4g.Critical(err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Redshift load completed.\n")
|
fmt.Printf("Redshift load completed.\n")
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,13 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
l4g "github.com/ezoic/sol/log4go"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -35,7 +35,9 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
l4g.Critical(err)
|
||||||
|
os.Exit(1)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate file paths as strings
|
// Aggregate file paths as strings
|
||||||
|
|
@ -55,7 +57,8 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(c)
|
_, err = db.Exec(c)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
l4g.Critical(err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert file paths into File Names table
|
// Insert file paths into File Names table
|
||||||
|
|
@ -63,10 +66,11 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(i)
|
_, err = db.Exec(i)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
l4g.Critical(err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("[%v] copied to Redshift", manifestFileName)
|
l4g.Info("[%v] copied to Redshift", manifestFileName)
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,11 @@ package connector
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
l4g "github.com/ezoic/sol/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
||||||
|
|
@ -44,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("S3Put ERROR: %v\n", err)
|
l4g.Error("S3Put ERROR: %v", err)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
l4g.Info("[%v] records emitted to [%s]", b.NumRecordsInBuffer(), e.S3Bucket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
"github.com/ezoic/go-kinesis"
|
||||||
|
l4g "github.com/ezoic/sol/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
||||||
|
|
@ -30,8 +29,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err := e.Ksis.PutRecord(args)
|
_, err := e.Ksis.PutRecord(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("PutRecord ERROR: %v", err)
|
l4g.Error("PutRecord ERROR: %v", err)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
l4g.Info("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue