From 6ea4d2c82de80efc87708e5e182034b7c6c2019e Mon Sep 17 00:00:00 2001 From: Gibheer Date: Thu, 5 Sep 2024 19:38:25 +0200 Subject: switch from github.com/lib/pq to github.com/jackc/pgx/v5 lib/pq is out of maintenance for some time now, so switch to the newer more active library. Looks like it finally stabilized after a long time. --- vendor/github.com/jackc/puddle/v2/pool.go | 696 ++++++++++++++++++++++++++++++ 1 file changed, 696 insertions(+) create mode 100644 vendor/github.com/jackc/puddle/v2/pool.go (limited to 'vendor/github.com/jackc/puddle/v2/pool.go') diff --git a/vendor/github.com/jackc/puddle/v2/pool.go b/vendor/github.com/jackc/puddle/v2/pool.go new file mode 100644 index 0000000..c8edc0f --- /dev/null +++ b/vendor/github.com/jackc/puddle/v2/pool.go @@ -0,0 +1,696 @@ +package puddle + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/jackc/puddle/v2/internal/genstack" + "golang.org/x/sync/semaphore" +) + +const ( + resourceStatusConstructing = 0 + resourceStatusIdle = iota + resourceStatusAcquired = iota + resourceStatusHijacked = iota +) + +// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool +// or a pool that is closed while the acquire is waiting. +var ErrClosedPool = errors.New("closed pool") + +// ErrNotAvailable occurs on an attempt to acquire a resource from a pool +// that is at maximum capacity and has no available resources. +var ErrNotAvailable = errors.New("resource not available") + +// Constructor is a function called by the pool to construct a resource. +type Constructor[T any] func(ctx context.Context) (res T, err error) + +// Destructor is a function called by the pool to destroy a resource. +type Destructor[T any] func(res T) + +// Resource is the resource handle returned by acquiring from the pool. +type Resource[T any] struct { + value T + pool *Pool[T] + creationTime time.Time + lastUsedNano int64 + poolResetCount int + status byte +} + +// Value returns the resource value. +func (res *Resource[T]) Value() T { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.value +} + +// Release returns the resource to the pool. res must not be subsequently used. +func (res *Resource[T]) Release() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, nanotime()) +} + +// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime +// will not change. res must not be subsequently used. +func (res *Resource[T]) ReleaseUnused() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, res.lastUsedNano) +} + +// Destroy returns the resource to the pool for destruction. res must not be +// subsequently used. +func (res *Resource[T]) Destroy() { + if res.status != resourceStatusAcquired { + panic("tried to destroy resource that is not acquired") + } + go res.pool.destroyAcquiredResource(res) +} + +// Hijack assumes ownership of the resource from the pool. Caller is responsible +// for cleanup of resource value. +func (res *Resource[T]) Hijack() { + if res.status != resourceStatusAcquired { + panic("tried to hijack resource that is not acquired") + } + res.pool.hijackAcquiredResource(res) +} + +// CreationTime returns when the resource was created by the pool. +func (res *Resource[T]) CreationTime() time.Time { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.creationTime +} + +// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time +// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with +// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead. +func (res *Resource[T]) LastUsedNanotime() int64 { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return res.lastUsedNano +} + +// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting +// LastUsedNanotime to the current nanotime. +func (res *Resource[T]) IdleDuration() time.Duration { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return time.Duration(nanotime() - res.lastUsedNano) +} + +// Pool is a concurrency-safe resource pool. +type Pool[T any] struct { + // mux is the pool internal lock. Any modification of shared state of + // the pool (but Acquires of acquireSem) must be performed only by + // holder of the lock. Long running operations are not allowed when mux + // is held. + mux sync.Mutex + // acquireSem provides an allowance to acquire a resource. + // + // Releases are allowed only when caller holds mux. Acquires have to + // happen before mux is locked (doesn't apply to semaphore.TryAcquire in + // AcquireAllIdle). + acquireSem *semaphore.Weighted + destructWG sync.WaitGroup + + allResources resList[T] + idleResources *genstack.GenStack[*Resource[T]] + + constructor Constructor[T] + destructor Destructor[T] + maxSize int32 + + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount atomic.Int64 + + resetCount int + + baseAcquireCtx context.Context + cancelBaseAcquireCtx context.CancelFunc + closed bool +} + +type Config[T any] struct { + Constructor Constructor[T] + Destructor Destructor[T] + MaxSize int32 +} + +// NewPool creates a new pool. Panics if maxSize is less than 1. +func NewPool[T any](config *Config[T]) (*Pool[T], error) { + if config.MaxSize < 1 { + return nil, errors.New("MaxSize must be >= 1") + } + + baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) + + return &Pool[T]{ + acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), + idleResources: genstack.NewGenStack[*Resource[T]](), + maxSize: config.MaxSize, + constructor: config.Constructor, + destructor: config.Destructor, + baseAcquireCtx: baseAcquireCtx, + cancelBaseAcquireCtx: cancelBaseAcquireCtx, + }, nil +} + +// Close destroys all resources in the pool and rejects future Acquire calls. +// Blocks until all resources are returned to pool and destroyed. +func (p *Pool[T]) Close() { + defer p.destructWG.Wait() + + p.mux.Lock() + defer p.mux.Unlock() + + if p.closed { + return + } + p.closed = true + p.cancelBaseAcquireCtx() + + for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { + p.allResources.remove(res) + go p.destructResourceValue(res.value) + } +} + +// Stat is a snapshot of Pool statistics. +type Stat struct { + constructingResources int32 + acquiredResources int32 + idleResources int32 + maxResources int32 + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount int64 +} + +// TotalResources returns the total number of resources currently in the pool. +// The value is the sum of ConstructingResources, AcquiredResources, and +// IdleResources. +func (s *Stat) TotalResources() int32 { + return s.constructingResources + s.acquiredResources + s.idleResources +} + +// ConstructingResources returns the number of resources with construction in progress in +// the pool. +func (s *Stat) ConstructingResources() int32 { + return s.constructingResources +} + +// AcquiredResources returns the number of currently acquired resources in the pool. +func (s *Stat) AcquiredResources() int32 { + return s.acquiredResources +} + +// IdleResources returns the number of currently idle resources in the pool. +func (s *Stat) IdleResources() int32 { + return s.idleResources +} + +// MaxResources returns the maximum size of the pool. +func (s *Stat) MaxResources() int32 { + return s.maxResources +} + +// AcquireCount returns the cumulative count of successful acquires from the pool. +func (s *Stat) AcquireCount() int64 { + return s.acquireCount +} + +// AcquireDuration returns the total duration of all successful acquires from +// the pool. +func (s *Stat) AcquireDuration() time.Duration { + return s.acquireDuration +} + +// EmptyAcquireCount returns the cumulative count of successful acquires from the pool +// that waited for a resource to be released or constructed because the pool was +// empty. +func (s *Stat) EmptyAcquireCount() int64 { + return s.emptyAcquireCount +} + +// CanceledAcquireCount returns the cumulative count of acquires from the pool +// that were canceled by a context. +func (s *Stat) CanceledAcquireCount() int64 { + return s.canceledAcquireCount +} + +// Stat returns the current pool statistics. +func (p *Pool[T]) Stat() *Stat { + p.mux.Lock() + defer p.mux.Unlock() + + s := &Stat{ + maxResources: p.maxSize, + acquireCount: p.acquireCount, + emptyAcquireCount: p.emptyAcquireCount, + canceledAcquireCount: p.canceledAcquireCount.Load(), + acquireDuration: p.acquireDuration, + } + + for _, res := range p.allResources { + switch res.status { + case resourceStatusConstructing: + s.constructingResources += 1 + case resourceStatusIdle: + s.idleResources += 1 + case resourceStatusAcquired: + s.acquiredResources += 1 + } + } + + return s +} + +// tryAcquireIdleResource checks if there is any idle resource. If there is +// some, this method removes it from idle list and returns it. If the idle pool +// is empty, this method returns nil and doesn't modify the idleResources slice. +// +// WARNING: Caller of this method must hold the pool mutex! +func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { + res, ok := p.idleResources.Pop() + if !ok { + return nil + } + + res.status = resourceStatusAcquired + return res +} + +// createNewResource creates a new resource and inserts it into list of pool +// resources. +// +// WARNING: Caller of this method must hold the pool mutex! +func (p *Pool[T]) createNewResource() *Resource[T] { + res := &Resource[T]{ + pool: p, + creationTime: time.Now(), + lastUsedNano: nanotime(), + poolResetCount: p.resetCount, + status: resourceStatusConstructing, + } + + p.allResources.append(res) + p.destructWG.Add(1) + + return res +} + +// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will +// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be +// used to cancel the Acquire. +// +// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to +// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids +// the problem of it being impossible to create resources when the time to create a resource is greater than any one +// caller of Acquire is willing to wait. +func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { + select { + case <-ctx.Done(): + p.canceledAcquireCount.Add(1) + return nil, ctx.Err() + default: + } + + return p.acquire(ctx) +} + +// acquire is a continuation of Acquire function that doesn't check context +// validity. +// +// This function exists solely only for benchmarking purposes. +func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { + startNano := nanotime() + + var waitedForLock bool + if !p.acquireSem.TryAcquire(1) { + waitedForLock = true + err := p.acquireSem.Acquire(ctx, 1) + if err != nil { + p.canceledAcquireCount.Add(1) + return nil, err + } + } + + p.mux.Lock() + if p.closed { + p.acquireSem.Release(1) + p.mux.Unlock() + return nil, ErrClosedPool + } + + // If a resource is available in the pool. + if res := p.tryAcquireIdleResource(); res != nil { + if waitedForLock { + p.emptyAcquireCount += 1 + } + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.mux.Unlock() + return res, nil + } + + if len(p.allResources) >= int(p.maxSize) { + // Unreachable code. + panic("bug: semaphore allowed more acquires than pool allows") + } + + // The resource is not idle, but there is enough space to create one. + res := p.createNewResource() + p.mux.Unlock() + + res, err := p.initResourceValue(ctx, res) + if err != nil { + return nil, err + } + + p.mux.Lock() + defer p.mux.Unlock() + + p.emptyAcquireCount += 1 + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + + return res, nil +} + +func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) { + // Create the resource in a goroutine to immediately return from Acquire + // if ctx is canceled without also canceling the constructor. + // + // See: + // - https://github.com/jackc/pgx/issues/1287 + // - https://github.com/jackc/pgx/issues/1259 + constructErrChan := make(chan error) + go func() { + constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) + value, err := p.constructor(constructorCtx) + if err != nil { + p.mux.Lock() + p.allResources.remove(res) + p.destructWG.Done() + + // The resource won't be acquired because its + // construction failed. We have to allow someone else to + // take that resouce. + p.acquireSem.Release(1) + p.mux.Unlock() + + select { + case constructErrChan <- err: + case <-ctx.Done(): + // The caller is cancelled, so no-one awaits the + // error. This branch avoid goroutine leak. + } + return + } + + // The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its + // status. + p.mux.Lock() + res.value = value + res.status = resourceStatusAcquired + p.mux.Unlock() + + // This select works because the channel is unbuffered. + select { + case constructErrChan <- nil: + case <-ctx.Done(): + p.releaseAcquiredResource(res, res.lastUsedNano) + } + }() + + select { + case <-ctx.Done(): + p.canceledAcquireCount.Add(1) + return nil, ctx.Err() + case err := <-constructErrChan: + if err != nil { + return nil, err + } + return res, nil + } +} + +// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no +// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only +// used to cancel the background creation. +func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { + if !p.acquireSem.TryAcquire(1) { + return nil, ErrNotAvailable + } + + p.mux.Lock() + defer p.mux.Unlock() + + if p.closed { + p.acquireSem.Release(1) + return nil, ErrClosedPool + } + + // If a resource is available now + if res := p.tryAcquireIdleResource(); res != nil { + p.acquireCount += 1 + return res, nil + } + + if len(p.allResources) >= int(p.maxSize) { + // Unreachable code. + panic("bug: semaphore allowed more acquires than pool allows") + } + + res := p.createNewResource() + go func() { + value, err := p.constructor(ctx) + + p.mux.Lock() + defer p.mux.Unlock() + // We have to create the resource and only then release the + // semaphore - For the time being there is no resource that + // someone could acquire. + defer p.acquireSem.Release(1) + + if err != nil { + p.allResources.remove(res) + p.destructWG.Done() + return + } + + res.value = value + res.status = resourceStatusIdle + p.idleResources.Push(res) + }() + + return nil, ErrNotAvailable +} + +// acquireSemAll tries to acquire num free tokens from sem. This function is +// guaranteed to acquire at least the lowest number of tokens that has been +// available in the semaphore during runtime of this function. +// +// For the time being, semaphore doesn't allow to acquire all tokens atomically +// (see https://github.com/golang/sync/pull/19). We simulate this by trying all +// powers of 2 that are less or equal to num. +// +// For example, let's immagine we have 19 free tokens in the semaphore which in +// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if +// num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 +// tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed. +// +// Naturally, Acquires and Releases of the semaphore might take place +// concurrently. For this reason, it's not guaranteed that absolutely all free +// tokens in the semaphore will be acquired. But it's guaranteed that at least +// the minimal number of tokens that has been present over the whole process +// will be acquired. This is sufficient for the use-case we have in this +// package. +// +// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to +// upstream. https://github.com/golang/sync/pull/19 +func acquireSemAll(sem *semaphore.Weighted, num int) int { + if sem.TryAcquire(int64(num)) { + return num + } + + var acquired int + for i := int(log2Int(num)); i >= 0; i-- { + val := 1 << i + if sem.TryAcquire(int64(val)) { + acquired += val + } + } + + return acquired +} + +// AcquireAllIdle acquires all currently idle resources. Its intended use is for +// health check and keep-alive functionality. It does not update pool +// statistics. +func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { + p.mux.Lock() + defer p.mux.Unlock() + + if p.closed { + return nil + } + + numIdle := p.idleResources.Len() + if numIdle == 0 { + return nil + } + + // In acquireSemAll we use only TryAcquire and not Acquire. Because + // TryAcquire cannot block, the fact that we hold mutex locked and try + // to acquire semaphore cannot result in dead-lock. + // + // Because the mutex is locked, no parallel Release can run. This + // implies that the number of tokens can only decrease because some + // Acquire/TryAcquire call can consume the semaphore token. Consequently + // acquired is always less or equal to numIdle. Moreover if acquired < + // numIdle, then there are some parallel Acquire/TryAcquire calls that + // will take the remaining idle connections. + acquired := acquireSemAll(p.acquireSem, numIdle) + + idle := make([]*Resource[T], acquired) + for i := range idle { + res, _ := p.idleResources.Pop() + res.status = resourceStatusAcquired + idle[i] = res + } + + // We have to bump the generation to ensure that Acquire/TryAcquire + // calls running in parallel (those which caused acquired < numIdle) + // will consume old connections and not freshly released connections + // instead. + p.idleResources.NextGen() + + return idle +} + +// CreateResource constructs a new resource without acquiring it. It goes straight in the IdlePool. If the pool is full +// it returns an error. It can be useful to maintain warm resources under little load. +func (p *Pool[T]) CreateResource(ctx context.Context) error { + if !p.acquireSem.TryAcquire(1) { + return ErrNotAvailable + } + + p.mux.Lock() + if p.closed { + p.acquireSem.Release(1) + p.mux.Unlock() + return ErrClosedPool + } + + if len(p.allResources) >= int(p.maxSize) { + p.acquireSem.Release(1) + p.mux.Unlock() + return ErrNotAvailable + } + + res := p.createNewResource() + p.mux.Unlock() + + value, err := p.constructor(ctx) + p.mux.Lock() + defer p.mux.Unlock() + defer p.acquireSem.Release(1) + if err != nil { + p.allResources.remove(res) + p.destructWG.Done() + return err + } + + res.value = value + res.status = resourceStatusIdle + + // If closed while constructing resource then destroy it and return an error + if p.closed { + go p.destructResourceValue(res.value) + return ErrClosedPool + } + + p.idleResources.Push(res) + + return nil +} + +// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would +// disrupt all resources (such as a network interruption or a server state change). +// +// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned +// to the pool. +func (p *Pool[T]) Reset() { + p.mux.Lock() + defer p.mux.Unlock() + + p.resetCount++ + + for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { + p.allResources.remove(res) + go p.destructResourceValue(res.value) + } +} + +// releaseAcquiredResource returns res to the the pool. +func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { + p.mux.Lock() + defer p.mux.Unlock() + defer p.acquireSem.Release(1) + + if p.closed || res.poolResetCount != p.resetCount { + p.allResources.remove(res) + go p.destructResourceValue(res.value) + } else { + res.lastUsedNano = lastUsedNano + res.status = resourceStatusIdle + p.idleResources.Push(res) + } +} + +// Remove removes res from the pool and closes it. If res is not part of the +// pool Remove will panic. +func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { + p.destructResourceValue(res.value) + + p.mux.Lock() + defer p.mux.Unlock() + defer p.acquireSem.Release(1) + + p.allResources.remove(res) +} + +func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { + p.mux.Lock() + defer p.mux.Unlock() + defer p.acquireSem.Release(1) + + p.allResources.remove(res) + res.status = resourceStatusHijacked + p.destructWG.Done() // not responsible for destructing hijacked resources +} + +func (p *Pool[T]) destructResourceValue(value T) { + p.destructor(value) + p.destructWG.Done() +} -- cgit v1.2.3-70-g09d2