refactor: logparser v5 funcs

This commit is contained in:
Inanc Gumus
2019-08-29 16:08:46 +03:00
parent d28c6d54d2
commit 661e018258
10 changed files with 48 additions and 53 deletions

View File

@ -10,8 +10,6 @@ package main
import ( import (
"log" "log"
"os" "os"
"github.com/inancgumus/learngo/logparser/v5/pipe"
) )
func main() { func main() {
@ -20,21 +18,9 @@ func main() {
// pipe.FilterBy(pipe.DomainExtFilter("com", "io")), // pipe.FilterBy(pipe.DomainExtFilter("com", "io")),
// pipe.GroupBy(pipe.DomainGrouper), // pipe.GroupBy(pipe.DomainGrouper),
// ) // )
p, err := fromFile(os.Args[1])
p := pipe.New( if err = p.Run(); err != nil {
pipe.NewTextLog(os.Stdin),
// pipe.NewJSONLog(os.Stdin),
pipe.NewTextReport(os.Stdout),
// pipe.NewJSONReport(os.Stdout),
// pipe.FilterBy(pipe.DomainExtFilter("com", "io")),
// pipe.GroupBy(pipe.DomainGrouper),
// new(passThrough),
)
if err := p.Run(); err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
} }

View File

@ -21,11 +21,11 @@ func (t *passThrough) Consume(results pipe.Iterator) error {
} }
func (t *passThrough) Each(yield func(pipe.Record) error) error { func (t *passThrough) Each(yield func(pipe.Record) error) error {
pass := func(r pipe.Record) error {
return t.Iterator.Each(func(r pipe.Record) error {
// fmt.Println(r.Fields()) // fmt.Println(r.Fields())
// fmt.Println(r.Int("visits")) // fmt.Println(r.Int("visits"))
return yield(r) return yield(r)
}) }
return t.Iterator.Each(pass)
} }

View File

@ -22,24 +22,26 @@ func FilterBy(fn ...FilterFunc) *Filter {
return &Filter{filters: fn} return &Filter{filters: fn}
} }
// Consume saves the iterator for later processing. // Consume the records for lazy filtering.
func (f *Filter) Consume(records Iterator) error { func (f *Filter) Consume(records Iterator) error {
f.src = records f.src = records
return nil return nil
} }
// Each yields only the filtered records. // Each filtered records.
func (f *Filter) Each(yield func(Record) error) error { func (f *Filter) Each(yield func(Record) error) error {
return f.src.Each(func(r Record) error { records := func(r Record) error {
if !f.check(r) { if !f.checkAll(r) {
return nil return nil
} }
return yield(r) return yield(r)
}) }
return f.src.Each(records)
} }
// check all the filters against the record. // checkAll the filters against the record.
func (f *Filter) check(r Record) bool { func (f *Filter) checkAll(r Record) bool {
for _, fi := range f.filters { for _, fi := range f.filters {
if !fi(r) { if !fi(r) {
return false return false

View File

@ -32,9 +32,9 @@ func GroupBy(key GroupFunc) *Group {
} }
} }
// Consume records for grouping. // Consume the records for grouping.
func (g *Group) Consume(records Iterator) error { func (g *Group) Consume(records Iterator) error {
return records.Each(func(r Record) error { group := func(r Record) error {
k := g.key(r) k := g.key(r)
if _, ok := g.sum[k]; !ok { if _, ok := g.sum[k]; !ok {
@ -44,15 +44,19 @@ func (g *Group) Consume(records Iterator) error {
g.sum[k] = r.sum(g.sum[k]) g.sum[k] = r.sum(g.sum[k])
return nil return nil
}) }
return records.Each(group)
} }
// Each sorts and yields the grouped records. // Each sends the grouped and sorted records to upstream.
func (g *Group) Each(yield func(Record) error) error { func (g *Group) Each(yield func(Record) error) error {
sort.Strings(g.keys) sort.Strings(g.keys)
for _, k := range g.keys { for _, k := range g.keys {
if err := yield(Record{g.sum[k]}); err != nil { err := yield(Record{g.sum[k]})
if err != nil {
return err return err
} }
} }

View File

@ -22,7 +22,7 @@ func NewJSONLog(r io.Reader) *JSON {
return &JSON{reader: r} return &JSON{reader: r}
} }
// Each yields records from a json reader. // Each sends the records from a reader to upstream.
func (j *JSON) Each(yield func(Record) error) error { func (j *JSON) Each(yield func(Record) error) error {
defer readClose(j.reader) defer readClose(j.reader)

View File

@ -22,11 +22,13 @@ func NewJSONReport(w io.Writer) *JSONReport {
return &JSONReport{w: w} return &JSONReport{w: w}
} }
// Consume generates a JSON report. // Consume the records and generate a JSON report.
func (t *JSONReport) Consume(records Iterator) error { func (t *JSONReport) Consume(records Iterator) error {
enc := json.NewEncoder(t.w) enc := json.NewEncoder(t.w)
return records.Each(func(r Record) error { encode := func(r Record) error {
return enc.Encode(&r) return enc.Encode(&r)
}) }
return records.Each(encode)
} }

View File

@ -18,16 +18,17 @@ type logCount struct {
// Each yields to the inner iterator while counting the records. // Each yields to the inner iterator while counting the records.
// Reports the record number on an error. // Reports the record number on an error.
func (lc *logCount) Each(yield func(Record) error) error { func (lc *logCount) Each(yield func(Record) error) error {
err := lc.Iterator.Each(func(r Record) error { count := func(r Record) error {
lc.n++ lc.n++
return yield(r) return yield(r)
}) }
err := lc.Iterator.Each(count)
if err != nil { if err != nil {
// lc.n+1: iterator.each won't call yield on err // lc.n+1: iterator.each won't call yield on err
return fmt.Errorf("record %d: %v", lc.n+1, err) return fmt.Errorf("record %d: %v", lc.n+1, err)
} }
return nil return nil
} }

View File

@ -7,12 +7,9 @@
package pipe package pipe
// YieldFunc yields a record from an Iterator to up-stream (Consumer).
type YieldFunc = func(Record) error
// Iterator yields a record. // Iterator yields a record.
type Iterator interface { type Iterator interface {
Each(YieldFunc) error Each(func(Record) error) error
} }
// Consumer consumes records from an iterator. // Consumer consumes records from an iterator.

View File

@ -36,6 +36,7 @@ func (r record) sum(other record) record {
// UnmarshalText to a *record. // UnmarshalText to a *record.
func (r *record) UnmarshalText(p []byte) (err error) { func (r *record) UnmarshalText(p []byte) (err error) {
fields := strings.Fields(string(p)) fields := strings.Fields(string(p))
if len(fields) != fieldsLength { if len(fields) != fieldsLength {
return fmt.Errorf("wrong number of fields %q", fields) return fmt.Errorf("wrong number of fields %q", fields)
} }
@ -48,6 +49,7 @@ func (r *record) UnmarshalText(p []byte) (err error) {
if r.uniques, err = parseStr("uniques", fields[3]); err != nil { if r.uniques, err = parseStr("uniques", fields[3]); err != nil {
return err return err
} }
return validate(*r) return validate(*r)
} }
@ -60,12 +62,14 @@ func (r *record) UnmarshalJSON(data []byte) error {
} }
*r = record{rj.Domain, rj.Page, rj.Visits, rj.Uniques} *r = record{rj.Domain, rj.Page, rj.Visits, rj.Uniques}
return validate(*r) return validate(*r)
} }
// MarshalJSON of a *record. // MarshalJSON of a *record.
func (r *record) MarshalJSON() ([]byte, error) { func (r *record) MarshalJSON() ([]byte, error) {
return json.Marshal(recordJSON{r.domain, r.page, r.visits, r.uniques}) rj := recordJSON{r.domain, r.page, r.visits, r.uniques}
return json.Marshal(rj)
} }
// parseStr helps UnmarshalText for string to positive int parsing. // parseStr helps UnmarshalText for string to positive int parsing.

View File

@ -34,29 +34,28 @@ func NewTextReport(w io.Writer) *TextReport {
func (t *TextReport) Consume(records Iterator) error { func (t *TextReport) Consume(records Iterator) error {
w := tabwriter.NewWriter(t.w, minWidth, tabWidth, padding, ' ', flags) w := tabwriter.NewWriter(t.w, minWidth, tabWidth, padding, ' ', flags)
write := fmt.Fprintf fmt.Fprintf(w, "DOMAINS\tPAGES\tVISITS\tUNIQUES\n")
fmt.Fprintf(w, "-------\t-----\t------\t-------\n")
write(w, "DOMAINS\tPAGES\tVISITS\tUNIQUES\n")
write(w, "-------\t-----\t------\t-------\n")
var total record var total record
err := records.Each(func(r Record) error { printLine := func(r Record) error {
total = r.sum(total) total = r.sum(total)
write(w, "%s\t%s\t%d\t%d\n", fmt.Fprintf(w, "%s\t%s\t%d\t%d\n",
r.domain, r.page, r.domain, r.page,
r.visits, r.uniques, r.visits, r.uniques,
) )
return nil return nil
}) }
if err != nil {
if err := records.Each(printLine); err != nil {
return err return err
} }
write(w, "\t\t\t\n") fmt.Fprintf(w, "\t\t\t\n")
write(w, "%s\t%s\t%d\t%d\n", "TOTAL", "", fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", "TOTAL", "",
total.visits, total.visits,
total.uniques, total.uniques,
) )