PHP Classes

File: pool.go

Recommend this page to a friend!
  Classes of Wolfy-J   roadrunner   pool.go   Download  
File: pool.go
Role: Auxiliary data
Content type: text/plain
Description: Auxiliary data
Class: roadrunner
Run multiple tasks of PHP in parallel using Golang
Author: By
Last change:
Date: 6 years ago
Size: 5,124 bytes
 

Contents

Class file image Download
package roadrunner import ( "fmt" "github.com/pkg/errors" "os/exec" "sync" "time" ) const ( // StopRequest can be sent by worker to indicate that restart is required. StopRequest = "{\"stop\":true}" ) const ( // EventCreated thrown when new worker is spawned. EventCreated = iota // EventDestruct thrown before worker destruction. EventDestruct // EventError thrown any worker related even happen (error passed as context) EventError ) // Pool controls worker creation, destruction and task routing. type Pool struct { // Observer is optional callback to handle worker create/destruct/error events. Observer func(event int, w *Worker, ctx interface{}) // pool behaviour cfg Config // worker command creator cmd func() *exec.Cmd // creates and connects to workers factory Factory // active task executions tasks sync.WaitGroup // workers circular allocation buffer free chan *Worker // protects state of worker list, does not affect allocation muw sync.RWMutex // all registered workers workers []*Worker } // NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { if err := cfg.Valid(); err != nil { return nil, errors.Wrap(err, "config error") } p := &Pool{ cfg: cfg, cmd: cmd, factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), free: make(chan *Worker, cfg.NumWorkers), } // constant number of workers simplify logic for i := uint64(0); i < p.cfg.NumWorkers; i++ { // to test if worker ready w, err := p.createWorker() if err != nil { p.Destroy() return nil, err } p.free <- w } return p, nil } // Config returns associated pool configuration. Immutable. func (p *Pool) Config() Config { return p.cfg } // Workers returns worker list associated with the pool. func (p *Pool) Workers() (workers []*Worker) { p.muw.RLock() defer p.muw.RUnlock() for _, w := range p.workers { workers = append(workers, w) } return workers } // Exec one task with given payload and context, returns result or error. func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) { p.tasks.Add(1) defer p.tasks.Done() w, err := p.allocateWorker() if err != nil { return nil, errors.Wrap(err, "unable to allocate worker") } rsp, err = w.Exec(rqs) if err != nil { // soft job errors are allowed if _, jobError := err.(JobError); jobError { p.free <- w return nil, err } go p.replaceWorker(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { go p.replaceWorker(w, err) return p.Exec(rqs) } if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { go p.replaceWorker(w, p.cfg.MaxExecutions) } else { p.free <- w } return rsp, nil } // Destroy all underlying workers (but let them to complete the task). func (p *Pool) Destroy() { p.tasks.Wait() var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) go func(w *Worker) { defer wg.Done() p.destroyWorker(w) }(w) } wg.Wait() } // finds free worker in a given time interval or creates new if allowed. func (p *Pool) allocateWorker() (w *Worker, err error) { select { case w = <-p.free: return w, nil default: // enable timeout handler } timeout := time.NewTimer(p.cfg.AllocateTimeout) select { case <-timeout.C: return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) case w := <-p.free: timeout.Stop() return w, nil } } // replaces dead or expired worker with new instance func (p *Pool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) if nw, err := p.createWorker(); err != nil { p.throw(EventError, w, err) if len(p.Workers()) == 0 { // possible situation when major error causes all PHP scripts to die (for example dead DB) p.throw(EventError, nil, fmt.Errorf("all workers dead")) } } else { p.free <- nw } } // destroy and remove worker from the pool. func (p *Pool) destroyWorker(w *Worker) { p.throw(EventDestruct, w, nil) // detaching p.muw.Lock() for i, wc := range p.workers { if wc == w { p.workers = p.workers[:i+1] break } } p.muw.Unlock() go w.Stop() select { case <-w.waitDone: // worker is dead case <-time.NewTimer(p.cfg.DestroyTimeout).C: // failed to stop process if err := w.Kill(); err != nil { p.throw(EventError, w, err) } } } // creates new worker using associated factory. automatically // adds worker to the worker list (background) func (p *Pool) createWorker() (*Worker, error) { w, err := p.factory.SpawnWorker(p.cmd()) if err != nil { return nil, err } p.throw(EventCreated, w, nil) go func(w *Worker) { if err := w.Wait(); err != nil { p.throw(EventError, w, err) } }(w) p.muw.Lock() defer p.muw.Unlock() p.workers = append(p.workers, w) return w, nil } // throw invokes event handler if any. func (p *Pool) throw(event int, w *Worker, ctx interface{}) { if p.Observer != nil { p.Observer(event, w, ctx) } }