aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/puddle/v2
diff options
context:
space:
mode:
authorGibheer <gibheer+git@zero-knowledge.org>2024-09-05 19:38:25 +0200
committerGibheer <gibheer+git@zero-knowledge.org>2024-09-05 19:38:25 +0200
commit6ea4d2c82de80efc87708e5e182034b7c6c2019e (patch)
tree35c0856a929040216c82153ca62d43b27530a887 /vendor/github.com/jackc/puddle/v2
parent6f64eeace1b66639b9380b44e88a8d54850a4306 (diff)
switch from github.com/lib/pq to github.com/jackc/pgx/v5HEAD20240905master
lib/pq is out of maintenance for some time now, so switch to the newer more active library. Looks like it finally stabilized after a long time.
Diffstat (limited to 'vendor/github.com/jackc/puddle/v2')
-rw-r--r--vendor/github.com/jackc/puddle/v2/CHANGELOG.md74
-rw-r--r--vendor/github.com/jackc/puddle/v2/LICENSE22
-rw-r--r--vendor/github.com/jackc/puddle/v2/README.md80
-rw-r--r--vendor/github.com/jackc/puddle/v2/context.go24
-rw-r--r--vendor/github.com/jackc/puddle/v2/doc.go11
-rw-r--r--vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go85
-rw-r--r--vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go39
-rw-r--r--vendor/github.com/jackc/puddle/v2/log.go32
-rw-r--r--vendor/github.com/jackc/puddle/v2/nanotime_time.go13
-rw-r--r--vendor/github.com/jackc/puddle/v2/nanotime_unsafe.go12
-rw-r--r--vendor/github.com/jackc/puddle/v2/pool.go696
-rw-r--r--vendor/github.com/jackc/puddle/v2/resource_list.go28
12 files changed, 1116 insertions, 0 deletions
diff --git a/vendor/github.com/jackc/puddle/v2/CHANGELOG.md b/vendor/github.com/jackc/puddle/v2/CHANGELOG.md
new file mode 100644
index 0000000..a15991c
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/CHANGELOG.md
@@ -0,0 +1,74 @@
+# 2.2.1 (July 15, 2023)
+
+* Fix: CreateResource cannot overflow pool. This changes documented behavior of CreateResource. Previously,
+ CreateResource could create a resource even if the pool was full. This could cause the pool to overflow. While this
+ was documented, it was documenting incorrect behavior. CreateResource now returns an error if the pool is full.
+
+# 2.2.0 (February 11, 2023)
+
+* Use Go 1.19 atomics and drop go.uber.org/atomic dependency
+
+# 2.1.2 (November 12, 2022)
+
+* Restore support to Go 1.18 via go.uber.org/atomic
+
+# 2.1.1 (November 11, 2022)
+
+* Fix create resource concurrently with Stat call race
+
+# 2.1.0 (October 28, 2022)
+
+* Concurrency control is now implemented with a semaphore. This simplifies some internal logic, resolves a few error conditions (including a deadlock), and improves performance. (Jan Dubsky)
+* Go 1.19 is now required for the improved atomic support.
+
+# 2.0.1 (October 28, 2022)
+
+* Fix race condition when Close is called concurrently with multiple constructors
+
+# 2.0.0 (September 17, 2022)
+
+* Use generics instead of interface{} (Столяров Владимир Алексеевич)
+* Add Reset
+* Do not cancel resource construction when Acquire is canceled
+* NewPool takes Config
+
+# 1.3.0 (August 27, 2022)
+
+* Acquire creates resources in background to allow creation to continue after Acquire is canceled (James Hartig)
+
+# 1.2.1 (December 2, 2021)
+
+* TryAcquire now does not block when background constructing resource
+
+# 1.2.0 (November 20, 2021)
+
+* Add TryAcquire (A. Jensen)
+* Fix: remove memory leak / unintentionally pinned memory when shrinking slices (Alexander Staubo)
+* Fix: Do not leave pool locked after panic from nil context
+
+# 1.1.4 (September 11, 2021)
+
+* Fix: Deadlock in CreateResource if pool was closed during resource acquisition (Dmitriy Matrenichev)
+
+# 1.1.3 (December 3, 2020)
+
+* Fix: Failed resource creation could cause concurrent Acquire to hang. (Evgeny Vanslov)
+
+# 1.1.2 (September 26, 2020)
+
+* Fix: Resource.Destroy no longer removes itself from the pool before its destructor has completed.
+* Fix: Prevent crash when pool is closed while resource is being created.
+
+# 1.1.1 (April 2, 2020)
+
+* Pool.Close can be safely called multiple times
+* AcquireAllIDle immediately returns nil if pool is closed
+* CreateResource checks if pool is closed before taking any action
+* Fix potential race condition when CreateResource and Close are called concurrently. CreateResource now checks if pool is closed before adding newly created resource to pool.
+
+# 1.1.0 (February 5, 2020)
+
+* Use runtime.nanotime for faster tracking of acquire time and last usage time.
+* Track resource idle time to enable client health check logic. (Patrick Ellul)
+* Add CreateResource to construct a new resource without acquiring it. (Patrick Ellul)
+* Fix deadlock race when acquire is cancelled. (Michael Tharp)
diff --git a/vendor/github.com/jackc/puddle/v2/LICENSE b/vendor/github.com/jackc/puddle/v2/LICENSE
new file mode 100644
index 0000000..bcc286c
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/LICENSE
@@ -0,0 +1,22 @@
+Copyright (c) 2018 Jack Christensen
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/jackc/puddle/v2/README.md b/vendor/github.com/jackc/puddle/v2/README.md
new file mode 100644
index 0000000..0ad07ec
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/README.md
@@ -0,0 +1,80 @@
+[![](https://godoc.org/github.com/jackc/puddle?status.svg)](https://godoc.org/github.com/jackc/puddle)
+![Build Status](https://github.com/jackc/puddle/actions/workflows/ci.yml/badge.svg)
+
+# Puddle
+
+Puddle is a tiny generic resource pool library for Go that uses the standard
+context library to signal cancellation of acquires. It is designed to contain
+the minimum functionality required for a resource pool. It can be used directly
+or it can be used as the base for a domain specific resource pool. For example,
+a database connection pool may use puddle internally and implement health checks
+and keep-alive behavior without needing to implement any concurrent code of its
+own.
+
+## Features
+
+* Acquire cancellation via context standard library
+* Statistics API for monitoring pool pressure
+* No dependencies outside of standard library and golang.org/x/sync
+* High performance
+* 100% test coverage of reachable code
+
+## Example Usage
+
+```go
+package main
+
+import (
+ "context"
+ "log"
+ "net"
+
+ "github.com/jackc/puddle/v2"
+)
+
+func main() {
+ constructor := func(context.Context) (net.Conn, error) {
+ return net.Dial("tcp", "127.0.0.1:8080")
+ }
+ destructor := func(value net.Conn) {
+ value.Close()
+ }
+ maxPoolSize := int32(10)
+
+ pool, err := puddle.NewPool(&puddle.Config[net.Conn]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Acquire resource from the pool.
+ res, err := pool.Acquire(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Use resource.
+ _, err = res.Value().Write([]byte{1})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Release when done.
+ res.Release()
+}
+```
+
+## Status
+
+Puddle is stable and feature complete.
+
+* Bug reports and fixes are welcome.
+* New features will usually not be accepted if they can be feasibly implemented in a wrapper.
+* Performance optimizations will usually not be accepted unless the performance issue rises to the level of a bug.
+
+## Supported Go Versions
+
+puddle supports the same versions of Go that are supported by the Go project. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases. This means puddle supports Go 1.19 and higher.
+
+## License
+
+MIT
diff --git a/vendor/github.com/jackc/puddle/v2/context.go b/vendor/github.com/jackc/puddle/v2/context.go
new file mode 100644
index 0000000..e19d2a6
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/context.go
@@ -0,0 +1,24 @@
+package puddle
+
+import (
+ "context"
+ "time"
+)
+
+// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation.
+type valueCancelCtx struct {
+ valueCtx context.Context
+ cancelCtx context.Context
+}
+
+func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() }
+func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() }
+func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() }
+func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) }
+
+func newValueCancelCtx(valueCtx, cancelContext context.Context) context.Context {
+ return &valueCancelCtx{
+ valueCtx: valueCtx,
+ cancelCtx: cancelContext,
+ }
+}
diff --git a/vendor/github.com/jackc/puddle/v2/doc.go b/vendor/github.com/jackc/puddle/v2/doc.go
new file mode 100644
index 0000000..818e4a6
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/doc.go
@@ -0,0 +1,11 @@
+// Package puddle is a generic resource pool with type-parametrized api.
+/*
+
+Puddle is a tiny generic resource pool library for Go that uses the standard
+context library to signal cancellation of acquires. It is designed to contain
+the minimum functionality a resource pool needs that cannot be implemented
+without concurrency concerns. For example, a database connection pool may use
+puddle internally and implement health checks and keep-alive behavior without
+needing to implement any concurrent code of its own.
+*/
+package puddle
diff --git a/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go b/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go
new file mode 100644
index 0000000..7e4660c
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go
@@ -0,0 +1,85 @@
+package genstack
+
+// GenStack implements a generational stack.
+//
+// GenStack works as common stack except for the fact that all elements in the
+// older generation are guaranteed to be popped before any element in the newer
+// generation. New elements are always pushed to the current (newest)
+// generation.
+//
+// We could also say that GenStack behaves as a stack in case of a single
+// generation, but it behaves as a queue of individual generation stacks.
+type GenStack[T any] struct {
+ // We can represent arbitrary number of generations using 2 stacks. The
+ // new stack stores all new pushes and the old stack serves all reads.
+ // Old stack can represent multiple generations. If old == new, then all
+ // elements pushed in previous (not current) generations have already
+ // been popped.
+
+ old *stack[T]
+ new *stack[T]
+}
+
+// NewGenStack creates a new empty GenStack.
+func NewGenStack[T any]() *GenStack[T] {
+ s := &stack[T]{}
+ return &GenStack[T]{
+ old: s,
+ new: s,
+ }
+}
+
+func (s *GenStack[T]) Pop() (T, bool) {
+ // Pushes always append to the new stack, so if the old once becomes
+ // empty, it will remail empty forever.
+ if s.old.len() == 0 && s.old != s.new {
+ s.old = s.new
+ }
+
+ if s.old.len() == 0 {
+ var zero T
+ return zero, false
+ }
+
+ return s.old.pop(), true
+}
+
+// Push pushes a new element at the top of the stack.
+func (s *GenStack[T]) Push(v T) { s.new.push(v) }
+
+// NextGen starts a new stack generation.
+func (s *GenStack[T]) NextGen() {
+ if s.old == s.new {
+ s.new = &stack[T]{}
+ return
+ }
+
+ // We need to pop from the old stack to the top of the new stack. Let's
+ // have an example:
+ //
+ // Old: <bottom> 4 3 2 1
+ // New: <bottom> 8 7 6 5
+ // PopOrder: 1 2 3 4 5 6 7 8
+ //
+ //
+ // To preserve pop order, we have to take all elements from the old
+ // stack and push them to the top of new stack:
+ //
+ // New: 8 7 6 5 4 3 2 1
+ //
+ s.new.push(s.old.takeAll()...)
+
+ // We have the old stack allocated and empty, so why not to reuse it as
+ // new new stack.
+ s.old, s.new = s.new, s.old
+}
+
+// Len returns number of elements in the stack.
+func (s *GenStack[T]) Len() int {
+ l := s.old.len()
+ if s.old != s.new {
+ l += s.new.len()
+ }
+
+ return l
+}
diff --git a/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go b/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go
new file mode 100644
index 0000000..dbced0c
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go
@@ -0,0 +1,39 @@
+package genstack
+
+// stack is a wrapper around an array implementing a stack.
+//
+// We cannot use slice to represent the stack because append might change the
+// pointer value of the slice. That would be an issue in GenStack
+// implementation.
+type stack[T any] struct {
+ arr []T
+}
+
+// push pushes a new element at the top of a stack.
+func (s *stack[T]) push(vs ...T) { s.arr = append(s.arr, vs...) }
+
+// pop pops the stack top-most element.
+//
+// If stack length is zero, this method panics.
+func (s *stack[T]) pop() T {
+ idx := s.len() - 1
+ val := s.arr[idx]
+
+ // Avoid memory leak
+ var zero T
+ s.arr[idx] = zero
+
+ s.arr = s.arr[:idx]
+ return val
+}
+
+// takeAll returns all elements in the stack in order as they are stored - i.e.
+// the top-most stack element is the last one.
+func (s *stack[T]) takeAll() []T {
+ arr := s.arr
+ s.arr = nil
+ return arr
+}
+
+// len returns number of elements in the stack.
+func (s *stack[T]) len() int { return len(s.arr) }
diff --git a/vendor/github.com/jackc/puddle/v2/log.go b/vendor/github.com/jackc/puddle/v2/log.go
new file mode 100644
index 0000000..b21b946
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/log.go
@@ -0,0 +1,32 @@
+package puddle
+
+import "unsafe"
+
+type ints interface {
+ int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64
+}
+
+// log2Int returns log2 of an integer. This function panics if val < 0. For val
+// == 0, returns 0.
+func log2Int[T ints](val T) uint8 {
+ if val <= 0 {
+ panic("log2 of non-positive number does not exist")
+ }
+
+ return log2IntRange(val, 0, uint8(8*unsafe.Sizeof(val)))
+}
+
+func log2IntRange[T ints](val T, begin, end uint8) uint8 {
+ length := end - begin
+ if length == 1 {
+ return begin
+ }
+
+ delim := begin + length/2
+ mask := T(1) << delim
+ if mask > val {
+ return log2IntRange(val, begin, delim)
+ } else {
+ return log2IntRange(val, delim, end)
+ }
+}
diff --git a/vendor/github.com/jackc/puddle/v2/nanotime_time.go b/vendor/github.com/jackc/puddle/v2/nanotime_time.go
new file mode 100644
index 0000000..f8e7593
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/nanotime_time.go
@@ -0,0 +1,13 @@
+//go:build purego || appengine || js
+
+// This file contains the safe implementation of nanotime using time.Now().
+
+package puddle
+
+import (
+ "time"
+)
+
+func nanotime() int64 {
+ return time.Now().UnixNano()
+}
diff --git a/vendor/github.com/jackc/puddle/v2/nanotime_unsafe.go b/vendor/github.com/jackc/puddle/v2/nanotime_unsafe.go
new file mode 100644
index 0000000..fc3b8a2
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/nanotime_unsafe.go
@@ -0,0 +1,12 @@
+//go:build !purego && !appengine && !js
+
+// This file contains the implementation of nanotime using runtime.nanotime.
+
+package puddle
+
+import "unsafe"
+
+var _ = unsafe.Sizeof(0)
+
+//go:linkname nanotime runtime.nanotime
+func nanotime() int64
diff --git a/vendor/github.com/jackc/puddle/v2/pool.go b/vendor/github.com/jackc/puddle/v2/pool.go
new file mode 100644
index 0000000..c8edc0f
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/pool.go
@@ -0,0 +1,696 @@
+package puddle
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/jackc/puddle/v2/internal/genstack"
+ "golang.org/x/sync/semaphore"
+)
+
+const (
+ resourceStatusConstructing = 0
+ resourceStatusIdle = iota
+ resourceStatusAcquired = iota
+ resourceStatusHijacked = iota
+)
+
+// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
+// or a pool that is closed while the acquire is waiting.
+var ErrClosedPool = errors.New("closed pool")
+
+// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
+// that is at maximum capacity and has no available resources.
+var ErrNotAvailable = errors.New("resource not available")
+
+// Constructor is a function called by the pool to construct a resource.
+type Constructor[T any] func(ctx context.Context) (res T, err error)
+
+// Destructor is a function called by the pool to destroy a resource.
+type Destructor[T any] func(res T)
+
+// Resource is the resource handle returned by acquiring from the pool.
+type Resource[T any] struct {
+ value T
+ pool *Pool[T]
+ creationTime time.Time
+ lastUsedNano int64
+ poolResetCount int
+ status byte
+}
+
+// Value returns the resource value.
+func (res *Resource[T]) Value() T {
+ if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
+ panic("tried to access resource that is not acquired or hijacked")
+ }
+ return res.value
+}
+
+// Release returns the resource to the pool. res must not be subsequently used.
+func (res *Resource[T]) Release() {
+ if res.status != resourceStatusAcquired {
+ panic("tried to release resource that is not acquired")
+ }
+ res.pool.releaseAcquiredResource(res, nanotime())
+}
+
+// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
+// will not change. res must not be subsequently used.
+func (res *Resource[T]) ReleaseUnused() {
+ if res.status != resourceStatusAcquired {
+ panic("tried to release resource that is not acquired")
+ }
+ res.pool.releaseAcquiredResource(res, res.lastUsedNano)
+}
+
+// Destroy returns the resource to the pool for destruction. res must not be
+// subsequently used.
+func (res *Resource[T]) Destroy() {
+ if res.status != resourceStatusAcquired {
+ panic("tried to destroy resource that is not acquired")
+ }
+ go res.pool.destroyAcquiredResource(res)
+}
+
+// Hijack assumes ownership of the resource from the pool. Caller is responsible
+// for cleanup of resource value.
+func (res *Resource[T]) Hijack() {
+ if res.status != resourceStatusAcquired {
+ panic("tried to hijack resource that is not acquired")
+ }
+ res.pool.hijackAcquiredResource(res)
+}
+
+// CreationTime returns when the resource was created by the pool.
+func (res *Resource[T]) CreationTime() time.Time {
+ if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
+ panic("tried to access resource that is not acquired or hijacked")
+ }
+ return res.creationTime
+}
+
+// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
+// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
+// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
+func (res *Resource[T]) LastUsedNanotime() int64 {
+ if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
+ panic("tried to access resource that is not acquired or hijacked")
+ }
+
+ return res.lastUsedNano
+}
+
+// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
+// LastUsedNanotime to the current nanotime.
+func (res *Resource[T]) IdleDuration() time.Duration {
+ if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
+ panic("tried to access resource that is not acquired or hijacked")
+ }
+
+ return time.Duration(nanotime() - res.lastUsedNano)
+}
+
+// Pool is a concurrency-safe resource pool.
+type Pool[T any] struct {
+ // mux is the pool internal lock. Any modification of shared state of
+ // the pool (but Acquires of acquireSem) must be performed only by
+ // holder of the lock. Long running operations are not allowed when mux
+ // is held.
+ mux sync.Mutex
+ // acquireSem provides an allowance to acquire a resource.
+ //
+ // Releases are allowed only when caller holds mux. Acquires have to
+ // happen before mux is locked (doesn't apply to semaphore.TryAcquire in
+ // AcquireAllIdle).
+ acquireSem *semaphore.Weighted
+ destructWG sync.WaitGroup
+
+ allResources resList[T]
+ idleResources *genstack.GenStack[*Resource[T]]
+
+ constructor Constructor[T]
+ destructor Destructor[T]
+ maxSize int32
+
+ acquireCount int64
+ acquireDuration time.Duration
+ emptyAcquireCount int64
+ canceledAcquireCount atomic.Int64
+
+ resetCount int
+
+ baseAcquireCtx context.Context
+ cancelBaseAcquireCtx context.CancelFunc
+ closed bool
+}
+
+type Config[T any] struct {
+ Constructor Constructor[T]
+ Destructor Destructor[T]
+ MaxSize int32
+}
+
+// NewPool creates a new pool. Panics if maxSize is less than 1.
+func NewPool[T any](config *Config[T]) (*Pool[T], error) {
+ if config.MaxSize < 1 {
+ return nil, errors.New("MaxSize must be >= 1")
+ }
+
+ baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
+
+ return &Pool[T]{
+ acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
+ idleResources: genstack.NewGenStack[*Resource[T]](),
+ maxSize: config.MaxSize,
+ constructor: config.Constructor,
+ destructor: config.Destructor,
+ baseAcquireCtx: baseAcquireCtx,
+ cancelBaseAcquireCtx: cancelBaseAcquireCtx,
+ }, nil
+}
+
+// Close destroys all resources in the pool and rejects future Acquire calls.
+// Blocks until all resources are returned to pool and destroyed.
+func (p *Pool[T]) Close() {
+ defer p.destructWG.Wait()
+
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ if p.closed {
+ return
+ }
+ p.closed = true
+ p.cancelBaseAcquireCtx()
+
+ for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
+ p.allResources.remove(res)
+ go p.destructResourceValue(res.value)
+ }
+}
+
+// Stat is a snapshot of Pool statistics.
+type Stat struct {
+ constructingResources int32
+ acquiredResources int32
+ idleResources int32
+ maxResources int32
+ acquireCount int64
+ acquireDuration time.Duration
+ emptyAcquireCount int64
+ canceledAcquireCount int64
+}
+
+// TotalResources returns the total number of resources currently in the pool.
+// The value is the sum of ConstructingResources, AcquiredResources, and
+// IdleResources.
+func (s *Stat) TotalResources() int32 {
+ return s.constructingResources + s.acquiredResources + s.idleResources
+}
+
+// ConstructingResources returns the number of resources with construction in progress in
+// the pool.
+func (s *Stat) ConstructingResources() int32 {
+ return s.constructingResources
+}
+
+// AcquiredResources returns the number of currently acquired resources in the pool.
+func (s *Stat) AcquiredResources() int32 {
+ return s.acquiredResources
+}
+
+// IdleResources returns the number of currently idle resources in the pool.
+func (s *Stat) IdleResources() int32 {
+ return s.idleResources
+}
+
+// MaxResources returns the maximum size of the pool.
+func (s *Stat) MaxResources() int32 {
+ return s.maxResources
+}
+
+// AcquireCount returns the cumulative count of successful acquires from the pool.
+func (s *Stat) AcquireCount() int64 {
+ return s.acquireCount
+}
+
+// AcquireDuration returns the total duration of all successful acquires from
+// the pool.
+func (s *Stat) AcquireDuration() time.Duration {
+ return s.acquireDuration
+}
+
+// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
+// that waited for a resource to be released or constructed because the pool was
+// empty.
+func (s *Stat) EmptyAcquireCount() int64 {
+ return s.emptyAcquireCount
+}
+
+// CanceledAcquireCount returns the cumulative count of acquires from the pool
+// that were canceled by a context.
+func (s *Stat) CanceledAcquireCount() int64 {
+ return s.canceledAcquireCount
+}
+
+// Stat returns the current pool statistics.
+func (p *Pool[T]) Stat() *Stat {
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ s := &Stat{
+ maxResources: p.maxSize,
+ acquireCount: p.acquireCount,
+ emptyAcquireCount: p.emptyAcquireCount,
+ canceledAcquireCount: p.canceledAcquireCount.Load(),
+ acquireDuration: p.acquireDuration,
+ }
+
+ for _, res := range p.allResources {
+ switch res.status {
+ case resourceStatusConstructing:
+ s.constructingResources += 1
+ case resourceStatusIdle:
+ s.idleResources += 1
+ case resourceStatusAcquired:
+ s.acquiredResources += 1
+ }
+ }
+
+ return s
+}
+
+// tryAcquireIdleResource checks if there is any idle resource. If there is
+// some, this method removes it from idle list and returns it. If the idle pool
+// is empty, this method returns nil and doesn't modify the idleResources slice.
+//
+// WARNING: Caller of this method must hold the pool mutex!
+func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
+ res, ok := p.idleResources.Pop()
+ if !ok {
+ return nil
+ }
+
+ res.status = resourceStatusAcquired
+ return res
+}
+
+// createNewResource creates a new resource and inserts it into list of pool
+// resources.
+//
+// WARNING: Caller of this method must hold the pool mutex!
+func (p *Pool[T]) createNewResource() *Resource[T] {
+ res := &Resource[T]{
+ pool: p,
+ creationTime: time.Now(),
+ lastUsedNano: nanotime(),
+ poolResetCount: p.resetCount,
+ status: resourceStatusConstructing,
+ }
+
+ p.allResources.append(res)
+ p.destructWG.Add(1)
+
+ return res
+}
+
+// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
+// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
+// used to cancel the Acquire.
+//
+// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
+// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
+// the problem of it being impossible to create resources when the time to create a resource is greater than any one
+// caller of Acquire is willing to wait.
+func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
+ select {
+ case <-ctx.Done():
+ p.canceledAcquireCount.Add(1)
+ return nil, ctx.Err()
+ default:
+ }
+
+ return p.acquire(ctx)
+}
+
+// acquire is a continuation of Acquire function that doesn't check context
+// validity.
+//
+// This function exists solely only for benchmarking purposes.
+func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
+ startNano := nanotime()
+
+ var waitedForLock bool
+ if !p.acquireSem.TryAcquire(1) {
+ waitedForLock = true
+ err := p.acquireSem.Acquire(ctx, 1)
+ if err != nil {
+ p.canceledAcquireCount.Add(1)
+ return nil, err
+ }
+ }
+
+ p.mux.Lock()
+ if p.closed {
+ p.acquireSem.Release(1)
+ p.mux.Unlock()
+ return nil, ErrClosedPool
+ }
+
+ // If a resource is available in the pool.
+ if res := p.tryAcquireIdleResource(); res != nil {
+ if waitedForLock {
+ p.emptyAcquireCount += 1
+ }
+ p.acquireCount += 1
+ p.acquireDuration += time.Duration(nanotime() - startNano)
+ p.mux.Unlock()
+ return res, nil
+ }
+
+ if len(p.allResources) >= int(p.maxSize) {
+ // Unreachable code.
+ panic("bug: semaphore allowed more acquires than pool allows")
+ }
+
+ // The resource is not idle, but there is enough space to create one.
+ res := p.createNewResource()
+ p.mux.Unlock()
+
+ res, err := p.initResourceValue(ctx, res)
+ if err != nil {
+ return nil, err
+ }
+
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ p.emptyAcquireCount += 1
+ p.acquireCount += 1
+ p.acquireDuration += time.Duration(nanotime() - startNano)
+
+ return res, nil
+}
+
+func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
+ // Create the resource in a goroutine to immediately return from Acquire
+ // if ctx is canceled without also canceling the constructor.
+ //
+ // See:
+ // - https://github.com/jackc/pgx/issues/1287
+ // - https://github.com/jackc/pgx/issues/1259
+ constructErrChan := make(chan error)
+ go func() {
+ constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
+ value, err := p.constructor(constructorCtx)
+ if err != nil {
+ p.mux.Lock()
+ p.allResources.remove(res)
+ p.destructWG.Done()
+
+ // The resource won't be acquired because its
+ // construction failed. We have to allow someone else to
+ // take that resouce.
+ p.acquireSem.Release(1)
+ p.mux.Unlock()
+
+ select {
+ case constructErrChan <- err:
+ case <-ctx.Done():
+ // The caller is cancelled, so no-one awaits the
+ // error. This branch avoid goroutine leak.
+ }
+ return
+ }
+
+ // The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its
+ // status.
+ p.mux.Lock()
+ res.value = value
+ res.status = resourceStatusAcquired
+ p.mux.Unlock()
+
+ // This select works because the channel is unbuffered.
+ select {
+ case constructErrChan <- nil:
+ case <-ctx.Done():
+ p.releaseAcquiredResource(res, res.lastUsedNano)
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ p.canceledAcquireCount.Add(1)
+ return nil, ctx.Err()
+ case err := <-constructErrChan:
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+ }
+}
+
+// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
+// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
+// used to cancel the background creation.
+func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
+ if !p.acquireSem.TryAcquire(1) {
+ return nil, ErrNotAvailable
+ }
+
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ if p.closed {
+ p.acquireSem.Release(1)
+ return nil, ErrClosedPool
+ }
+
+ // If a resource is available now
+ if res := p.tryAcquireIdleResource(); res != nil {
+ p.acquireCount += 1
+ return res, nil
+ }
+
+ if len(p.allResources) >= int(p.maxSize) {
+ // Unreachable code.
+ panic("bug: semaphore allowed more acquires than pool allows")
+ }
+
+ res := p.createNewResource()
+ go func() {
+ value, err := p.constructor(ctx)
+
+ p.mux.Lock()
+ defer p.mux.Unlock()
+ // We have to create the resource and only then release the
+ // semaphore - For the time being there is no resource that
+ // someone could acquire.
+ defer p.acquireSem.Release(1)
+
+ if err != nil {
+ p.allResources.remove(res)
+ p.destructWG.Done()
+ return
+ }
+
+ res.value = value
+ res.status = resourceStatusIdle
+ p.idleResources.Push(res)
+ }()
+
+ return nil, ErrNotAvailable
+}
+
+// acquireSemAll tries to acquire num free tokens from sem. This function is
+// guaranteed to acquire at least the lowest number of tokens that has been
+// available in the semaphore during runtime of this function.
+//
+// For the time being, semaphore doesn't allow to acquire all tokens atomically
+// (see https://github.com/golang/sync/pull/19). We simulate this by trying all
+// powers of 2 that are less or equal to num.
+//
+// For example, let's immagine we have 19 free tokens in the semaphore which in
+// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
+// num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
+// tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
+//
+// Naturally, Acquires and Releases of the semaphore might take place
+// concurrently. For this reason, it's not guaranteed that absolutely all free
+// tokens in the semaphore will be acquired. But it's guaranteed that at least
+// the minimal number of tokens that has been present over the whole process
+// will be acquired. This is sufficient for the use-case we have in this
+// package.
+//
+// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
+// upstream. https://github.com/golang/sync/pull/19
+func acquireSemAll(sem *semaphore.Weighted, num int) int {
+ if sem.TryAcquire(int64(num)) {
+ return num
+ }
+
+ var acquired int
+ for i := int(log2Int(num)); i >= 0; i-- {
+ val := 1 << i
+ if sem.TryAcquire(int64(val)) {
+ acquired += val
+ }
+ }
+
+ return acquired
+}
+
+// AcquireAllIdle acquires all currently idle resources. Its intended use is for
+// health check and keep-alive functionality. It does not update pool
+// statistics.
+func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ if p.closed {
+ return nil
+ }
+
+ numIdle := p.idleResources.Len()
+ if numIdle == 0 {
+ return nil
+ }
+
+ // In acquireSemAll we use only TryAcquire and not Acquire. Because
+ // TryAcquire cannot block, the fact that we hold mutex locked and try
+ // to acquire semaphore cannot result in dead-lock.
+ //
+ // Because the mutex is locked, no parallel Release can run. This
+ // implies that the number of tokens can only decrease because some
+ // Acquire/TryAcquire call can consume the semaphore token. Consequently
+ // acquired is always less or equal to numIdle. Moreover if acquired <
+ // numIdle, then there are some parallel Acquire/TryAcquire calls that
+ // will take the remaining idle connections.
+ acquired := acquireSemAll(p.acquireSem, numIdle)
+
+ idle := make([]*Resource[T], acquired)
+ for i := range idle {
+ res, _ := p.idleResources.Pop()
+ res.status = resourceStatusAcquired
+ idle[i] = res
+ }
+
+ // We have to bump the generation to ensure that Acquire/TryAcquire
+ // calls running in parallel (those which caused acquired < numIdle)
+ // will consume old connections and not freshly released connections
+ // instead.
+ p.idleResources.NextGen()
+
+ return idle
+}
+
+// CreateResource constructs a new resource without acquiring it. It goes straight in the IdlePool. If the pool is full
+// it returns an error. It can be useful to maintain warm resources under little load.
+func (p *Pool[T]) CreateResource(ctx context.Context) error {
+ if !p.acquireSem.TryAcquire(1) {
+ return ErrNotAvailable
+ }
+
+ p.mux.Lock()
+ if p.closed {
+ p.acquireSem.Release(1)
+ p.mux.Unlock()
+ return ErrClosedPool
+ }
+
+ if len(p.allResources) >= int(p.maxSize) {
+ p.acquireSem.Release(1)
+ p.mux.Unlock()
+ return ErrNotAvailable
+ }
+
+ res := p.createNewResource()
+ p.mux.Unlock()
+
+ value, err := p.constructor(ctx)
+ p.mux.Lock()
+ defer p.mux.Unlock()
+ defer p.acquireSem.Release(1)
+ if err != nil {
+ p.allResources.remove(res)
+ p.destructWG.Done()
+ return err
+ }
+
+ res.value = value
+ res.status = resourceStatusIdle
+
+ // If closed while constructing resource then destroy it and return an error
+ if p.closed {
+ go p.destructResourceValue(res.value)
+ return ErrClosedPool
+ }
+
+ p.idleResources.Push(res)
+
+ return nil
+}
+
+// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
+// disrupt all resources (such as a network interruption or a server state change).
+//
+// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
+// to the pool.
+func (p *Pool[T]) Reset() {
+ p.mux.Lock()
+ defer p.mux.Unlock()
+
+ p.resetCount++
+
+ for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
+ p.allResources.remove(res)
+ go p.destructResourceValue(res.value)
+ }
+}
+
+// releaseAcquiredResource returns res to the the pool.
+func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
+ p.mux.Lock()
+ defer p.mux.Unlock()
+ defer p.acquireSem.Release(1)
+
+ if p.closed || res.poolResetCount != p.resetCount {
+ p.allResources.remove(res)
+ go p.destructResourceValue(res.value)
+ } else {
+ res.lastUsedNano = lastUsedNano
+ res.status = resourceStatusIdle
+ p.idleResources.Push(res)
+ }
+}
+
+// Remove removes res from the pool and closes it. If res is not part of the
+// pool Remove will panic.
+func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
+ p.destructResourceValue(res.value)
+
+ p.mux.Lock()
+ defer p.mux.Unlock()
+ defer p.acquireSem.Release(1)
+
+ p.allResources.remove(res)
+}
+
+func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
+ p.mux.Lock()
+ defer p.mux.Unlock()
+ defer p.acquireSem.Release(1)
+
+ p.allResources.remove(res)
+ res.status = resourceStatusHijacked
+ p.destructWG.Done() // not responsible for destructing hijacked resources
+}
+
+func (p *Pool[T]) destructResourceValue(value T) {
+ p.destructor(value)
+ p.destructWG.Done()
+}
diff --git a/vendor/github.com/jackc/puddle/v2/resource_list.go b/vendor/github.com/jackc/puddle/v2/resource_list.go
new file mode 100644
index 0000000..b243095
--- /dev/null
+++ b/vendor/github.com/jackc/puddle/v2/resource_list.go
@@ -0,0 +1,28 @@
+package puddle
+
+type resList[T any] []*Resource[T]
+
+func (l *resList[T]) append(val *Resource[T]) { *l = append(*l, val) }
+
+func (l *resList[T]) popBack() *Resource[T] {
+ idx := len(*l) - 1
+ val := (*l)[idx]
+ (*l)[idx] = nil // Avoid memory leak
+ *l = (*l)[:idx]
+
+ return val
+}
+
+func (l *resList[T]) remove(val *Resource[T]) {
+ for i, elem := range *l {
+ if elem == val {
+ lastIdx := len(*l) - 1
+ (*l)[i] = (*l)[lastIdx]
+ (*l)[lastIdx] = nil // Avoid memory leak
+ (*l) = (*l)[:lastIdx]
+ return
+ }
+ }
+
+ panic("BUG: removeResource could not find res in slice")
+}