remove: concurrent downloader

This commit is contained in:
Inanc Gumus
2019-05-24 16:35:46 +03:00
parent 5c1364e181
commit 4eeb721db0
12 changed files with 0 additions and 663 deletions

View File

@ -1,46 +0,0 @@
package main
import (
"fmt"
"time"
)
/*
when the default case exists, messages are not guaranteed
*/
func main() {
ch := make(chan int)
// if this GR is fast enough
// the main GR can miss the signals
go func() {
var i int
for {
i++
select {
case ch <- i:
default:
// message is lost
// it doesn't matter whether the chan is buffered or not
}
if i == 10000 {
fmt.Println("gopher dies")
close(ch)
return
}
}
}()
var total int
for i := range ch {
// works slower — misses some of the signals
time.Sleep(time.Nanosecond)
total += i
}
// should be: 50005000
fmt.Println(total)
}

View File

@ -1,40 +0,0 @@
package main
import (
"fmt"
"time"
)
func main() {
a, b := make(chan bool), make(chan bool)
go func() {
for a != nil || b != nil {
fmt.Println("loop starts")
select {
case <-a:
fmt.Println("recv: a")
a = nil
case <-b:
b = nil
fmt.Println("recv: b")
}
fmt.Println("loop ends")
}
fmt.Println("gopher dies")
}()
time.Sleep(time.Second)
// a <- true
close(a)
time.Sleep(time.Second)
//b <- true
close(b)
time.Sleep(time.Second * 2)
// closed chan never blocks
// nil chan always blocks
// if in the loop chans not set to nil, the loop will loop forever
}

View File

@ -1,12 +0,0 @@
package fetch
// Drain drains the progress updates and returns the latest progresses
func Drain(updates <-chan Progress) map[string]Progress {
latest := make(map[string]Progress)
// save the latest progress
for p := range updates {
latest[p.URL] = p
}
return latest
}

View File

@ -1,31 +0,0 @@
package fetch
import (
"fmt"
"net/http"
"time"
)
// HTTPGet requests to a url with the specified timeout.
// And it returns the response with an error.
// The caller should drain and close the body or it will leak.
func HTTPGet(url string, timeout time.Duration) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
req.Close = true
if err != nil {
return nil, err
}
c := &http.Client{Timeout: timeout}
resp, err := c.Do(req)
if err != nil {
return nil, err
}
// checkout for the bad urls
if s := resp.StatusCode; s < 200 || s > 299 {
return resp, fmt.Errorf("bad status: %d", s)
}
return resp, nil
}

View File

@ -1,126 +0,0 @@
package fetch
import (
"fmt"
"io"
"net/http"
"os"
"path"
"time"
)
// HTTPTransfer uses a fetcher and a storager to fetch and store an url
type HTTPTransfer struct {
Progress
r io.ReadCloser
w io.WriteCloser
timeout time.Duration
done <-chan bool
updates chan<- Progress
}
// NewHTTPTransfer creates and returns a new transfer
func NewHTTPTransfer(url string, timeout time.Duration) *HTTPTransfer {
return &HTTPTransfer{
Progress: Progress{URL: url},
timeout: timeout,
}
}
// Start starts the transfer progress
// The transfer will send its updates to the update chan
// and it will stop when receives a signal from the done chan
//
// All the t.signal calls are here. I believe this increases the visibility
// of the Start function (what it does). If the signal calls were in the
// other funcs of HTTPTransfer, it could easily lead to bugs.
func (t *HTTPTransfer) Start(updates chan<- Progress, done <-chan bool) {
defer t.cleanup()
t.done, t.updates = done, updates
t.w, t.Error = os.Create(path.Base(t.URL))
if t.Error != nil {
t.signal()
return
}
t.request()
if !t.signal() {
return
}
// sniff the on-going transfer until the signal returns false
sniff := sniffer(func(p []byte) bool {
l := len(p)
t.Current = l
t.Downloaded += l
return t.signal()
})
t.transfer(sniff)
t.signal()
}
func (t *HTTPTransfer) cleanup() {
if t.r != nil {
t.r.Close()
}
if t.w != nil {
t.w.Close()
}
}
func (t *HTTPTransfer) request() {
var resp *http.Response
resp, t.Error = HTTPGet(t.URL, t.timeout)
if t.Error != nil {
return
}
t.Total = int(resp.ContentLength) // TODO: int(int64)
if t.Total <= 0 {
t.Error = fmt.Errorf("unknown content length: %d", t.Total)
}
t.r = resp.Body
}
func (t *HTTPTransfer) transfer(sniff sniffer) {
// initiate the transfer and monitor it
_, t.Error = io.Copy(io.MultiWriter(t.w, sniff), t.r)
// if the err is from sniffer ignore it
if _, ok := t.Error.(sniffer); ok {
t.Error = nil
}
// the next signal will say: "no new bytes received" and its done
t.Current = 0
t.Done = true
}
// signal signals the listeners about the last transfer progress.
// it returns false when the done signal is received or there was an error
// in the transfer.
func (t *HTTPTransfer) signal() bool {
select {
case t.updates <- t.Progress:
case <-t.done:
// shutting down signal received
return false
}
// check the error only after sending the last progress
// if this check was above, the last update won't be sent
if t.Error != nil {
return false
}
// go on your duties
return true
}

View File

@ -1,71 +0,0 @@
package fetch
import (
"errors"
"fmt"
"math/rand"
"time"
)
func (t *HTTPTransfer) fakeStart() {
jitter()
t.fakeRequest()
t.signal()
for t.Downloaded < 100 {
jitter()
t.fakeFetch()
if !t.signal() {
// done signal received or there was an error
break
}
}
t.fakeFinish()
t.signal()
}
// request requests to the url and adjusts the total length.
func (t *HTTPTransfer) fakeRequest() {
t.Total = 100
if debug {
fmt.Printf("[TRANSFER] started: %s\n", t.URL)
}
}
// fetch fetches a bit from the resource
func (t *HTTPTransfer) fakeFetch() {
// TODO: right now hanged goroutine may hang the download completion
// needs timeout
// if t.URL == "url1" {
// select {}
// }
// NOTE: burada sayacli io.Writer kullan
if t.URL == "url1" && t.Downloaded > rand.Intn(50) {
t.Error = errors.New("cekemedim netten")
}
n := rand.Intn(20) + 1
if nn := t.Downloaded + n; nn > 100 {
n = 100 - t.Downloaded
}
t.Current = n
t.Downloaded += n
}
// finish signals the finish signal to the listeners
func (t *HTTPTransfer) fakeFinish() {
t.Current = 0
t.Done = true
if debug {
fmt.Printf("[TRANSFER] DONE: %s\n", t.URL)
}
}
func jitter() {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)+1))
}

View File

@ -1,10 +0,0 @@
package fetch
// Progress contains data about the downloading progress
type Progress struct {
URL string
Total, Downloaded, Current int
Done bool
Error error
}

View File

@ -1,57 +0,0 @@
package fetch
import (
"fmt"
"sync"
)
const debug = true
// Session manages the downloading process
type Session struct {
done chan bool
}
// Transfer sends `updates` and terminates when `done` closes
type Transfer interface {
Start(updates chan<- Progress, done <-chan bool)
}
// NewSession creates a new downloading session
func NewSession() *Session {
return &Session{done: make(chan bool)}
}
// Start starts the downloading process
func (s *Session) Start(transfers ...Transfer) <-chan Progress {
// a buffered chan may unblock transfers in case of a slow ui
updates := make(chan Progress)
var wg sync.WaitGroup
wg.Add(len(transfers))
for _, t := range transfers {
go func(t Transfer) {
defer wg.Done()
t.Start(updates, s.done)
}(t)
}
go func() {
wg.Wait() // wait until all downloads complete
close(updates) // let the watchers (ui) know that we're shutting down
}()
return updates
}
// Shutdown stops the downloading process and sends a signal to all parties
func (s *Session) Shutdown() {
// let the transfers know we're shutting down
// when this is done s.updates will be closed in the Start() routine above
close(s.done)
if debug {
fmt.Printf("[SESSION ] DONE\n")
}
}

View File

@ -1,27 +0,0 @@
package fetch
// sniffer converts a function to a sniffer that can sniff from the
// on-going io.Writer call such as request -> file.
//
// This is here just for simplifying the logic of HTTPTransfer.
type sniffer func(p []byte) bool
// Write satistifes io.Writer interface to sniff from it.
// It can be used through a io.MultiWriter.
func (f sniffer) Write(p []byte) (n int, err error) {
n = len(p)
// if the sniffer returns false, terminate with a non-nil error.
// it used to abrupt the sniffing process, such as abrupting the
// io.MultiWriter.
if !f(p) {
err = f
}
return
}
// Error satisfies the Error interface. So the returned error from the
// sniffer.Write func can return itself as an error. This is only used when
// the sniff.Write wants to terminate. So that we can distinguish it from a
// real error.
func (f sniffer) Error() string { return "" }

View File

@ -1,44 +0,0 @@
package fetch
import (
"fmt"
"io"
)
// TODO: let main func hand the "TransferFactory" to "Session"
// instead of "StorageFactory"
//
// Because: "Session" duplicates almost everything for "Transfer"
// StorageFactory func allows to switch storage implementations
// When called it may return a FileStore
// The transfer will call its Write method
//
// Why a func rather than an interface?
// Because: We don't have to store any state
// Storage will return its own io.Writer and the state will be in it
// However, before invoking the storage, we don't need any state
// We let us the storage manage its own state, we don't care about it
type StorageFactory func(url string) io.Writer
// FileStorage is the default storage mechanism for the downloader
// It writes to files
type FileStorage struct {
url string
saved int
}
// FileStorageFactory creates and returns a new FileStorage
func FileStorageFactory(url string) io.Writer {
return &FileStorage{url: url}
}
func (f *FileStorage) Write(p []byte) (int, error) {
if debug {
fmt.Println("[FILESTORAGE]", string(p), "for", f.url)
}
// TODO:
// if not exists create it
// if exists update it
return 0, nil
}

View File

@ -1,96 +0,0 @@
package fetch
import (
"fmt"
"time"
"github.com/inancgumus/screen"
)
const refreshPeriod = time.Second / 10
// uiProgress is the default UI for the downloader
type uiProgress struct {
urls []string
transfers map[string]Progress
}
// UI listens for the progress updates from the updates chan
// and it refreshes the ui
func UI(updates <-chan Progress) {
ui := &uiProgress{transfers: make(map[string]Progress)}
// NOTE: we didn't use time.After here directly
// because doing so can create a lot of Timer chans unnecessarily
// instead we're just waiting on the same timer value
tick := time.After(refreshPeriod)
for {
select {
case p, ok := <-updates:
// if no more updates close the ui
if !ok {
ui.shutdown()
return
}
ui.update(p)
case <-tick:
// `case <-tick:` allows updating the ui independently
// from the progress update signals. or the ui would hang
// the updaters (transfers).
ui.refresh()
tick = time.After(refreshPeriod)
}
}
}
// shutdown refreshes the ui for the last time and closes it
func (ui *uiProgress) shutdown() {
ui.refresh()
if debug {
fmt.Printf("[ UI ] DONE\n")
}
}
// update updates the progress data from the received message
func (ui *uiProgress) update(p Progress) {
if _, ok := ui.transfers[p.URL]; !ok {
ui.urls = append(ui.urls, p.URL)
}
// update the latest progress for the url
ui.transfers[p.URL] = p
}
// refresh refreshes the UI with the latest progress
func (ui *uiProgress) refresh() {
if !debug {
screen.Clear()
screen.MoveTopLeft()
}
var total, downloaded int
for _, u := range ui.urls {
p := ui.transfers[u]
msg := "Downloading"
if p.Done && p.Error == nil {
msg = "👍 Completed"
}
if p.Error != nil {
msg = fmt.Sprintf("❌ %s", p.Error)
}
fmt.Println(p.URL)
fmt.Printf("\t%d/%d\n", p.Downloaded, p.Total)
fmt.Printf("\t%s\n", msg)
total += p.Total
downloaded += p.Downloaded
}
fmt.Printf("\n%s %d/%d\n", "TOTAL DOWNLOADED BYTES:", downloaded, total)
}

View File

@ -1,103 +0,0 @@
package main
import (
"time"
dl "github.com/inancgumus/learngo/31-concurrency/xxx-concurrent-downloader/fetch"
)
//
// +--- transfer #1 (on done/error signals and quits)
// |
// UI <--+--- transfer #2
// |
// +--- transfer #N
// ^
// |
// SESSION ------+ launches goroutines
//
//
// +--- transfer #1
// |
// UI <--+--- transfer #2
// |
// +--- transfer #N
// ^
// |
// SESSION ------+ launches goroutines
//
//
// +--- transfer #1
// |
// UI <--+--- transfer #2
// ^ |
// | +--- transfer #N
// | ^
// | |
// +--------- SESSION
//
// Session can close the transfers with session.Shutdown()
// (through session.done chan)
//
// Session closes the watcher chan when the transfers end
// Possible problem: If any of the transfers never quit, session will hang.
// I think, I'm going to manage that using context.Context in the transfers.
//
func main() {
// ========================================================================
// to := time.Second * 5
// res, err := dl.HTTPGet("https://jsonplaceholder.typicode.com/todos/1", to)
// if err != nil {
// fmt.Println(err)
// return
// }
// defer res.Body.Close()
// io.Copy(os.Stdout, res.Body)
// return
// ========================================================================
sess := dl.NewSession()
// simulate a manual shutdown
// time.AfterFunc(time.Second, func() {
// sess.Shutdown()
// })
to := time.Second * 5
transfers := []dl.Transfer{
dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-160kb.jpg", to),
dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-610kb.jpg", to),
dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1020x670-80kb.jpg", to),
dl.NewHTTPTransfer("https://inancgumus.github.io/samples/jpg-beach-1900x1250-1700kb.jpg", to),
}
// transfers := []dl.Transfer{
// dl.NewHTTPTransfer(ctx, "http://inanc.io/1"),
// dl.NewHTTPTransfer(ctx, "http://inanc.io/2"),
// dl.NewHTTPTransfer(ctx, "http://inanc.io/3"),
// }
dl.UI(sess.Start(transfers...))
// results := dl.Drain(sess.Start(urls))
// for _, r := range results {
// fmt.Printf("%s [err: %v — %d/%d]\n",
// r.URL, r.Error, r.Downloaded, r.Total)
// }
// how to handle ctrl+c signals?
// register a signal
// let the downloader now that the operation halts
// with Downloader.onCancel or "context.Context"
// run with: GOTRACEBACK=all go run -race main.go
// time.Sleep(time.Second * 2)
// panic("give me the stack trace")
}