diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/danger/main.go b/31-concurrency/xxx-concurrent-downloader/fetch/danger/main.go deleted file mode 100644 index acca60b..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/danger/main.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "fmt" - "time" -) - -/* -when the default case exists, messages are not guaranteed -*/ -func main() { - ch := make(chan int) - - // if this GR is fast enough - // the main GR can miss the signals - go func() { - var i int - - for { - i++ - - select { - case ch <- i: - default: - // message is lost - // it doesn't matter whether the chan is buffered or not - } - - if i == 10000 { - fmt.Println("gopher dies") - close(ch) - return - } - } - }() - - var total int - for i := range ch { - // works slower — misses some of the signals - time.Sleep(time.Nanosecond) - total += i - } - - // should be: 50005000 - fmt.Println(total) -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/danger2/main.go b/31-concurrency/xxx-concurrent-downloader/fetch/danger2/main.go deleted file mode 100644 index c937201..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/danger2/main.go +++ /dev/null @@ -1,40 +0,0 @@ -package main - -import ( - "fmt" - "time" -) - -func main() { - a, b := make(chan bool), make(chan bool) - - go func() { - for a != nil || b != nil { - fmt.Println("loop starts") - - select { - case <-a: - fmt.Println("recv: a") - a = nil - case <-b: - b = nil - fmt.Println("recv: b") - } - - fmt.Println("loop ends") - } - fmt.Println("gopher dies") - }() - - time.Sleep(time.Second) - // a <- true - close(a) - time.Sleep(time.Second) - //b <- true - close(b) - time.Sleep(time.Second * 2) - - // closed chan never blocks - // nil chan always blocks - // if in the loop chans not set to nil, the loop will loop forever -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/drain.go b/31-concurrency/xxx-concurrent-downloader/fetch/drain.go deleted file mode 100644 index 98346b4..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/drain.go +++ /dev/null @@ -1,12 +0,0 @@ -package fetch - -// Drain drains the progress updates and returns the latest progresses -func Drain(updates <-chan Progress) map[string]Progress { - latest := make(map[string]Progress) - - // save the latest progress - for p := range updates { - latest[p.URL] = p - } - return latest -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/httpget.go b/31-concurrency/xxx-concurrent-downloader/fetch/httpget.go deleted file mode 100644 index 56104fa..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/httpget.go +++ /dev/null @@ -1,31 +0,0 @@ -package fetch - -import ( - "fmt" - "net/http" - "time" -) - -// HTTPGet requests to a url with the specified timeout. -// And it returns the response with an error. -// The caller should drain and close the body or it will leak. -func HTTPGet(url string, timeout time.Duration) (*http.Response, error) { - req, err := http.NewRequest("GET", url, nil) - req.Close = true - if err != nil { - return nil, err - } - - c := &http.Client{Timeout: timeout} - resp, err := c.Do(req) - if err != nil { - return nil, err - } - - // checkout for the bad urls - if s := resp.StatusCode; s < 200 || s > 299 { - return resp, fmt.Errorf("bad status: %d", s) - } - - return resp, nil -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer.go b/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer.go deleted file mode 100644 index 1b132d0..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer.go +++ /dev/null @@ -1,126 +0,0 @@ -package fetch - -import ( - "fmt" - "io" - "net/http" - "os" - "path" - "time" -) - -// HTTPTransfer uses a fetcher and a storager to fetch and store an url -type HTTPTransfer struct { - Progress - r io.ReadCloser - w io.WriteCloser - - timeout time.Duration - - done <-chan bool - updates chan<- Progress -} - -// NewHTTPTransfer creates and returns a new transfer -func NewHTTPTransfer(url string, timeout time.Duration) *HTTPTransfer { - return &HTTPTransfer{ - Progress: Progress{URL: url}, - timeout: timeout, - } -} - -// Start starts the transfer progress -// The transfer will send its updates to the update chan -// and it will stop when receives a signal from the done chan -// -// All the t.signal calls are here. I believe this increases the visibility -// of the Start function (what it does). If the signal calls were in the -// other funcs of HTTPTransfer, it could easily lead to bugs. -func (t *HTTPTransfer) Start(updates chan<- Progress, done <-chan bool) { - defer t.cleanup() - - t.done, t.updates = done, updates - - t.w, t.Error = os.Create(path.Base(t.URL)) - if t.Error != nil { - t.signal() - return - } - - t.request() - if !t.signal() { - return - } - - // sniff the on-going transfer until the signal returns false - sniff := sniffer(func(p []byte) bool { - l := len(p) - t.Current = l - t.Downloaded += l - - return t.signal() - }) - - t.transfer(sniff) - t.signal() -} - -func (t *HTTPTransfer) cleanup() { - if t.r != nil { - t.r.Close() - } - if t.w != nil { - t.w.Close() - } -} - -func (t *HTTPTransfer) request() { - var resp *http.Response - - resp, t.Error = HTTPGet(t.URL, t.timeout) - if t.Error != nil { - return - } - - t.Total = int(resp.ContentLength) // TODO: int(int64) - if t.Total <= 0 { - t.Error = fmt.Errorf("unknown content length: %d", t.Total) - } - - t.r = resp.Body -} - -func (t *HTTPTransfer) transfer(sniff sniffer) { - // initiate the transfer and monitor it - _, t.Error = io.Copy(io.MultiWriter(t.w, sniff), t.r) - - // if the err is from sniffer ignore it - if _, ok := t.Error.(sniffer); ok { - t.Error = nil - } - - // the next signal will say: "no new bytes received" and its done - t.Current = 0 - t.Done = true -} - -// signal signals the listeners about the last transfer progress. -// it returns false when the done signal is received or there was an error -// in the transfer. -func (t *HTTPTransfer) signal() bool { - select { - case t.updates <- t.Progress: - case <-t.done: - // shutting down signal received - return false - } - - // check the error only after sending the last progress - // if this check was above, the last update won't be sent - if t.Error != nil { - return false - } - - // go on your duties - return true -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer_fake.go b/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer_fake.go deleted file mode 100644 index 488b94e..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/httptransfer_fake.go +++ /dev/null @@ -1,71 +0,0 @@ -package fetch - -import ( - "errors" - "fmt" - "math/rand" - "time" -) - -func (t *HTTPTransfer) fakeStart() { - jitter() - t.fakeRequest() - t.signal() - - for t.Downloaded < 100 { - jitter() - t.fakeFetch() - - if !t.signal() { - // done signal received or there was an error - break - } - } - - t.fakeFinish() - t.signal() -} - -// request requests to the url and adjusts the total length. -func (t *HTTPTransfer) fakeRequest() { - t.Total = 100 - - if debug { - fmt.Printf("[TRANSFER] started: %s\n", t.URL) - } -} - -// fetch fetches a bit from the resource -func (t *HTTPTransfer) fakeFetch() { - // TODO: right now hanged goroutine may hang the download completion - // needs timeout - // if t.URL == "url1" { - // select {} - // } - - // NOTE: burada sayacli io.Writer kullan - if t.URL == "url1" && t.Downloaded > rand.Intn(50) { - t.Error = errors.New("cekemedim netten") - } - - n := rand.Intn(20) + 1 - if nn := t.Downloaded + n; nn > 100 { - n = 100 - t.Downloaded - } - t.Current = n - t.Downloaded += n -} - -// finish signals the finish signal to the listeners -func (t *HTTPTransfer) fakeFinish() { - t.Current = 0 - t.Done = true - - if debug { - fmt.Printf("[TRANSFER] DONE: %s\n", t.URL) - } -} - -func jitter() { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)+1)) -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/progress.go b/31-concurrency/xxx-concurrent-downloader/fetch/progress.go deleted file mode 100644 index a85eb34..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/progress.go +++ /dev/null @@ -1,10 +0,0 @@ -package fetch - -// Progress contains data about the downloading progress -type Progress struct { - URL string - - Total, Downloaded, Current int - Done bool - Error error -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/session.go b/31-concurrency/xxx-concurrent-downloader/fetch/session.go deleted file mode 100644 index 3de9ea3..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/session.go +++ /dev/null @@ -1,57 +0,0 @@ -package fetch - -import ( - "fmt" - "sync" -) - -const debug = true - -// Session manages the downloading process -type Session struct { - done chan bool -} - -// Transfer sends `updates` and terminates when `done` closes -type Transfer interface { - Start(updates chan<- Progress, done <-chan bool) -} - -// NewSession creates a new downloading session -func NewSession() *Session { - return &Session{done: make(chan bool)} -} - -// Start starts the downloading process -func (s *Session) Start(transfers ...Transfer) <-chan Progress { - // a buffered chan may unblock transfers in case of a slow ui - updates := make(chan Progress) - - var wg sync.WaitGroup - wg.Add(len(transfers)) - - for _, t := range transfers { - go func(t Transfer) { - defer wg.Done() - t.Start(updates, s.done) - }(t) - } - - go func() { - wg.Wait() // wait until all downloads complete - close(updates) // let the watchers (ui) know that we're shutting down - }() - - return updates -} - -// Shutdown stops the downloading process and sends a signal to all parties -func (s *Session) Shutdown() { - // let the transfers know we're shutting down - // when this is done s.updates will be closed in the Start() routine above - close(s.done) - - if debug { - fmt.Printf("[SESSION ] DONE\n") - } -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/sniffer.go b/31-concurrency/xxx-concurrent-downloader/fetch/sniffer.go deleted file mode 100644 index 7b87429..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/sniffer.go +++ /dev/null @@ -1,27 +0,0 @@ -package fetch - -// sniffer converts a function to a sniffer that can sniff from the -// on-going io.Writer call such as request -> file. -// -// This is here just for simplifying the logic of HTTPTransfer. -type sniffer func(p []byte) bool - -// Write satistifes io.Writer interface to sniff from it. -// It can be used through a io.MultiWriter. -func (f sniffer) Write(p []byte) (n int, err error) { - n = len(p) - - // if the sniffer returns false, terminate with a non-nil error. - // it used to abrupt the sniffing process, such as abrupting the - // io.MultiWriter. - if !f(p) { - err = f - } - return -} - -// Error satisfies the Error interface. So the returned error from the -// sniffer.Write func can return itself as an error. This is only used when -// the sniff.Write wants to terminate. So that we can distinguish it from a -// real error. -func (f sniffer) Error() string { return "" } diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/storage.go b/31-concurrency/xxx-concurrent-downloader/fetch/storage.go deleted file mode 100644 index b26cf3d..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/storage.go +++ /dev/null @@ -1,44 +0,0 @@ -package fetch - -import ( - "fmt" - "io" -) - -// TODO: let main func hand the "TransferFactory" to "Session" -// instead of "StorageFactory" -// -// Because: "Session" duplicates almost everything for "Transfer" - -// StorageFactory func allows to switch storage implementations -// When called it may return a FileStore -// The transfer will call its Write method -// -// Why a func rather than an interface? -// Because: We don't have to store any state -// Storage will return its own io.Writer and the state will be in it -// However, before invoking the storage, we don't need any state -// We let us the storage manage its own state, we don't care about it -type StorageFactory func(url string) io.Writer - -// FileStorage is the default storage mechanism for the downloader -// It writes to files -type FileStorage struct { - url string - saved int -} - -// FileStorageFactory creates and returns a new FileStorage -func FileStorageFactory(url string) io.Writer { - return &FileStorage{url: url} -} - -func (f *FileStorage) Write(p []byte) (int, error) { - if debug { - fmt.Println("[FILESTORAGE]", string(p), "for", f.url) - } - // TODO: - // if not exists create it - // if exists update it - return 0, nil -} diff --git a/31-concurrency/xxx-concurrent-downloader/fetch/ui.go b/31-concurrency/xxx-concurrent-downloader/fetch/ui.go deleted file mode 100644 index cd38672..0000000 --- a/31-concurrency/xxx-concurrent-downloader/fetch/ui.go +++ /dev/null @@ -1,96 +0,0 @@ -package fetch - -import ( - "fmt" - "time" - - "github.com/inancgumus/screen" -) - -const refreshPeriod = time.Second / 10 - -// uiProgress is the default UI for the downloader -type uiProgress struct { - urls []string - transfers map[string]Progress -} - -// UI listens for the progress updates from the updates chan -// and it refreshes the ui -func UI(updates <-chan Progress) { - ui := &uiProgress{transfers: make(map[string]Progress)} - - // NOTE: we didn't use time.After here directly - // because doing so can create a lot of Timer chans unnecessarily - // instead we're just waiting on the same timer value - tick := time.After(refreshPeriod) - for { - select { - case p, ok := <-updates: - // if no more updates close the ui - if !ok { - ui.shutdown() - return - } - ui.update(p) - - case <-tick: - // `case <-tick:` allows updating the ui independently - // from the progress update signals. or the ui would hang - // the updaters (transfers). - ui.refresh() - tick = time.After(refreshPeriod) - } - } - -} - -// shutdown refreshes the ui for the last time and closes it -func (ui *uiProgress) shutdown() { - ui.refresh() - - if debug { - fmt.Printf("[ UI ] DONE\n") - } -} - -// update updates the progress data from the received message -func (ui *uiProgress) update(p Progress) { - if _, ok := ui.transfers[p.URL]; !ok { - ui.urls = append(ui.urls, p.URL) - } - - // update the latest progress for the url - ui.transfers[p.URL] = p -} - -// refresh refreshes the UI with the latest progress -func (ui *uiProgress) refresh() { - if !debug { - screen.Clear() - screen.MoveTopLeft() - } - - var total, downloaded int - - for _, u := range ui.urls { - p := ui.transfers[u] - - msg := "Downloading" - if p.Done && p.Error == nil { - msg = "👍 Completed" - } - if p.Error != nil { - msg = fmt.Sprintf("❌ %s", p.Error) - } - - fmt.Println(p.URL) - fmt.Printf("\t%d/%d\n", p.Downloaded, p.Total) - fmt.Printf("\t%s\n", msg) - - total += p.Total - downloaded += p.Downloaded - } - - fmt.Printf("\n%s %d/%d\n", "TOTAL DOWNLOADED BYTES:", downloaded, total) -} diff --git a/31-concurrency/xxx-concurrent-downloader/main.go b/31-concurrency/xxx-concurrent-downloader/main.go deleted file mode 100644 index e01a639..0000000 --- a/31-concurrency/xxx-concurrent-downloader/main.go +++ /dev/null @@ -1,103 +0,0 @@ -package main - -import ( - "time" - - dl "github.com/inancgumus/learngo/31-concurrency/xxx-concurrent-downloader/fetch" -) - -// -// +--- transfer #1 (on done/error signals and quits) -// | -// UI <--+--- transfer #2 -// | -// +--- transfer #N -// ^ -// | -// SESSION ------+ launches goroutines -// - -// -// +--- transfer #1 -// | -// UI <--+--- transfer #2 -// | -// +--- transfer #N -// ^ -// | -// SESSION ------+ launches goroutines -// - -// -// +--- transfer #1 -// | -// UI <--+--- transfer #2 -// ^ | -// | +--- transfer #N -// | ^ -// | | -// +--------- SESSION -// -// Session can close the transfers with session.Shutdown() -// (through session.done chan) -// -// Session closes the watcher chan when the transfers end -// Possible problem: If any of the transfers never quit, session will hang. -// I think, I'm going to manage that using context.Context in the transfers. -// - -func main() { - // ======================================================================== - - // to := time.Second * 5 - // res, err := dl.HTTPGet("https://jsonplaceholder.typicode.com/todos/1", to) - // if err != nil { - // fmt.Println(err) - // return - // } - // defer res.Body.Close() - - // io.Copy(os.Stdout, res.Body) - - // return - - // ======================================================================== - - sess := dl.NewSession() - - // simulate a manual shutdown - // time.AfterFunc(time.Second, func() { - // sess.Shutdown() - // }) - - to := time.Second * 5 - transfers := []dl.Transfer{ - dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-160kb.jpg", to), - dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-610kb.jpg", to), - dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-80kb.jpg", to), - dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1900x1250-1700kb.jpg", to), - } - - // transfers := []dl.Transfer{ - // dl.NewHTTPTransfer(ctx, "http://inanc.io/1"), - // dl.NewHTTPTransfer(ctx, "http://inanc.io/2"), - // dl.NewHTTPTransfer(ctx, "http://inanc.io/3"), - // } - - dl.UI(sess.Start(transfers...)) - - // results := dl.Drain(sess.Start(urls)) - // for _, r := range results { - // fmt.Printf("%s [err: %v — %d/%d]\n", - // r.URL, r.Error, r.Downloaded, r.Total) - // } - - // how to handle ctrl+c signals? - // register a signal - // let the downloader now that the operation halts - // with Downloader.onCancel or "context.Context" - - // run with: GOTRACEBACK=all go run -race main.go - // time.Sleep(time.Second * 2) - // panic("give me the stack trace") -}