add: new pipeline to log parser
This commit is contained in:
25
27-functional-programming/log-parser-exp/Makefile
Normal file
25
27-functional-programming/log-parser-exp/Makefile
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
SHELL := /bin/bash
|
||||||
|
LINES = echo -e ">> log.txt has $$(wc -l log.txt | cut -f1 -d' ') lines"
|
||||||
|
|
||||||
|
ifeq ($(n),)
|
||||||
|
n := 18
|
||||||
|
endif
|
||||||
|
|
||||||
|
s:
|
||||||
|
time go run . < log.txt
|
||||||
|
|
||||||
|
r:
|
||||||
|
go run . < log.txt
|
||||||
|
|
||||||
|
# make n=18
|
||||||
|
load: restore
|
||||||
|
@echo "enlarging the file with itself, please wait..."
|
||||||
|
@for i in {1..$(n)}; do awk 1 log.txt log.txt > log_.txt; mv log_.txt log.txt; rm -f log_.txt; done
|
||||||
|
@$(LINES)
|
||||||
|
|
||||||
|
restore:
|
||||||
|
@$(LINES)
|
||||||
|
git checkout log.txt
|
||||||
|
|
||||||
|
lines:
|
||||||
|
@$(LINES)
|
@ -1,17 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
type filterFunc func(result) (include bool)
|
|
||||||
|
|
||||||
func filterBy(results []result, filterer filterFunc) []result {
|
|
||||||
out := results[:0]
|
|
||||||
|
|
||||||
for _, r := range results {
|
|
||||||
if !filterer(r) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
out = append(out, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
|
@ -6,13 +6,13 @@ func noopFilter(r result) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func notUsing(filter filterFunc) filterFunc {
|
func notUsing(filter filterFn) filterFn {
|
||||||
return func(r result) bool {
|
return func(r result) bool {
|
||||||
return !filter(r)
|
return !filter(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func domainExtFilter(domains ...string) filterFunc {
|
func domainExtFilter(domains ...string) filterFn {
|
||||||
return func(r result) bool {
|
return func(r result) bool {
|
||||||
for _, domain := range domains {
|
for _, domain := range domains {
|
||||||
if strings.HasSuffix(r.domain, "."+domain) {
|
if strings.HasSuffix(r.domain, "."+domain) {
|
||||||
@ -23,7 +23,7 @@ func domainExtFilter(domains ...string) filterFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func domainFilter(domain string) filterFunc {
|
func domainFilter(domain string) filterFn {
|
||||||
return func(r result) bool {
|
return func(r result) bool {
|
||||||
return strings.Contains(r.domain, domain)
|
return strings.Contains(r.domain, domain)
|
||||||
}
|
}
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
type groupFunc func(result) (key string)
|
|
||||||
|
|
||||||
func groupBy(results []result, keyer groupFunc) []result {
|
|
||||||
grouped := make(map[string]result, len(results))
|
|
||||||
|
|
||||||
for _, cur := range results {
|
|
||||||
key := keyer(cur)
|
|
||||||
grouped[key] = cur.add(grouped[key])
|
|
||||||
}
|
|
||||||
|
|
||||||
out := results[:0]
|
|
||||||
for _, r := range grouped {
|
|
||||||
out = append(out, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
|
@ -14,7 +14,7 @@ import (
|
|||||||
func main() {
|
func main() {
|
||||||
defer recoverErr()
|
defer recoverErr()
|
||||||
|
|
||||||
_, err := newReport().
|
_, err := newPipeline().
|
||||||
// from(fastTextReader(os.Stdin)).
|
// from(fastTextReader(os.Stdin)).
|
||||||
filterBy(notUsing(domainExtFilter("com", "io"))).
|
filterBy(notUsing(domainExtFilter("com", "io"))).
|
||||||
groupBy(domainGrouper).
|
groupBy(domainGrouper).
|
||||||
|
75
27-functional-programming/log-parser-exp/pipeline.go
Normal file
75
27-functional-programming/log-parser-exp/pipeline.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
inputFn func() ([]result, error)
|
||||||
|
outputFn func([]result) error
|
||||||
|
filterFn func(result) (include bool)
|
||||||
|
groupFn func(result) (key string)
|
||||||
|
)
|
||||||
|
|
||||||
|
type pipeline struct {
|
||||||
|
input inputFn
|
||||||
|
filter filterFn
|
||||||
|
groupKey groupFn
|
||||||
|
output outputFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPipeline() *pipeline {
|
||||||
|
return &pipeline{
|
||||||
|
filter: noopFilter,
|
||||||
|
groupKey: noopGrouper,
|
||||||
|
input: textReader(os.Stdin),
|
||||||
|
output: textWriter(os.Stdout),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pipeline) from(fn inputFn) *pipeline { p.input = fn; return p }
|
||||||
|
func (p *pipeline) to(fn outputFn) *pipeline { p.output = fn; return p }
|
||||||
|
func (p *pipeline) filterBy(fn filterFn) *pipeline { p.filter = fn; return p }
|
||||||
|
func (p *pipeline) groupBy(fn groupFn) *pipeline { p.groupKey = fn; return p }
|
||||||
|
|
||||||
|
func (p *pipeline) start() ([]result, error) {
|
||||||
|
res, err := p.input()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
out []result
|
||||||
|
gres = make(map[string]int)
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, r := range res {
|
||||||
|
if !p.filter(r) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
k := p.groupKey(r)
|
||||||
|
|
||||||
|
if i, ok := gres[k]; ok {
|
||||||
|
out[i] = out[i].add(r)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
gres[k] = len(out)
|
||||||
|
|
||||||
|
out = append(out, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = p.output(out)
|
||||||
|
|
||||||
|
return out, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: remove me
|
||||||
|
func measure(name string) func() {
|
||||||
|
start := time.Now()
|
||||||
|
return func() {
|
||||||
|
fmt.Printf("%s took %v\n", name, time.Since(start))
|
||||||
|
}
|
||||||
|
}
|
@ -1,62 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import "os"
|
|
||||||
|
|
||||||
type (
|
|
||||||
inputFunc func() ([]result, error)
|
|
||||||
outputFunc func([]result) error
|
|
||||||
)
|
|
||||||
|
|
||||||
type report struct {
|
|
||||||
input inputFunc
|
|
||||||
filter filterFunc
|
|
||||||
group groupFunc
|
|
||||||
output outputFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func newReport() *report {
|
|
||||||
return &report{
|
|
||||||
filter: noopFilter,
|
|
||||||
group: noopGrouper,
|
|
||||||
input: textReader(os.Stdin),
|
|
||||||
output: textWriter(os.Stdout),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *report) from(fn inputFunc) *report {
|
|
||||||
r.input = fn
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *report) to(fn outputFunc) *report {
|
|
||||||
r.output = fn
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *report) filterBy(fn filterFunc) *report {
|
|
||||||
r.filter = fn
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *report) groupBy(fn groupFunc) *report {
|
|
||||||
r.group = fn
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *report) start() ([]result, error) {
|
|
||||||
// input filterBy groupBy
|
|
||||||
// scanner (result) bool map[string]result
|
|
||||||
//
|
|
||||||
// stdin -> []result -> []results -> []result -> output(stdout)
|
|
||||||
|
|
||||||
res, err := r.input()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res = filterBy(res, r.filter)
|
|
||||||
res = groupBy(res, r.group)
|
|
||||||
err = r.output(res)
|
|
||||||
|
|
||||||
return res, err
|
|
||||||
}
|
|
@ -12,10 +12,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func textReader(r io.Reader) inputFunc {
|
func textReader(r io.Reader) inputFn {
|
||||||
return func() ([]result, error) {
|
return func() ([]result, error) {
|
||||||
// first: count the lines, so the parseText can create
|
// first: count the lines, so the parseText can create
|
||||||
// enough buffer.
|
// enough buffer.
|
||||||
@ -49,7 +50,7 @@ func parseText(in *bufio.Scanner, nlines int) ([]result, error) {
|
|||||||
func countLines(r io.Reader) (int, error) {
|
func countLines(r io.Reader) (int, error) {
|
||||||
var (
|
var (
|
||||||
lines int
|
lines int
|
||||||
buf = make([]byte, 1024<<4) // read via 16 KB blocks
|
buf = make([]byte, os.Getpagesize()) // read via 16 KB blocks
|
||||||
)
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
// + uses a manual atoi
|
// + uses a manual atoi
|
||||||
// +
|
// +
|
||||||
|
|
||||||
func fastTextReader(r io.Reader) inputFunc {
|
func fastTextReader(r io.Reader) inputFn {
|
||||||
return func() ([]result, error) {
|
return func() ([]result, error) {
|
||||||
// first: count the lines, so the parseText can create
|
// first: count the lines, so the parseText can create
|
||||||
// enough buffer.
|
// enough buffer.
|
||||||
@ -45,7 +45,9 @@ func fastParseText(in *bufio.Scanner, nlines int) ([]result, error) {
|
|||||||
res := make([]result, 0, nlines)
|
res := make([]result, 0, nlines)
|
||||||
|
|
||||||
for l := 0; in.Scan(); l++ {
|
for l := 0; in.Scan(); l++ {
|
||||||
|
_ = in.Bytes()
|
||||||
r, err := fastParseFields(in.Bytes())
|
r, err := fastParseFields(in.Bytes())
|
||||||
|
// r, err := result{"foo.com", "/bar", 10, 10}, error(nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("line %d: %v", l, err)
|
return nil, fmt.Errorf("line %d: %v", l, err)
|
||||||
|
@ -19,7 +19,7 @@ const (
|
|||||||
dashLength = 58
|
dashLength = 58
|
||||||
)
|
)
|
||||||
|
|
||||||
func textWriter(w io.Writer) outputFunc {
|
func textWriter(w io.Writer) outputFn {
|
||||||
return func(results []result) error {
|
return func(results []result) error {
|
||||||
fmt.Fprintf(w, header, "DOMAINS", "PAGES", "VISITS", "UNIQUES")
|
fmt.Fprintf(w, header, "DOMAINS", "PAGES", "VISITS", "UNIQUES")
|
||||||
fmt.Fprintln(w, strings.Repeat("-", dashLength))
|
fmt.Fprintln(w, strings.Repeat("-", dashLength))
|
||||||
@ -36,7 +36,7 @@ func textWriter(w io.Writer) outputFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func noWhere() outputFunc {
|
func noWhere() outputFn {
|
||||||
return func(res []result) error {
|
return func(res []result) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user