aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgxpool
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgxpool')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/batch_results.go52
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/conn.go134
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/doc.go27
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/pool.go717
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/rows.go116
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/stat.go84
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/tracer.go33
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/tx.go82
8 files changed, 1245 insertions, 0 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/batch_results.go b/vendor/github.com/jackc/pgx/v5/pgxpool/batch_results.go
new file mode 100644
index 0000000..5d5c681
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/batch_results.go
@@ -0,0 +1,52 @@
+package pgxpool
+
+import (
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+type errBatchResults struct {
+ err error
+}
+
+func (br errBatchResults) Exec() (pgconn.CommandTag, error) {
+ return pgconn.CommandTag{}, br.err
+}
+
+func (br errBatchResults) Query() (pgx.Rows, error) {
+ return errRows{err: br.err}, br.err
+}
+
+func (br errBatchResults) QueryRow() pgx.Row {
+ return errRow{err: br.err}
+}
+
+func (br errBatchResults) Close() error {
+ return br.err
+}
+
+type poolBatchResults struct {
+ br pgx.BatchResults
+ c *Conn
+}
+
+func (br *poolBatchResults) Exec() (pgconn.CommandTag, error) {
+ return br.br.Exec()
+}
+
+func (br *poolBatchResults) Query() (pgx.Rows, error) {
+ return br.br.Query()
+}
+
+func (br *poolBatchResults) QueryRow() pgx.Row {
+ return br.br.QueryRow()
+}
+
+func (br *poolBatchResults) Close() error {
+ err := br.br.Close()
+ if br.c != nil {
+ br.c.Release()
+ br.c = nil
+ }
+ return err
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/conn.go b/vendor/github.com/jackc/pgx/v5/pgxpool/conn.go
new file mode 100644
index 0000000..38c90f3
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/conn.go
@@ -0,0 +1,134 @@
+package pgxpool
+
+import (
+ "context"
+ "sync/atomic"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+ "github.com/jackc/puddle/v2"
+)
+
+// Conn is an acquired *pgx.Conn from a Pool.
+type Conn struct {
+ res *puddle.Resource[*connResource]
+ p *Pool
+}
+
+// Release returns c to the pool it was acquired from. Once Release has been called, other methods must not be called.
+// However, it is safe to call Release multiple times. Subsequent calls after the first will be ignored.
+func (c *Conn) Release() {
+ if c.res == nil {
+ return
+ }
+
+ conn := c.Conn()
+ res := c.res
+ c.res = nil
+
+ if c.p.releaseTracer != nil {
+ c.p.releaseTracer.TraceRelease(c.p, TraceReleaseData{Conn: conn})
+ }
+
+ if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
+ res.Destroy()
+ // Signal to the health check to run since we just destroyed a connections
+ // and we might be below minConns now
+ c.p.triggerHealthCheck()
+ return
+ }
+
+ // If the pool is consistently being used, we might never get to check the
+ // lifetime of a connection since we only check idle connections in checkConnsHealth
+ // so we also check the lifetime here and force a health check
+ if c.p.isExpired(res) {
+ atomic.AddInt64(&c.p.lifetimeDestroyCount, 1)
+ res.Destroy()
+ // Signal to the health check to run since we just destroyed a connections
+ // and we might be below minConns now
+ c.p.triggerHealthCheck()
+ return
+ }
+
+ if c.p.afterRelease == nil {
+ res.Release()
+ return
+ }
+
+ go func() {
+ if c.p.afterRelease(conn) {
+ res.Release()
+ } else {
+ res.Destroy()
+ // Signal to the health check to run since we just destroyed a connections
+ // and we might be below minConns now
+ c.p.triggerHealthCheck()
+ }
+ }()
+}
+
+// Hijack assumes ownership of the connection from the pool. Caller is responsible for closing the connection. Hijack
+// will panic if called on an already released or hijacked connection.
+func (c *Conn) Hijack() *pgx.Conn {
+ if c.res == nil {
+ panic("cannot hijack already released or hijacked connection")
+ }
+
+ conn := c.Conn()
+ res := c.res
+ c.res = nil
+
+ res.Hijack()
+
+ return conn
+}
+
+func (c *Conn) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
+ return c.Conn().Exec(ctx, sql, arguments...)
+}
+
+func (c *Conn) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
+ return c.Conn().Query(ctx, sql, args...)
+}
+
+func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
+ return c.Conn().QueryRow(ctx, sql, args...)
+}
+
+func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
+ return c.Conn().SendBatch(ctx, b)
+}
+
+func (c *Conn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
+ return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
+}
+
+// Begin starts a transaction block from the *Conn without explicitly setting a transaction mode (see BeginTx with TxOptions if transaction mode is required).
+func (c *Conn) Begin(ctx context.Context) (pgx.Tx, error) {
+ return c.Conn().Begin(ctx)
+}
+
+// BeginTx starts a transaction block from the *Conn with txOptions determining the transaction mode.
+func (c *Conn) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
+ return c.Conn().BeginTx(ctx, txOptions)
+}
+
+func (c *Conn) Ping(ctx context.Context) error {
+ return c.Conn().Ping(ctx)
+}
+
+func (c *Conn) Conn() *pgx.Conn {
+ return c.connResource().conn
+}
+
+func (c *Conn) connResource() *connResource {
+ return c.res.Value()
+}
+
+func (c *Conn) getPoolRow(r pgx.Row) *poolRow {
+ return c.connResource().getPoolRow(c, r)
+}
+
+func (c *Conn) getPoolRows(r pgx.Rows) *poolRows {
+ return c.connResource().getPoolRows(c, r)
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/doc.go b/vendor/github.com/jackc/pgx/v5/pgxpool/doc.go
new file mode 100644
index 0000000..099443b
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/doc.go
@@ -0,0 +1,27 @@
+// Package pgxpool is a concurrency-safe connection pool for pgx.
+/*
+pgxpool implements a nearly identical interface to pgx connections.
+
+Creating a Pool
+
+The primary way of creating a pool is with [pgxpool.New]:
+
+ pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
+
+The database connection string can be in URL or keyword/value format. PostgreSQL settings, pgx settings, and pool settings can be
+specified here. In addition, a config struct can be created by [ParseConfig].
+
+ config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
+ if err != nil {
+ // ...
+ }
+ config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
+ // do something with every new connection
+ }
+
+ pool, err := pgxpool.NewWithConfig(context.Background(), config)
+
+A pool returns without waiting for any connections to be established. Acquire a connection immediately after creating
+the pool to check if a connection can successfully be established.
+*/
+package pgxpool
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
new file mode 100644
index 0000000..fdcba72
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
@@ -0,0 +1,717 @@
+package pgxpool
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "runtime"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+ "github.com/jackc/puddle/v2"
+)
+
+var defaultMaxConns = int32(4)
+var defaultMinConns = int32(0)
+var defaultMaxConnLifetime = time.Hour
+var defaultMaxConnIdleTime = time.Minute * 30
+var defaultHealthCheckPeriod = time.Minute
+
+type connResource struct {
+ conn *pgx.Conn
+ conns []Conn
+ poolRows []poolRow
+ poolRowss []poolRows
+ maxAgeTime time.Time
+}
+
+func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Conn {
+ if len(cr.conns) == 0 {
+ cr.conns = make([]Conn, 128)
+ }
+
+ c := &cr.conns[len(cr.conns)-1]
+ cr.conns = cr.conns[0 : len(cr.conns)-1]
+
+ c.res = res
+ c.p = p
+
+ return c
+}
+
+func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
+ if len(cr.poolRows) == 0 {
+ cr.poolRows = make([]poolRow, 128)
+ }
+
+ pr := &cr.poolRows[len(cr.poolRows)-1]
+ cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
+
+ pr.c = c
+ pr.r = r
+
+ return pr
+}
+
+func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
+ if len(cr.poolRowss) == 0 {
+ cr.poolRowss = make([]poolRows, 128)
+ }
+
+ pr := &cr.poolRowss[len(cr.poolRowss)-1]
+ cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
+
+ pr.c = c
+ pr.r = r
+
+ return pr
+}
+
+// Pool allows for connection reuse.
+type Pool struct {
+ // 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
+ // architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288.
+ newConnsCount int64
+ lifetimeDestroyCount int64
+ idleDestroyCount int64
+
+ p *puddle.Pool[*connResource]
+ config *Config
+ beforeConnect func(context.Context, *pgx.ConnConfig) error
+ afterConnect func(context.Context, *pgx.Conn) error
+ beforeAcquire func(context.Context, *pgx.Conn) bool
+ afterRelease func(*pgx.Conn) bool
+ beforeClose func(*pgx.Conn)
+ minConns int32
+ maxConns int32
+ maxConnLifetime time.Duration
+ maxConnLifetimeJitter time.Duration
+ maxConnIdleTime time.Duration
+ healthCheckPeriod time.Duration
+
+ healthCheckChan chan struct{}
+
+ acquireTracer AcquireTracer
+ releaseTracer ReleaseTracer
+
+ closeOnce sync.Once
+ closeChan chan struct{}
+}
+
+// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
+// modified.
+type Config struct {
+ ConnConfig *pgx.ConnConfig
+
+ // BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and
+ // will not impact any existing open connections.
+ BeforeConnect func(context.Context, *pgx.ConnConfig) error
+
+ // AfterConnect is called after a connection is established, but before it is added to the pool.
+ AfterConnect func(context.Context, *pgx.Conn) error
+
+ // BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
+ // acquisition or false to indicate that the connection should be destroyed and a different connection should be
+ // acquired.
+ BeforeAcquire func(context.Context, *pgx.Conn) bool
+
+ // AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
+ // return the connection to the pool or false to destroy the connection.
+ AfterRelease func(*pgx.Conn) bool
+
+ // BeforeClose is called right before a connection is closed and removed from the pool.
+ BeforeClose func(*pgx.Conn)
+
+ // MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
+ MaxConnLifetime time.Duration
+
+ // MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
+ // This helps prevent all connections from being closed at the exact same time, starving the pool.
+ MaxConnLifetimeJitter time.Duration
+
+ // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
+ MaxConnIdleTime time.Duration
+
+ // MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
+ MaxConns int32
+
+ // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
+ // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
+ // to create new connections.
+ MinConns int32
+
+ // HealthCheckPeriod is the duration between checks of the health of idle connections.
+ HealthCheckPeriod time.Duration
+
+ createdByParseConfig bool // Used to enforce created by ParseConfig rule.
+}
+
+// Copy returns a deep copy of the config that is safe to use and modify.
+// The only exception is the tls.Config:
+// according to the tls.Config docs it must not be modified after creation.
+func (c *Config) Copy() *Config {
+ newConfig := new(Config)
+ *newConfig = *c
+ newConfig.ConnConfig = c.ConnConfig.Copy()
+ return newConfig
+}
+
+// ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
+func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
+
+// New creates a new Pool. See [ParseConfig] for information on connString format.
+func New(ctx context.Context, connString string) (*Pool, error) {
+ config, err := ParseConfig(connString)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewWithConfig(ctx, config)
+}
+
+// NewWithConfig creates a new Pool. config must have been created by [ParseConfig].
+func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
+ // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
+ // zero values.
+ if !config.createdByParseConfig {
+ panic("config must be created by ParseConfig")
+ }
+
+ p := &Pool{
+ config: config,
+ beforeConnect: config.BeforeConnect,
+ afterConnect: config.AfterConnect,
+ beforeAcquire: config.BeforeAcquire,
+ afterRelease: config.AfterRelease,
+ beforeClose: config.BeforeClose,
+ minConns: config.MinConns,
+ maxConns: config.MaxConns,
+ maxConnLifetime: config.MaxConnLifetime,
+ maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
+ maxConnIdleTime: config.MaxConnIdleTime,
+ healthCheckPeriod: config.HealthCheckPeriod,
+ healthCheckChan: make(chan struct{}, 1),
+ closeChan: make(chan struct{}),
+ }
+
+ if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
+ p.acquireTracer = t
+ }
+
+ if t, ok := config.ConnConfig.Tracer.(ReleaseTracer); ok {
+ p.releaseTracer = t
+ }
+
+ var err error
+ p.p, err = puddle.NewPool(
+ &puddle.Config[*connResource]{
+ Constructor: func(ctx context.Context) (*connResource, error) {
+ atomic.AddInt64(&p.newConnsCount, 1)
+ connConfig := p.config.ConnConfig.Copy()
+
+ // Connection will continue in background even if Acquire is canceled. Ensure that a connect won't hang forever.
+ if connConfig.ConnectTimeout <= 0 {
+ connConfig.ConnectTimeout = 2 * time.Minute
+ }
+
+ if p.beforeConnect != nil {
+ if err := p.beforeConnect(ctx, connConfig); err != nil {
+ return nil, err
+ }
+ }
+
+ conn, err := pgx.ConnectConfig(ctx, connConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ if p.afterConnect != nil {
+ err = p.afterConnect(ctx, conn)
+ if err != nil {
+ conn.Close(ctx)
+ return nil, err
+ }
+ }
+
+ jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
+ maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
+
+ cr := &connResource{
+ conn: conn,
+ conns: make([]Conn, 64),
+ poolRows: make([]poolRow, 64),
+ poolRowss: make([]poolRows, 64),
+ maxAgeTime: maxAgeTime,
+ }
+
+ return cr, nil
+ },
+ Destructor: func(value *connResource) {
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ conn := value.conn
+ if p.beforeClose != nil {
+ p.beforeClose(conn)
+ }
+ conn.Close(ctx)
+ select {
+ case <-conn.PgConn().CleanupDone():
+ case <-ctx.Done():
+ }
+ cancel()
+ },
+ MaxSize: config.MaxConns,
+ },
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+ p.createIdleResources(ctx, int(p.minConns))
+ p.backgroundHealthCheck()
+ }()
+
+ return p, nil
+}
+
+// ParseConfig builds a Config from connString. It parses connString with the same behavior as [pgx.ParseConfig] with the
+// addition of the following variables:
+//
+// - pool_max_conns: integer greater than 0
+// - pool_min_conns: integer 0 or greater
+// - pool_max_conn_lifetime: duration string
+// - pool_max_conn_idle_time: duration string
+// - pool_health_check_period: duration string
+// - pool_max_conn_lifetime_jitter: duration string
+//
+// See Config for definitions of these arguments.
+//
+// # Example Keyword/Value
+// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
+//
+// # Example URL
+// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
+func ParseConfig(connString string) (*Config, error) {
+ connConfig, err := pgx.ParseConfig(connString)
+ if err != nil {
+ return nil, err
+ }
+
+ config := &Config{
+ ConnConfig: connConfig,
+ createdByParseConfig: true,
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_max_conns")
+ n, err := strconv.ParseInt(s, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err)
+ }
+ if n < 1 {
+ return nil, fmt.Errorf("pool_max_conns too small: %d", n)
+ }
+ config.MaxConns = int32(n)
+ } else {
+ config.MaxConns = defaultMaxConns
+ if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
+ config.MaxConns = numCPU
+ }
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_min_conns")
+ n, err := strconv.ParseInt(s, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err)
+ }
+ config.MinConns = int32(n)
+ } else {
+ config.MinConns = defaultMinConns
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err)
+ }
+ config.MaxConnLifetime = d
+ } else {
+ config.MaxConnLifetime = defaultMaxConnLifetime
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time")
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err)
+ }
+ config.MaxConnIdleTime = d
+ } else {
+ config.MaxConnIdleTime = defaultMaxConnIdleTime
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return nil, fmt.Errorf("invalid pool_health_check_period: %w", err)
+ }
+ config.HealthCheckPeriod = d
+ } else {
+ config.HealthCheckPeriod = defaultHealthCheckPeriod
+ }
+
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
+ }
+ config.MaxConnLifetimeJitter = d
+ }
+
+ return config, nil
+}
+
+// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
+// to pool and closed.
+func (p *Pool) Close() {
+ p.closeOnce.Do(func() {
+ close(p.closeChan)
+ p.p.Close()
+ })
+}
+
+func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
+ return time.Now().After(res.Value().maxAgeTime)
+}
+
+func (p *Pool) triggerHealthCheck() {
+ go func() {
+ // Destroy is asynchronous so we give it time to actually remove itself from
+ // the pool otherwise we might try to check the pool size too soon
+ time.Sleep(500 * time.Millisecond)
+ select {
+ case p.healthCheckChan <- struct{}{}:
+ default:
+ }
+ }()
+}
+
+func (p *Pool) backgroundHealthCheck() {
+ ticker := time.NewTicker(p.healthCheckPeriod)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-p.closeChan:
+ return
+ case <-p.healthCheckChan:
+ p.checkHealth()
+ case <-ticker.C:
+ p.checkHealth()
+ }
+ }
+}
+
+func (p *Pool) checkHealth() {
+ for {
+ // If checkMinConns failed we don't destroy any connections since we couldn't
+ // even get to minConns
+ if err := p.checkMinConns(); err != nil {
+ // Should we log this error somewhere?
+ break
+ }
+ if !p.checkConnsHealth() {
+ // Since we didn't destroy any connections we can stop looping
+ break
+ }
+ // Technically Destroy is asynchronous but 500ms should be enough for it to
+ // remove it from the underlying pool
+ select {
+ case <-p.closeChan:
+ return
+ case <-time.After(500 * time.Millisecond):
+ }
+ }
+}
+
+// checkConnsHealth will check all idle connections, destroy a connection if
+// it's idle or too old, and returns true if any were destroyed
+func (p *Pool) checkConnsHealth() bool {
+ var destroyed bool
+ totalConns := p.Stat().TotalConns()
+ resources := p.p.AcquireAllIdle()
+ for _, res := range resources {
+ // We're okay going under minConns if the lifetime is up
+ if p.isExpired(res) && totalConns >= p.minConns {
+ atomic.AddInt64(&p.lifetimeDestroyCount, 1)
+ res.Destroy()
+ destroyed = true
+ // Since Destroy is async we manually decrement totalConns.
+ totalConns--
+ } else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
+ atomic.AddInt64(&p.idleDestroyCount, 1)
+ res.Destroy()
+ destroyed = true
+ // Since Destroy is async we manually decrement totalConns.
+ totalConns--
+ } else {
+ res.ReleaseUnused()
+ }
+ }
+ return destroyed
+}
+
+func (p *Pool) checkMinConns() error {
+ // TotalConns can include ones that are being destroyed but we should have
+ // sleep(500ms) around all of the destroys to help prevent that from throwing
+ // off this check
+ toCreate := p.minConns - p.Stat().TotalConns()
+ if toCreate > 0 {
+ return p.createIdleResources(context.Background(), int(toCreate))
+ }
+ return nil
+}
+
+func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
+ ctx, cancel := context.WithCancel(parentCtx)
+ defer cancel()
+
+ errs := make(chan error, targetResources)
+
+ for i := 0; i < targetResources; i++ {
+ go func() {
+ err := p.p.CreateResource(ctx)
+ // Ignore ErrNotAvailable since it means that the pool has become full since we started creating resource.
+ if err == puddle.ErrNotAvailable {
+ err = nil
+ }
+ errs <- err
+ }()
+ }
+
+ var firstError error
+ for i := 0; i < targetResources; i++ {
+ err := <-errs
+ if err != nil && firstError == nil {
+ cancel()
+ firstError = err
+ }
+ }
+
+ return firstError
+}
+
+// Acquire returns a connection (*Conn) from the Pool
+func (p *Pool) Acquire(ctx context.Context) (c *Conn, err error) {
+ if p.acquireTracer != nil {
+ ctx = p.acquireTracer.TraceAcquireStart(ctx, p, TraceAcquireStartData{})
+ defer func() {
+ var conn *pgx.Conn
+ if c != nil {
+ conn = c.Conn()
+ }
+ p.acquireTracer.TraceAcquireEnd(ctx, p, TraceAcquireEndData{Conn: conn, Err: err})
+ }()
+ }
+
+ for {
+ res, err := p.p.Acquire(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ cr := res.Value()
+
+ if res.IdleDuration() > time.Second {
+ err := cr.conn.Ping(ctx)
+ if err != nil {
+ res.Destroy()
+ continue
+ }
+ }
+
+ if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
+ return cr.getConn(p, res), nil
+ }
+
+ res.Destroy()
+ }
+}
+
+// AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the
+// call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is
+// automatically released after the call of f.
+func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error {
+ conn, err := p.Acquire(ctx)
+ if err != nil {
+ return err
+ }
+ defer conn.Release()
+
+ return f(conn)
+}
+
+// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
+// keep-alive functionality. It does not update pool statistics.
+func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn {
+ resources := p.p.AcquireAllIdle()
+ conns := make([]*Conn, 0, len(resources))
+ for _, res := range resources {
+ cr := res.Value()
+ if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
+ conns = append(conns, cr.getConn(p, res))
+ } else {
+ res.Destroy()
+ }
+ }
+
+ return conns
+}
+
+// Reset closes all connections, but leaves the pool open. It is intended for use when an error is detected that would
+// disrupt all connections (such as a network interruption or a server state change).
+//
+// It is safe to reset a pool while connections are checked out. Those connections will be closed when they are returned
+// to the pool.
+func (p *Pool) Reset() {
+ p.p.Reset()
+}
+
+// Config returns a copy of config that was used to initialize this pool.
+func (p *Pool) Config() *Config { return p.config.Copy() }
+
+// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
+func (p *Pool) Stat() *Stat {
+ return &Stat{
+ s: p.p.Stat(),
+ newConnsCount: atomic.LoadInt64(&p.newConnsCount),
+ lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
+ idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
+ }
+}
+
+// Exec acquires a connection from the Pool and executes the given SQL.
+// SQL can be either a prepared statement name or an SQL string.
+// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
+// The acquired connection is returned to the pool when the Exec function returns.
+func (p *Pool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return pgconn.CommandTag{}, err
+ }
+ defer c.Release()
+
+ return c.Exec(ctx, sql, arguments...)
+}
+
+// Query acquires a connection and executes a query that returns pgx.Rows.
+// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
+// See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool.
+//
+// If there is an error, the returned pgx.Rows will be returned in an error state.
+// If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows.
+//
+// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
+// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
+// needed. See the documentation for those types for details.
+func (p *Pool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return errRows{err: err}, err
+ }
+
+ rows, err := c.Query(ctx, sql, args...)
+ if err != nil {
+ c.Release()
+ return errRows{err: err}, err
+ }
+
+ return c.getPoolRows(rows), nil
+}
+
+// QueryRow acquires a connection and executes a query that is expected
+// to return at most one row (pgx.Row). Errors are deferred until pgx.Row's
+// Scan method is called. If the query selects no rows, pgx.Row's Scan will
+// return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row
+// and discards the rest. The acquired connection is returned to the Pool when
+// pgx.Row's Scan method is called.
+//
+// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
+//
+// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
+// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
+// needed. See the documentation for those types for details.
+func (p *Pool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return errRow{err: err}
+ }
+
+ row := c.QueryRow(ctx, sql, args...)
+ return c.getPoolRow(row)
+}
+
+func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return errBatchResults{err: err}
+ }
+
+ br := c.SendBatch(ctx, b)
+ return &poolBatchResults{br: br, c: c}
+}
+
+// Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
+// auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required).
+// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
+// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
+func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) {
+ return p.BeginTx(ctx, pgx.TxOptions{})
+}
+
+// BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode.
+// Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation.
+// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
+// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
+func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ t, err := c.BeginTx(ctx, txOptions)
+ if err != nil {
+ c.Release()
+ return nil, err
+ }
+
+ return &Tx{t: t, c: c}, nil
+}
+
+func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer c.Release()
+
+ return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
+}
+
+// Ping acquires a connection from the Pool and executes an empty sql statement against it.
+// If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned.
+func (p *Pool) Ping(ctx context.Context) error {
+ c, err := p.Acquire(ctx)
+ if err != nil {
+ return err
+ }
+ defer c.Release()
+ return c.Ping(ctx)
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/rows.go b/vendor/github.com/jackc/pgx/v5/pgxpool/rows.go
new file mode 100644
index 0000000..f834b7e
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/rows.go
@@ -0,0 +1,116 @@
+package pgxpool
+
+import (
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+type errRows struct {
+ err error
+}
+
+func (errRows) Close() {}
+func (e errRows) Err() error { return e.err }
+func (errRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} }
+func (errRows) FieldDescriptions() []pgconn.FieldDescription { return nil }
+func (errRows) Next() bool { return false }
+func (e errRows) Scan(dest ...any) error { return e.err }
+func (e errRows) Values() ([]any, error) { return nil, e.err }
+func (e errRows) RawValues() [][]byte { return nil }
+func (e errRows) Conn() *pgx.Conn { return nil }
+
+type errRow struct {
+ err error
+}
+
+func (e errRow) Scan(dest ...any) error { return e.err }
+
+type poolRows struct {
+ r pgx.Rows
+ c *Conn
+ err error
+}
+
+func (rows *poolRows) Close() {
+ rows.r.Close()
+ if rows.c != nil {
+ rows.c.Release()
+ rows.c = nil
+ }
+}
+
+func (rows *poolRows) Err() error {
+ if rows.err != nil {
+ return rows.err
+ }
+ return rows.r.Err()
+}
+
+func (rows *poolRows) CommandTag() pgconn.CommandTag {
+ return rows.r.CommandTag()
+}
+
+func (rows *poolRows) FieldDescriptions() []pgconn.FieldDescription {
+ return rows.r.FieldDescriptions()
+}
+
+func (rows *poolRows) Next() bool {
+ if rows.err != nil {
+ return false
+ }
+
+ n := rows.r.Next()
+ if !n {
+ rows.Close()
+ }
+ return n
+}
+
+func (rows *poolRows) Scan(dest ...any) error {
+ err := rows.r.Scan(dest...)
+ if err != nil {
+ rows.Close()
+ }
+ return err
+}
+
+func (rows *poolRows) Values() ([]any, error) {
+ values, err := rows.r.Values()
+ if err != nil {
+ rows.Close()
+ }
+ return values, err
+}
+
+func (rows *poolRows) RawValues() [][]byte {
+ return rows.r.RawValues()
+}
+
+func (rows *poolRows) Conn() *pgx.Conn {
+ return rows.r.Conn()
+}
+
+type poolRow struct {
+ r pgx.Row
+ c *Conn
+ err error
+}
+
+func (row *poolRow) Scan(dest ...any) error {
+ if row.err != nil {
+ return row.err
+ }
+
+ panicked := true
+ defer func() {
+ if panicked && row.c != nil {
+ row.c.Release()
+ }
+ }()
+ err := row.r.Scan(dest...)
+ panicked = false
+ if row.c != nil {
+ row.c.Release()
+ }
+ return err
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go
new file mode 100644
index 0000000..cfa0c4c
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go
@@ -0,0 +1,84 @@
+package pgxpool
+
+import (
+ "time"
+
+ "github.com/jackc/puddle/v2"
+)
+
+// Stat is a snapshot of Pool statistics.
+type Stat struct {
+ s *puddle.Stat
+ newConnsCount int64
+ lifetimeDestroyCount int64
+ idleDestroyCount int64
+}
+
+// AcquireCount returns the cumulative count of successful acquires from the pool.
+func (s *Stat) AcquireCount() int64 {
+ return s.s.AcquireCount()
+}
+
+// AcquireDuration returns the total duration of all successful acquires from
+// the pool.
+func (s *Stat) AcquireDuration() time.Duration {
+ return s.s.AcquireDuration()
+}
+
+// AcquiredConns returns the number of currently acquired connections in the pool.
+func (s *Stat) AcquiredConns() int32 {
+ return s.s.AcquiredResources()
+}
+
+// CanceledAcquireCount returns the cumulative count of acquires from the pool
+// that were canceled by a context.
+func (s *Stat) CanceledAcquireCount() int64 {
+ return s.s.CanceledAcquireCount()
+}
+
+// ConstructingConns returns the number of conns with construction in progress in
+// the pool.
+func (s *Stat) ConstructingConns() int32 {
+ return s.s.ConstructingResources()
+}
+
+// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
+// that waited for a resource to be released or constructed because the pool was
+// empty.
+func (s *Stat) EmptyAcquireCount() int64 {
+ return s.s.EmptyAcquireCount()
+}
+
+// IdleConns returns the number of currently idle conns in the pool.
+func (s *Stat) IdleConns() int32 {
+ return s.s.IdleResources()
+}
+
+// MaxConns returns the maximum size of the pool.
+func (s *Stat) MaxConns() int32 {
+ return s.s.MaxResources()
+}
+
+// TotalConns returns the total number of resources currently in the pool.
+// The value is the sum of ConstructingConns, AcquiredConns, and
+// IdleConns.
+func (s *Stat) TotalConns() int32 {
+ return s.s.TotalResources()
+}
+
+// NewConnsCount returns the cumulative count of new connections opened.
+func (s *Stat) NewConnsCount() int64 {
+ return s.newConnsCount
+}
+
+// MaxLifetimeDestroyCount returns the cumulative count of connections destroyed
+// because they exceeded MaxConnLifetime.
+func (s *Stat) MaxLifetimeDestroyCount() int64 {
+ return s.lifetimeDestroyCount
+}
+
+// MaxIdleDestroyCount returns the cumulative count of connections destroyed because
+// they exceeded MaxConnIdleTime.
+func (s *Stat) MaxIdleDestroyCount() int64 {
+ return s.idleDestroyCount
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/tracer.go b/vendor/github.com/jackc/pgx/v5/pgxpool/tracer.go
new file mode 100644
index 0000000..78b9d15
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/tracer.go
@@ -0,0 +1,33 @@
+package pgxpool
+
+import (
+ "context"
+
+ "github.com/jackc/pgx/v5"
+)
+
+// AcquireTracer traces Acquire.
+type AcquireTracer interface {
+ // TraceAcquireStart is called at the beginning of Acquire.
+ // The returned context is used for the rest of the call and will be passed to the TraceAcquireEnd.
+ TraceAcquireStart(ctx context.Context, pool *Pool, data TraceAcquireStartData) context.Context
+ // TraceAcquireEnd is called when a connection has been acquired.
+ TraceAcquireEnd(ctx context.Context, pool *Pool, data TraceAcquireEndData)
+}
+
+type TraceAcquireStartData struct{}
+
+type TraceAcquireEndData struct {
+ Conn *pgx.Conn
+ Err error
+}
+
+// ReleaseTracer traces Release.
+type ReleaseTracer interface {
+ // TraceRelease is called at the beginning of Release.
+ TraceRelease(pool *Pool, data TraceReleaseData)
+}
+
+type TraceReleaseData struct {
+ Conn *pgx.Conn
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/tx.go b/vendor/github.com/jackc/pgx/v5/pgxpool/tx.go
new file mode 100644
index 0000000..74df859
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/tx.go
@@ -0,0 +1,82 @@
+package pgxpool
+
+import (
+ "context"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+// Tx represents a database transaction acquired from a Pool.
+type Tx struct {
+ t pgx.Tx
+ c *Conn
+}
+
+// Begin starts a pseudo nested transaction implemented with a savepoint.
+func (tx *Tx) Begin(ctx context.Context) (pgx.Tx, error) {
+ return tx.t.Begin(ctx)
+}
+
+// Commit commits the transaction and returns the associated connection back to the Pool. Commit will return ErrTxClosed
+// if the Tx is already closed, but is otherwise safe to call multiple times. If the commit fails with a rollback status
+// (e.g. the transaction was already in a broken state) then ErrTxCommitRollback will be returned.
+func (tx *Tx) Commit(ctx context.Context) error {
+ err := tx.t.Commit(ctx)
+ if tx.c != nil {
+ tx.c.Release()
+ tx.c = nil
+ }
+ return err
+}
+
+// Rollback rolls back the transaction and returns the associated connection back to the Pool. Rollback will return ErrTxClosed
+// if the Tx is already closed, but is otherwise safe to call multiple times. Hence, defer tx.Rollback() is safe even if
+// tx.Commit() will be called first in a non-error condition.
+func (tx *Tx) Rollback(ctx context.Context) error {
+ err := tx.t.Rollback(ctx)
+ if tx.c != nil {
+ tx.c.Release()
+ tx.c = nil
+ }
+ return err
+}
+
+func (tx *Tx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
+ return tx.t.CopyFrom(ctx, tableName, columnNames, rowSrc)
+}
+
+func (tx *Tx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
+ return tx.t.SendBatch(ctx, b)
+}
+
+func (tx *Tx) LargeObjects() pgx.LargeObjects {
+ return tx.t.LargeObjects()
+}
+
+// Prepare creates a prepared statement with name and sql. If the name is empty,
+// an anonymous prepared statement will be used. sql can contain placeholders
+// for bound parameters. These placeholders are referenced positionally as $1, $2, etc.
+//
+// Prepare is idempotent; i.e. it is safe to call Prepare multiple times with the same
+// name and sql arguments. This allows a code path to Prepare and Query/Exec without
+// needing to first check whether the statement has already been prepared.
+func (tx *Tx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
+ return tx.t.Prepare(ctx, name, sql)
+}
+
+func (tx *Tx) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
+ return tx.t.Exec(ctx, sql, arguments...)
+}
+
+func (tx *Tx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
+ return tx.t.Query(ctx, sql, args...)
+}
+
+func (tx *Tx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
+ return tx.t.QueryRow(ctx, sql, args...)
+}
+
+func (tx *Tx) Conn() *pgx.Conn {
+ return tx.t.Conn()
+}