From 661e01825871626d520ac368c7b42ac8c26ef4bb Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Thu, 29 Aug 2019 16:08:46 +0300 Subject: [PATCH] refactor: logparser v5 funcs --- logparser/v5/main.go | 18 ++---------------- logparser/v5/passthrough.go | 6 +++--- logparser/v5/pipe/filter.go | 16 +++++++++------- logparser/v5/pipe/group.go | 14 +++++++++----- logparser/v5/pipe/jsonlog.go | 2 +- logparser/v5/pipe/jsonreport.go | 8 +++++--- logparser/v5/pipe/logcount.go | 7 ++++--- logparser/v5/pipe/pipe.go | 5 +---- logparser/v5/pipe/record.go | 6 +++++- logparser/v5/pipe/textreport.go | 19 +++++++++---------- 10 files changed, 48 insertions(+), 53 deletions(-) diff --git a/logparser/v5/main.go b/logparser/v5/main.go index c848dd2..9b3961a 100644 --- a/logparser/v5/main.go +++ b/logparser/v5/main.go @@ -10,8 +10,6 @@ package main import ( "log" "os" - - "github.com/inancgumus/learngo/logparser/v5/pipe" ) func main() { @@ -20,21 +18,9 @@ func main() { // pipe.FilterBy(pipe.DomainExtFilter("com", "io")), // pipe.GroupBy(pipe.DomainGrouper), // ) + p, err := fromFile(os.Args[1]) - p := pipe.New( - pipe.NewTextLog(os.Stdin), - // pipe.NewJSONLog(os.Stdin), - - pipe.NewTextReport(os.Stdout), - // pipe.NewJSONReport(os.Stdout), - - // pipe.FilterBy(pipe.DomainExtFilter("com", "io")), - // pipe.GroupBy(pipe.DomainGrouper), - - // new(passThrough), - ) - - if err := p.Run(); err != nil { + if err = p.Run(); err != nil { log.Fatalln(err) } } diff --git a/logparser/v5/passthrough.go b/logparser/v5/passthrough.go index 4261f73..535ffd0 100644 --- a/logparser/v5/passthrough.go +++ b/logparser/v5/passthrough.go @@ -21,11 +21,11 @@ func (t *passThrough) Consume(results pipe.Iterator) error { } func (t *passThrough) Each(yield func(pipe.Record) error) error { - - return t.Iterator.Each(func(r pipe.Record) error { + pass := func(r pipe.Record) error { // fmt.Println(r.Fields()) // fmt.Println(r.Int("visits")) return yield(r) - }) + } + return t.Iterator.Each(pass) } diff --git a/logparser/v5/pipe/filter.go b/logparser/v5/pipe/filter.go index ea25bea..97de99d 100644 --- a/logparser/v5/pipe/filter.go +++ b/logparser/v5/pipe/filter.go @@ -22,24 +22,26 @@ func FilterBy(fn ...FilterFunc) *Filter { return &Filter{filters: fn} } -// Consume saves the iterator for later processing. +// Consume the records for lazy filtering. func (f *Filter) Consume(records Iterator) error { f.src = records return nil } -// Each yields only the filtered records. +// Each filtered records. func (f *Filter) Each(yield func(Record) error) error { - return f.src.Each(func(r Record) error { - if !f.check(r) { + records := func(r Record) error { + if !f.checkAll(r) { return nil } return yield(r) - }) + } + + return f.src.Each(records) } -// check all the filters against the record. -func (f *Filter) check(r Record) bool { +// checkAll the filters against the record. +func (f *Filter) checkAll(r Record) bool { for _, fi := range f.filters { if !fi(r) { return false diff --git a/logparser/v5/pipe/group.go b/logparser/v5/pipe/group.go index e50beca..1a9ccc4 100644 --- a/logparser/v5/pipe/group.go +++ b/logparser/v5/pipe/group.go @@ -32,9 +32,9 @@ func GroupBy(key GroupFunc) *Group { } } -// Consume records for grouping. +// Consume the records for grouping. func (g *Group) Consume(records Iterator) error { - return records.Each(func(r Record) error { + group := func(r Record) error { k := g.key(r) if _, ok := g.sum[k]; !ok { @@ -44,15 +44,19 @@ func (g *Group) Consume(records Iterator) error { g.sum[k] = r.sum(g.sum[k]) return nil - }) + } + + return records.Each(group) } -// Each sorts and yields the grouped records. +// Each sends the grouped and sorted records to upstream. func (g *Group) Each(yield func(Record) error) error { sort.Strings(g.keys) for _, k := range g.keys { - if err := yield(Record{g.sum[k]}); err != nil { + err := yield(Record{g.sum[k]}) + + if err != nil { return err } } diff --git a/logparser/v5/pipe/jsonlog.go b/logparser/v5/pipe/jsonlog.go index d833a7d..7b2d427 100644 --- a/logparser/v5/pipe/jsonlog.go +++ b/logparser/v5/pipe/jsonlog.go @@ -22,7 +22,7 @@ func NewJSONLog(r io.Reader) *JSON { return &JSON{reader: r} } -// Each yields records from a json reader. +// Each sends the records from a reader to upstream. func (j *JSON) Each(yield func(Record) error) error { defer readClose(j.reader) diff --git a/logparser/v5/pipe/jsonreport.go b/logparser/v5/pipe/jsonreport.go index 26945d5..3e69743 100644 --- a/logparser/v5/pipe/jsonreport.go +++ b/logparser/v5/pipe/jsonreport.go @@ -22,11 +22,13 @@ func NewJSONReport(w io.Writer) *JSONReport { return &JSONReport{w: w} } -// Consume generates a JSON report. +// Consume the records and generate a JSON report. func (t *JSONReport) Consume(records Iterator) error { enc := json.NewEncoder(t.w) - return records.Each(func(r Record) error { + encode := func(r Record) error { return enc.Encode(&r) - }) + } + + return records.Each(encode) } diff --git a/logparser/v5/pipe/logcount.go b/logparser/v5/pipe/logcount.go index ddc56e6..617a220 100644 --- a/logparser/v5/pipe/logcount.go +++ b/logparser/v5/pipe/logcount.go @@ -18,16 +18,17 @@ type logCount struct { // Each yields to the inner iterator while counting the records. // Reports the record number on an error. func (lc *logCount) Each(yield func(Record) error) error { - err := lc.Iterator.Each(func(r Record) error { + count := func(r Record) error { lc.n++ return yield(r) - }) + } + + err := lc.Iterator.Each(count) if err != nil { // lc.n+1: iterator.each won't call yield on err return fmt.Errorf("record %d: %v", lc.n+1, err) } - return nil } diff --git a/logparser/v5/pipe/pipe.go b/logparser/v5/pipe/pipe.go index 5a02aa6..38974c7 100644 --- a/logparser/v5/pipe/pipe.go +++ b/logparser/v5/pipe/pipe.go @@ -7,12 +7,9 @@ package pipe -// YieldFunc yields a record from an Iterator to up-stream (Consumer). -type YieldFunc = func(Record) error - // Iterator yields a record. type Iterator interface { - Each(YieldFunc) error + Each(func(Record) error) error } // Consumer consumes records from an iterator. diff --git a/logparser/v5/pipe/record.go b/logparser/v5/pipe/record.go index 33a1119..cbca886 100644 --- a/logparser/v5/pipe/record.go +++ b/logparser/v5/pipe/record.go @@ -36,6 +36,7 @@ func (r record) sum(other record) record { // UnmarshalText to a *record. func (r *record) UnmarshalText(p []byte) (err error) { fields := strings.Fields(string(p)) + if len(fields) != fieldsLength { return fmt.Errorf("wrong number of fields %q", fields) } @@ -48,6 +49,7 @@ func (r *record) UnmarshalText(p []byte) (err error) { if r.uniques, err = parseStr("uniques", fields[3]); err != nil { return err } + return validate(*r) } @@ -60,12 +62,14 @@ func (r *record) UnmarshalJSON(data []byte) error { } *r = record{rj.Domain, rj.Page, rj.Visits, rj.Uniques} + return validate(*r) } // MarshalJSON of a *record. func (r *record) MarshalJSON() ([]byte, error) { - return json.Marshal(recordJSON{r.domain, r.page, r.visits, r.uniques}) + rj := recordJSON{r.domain, r.page, r.visits, r.uniques} + return json.Marshal(rj) } // parseStr helps UnmarshalText for string to positive int parsing. diff --git a/logparser/v5/pipe/textreport.go b/logparser/v5/pipe/textreport.go index 00ca37e..8b4389d 100644 --- a/logparser/v5/pipe/textreport.go +++ b/logparser/v5/pipe/textreport.go @@ -34,29 +34,28 @@ func NewTextReport(w io.Writer) *TextReport { func (t *TextReport) Consume(records Iterator) error { w := tabwriter.NewWriter(t.w, minWidth, tabWidth, padding, ' ', flags) - write := fmt.Fprintf - - write(w, "DOMAINS\tPAGES\tVISITS\tUNIQUES\n") - write(w, "-------\t-----\t------\t-------\n") + fmt.Fprintf(w, "DOMAINS\tPAGES\tVISITS\tUNIQUES\n") + fmt.Fprintf(w, "-------\t-----\t------\t-------\n") var total record - err := records.Each(func(r Record) error { + printLine := func(r Record) error { total = r.sum(total) - write(w, "%s\t%s\t%d\t%d\n", + fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", r.domain, r.page, r.visits, r.uniques, ) return nil - }) - if err != nil { + } + + if err := records.Each(printLine); err != nil { return err } - write(w, "\t\t\t\n") - write(w, "%s\t%s\t%d\t%d\n", "TOTAL", "", + fmt.Fprintf(w, "\t\t\t\n") + fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", "TOTAL", "", total.visits, total.uniques, )