diff options
author | Gibheer <gibheer+git@zero-knowledge.org> | 2024-09-05 19:38:25 +0200 |
---|---|---|
committer | Gibheer <gibheer+git@zero-knowledge.org> | 2024-09-05 19:38:25 +0200 |
commit | 6ea4d2c82de80efc87708e5e182034b7c6c2019e (patch) | |
tree | 35c0856a929040216c82153ca62d43b27530a887 /vendor/github.com/jackc/pgx/v5/pgconn | |
parent | 6f64eeace1b66639b9380b44e88a8d54850a4306 (diff) |
lib/pq is out of maintenance for some time now, so switch to the newer
more active library. Looks like it finally stabilized after a long time.
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn')
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/README.md | 29 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go | 272 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/config.go | 918 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go | 80 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/defaults.go | 63 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go | 57 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/doc.go | 38 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/errors.go | 248 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go | 139 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/krb5.go | 100 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go | 2346 |
11 files changed, 4290 insertions, 0 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/README.md b/vendor/github.com/jackc/pgx/v5/pgconn/README.md new file mode 100644 index 0000000..1fe15c2 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/README.md @@ -0,0 +1,29 @@ +# pgconn + +Package pgconn is a low-level PostgreSQL database driver. It operates at nearly the same level as the C library libpq. +It is primarily intended to serve as the foundation for higher level libraries such as https://github.com/jackc/pgx. +Applications should handle normal queries with a higher level library and only use pgconn directly when required for +low-level access to PostgreSQL functionality. + +## Example Usage + +```go +pgConn, err := pgconn.Connect(context.Background(), os.Getenv("DATABASE_URL")) +if err != nil { + log.Fatalln("pgconn failed to connect:", err) +} +defer pgConn.Close(context.Background()) + +result := pgConn.ExecParams(context.Background(), "SELECT email FROM users WHERE id=$1", [][]byte{[]byte("123")}, nil, nil, nil) +for result.NextRow() { + fmt.Println("User 123 has email:", string(result.Values()[0])) +} +_, err = result.Close() +if err != nil { + log.Fatalln("failed reading result:", err) +} +``` + +## Testing + +See CONTRIBUTING.md for setup instructions. diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go b/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go new file mode 100644 index 0000000..0649836 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go @@ -0,0 +1,272 @@ +// SCRAM-SHA-256 authentication +// +// Resources: +// https://tools.ietf.org/html/rfc5802 +// https://tools.ietf.org/html/rfc8265 +// https://www.postgresql.org/docs/current/sasl-authentication.html +// +// Inspiration drawn from other implementations: +// https://github.com/lib/pq/pull/608 +// https://github.com/lib/pq/pull/788 +// https://github.com/lib/pq/pull/833 + +package pgconn + +import ( + "bytes" + "crypto/hmac" + "crypto/rand" + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "strconv" + + "github.com/jackc/pgx/v5/pgproto3" + "golang.org/x/crypto/pbkdf2" + "golang.org/x/text/secure/precis" +) + +const clientNonceLen = 18 + +// Perform SCRAM authentication. +func (c *PgConn) scramAuth(serverAuthMechanisms []string) error { + sc, err := newScramClient(serverAuthMechanisms, c.config.Password) + if err != nil { + return err + } + + // Send client-first-message in a SASLInitialResponse + saslInitialResponse := &pgproto3.SASLInitialResponse{ + AuthMechanism: "SCRAM-SHA-256", + Data: sc.clientFirstMessage(), + } + c.frontend.Send(saslInitialResponse) + err = c.flushWithPotentialWriteReadDeadlock() + if err != nil { + return err + } + + // Receive server-first-message payload in an AuthenticationSASLContinue. + saslContinue, err := c.rxSASLContinue() + if err != nil { + return err + } + err = sc.recvServerFirstMessage(saslContinue.Data) + if err != nil { + return err + } + + // Send client-final-message in a SASLResponse + saslResponse := &pgproto3.SASLResponse{ + Data: []byte(sc.clientFinalMessage()), + } + c.frontend.Send(saslResponse) + err = c.flushWithPotentialWriteReadDeadlock() + if err != nil { + return err + } + + // Receive server-final-message payload in an AuthenticationSASLFinal. + saslFinal, err := c.rxSASLFinal() + if err != nil { + return err + } + return sc.recvServerFinalMessage(saslFinal.Data) +} + +func (c *PgConn) rxSASLContinue() (*pgproto3.AuthenticationSASLContinue, error) { + msg, err := c.receiveMessage() + if err != nil { + return nil, err + } + switch m := msg.(type) { + case *pgproto3.AuthenticationSASLContinue: + return m, nil + case *pgproto3.ErrorResponse: + return nil, ErrorResponseToPgError(m) + } + + return nil, fmt.Errorf("expected AuthenticationSASLContinue message but received unexpected message %T", msg) +} + +func (c *PgConn) rxSASLFinal() (*pgproto3.AuthenticationSASLFinal, error) { + msg, err := c.receiveMessage() + if err != nil { + return nil, err + } + switch m := msg.(type) { + case *pgproto3.AuthenticationSASLFinal: + return m, nil + case *pgproto3.ErrorResponse: + return nil, ErrorResponseToPgError(m) + } + + return nil, fmt.Errorf("expected AuthenticationSASLFinal message but received unexpected message %T", msg) +} + +type scramClient struct { + serverAuthMechanisms []string + password []byte + clientNonce []byte + + clientFirstMessageBare []byte + + serverFirstMessage []byte + clientAndServerNonce []byte + salt []byte + iterations int + + saltedPassword []byte + authMessage []byte +} + +func newScramClient(serverAuthMechanisms []string, password string) (*scramClient, error) { + sc := &scramClient{ + serverAuthMechanisms: serverAuthMechanisms, + } + + // Ensure server supports SCRAM-SHA-256 + hasScramSHA256 := false + for _, mech := range sc.serverAuthMechanisms { + if mech == "SCRAM-SHA-256" { + hasScramSHA256 = true + break + } + } + if !hasScramSHA256 { + return nil, errors.New("server does not support SCRAM-SHA-256") + } + + // precis.OpaqueString is equivalent to SASLprep for password. + var err error + sc.password, err = precis.OpaqueString.Bytes([]byte(password)) + if err != nil { + // PostgreSQL allows passwords invalid according to SCRAM / SASLprep. + sc.password = []byte(password) + } + + buf := make([]byte, clientNonceLen) + _, err = rand.Read(buf) + if err != nil { + return nil, err + } + sc.clientNonce = make([]byte, base64.RawStdEncoding.EncodedLen(len(buf))) + base64.RawStdEncoding.Encode(sc.clientNonce, buf) + + return sc, nil +} + +func (sc *scramClient) clientFirstMessage() []byte { + sc.clientFirstMessageBare = []byte(fmt.Sprintf("n=,r=%s", sc.clientNonce)) + return []byte(fmt.Sprintf("n,,%s", sc.clientFirstMessageBare)) +} + +func (sc *scramClient) recvServerFirstMessage(serverFirstMessage []byte) error { + sc.serverFirstMessage = serverFirstMessage + buf := serverFirstMessage + if !bytes.HasPrefix(buf, []byte("r=")) { + return errors.New("invalid SCRAM server-first-message received from server: did not include r=") + } + buf = buf[2:] + + idx := bytes.IndexByte(buf, ',') + if idx == -1 { + return errors.New("invalid SCRAM server-first-message received from server: did not include s=") + } + sc.clientAndServerNonce = buf[:idx] + buf = buf[idx+1:] + + if !bytes.HasPrefix(buf, []byte("s=")) { + return errors.New("invalid SCRAM server-first-message received from server: did not include s=") + } + buf = buf[2:] + + idx = bytes.IndexByte(buf, ',') + if idx == -1 { + return errors.New("invalid SCRAM server-first-message received from server: did not include i=") + } + saltStr := buf[:idx] + buf = buf[idx+1:] + + if !bytes.HasPrefix(buf, []byte("i=")) { + return errors.New("invalid SCRAM server-first-message received from server: did not include i=") + } + buf = buf[2:] + iterationsStr := buf + + var err error + sc.salt, err = base64.StdEncoding.DecodeString(string(saltStr)) + if err != nil { + return fmt.Errorf("invalid SCRAM salt received from server: %w", err) + } + + sc.iterations, err = strconv.Atoi(string(iterationsStr)) + if err != nil || sc.iterations <= 0 { + return fmt.Errorf("invalid SCRAM iteration count received from server: %w", err) + } + + if !bytes.HasPrefix(sc.clientAndServerNonce, sc.clientNonce) { + return errors.New("invalid SCRAM nonce: did not start with client nonce") + } + + if len(sc.clientAndServerNonce) <= len(sc.clientNonce) { + return errors.New("invalid SCRAM nonce: did not include server nonce") + } + + return nil +} + +func (sc *scramClient) clientFinalMessage() string { + clientFinalMessageWithoutProof := []byte(fmt.Sprintf("c=biws,r=%s", sc.clientAndServerNonce)) + + sc.saltedPassword = pbkdf2.Key([]byte(sc.password), sc.salt, sc.iterations, 32, sha256.New) + sc.authMessage = bytes.Join([][]byte{sc.clientFirstMessageBare, sc.serverFirstMessage, clientFinalMessageWithoutProof}, []byte(",")) + + clientProof := computeClientProof(sc.saltedPassword, sc.authMessage) + + return fmt.Sprintf("%s,p=%s", clientFinalMessageWithoutProof, clientProof) +} + +func (sc *scramClient) recvServerFinalMessage(serverFinalMessage []byte) error { + if !bytes.HasPrefix(serverFinalMessage, []byte("v=")) { + return errors.New("invalid SCRAM server-final-message received from server") + } + + serverSignature := serverFinalMessage[2:] + + if !hmac.Equal(serverSignature, computeServerSignature(sc.saltedPassword, sc.authMessage)) { + return errors.New("invalid SCRAM ServerSignature received from server") + } + + return nil +} + +func computeHMAC(key, msg []byte) []byte { + mac := hmac.New(sha256.New, key) + mac.Write(msg) + return mac.Sum(nil) +} + +func computeClientProof(saltedPassword, authMessage []byte) []byte { + clientKey := computeHMAC(saltedPassword, []byte("Client Key")) + storedKey := sha256.Sum256(clientKey) + clientSignature := computeHMAC(storedKey[:], authMessage) + + clientProof := make([]byte, len(clientSignature)) + for i := 0; i < len(clientSignature); i++ { + clientProof[i] = clientKey[i] ^ clientSignature[i] + } + + buf := make([]byte, base64.StdEncoding.EncodedLen(len(clientProof))) + base64.StdEncoding.Encode(buf, clientProof) + return buf +} + +func computeServerSignature(saltedPassword []byte, authMessage []byte) []byte { + serverKey := computeHMAC(saltedPassword, []byte("Server Key")) + serverSignature := computeHMAC(serverKey, authMessage) + buf := make([]byte, base64.StdEncoding.EncodedLen(len(serverSignature))) + base64.StdEncoding.Encode(buf, serverSignature) + return buf +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/config.go b/vendor/github.com/jackc/pgx/v5/pgconn/config.go new file mode 100644 index 0000000..598917f --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/config.go @@ -0,0 +1,918 @@ +package pgconn + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "io" + "math" + "net" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/jackc/pgpassfile" + "github.com/jackc/pgservicefile" + "github.com/jackc/pgx/v5/pgconn/ctxwatch" + "github.com/jackc/pgx/v5/pgproto3" +) + +type AfterConnectFunc func(ctx context.Context, pgconn *PgConn) error +type ValidateConnectFunc func(ctx context.Context, pgconn *PgConn) error +type GetSSLPasswordFunc func(ctx context.Context) string + +// Config is the settings used to establish a connection to a PostgreSQL server. It must be created by [ParseConfig]. A +// manually initialized Config will cause ConnectConfig to panic. +type Config struct { + Host string // host (e.g. localhost) or absolute path to unix domain socket directory (e.g. /private/tmp) + Port uint16 + Database string + User string + Password string + TLSConfig *tls.Config // nil disables TLS + ConnectTimeout time.Duration + DialFunc DialFunc // e.g. net.Dialer.DialContext + LookupFunc LookupFunc // e.g. net.Resolver.LookupHost + BuildFrontend BuildFrontendFunc + + // BuildContextWatcherHandler is called to create a ContextWatcherHandler for a connection. The handler is called + // when a context passed to a PgConn method is canceled. + BuildContextWatcherHandler func(*PgConn) ctxwatch.Handler + + RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) + + KerberosSrvName string + KerberosSpn string + Fallbacks []*FallbackConfig + + // ValidateConnect is called during a connection attempt after a successful authentication with the PostgreSQL server. + // It can be used to validate that the server is acceptable. If this returns an error the connection is closed and the next + // fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs. + ValidateConnect ValidateConnectFunc + + // AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables + // or prepare statements). If this returns an error the connection attempt fails. + AfterConnect AfterConnectFunc + + // OnNotice is a callback function called when a notice response is received. + OnNotice NoticeHandler + + // OnNotification is a callback function called when a notification from the LISTEN/NOTIFY system is received. + OnNotification NotificationHandler + + // OnPgError is a callback function called when a Postgres error is received by the server. The default handler will close + // the connection on any FATAL errors. If you override this handler you should call the previously set handler or ensure + // that you close on FATAL errors by returning false. + OnPgError PgErrorHandler + + createdByParseConfig bool // Used to enforce created by ParseConfig rule. +} + +// ParseConfigOptions contains options that control how a config is built such as GetSSLPassword. +type ParseConfigOptions struct { + // GetSSLPassword gets the password to decrypt a SSL client certificate. This is analogous to the libpq function + // PQsetSSLKeyPassHook_OpenSSL. + GetSSLPassword GetSSLPasswordFunc +} + +// Copy returns a deep copy of the config that is safe to use and modify. +// The only exception is the TLSConfig field: +// according to the tls.Config docs it must not be modified after creation. +func (c *Config) Copy() *Config { + newConf := new(Config) + *newConf = *c + if newConf.TLSConfig != nil { + newConf.TLSConfig = c.TLSConfig.Clone() + } + if newConf.RuntimeParams != nil { + newConf.RuntimeParams = make(map[string]string, len(c.RuntimeParams)) + for k, v := range c.RuntimeParams { + newConf.RuntimeParams[k] = v + } + } + if newConf.Fallbacks != nil { + newConf.Fallbacks = make([]*FallbackConfig, len(c.Fallbacks)) + for i, fallback := range c.Fallbacks { + newFallback := new(FallbackConfig) + *newFallback = *fallback + if newFallback.TLSConfig != nil { + newFallback.TLSConfig = fallback.TLSConfig.Clone() + } + newConf.Fallbacks[i] = newFallback + } + } + return newConf +} + +// FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a +// network connection. It is used for TLS fallback such as sslmode=prefer and high availability (HA) connections. +type FallbackConfig struct { + Host string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp) + Port uint16 + TLSConfig *tls.Config // nil disables TLS +} + +// connectOneConfig is the configuration for a single attempt to connect to a single host. +type connectOneConfig struct { + network string + address string + originalHostname string // original hostname before resolving + tlsConfig *tls.Config // nil disables TLS +} + +// isAbsolutePath checks if the provided value is an absolute path either +// beginning with a forward slash (as on Linux-based systems) or with a capital +// letter A-Z followed by a colon and a backslash, e.g., "C:\", (as on Windows). +func isAbsolutePath(path string) bool { + isWindowsPath := func(p string) bool { + if len(p) < 3 { + return false + } + drive := p[0] + colon := p[1] + backslash := p[2] + if drive >= 'A' && drive <= 'Z' && colon == ':' && backslash == '\\' { + return true + } + return false + } + return strings.HasPrefix(path, "/") || isWindowsPath(path) +} + +// NetworkAddress converts a PostgreSQL host and port into network and address suitable for use with +// net.Dial. +func NetworkAddress(host string, port uint16) (network, address string) { + if isAbsolutePath(host) { + network = "unix" + address = filepath.Join(host, ".s.PGSQL.") + strconv.FormatInt(int64(port), 10) + } else { + network = "tcp" + address = net.JoinHostPort(host, strconv.Itoa(int(port))) + } + return network, address +} + +// ParseConfig builds a *Config from connString with similar behavior to the PostgreSQL standard C library libpq. It +// uses the same defaults as libpq (e.g. port=5432) and understands most PG* environment variables. ParseConfig closely +// matches the parsing behavior of libpq. connString may either be in URL format or keyword = value format. See +// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING for details. connString also may be empty +// to only read from the environment. If a password is not supplied it will attempt to read the .pgpass file. +// +// # Example Keyword/Value +// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca +// +// # Example URL +// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca +// +// The returned *Config may be modified. However, it is strongly recommended that any configuration that can be done +// through the connection string be done there. In particular the fields Host, Port, TLSConfig, and Fallbacks can be +// interdependent (e.g. TLSConfig needs knowledge of the host to validate the server certificate). These fields should +// not be modified individually. They should all be modified or all left unchanged. +// +// ParseConfig supports specifying multiple hosts in similar manner to libpq. Host and port may include comma separated +// values that will be tried in order. This can be used as part of a high availability system. See +// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS for more information. +// +// # Example URL +// postgres://jack:secret@foo.example.com:5432,bar.example.com:5432/mydb +// +// ParseConfig currently recognizes the following environment variable and their parameter key word equivalents passed +// via database URL or keyword/value: +// +// PGHOST +// PGPORT +// PGDATABASE +// PGUSER +// PGPASSWORD +// PGPASSFILE +// PGSERVICE +// PGSERVICEFILE +// PGSSLMODE +// PGSSLCERT +// PGSSLKEY +// PGSSLROOTCERT +// PGSSLPASSWORD +// PGAPPNAME +// PGCONNECT_TIMEOUT +// PGTARGETSESSIONATTRS +// +// See http://www.postgresql.org/docs/11/static/libpq-envars.html for details on the meaning of environment variables. +// +// See https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-PARAMKEYWORDS for parameter key word names. They are +// usually but not always the environment variable name downcased and without the "PG" prefix. +// +// Important Security Notes: +// +// ParseConfig tries to match libpq behavior with regard to PGSSLMODE. This includes defaulting to "prefer" behavior if +// not set. +// +// See http://www.postgresql.org/docs/11/static/libpq-ssl.html#LIBPQ-SSL-PROTECTION for details on what level of +// security each sslmode provides. +// +// The sslmode "prefer" (the default), sslmode "allow", and multiple hosts are implemented via the Fallbacks field of +// the Config struct. If TLSConfig is manually changed it will not affect the fallbacks. For example, in the case of +// sslmode "prefer" this means it will first try the main Config settings which use TLS, then it will try the fallback +// which does not use TLS. This can lead to an unexpected unencrypted connection if the main TLS config is manually +// changed later but the unencrypted fallback is present. Ensure there are no stale fallbacks when manually setting +// TLSConfig. +// +// Other known differences with libpq: +// +// When multiple hosts are specified, libpq allows them to have different passwords set via the .pgpass file. pgconn +// does not. +// +// In addition, ParseConfig accepts the following options: +// +// - servicefile. +// libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a +// part of the connection string. +func ParseConfig(connString string) (*Config, error) { + var parseConfigOptions ParseConfigOptions + return ParseConfigWithOptions(connString, parseConfigOptions) +} + +// ParseConfigWithOptions builds a *Config from connString and options with similar behavior to the PostgreSQL standard +// C library libpq. options contains settings that cannot be specified in a connString such as providing a function to +// get the SSL password. +func ParseConfigWithOptions(connString string, options ParseConfigOptions) (*Config, error) { + defaultSettings := defaultSettings() + envSettings := parseEnvSettings() + + connStringSettings := make(map[string]string) + if connString != "" { + var err error + // connString may be a database URL or in PostgreSQL keyword/value format + if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { + connStringSettings, err = parseURLSettings(connString) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "failed to parse as URL", err: err} + } + } else { + connStringSettings, err = parseKeywordValueSettings(connString) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "failed to parse as keyword/value", err: err} + } + } + } + + settings := mergeSettings(defaultSettings, envSettings, connStringSettings) + if service, present := settings["service"]; present { + serviceSettings, err := parseServiceSettings(settings["servicefile"], service) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "failed to read service", err: err} + } + + settings = mergeSettings(defaultSettings, envSettings, serviceSettings, connStringSettings) + } + + config := &Config{ + createdByParseConfig: true, + Database: settings["database"], + User: settings["user"], + Password: settings["password"], + RuntimeParams: make(map[string]string), + BuildFrontend: func(r io.Reader, w io.Writer) *pgproto3.Frontend { + return pgproto3.NewFrontend(r, w) + }, + BuildContextWatcherHandler: func(pgConn *PgConn) ctxwatch.Handler { + return &DeadlineContextWatcherHandler{Conn: pgConn.conn} + }, + OnPgError: func(_ *PgConn, pgErr *PgError) bool { + // we want to automatically close any fatal errors + if strings.EqualFold(pgErr.Severity, "FATAL") { + return false + } + return true + }, + } + + if connectTimeoutSetting, present := settings["connect_timeout"]; present { + connectTimeout, err := parseConnectTimeoutSetting(connectTimeoutSetting) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "invalid connect_timeout", err: err} + } + config.ConnectTimeout = connectTimeout + config.DialFunc = makeConnectTimeoutDialFunc(connectTimeout) + } else { + defaultDialer := makeDefaultDialer() + config.DialFunc = defaultDialer.DialContext + } + + config.LookupFunc = makeDefaultResolver().LookupHost + + notRuntimeParams := map[string]struct{}{ + "host": {}, + "port": {}, + "database": {}, + "user": {}, + "password": {}, + "passfile": {}, + "connect_timeout": {}, + "sslmode": {}, + "sslkey": {}, + "sslcert": {}, + "sslrootcert": {}, + "sslpassword": {}, + "sslsni": {}, + "krbspn": {}, + "krbsrvname": {}, + "target_session_attrs": {}, + "service": {}, + "servicefile": {}, + } + + // Adding kerberos configuration + if _, present := settings["krbsrvname"]; present { + config.KerberosSrvName = settings["krbsrvname"] + } + if _, present := settings["krbspn"]; present { + config.KerberosSpn = settings["krbspn"] + } + + for k, v := range settings { + if _, present := notRuntimeParams[k]; present { + continue + } + config.RuntimeParams[k] = v + } + + fallbacks := []*FallbackConfig{} + + hosts := strings.Split(settings["host"], ",") + ports := strings.Split(settings["port"], ",") + + for i, host := range hosts { + var portStr string + if i < len(ports) { + portStr = ports[i] + } else { + portStr = ports[0] + } + + port, err := parsePort(portStr) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "invalid port", err: err} + } + + var tlsConfigs []*tls.Config + + // Ignore TLS settings if Unix domain socket like libpq + if network, _ := NetworkAddress(host, port); network == "unix" { + tlsConfigs = append(tlsConfigs, nil) + } else { + var err error + tlsConfigs, err = configTLS(settings, host, options) + if err != nil { + return nil, &ParseConfigError{ConnString: connString, msg: "failed to configure TLS", err: err} + } + } + + for _, tlsConfig := range tlsConfigs { + fallbacks = append(fallbacks, &FallbackConfig{ + Host: host, + Port: port, + TLSConfig: tlsConfig, + }) + } + } + + config.Host = fallbacks[0].Host + config.Port = fallbacks[0].Port + config.TLSConfig = fallbacks[0].TLSConfig + config.Fallbacks = fallbacks[1:] + + passfile, err := pgpassfile.ReadPassfile(settings["passfile"]) + if err == nil { + if config.Password == "" { + host := config.Host + if network, _ := NetworkAddress(config.Host, config.Port); network == "unix" { + host = "localhost" + } + + config.Password = passfile.FindPassword(host, strconv.Itoa(int(config.Port)), config.Database, config.User) + } + } + + switch tsa := settings["target_session_attrs"]; tsa { + case "read-write": + config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite + case "read-only": + config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly + case "primary": + config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary + case "standby": + config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby + case "prefer-standby": + config.ValidateConnect = ValidateConnectTargetSessionAttrsPreferStandby + case "any": + // do nothing + default: + return nil, &ParseConfigError{ConnString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)} + } + + return config, nil +} + +func mergeSettings(settingSets ...map[string]string) map[string]string { + settings := make(map[string]string) + + for _, s2 := range settingSets { + for k, v := range s2 { + settings[k] = v + } + } + + return settings +} + +func parseEnvSettings() map[string]string { + settings := make(map[string]string) + + nameMap := map[string]string{ + "PGHOST": "host", + "PGPORT": "port", + "PGDATABASE": "database", + "PGUSER": "user", + "PGPASSWORD": "password", + "PGPASSFILE": "passfile", + "PGAPPNAME": "application_name", + "PGCONNECT_TIMEOUT": "connect_timeout", + "PGSSLMODE": "sslmode", + "PGSSLKEY": "sslkey", + "PGSSLCERT": "sslcert", + "PGSSLSNI": "sslsni", + "PGSSLROOTCERT": "sslrootcert", + "PGSSLPASSWORD": "sslpassword", + "PGTARGETSESSIONATTRS": "target_session_attrs", + "PGSERVICE": "service", + "PGSERVICEFILE": "servicefile", + } + + for envname, realname := range nameMap { + value := os.Getenv(envname) + if value != "" { + settings[realname] = value + } + } + + return settings +} + +func parseURLSettings(connString string) (map[string]string, error) { + settings := make(map[string]string) + + url, err := url.Parse(connString) + if err != nil { + return nil, err + } + + if url.User != nil { + settings["user"] = url.User.Username() + if password, present := url.User.Password(); present { + settings["password"] = password + } + } + + // Handle multiple host:port's in url.Host by splitting them into host,host,host and port,port,port. + var hosts []string + var ports []string + for _, host := range strings.Split(url.Host, ",") { + if host == "" { + continue + } + if isIPOnly(host) { + hosts = append(hosts, strings.Trim(host, "[]")) + continue + } + h, p, err := net.SplitHostPort(host) + if err != nil { + return nil, fmt.Errorf("failed to split host:port in '%s', err: %w", host, err) + } + if h != "" { + hosts = append(hosts, h) + } + if p != "" { + ports = append(ports, p) + } + } + if len(hosts) > 0 { + settings["host"] = strings.Join(hosts, ",") + } + if len(ports) > 0 { + settings["port"] = strings.Join(ports, ",") + } + + database := strings.TrimLeft(url.Path, "/") + if database != "" { + settings["database"] = database + } + + nameMap := map[string]string{ + "dbname": "database", + } + + for k, v := range url.Query() { + if k2, present := nameMap[k]; present { + k = k2 + } + + settings[k] = v[0] + } + + return settings, nil +} + +func isIPOnly(host string) bool { + return net.ParseIP(strings.Trim(host, "[]")) != nil || !strings.Contains(host, ":") +} + +var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} + +func parseKeywordValueSettings(s string) (map[string]string, error) { + settings := make(map[string]string) + + nameMap := map[string]string{ + "dbname": "database", + } + + for len(s) > 0 { + var key, val string + eqIdx := strings.IndexRune(s, '=') + if eqIdx < 0 { + return nil, errors.New("invalid keyword/value") + } + + key = strings.Trim(s[:eqIdx], " \t\n\r\v\f") + s = strings.TrimLeft(s[eqIdx+1:], " \t\n\r\v\f") + if len(s) == 0 { + } else if s[0] != '\'' { + end := 0 + for ; end < len(s); end++ { + if asciiSpace[s[end]] == 1 { + break + } + if s[end] == '\\' { + end++ + if end == len(s) { + return nil, errors.New("invalid backslash") + } + } + } + val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) + if end == len(s) { + s = "" + } else { + s = s[end+1:] + } + } else { // quoted string + s = s[1:] + end := 0 + for ; end < len(s); end++ { + if s[end] == '\'' { + break + } + if s[end] == '\\' { + end++ + } + } + if end == len(s) { + return nil, errors.New("unterminated quoted string in connection info string") + } + val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) + if end == len(s) { + s = "" + } else { + s = s[end+1:] + } + } + + if k, ok := nameMap[key]; ok { + key = k + } + + if key == "" { + return nil, errors.New("invalid keyword/value") + } + + settings[key] = val + } + + return settings, nil +} + +func parseServiceSettings(servicefilePath, serviceName string) (map[string]string, error) { + servicefile, err := pgservicefile.ReadServicefile(servicefilePath) + if err != nil { + return nil, fmt.Errorf("failed to read service file: %v", servicefilePath) + } + + service, err := servicefile.GetService(serviceName) + if err != nil { + return nil, fmt.Errorf("unable to find service: %v", serviceName) + } + + nameMap := map[string]string{ + "dbname": "database", + } + + settings := make(map[string]string, len(service.Settings)) + for k, v := range service.Settings { + if k2, present := nameMap[k]; present { + k = k2 + } + settings[k] = v + } + + return settings, nil +} + +// configTLS uses libpq's TLS parameters to construct []*tls.Config. It is +// necessary to allow returning multiple TLS configs as sslmode "allow" and +// "prefer" allow fallback. +func configTLS(settings map[string]string, thisHost string, parseConfigOptions ParseConfigOptions) ([]*tls.Config, error) { + host := thisHost + sslmode := settings["sslmode"] + sslrootcert := settings["sslrootcert"] + sslcert := settings["sslcert"] + sslkey := settings["sslkey"] + sslpassword := settings["sslpassword"] + sslsni := settings["sslsni"] + + // Match libpq default behavior + if sslmode == "" { + sslmode = "prefer" + } + if sslsni == "" { + sslsni = "1" + } + + tlsConfig := &tls.Config{} + + switch sslmode { + case "disable": + return []*tls.Config{nil}, nil + case "allow", "prefer": + tlsConfig.InsecureSkipVerify = true + case "require": + // According to PostgreSQL documentation, if a root CA file exists, + // the behavior of sslmode=require should be the same as that of verify-ca + // + // See https://www.postgresql.org/docs/12/libpq-ssl.html + if sslrootcert != "" { + goto nextCase + } + tlsConfig.InsecureSkipVerify = true + break + nextCase: + fallthrough + case "verify-ca": + // Don't perform the default certificate verification because it + // will verify the hostname. Instead, verify the server's + // certificate chain ourselves in VerifyPeerCertificate and + // ignore the server name. This emulates libpq's verify-ca + // behavior. + // + // See https://github.com/golang/go/issues/21971#issuecomment-332693931 + // and https://pkg.go.dev/crypto/tls?tab=doc#example-Config-VerifyPeerCertificate + // for more info. + tlsConfig.InsecureSkipVerify = true + tlsConfig.VerifyPeerCertificate = func(certificates [][]byte, _ [][]*x509.Certificate) error { + certs := make([]*x509.Certificate, len(certificates)) + for i, asn1Data := range certificates { + cert, err := x509.ParseCertificate(asn1Data) + if err != nil { + return errors.New("failed to parse certificate from server: " + err.Error()) + } + certs[i] = cert + } + + // Leave DNSName empty to skip hostname verification. + opts := x509.VerifyOptions{ + Roots: tlsConfig.RootCAs, + Intermediates: x509.NewCertPool(), + } + // Skip the first cert because it's the leaf. All others + // are intermediates. + for _, cert := range certs[1:] { + opts.Intermediates.AddCert(cert) + } + _, err := certs[0].Verify(opts) + return err + } + case "verify-full": + tlsConfig.ServerName = host + default: + return nil, errors.New("sslmode is invalid") + } + + if sslrootcert != "" { + caCertPool := x509.NewCertPool() + + caPath := sslrootcert + caCert, err := os.ReadFile(caPath) + if err != nil { + return nil, fmt.Errorf("unable to read CA file: %w", err) + } + + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, errors.New("unable to add CA to cert pool") + } + + tlsConfig.RootCAs = caCertPool + tlsConfig.ClientCAs = caCertPool + } + + if (sslcert != "" && sslkey == "") || (sslcert == "" && sslkey != "") { + return nil, errors.New(`both "sslcert" and "sslkey" are required`) + } + + if sslcert != "" && sslkey != "" { + buf, err := os.ReadFile(sslkey) + if err != nil { + return nil, fmt.Errorf("unable to read sslkey: %w", err) + } + block, _ := pem.Decode(buf) + if block == nil { + return nil, errors.New("failed to decode sslkey") + } + var pemKey []byte + var decryptedKey []byte + var decryptedError error + // If PEM is encrypted, attempt to decrypt using pass phrase + if x509.IsEncryptedPEMBlock(block) { + // Attempt decryption with pass phrase + // NOTE: only supports RSA (PKCS#1) + if sslpassword != "" { + decryptedKey, decryptedError = x509.DecryptPEMBlock(block, []byte(sslpassword)) + } + //if sslpassword not provided or has decryption error when use it + //try to find sslpassword with callback function + if sslpassword == "" || decryptedError != nil { + if parseConfigOptions.GetSSLPassword != nil { + sslpassword = parseConfigOptions.GetSSLPassword(context.Background()) + } + if sslpassword == "" { + return nil, fmt.Errorf("unable to find sslpassword") + } + } + decryptedKey, decryptedError = x509.DecryptPEMBlock(block, []byte(sslpassword)) + // Should we also provide warning for PKCS#1 needed? + if decryptedError != nil { + return nil, fmt.Errorf("unable to decrypt key: %w", err) + } + + pemBytes := pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: decryptedKey, + } + pemKey = pem.EncodeToMemory(&pemBytes) + } else { + pemKey = pem.EncodeToMemory(block) + } + certfile, err := os.ReadFile(sslcert) + if err != nil { + return nil, fmt.Errorf("unable to read cert: %w", err) + } + cert, err := tls.X509KeyPair(certfile, pemKey) + if err != nil { + return nil, fmt.Errorf("unable to load cert: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + // Set Server Name Indication (SNI), if enabled by connection parameters. + // Per RFC 6066, do not set it if the host is a literal IP address (IPv4 + // or IPv6). + if sslsni == "1" && net.ParseIP(host) == nil { + tlsConfig.ServerName = host + } + + switch sslmode { + case "allow": + return []*tls.Config{nil, tlsConfig}, nil + case "prefer": + return []*tls.Config{tlsConfig, nil}, nil + case "require", "verify-ca", "verify-full": + return []*tls.Config{tlsConfig}, nil + default: + panic("BUG: bad sslmode should already have been caught") + } +} + +func parsePort(s string) (uint16, error) { + port, err := strconv.ParseUint(s, 10, 16) + if err != nil { + return 0, err + } + if port < 1 || port > math.MaxUint16 { + return 0, errors.New("outside range") + } + return uint16(port), nil +} + +func makeDefaultDialer() *net.Dialer { + // rely on GOLANG KeepAlive settings + return &net.Dialer{} +} + +func makeDefaultResolver() *net.Resolver { + return net.DefaultResolver +} + +func parseConnectTimeoutSetting(s string) (time.Duration, error) { + timeout, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, err + } + if timeout < 0 { + return 0, errors.New("negative timeout") + } + return time.Duration(timeout) * time.Second, nil +} + +func makeConnectTimeoutDialFunc(timeout time.Duration) DialFunc { + d := makeDefaultDialer() + d.Timeout = timeout + return d.DialContext +} + +// ValidateConnectTargetSessionAttrsReadWrite is a ValidateConnectFunc that implements libpq compatible +// target_session_attrs=read-write. +func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) == "on" { + return errors.New("read only connection") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsReadOnly is a ValidateConnectFunc that implements libpq compatible +// target_session_attrs=read-only. +func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) != "on" { + return errors.New("connection is not read only") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsStandby is a ValidateConnectFunc that implements libpq compatible +// target_session_attrs=standby. +func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) != "t" { + return errors.New("server is not in hot standby mode") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsPrimary is a ValidateConnectFunc that implements libpq compatible +// target_session_attrs=primary. +func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) == "t" { + return errors.New("server is in standby mode") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsPreferStandby is a ValidateConnectFunc that implements libpq compatible +// target_session_attrs=prefer-standby. +func ValidateConnectTargetSessionAttrsPreferStandby(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) != "t" { + return &NotPreferredError{err: errors.New("server is not in hot standby mode")} + } + + return nil +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go b/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go new file mode 100644 index 0000000..db8884e --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go @@ -0,0 +1,80 @@ +package ctxwatch + +import ( + "context" + "sync" +) + +// ContextWatcher watches a context and performs an action when the context is canceled. It can watch one context at a +// time. +type ContextWatcher struct { + handler Handler + unwatchChan chan struct{} + + lock sync.Mutex + watchInProgress bool + onCancelWasCalled bool +} + +// NewContextWatcher returns a ContextWatcher. onCancel will be called when a watched context is canceled. +// OnUnwatchAfterCancel will be called when Unwatch is called and the watched context had already been canceled and +// onCancel called. +func NewContextWatcher(handler Handler) *ContextWatcher { + cw := &ContextWatcher{ + handler: handler, + unwatchChan: make(chan struct{}), + } + + return cw +} + +// Watch starts watching ctx. If ctx is canceled then the onCancel function passed to NewContextWatcher will be called. +func (cw *ContextWatcher) Watch(ctx context.Context) { + cw.lock.Lock() + defer cw.lock.Unlock() + + if cw.watchInProgress { + panic("Watch already in progress") + } + + cw.onCancelWasCalled = false + + if ctx.Done() != nil { + cw.watchInProgress = true + go func() { + select { + case <-ctx.Done(): + cw.handler.HandleCancel(ctx) + cw.onCancelWasCalled = true + <-cw.unwatchChan + case <-cw.unwatchChan: + } + }() + } else { + cw.watchInProgress = false + } +} + +// Unwatch stops watching the previously watched context. If the onCancel function passed to NewContextWatcher was +// called then onUnwatchAfterCancel will also be called. +func (cw *ContextWatcher) Unwatch() { + cw.lock.Lock() + defer cw.lock.Unlock() + + if cw.watchInProgress { + cw.unwatchChan <- struct{}{} + if cw.onCancelWasCalled { + cw.handler.HandleUnwatchAfterCancel() + } + cw.watchInProgress = false + } +} + +type Handler interface { + // HandleCancel is called when the context that a ContextWatcher is currently watching is canceled. canceledCtx is the + // context that was canceled. + HandleCancel(canceledCtx context.Context) + + // HandleUnwatchAfterCancel is called when a ContextWatcher that called HandleCancel on this Handler is unwatched. + HandleUnwatchAfterCancel() +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go b/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go new file mode 100644 index 0000000..1dd514f --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go @@ -0,0 +1,63 @@ +//go:build !windows +// +build !windows + +package pgconn + +import ( + "os" + "os/user" + "path/filepath" +) + +func defaultSettings() map[string]string { + settings := make(map[string]string) + + settings["host"] = defaultHost() + settings["port"] = "5432" + + // Default to the OS user name. Purposely ignoring err getting user name from + // OS. The client application will simply have to specify the user in that + // case (which they typically will be doing anyway). + user, err := user.Current() + if err == nil { + settings["user"] = user.Username + settings["passfile"] = filepath.Join(user.HomeDir, ".pgpass") + settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") + sslcert := filepath.Join(user.HomeDir, ".postgresql", "postgresql.crt") + sslkey := filepath.Join(user.HomeDir, ".postgresql", "postgresql.key") + if _, err := os.Stat(sslcert); err == nil { + if _, err := os.Stat(sslkey); err == nil { + // Both the cert and key must be present to use them, or do not use either + settings["sslcert"] = sslcert + settings["sslkey"] = sslkey + } + } + sslrootcert := filepath.Join(user.HomeDir, ".postgresql", "root.crt") + if _, err := os.Stat(sslrootcert); err == nil { + settings["sslrootcert"] = sslrootcert + } + } + + settings["target_session_attrs"] = "any" + + return settings +} + +// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost +// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it +// checks the existence of common locations. +func defaultHost() string { + candidatePaths := []string{ + "/var/run/postgresql", // Debian + "/private/tmp", // OSX - homebrew + "/tmp", // standard PostgreSQL + } + + for _, path := range candidatePaths { + if _, err := os.Stat(path); err == nil { + return path + } + } + + return "localhost" +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go b/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go new file mode 100644 index 0000000..33b4a1f --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go @@ -0,0 +1,57 @@ +package pgconn + +import ( + "os" + "os/user" + "path/filepath" + "strings" +) + +func defaultSettings() map[string]string { + settings := make(map[string]string) + + settings["host"] = defaultHost() + settings["port"] = "5432" + + // Default to the OS user name. Purposely ignoring err getting user name from + // OS. The client application will simply have to specify the user in that + // case (which they typically will be doing anyway). + user, err := user.Current() + appData := os.Getenv("APPDATA") + if err == nil { + // Windows gives us the username here as `DOMAIN\user` or `LOCALPCNAME\user`, + // but the libpq default is just the `user` portion, so we strip off the first part. + username := user.Username + if strings.Contains(username, "\\") { + username = username[strings.LastIndex(username, "\\")+1:] + } + + settings["user"] = username + settings["passfile"] = filepath.Join(appData, "postgresql", "pgpass.conf") + settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") + sslcert := filepath.Join(appData, "postgresql", "postgresql.crt") + sslkey := filepath.Join(appData, "postgresql", "postgresql.key") + if _, err := os.Stat(sslcert); err == nil { + if _, err := os.Stat(sslkey); err == nil { + // Both the cert and key must be present to use them, or do not use either + settings["sslcert"] = sslcert + settings["sslkey"] = sslkey + } + } + sslrootcert := filepath.Join(appData, "postgresql", "root.crt") + if _, err := os.Stat(sslrootcert); err == nil { + settings["sslrootcert"] = sslrootcert + } + } + + settings["target_session_attrs"] = "any" + + return settings +} + +// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost +// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it +// checks the existence of common locations. +func defaultHost() string { + return "localhost" +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/doc.go b/vendor/github.com/jackc/pgx/v5/pgconn/doc.go new file mode 100644 index 0000000..7013750 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/doc.go @@ -0,0 +1,38 @@ +// Package pgconn is a low-level PostgreSQL database driver. +/* +pgconn provides lower level access to a PostgreSQL connection than a database/sql or pgx connection. It operates at +nearly the same level is the C library libpq. + +Establishing a Connection + +Use Connect to establish a connection. It accepts a connection string in URL or keyword/value format and will read the +environment for libpq style environment variables. + +Executing a Query + +ExecParams and ExecPrepared execute a single query. They return readers that iterate over each row. The Read method +reads all rows into memory. + +Executing Multiple Queries in a Single Round Trip + +Exec and ExecBatch can execute multiple queries in a single round trip. They return readers that iterate over each query +result. The ReadAll method reads all query results into memory. + +Pipeline Mode + +Pipeline mode allows sending queries without having read the results of previously sent queries. It allows control of +exactly how many and when network round trips occur. + +Context Support + +All potentially blocking operations take a context.Context. The default behavior when a context is canceled is for the +method to immediately return. In most circumstances, this will also close the underlying connection. This behavior can +be customized by using BuildContextWatcherHandler on the Config to create a ctxwatch.Handler with different behavior. +This can be especially useful when queries that are frequently canceled and the overhead of creating new connections is +a problem. DeadlineContextWatcherHandler and CancelRequestContextWatcherHandler can be used to introduce a delay before +interrupting the query in such a way as to close the connection. + +The CancelRequest method may be used to request the PostgreSQL server cancel an in-progress query without forcing the +client to abort. +*/ +package pgconn diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/errors.go b/vendor/github.com/jackc/pgx/v5/pgconn/errors.go new file mode 100644 index 0000000..ec4a6d4 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/errors.go @@ -0,0 +1,248 @@ +package pgconn + +import ( + "context" + "errors" + "fmt" + "net" + "net/url" + "regexp" + "strings" +) + +// SafeToRetry checks if the err is guaranteed to have occurred before sending any data to the server. +func SafeToRetry(err error) bool { + var retryableErr interface{ SafeToRetry() bool } + if errors.As(err, &retryableErr) { + return retryableErr.SafeToRetry() + } + return false +} + +// Timeout checks if err was caused by a timeout. To be specific, it is true if err was caused within pgconn by a +// context.DeadlineExceeded or an implementer of net.Error where Timeout() is true. +func Timeout(err error) bool { + var timeoutErr *errTimeout + return errors.As(err, &timeoutErr) +} + +// PgError represents an error reported by the PostgreSQL server. See +// http://www.postgresql.org/docs/11/static/protocol-error-fields.html for +// detailed field description. +type PgError struct { + Severity string + SeverityUnlocalized string + Code string + Message string + Detail string + Hint string + Position int32 + InternalPosition int32 + InternalQuery string + Where string + SchemaName string + TableName string + ColumnName string + DataTypeName string + ConstraintName string + File string + Line int32 + Routine string +} + +func (pe *PgError) Error() string { + return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" +} + +// SQLState returns the SQLState of the error. +func (pe *PgError) SQLState() string { + return pe.Code +} + +// ConnectError is the error returned when a connection attempt fails. +type ConnectError struct { + Config *Config // The configuration that was used in the connection attempt. + err error +} + +func (e *ConnectError) Error() string { + prefix := fmt.Sprintf("failed to connect to `user=%s database=%s`:", e.Config.User, e.Config.Database) + details := e.err.Error() + if strings.Contains(details, "\n") { + return prefix + "\n\t" + strings.ReplaceAll(details, "\n", "\n\t") + } else { + return prefix + " " + details + } +} + +func (e *ConnectError) Unwrap() error { + return e.err +} + +type perDialConnectError struct { + address string + originalHostname string + err error +} + +func (e *perDialConnectError) Error() string { + return fmt.Sprintf("%s (%s): %s", e.address, e.originalHostname, e.err.Error()) +} + +func (e *perDialConnectError) Unwrap() error { + return e.err +} + +type connLockError struct { + status string +} + +func (e *connLockError) SafeToRetry() bool { + return true // a lock failure by definition happens before the connection is used. +} + +func (e *connLockError) Error() string { + return e.status +} + +// ParseConfigError is the error returned when a connection string cannot be parsed. +type ParseConfigError struct { + ConnString string // The connection string that could not be parsed. + msg string + err error +} + +func (e *ParseConfigError) Error() string { + // Now that ParseConfigError is public and ConnString is available to the developer, perhaps it would be better only + // return a static string. That would ensure that the error message cannot leak a password. The ConnString field would + // allow access to the original string if desired and Unwrap would allow access to the underlying error. + connString := redactPW(e.ConnString) + if e.err == nil { + return fmt.Sprintf("cannot parse `%s`: %s", connString, e.msg) + } + return fmt.Sprintf("cannot parse `%s`: %s (%s)", connString, e.msg, e.err.Error()) +} + +func (e *ParseConfigError) Unwrap() error { + return e.err +} + +func normalizeTimeoutError(ctx context.Context, err error) error { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + if ctx.Err() == context.Canceled { + // Since the timeout was caused by a context cancellation, the actual error is context.Canceled not the timeout error. + return context.Canceled + } else if ctx.Err() == context.DeadlineExceeded { + return &errTimeout{err: ctx.Err()} + } else { + return &errTimeout{err: netErr} + } + } + return err +} + +type pgconnError struct { + msg string + err error + safeToRetry bool +} + +func (e *pgconnError) Error() string { + if e.msg == "" { + return e.err.Error() + } + if e.err == nil { + return e.msg + } + return fmt.Sprintf("%s: %s", e.msg, e.err.Error()) +} + +func (e *pgconnError) SafeToRetry() bool { + return e.safeToRetry +} + +func (e *pgconnError) Unwrap() error { + return e.err +} + +// errTimeout occurs when an error was caused by a timeout. Specifically, it wraps an error which is +// context.Canceled, context.DeadlineExceeded, or an implementer of net.Error where Timeout() is true. +type errTimeout struct { + err error +} + +func (e *errTimeout) Error() string { + return fmt.Sprintf("timeout: %s", e.err.Error()) +} + +func (e *errTimeout) SafeToRetry() bool { + return SafeToRetry(e.err) +} + +func (e *errTimeout) Unwrap() error { + return e.err +} + +type contextAlreadyDoneError struct { + err error +} + +func (e *contextAlreadyDoneError) Error() string { + return fmt.Sprintf("context already done: %s", e.err.Error()) +} + +func (e *contextAlreadyDoneError) SafeToRetry() bool { + return true +} + +func (e *contextAlreadyDoneError) Unwrap() error { + return e.err +} + +// newContextAlreadyDoneError double-wraps a context error in `contextAlreadyDoneError` and `errTimeout`. +func newContextAlreadyDoneError(ctx context.Context) (err error) { + return &errTimeout{&contextAlreadyDoneError{err: ctx.Err()}} +} + +func redactPW(connString string) string { + if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { + if u, err := url.Parse(connString); err == nil { + return redactURL(u) + } + } + quotedKV := regexp.MustCompile(`password='[^']*'`) + connString = quotedKV.ReplaceAllLiteralString(connString, "password=xxxxx") + plainKV := regexp.MustCompile(`password=[^ ]*`) + connString = plainKV.ReplaceAllLiteralString(connString, "password=xxxxx") + brokenURL := regexp.MustCompile(`:[^:@]+?@`) + connString = brokenURL.ReplaceAllLiteralString(connString, ":xxxxxx@") + return connString +} + +func redactURL(u *url.URL) string { + if u == nil { + return "" + } + if _, pwSet := u.User.Password(); pwSet { + u.User = url.UserPassword(u.User.Username(), "xxxxx") + } + return u.String() +} + +type NotPreferredError struct { + err error + safeToRetry bool +} + +func (e *NotPreferredError) Error() string { + return fmt.Sprintf("standby server not found: %s", e.err.Error()) +} + +func (e *NotPreferredError) SafeToRetry() bool { + return e.safeToRetry +} + +func (e *NotPreferredError) Unwrap() error { + return e.err +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go b/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go new file mode 100644 index 0000000..e65c2c2 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go @@ -0,0 +1,139 @@ +// Package bgreader provides a io.Reader that can optionally buffer reads in the background. +package bgreader + +import ( + "io" + "sync" + + "github.com/jackc/pgx/v5/internal/iobufpool" +) + +const ( + StatusStopped = iota + StatusRunning + StatusStopping +) + +// BGReader is an io.Reader that can optionally buffer reads in the background. It is safe for concurrent use. +type BGReader struct { + r io.Reader + + cond *sync.Cond + status int32 + readResults []readResult +} + +type readResult struct { + buf *[]byte + err error +} + +// Start starts the backgrounder reader. If the background reader is already running this is a no-op. The background +// reader will stop automatically when the underlying reader returns an error. +func (r *BGReader) Start() { + r.cond.L.Lock() + defer r.cond.L.Unlock() + + switch r.status { + case StatusStopped: + r.status = StatusRunning + go r.bgRead() + case StatusRunning: + // no-op + case StatusStopping: + r.status = StatusRunning + } +} + +// Stop tells the background reader to stop after the in progress Read returns. It is safe to call Stop when the +// background reader is not running. +func (r *BGReader) Stop() { + r.cond.L.Lock() + defer r.cond.L.Unlock() + + switch r.status { + case StatusStopped: + // no-op + case StatusRunning: + r.status = StatusStopping + case StatusStopping: + // no-op + } +} + +// Status returns the current status of the background reader. +func (r *BGReader) Status() int32 { + r.cond.L.Lock() + defer r.cond.L.Unlock() + return r.status +} + +func (r *BGReader) bgRead() { + keepReading := true + for keepReading { + buf := iobufpool.Get(8192) + n, err := r.r.Read(*buf) + *buf = (*buf)[:n] + + r.cond.L.Lock() + r.readResults = append(r.readResults, readResult{buf: buf, err: err}) + if r.status == StatusStopping || err != nil { + r.status = StatusStopped + keepReading = false + } + r.cond.L.Unlock() + r.cond.Broadcast() + } +} + +// Read implements the io.Reader interface. +func (r *BGReader) Read(p []byte) (int, error) { + r.cond.L.Lock() + defer r.cond.L.Unlock() + + if len(r.readResults) > 0 { + return r.readFromReadResults(p) + } + + // There are no unread background read results and the background reader is stopped. + if r.status == StatusStopped { + return r.r.Read(p) + } + + // Wait for results from the background reader + for len(r.readResults) == 0 { + r.cond.Wait() + } + return r.readFromReadResults(p) +} + +// readBackgroundResults reads a result previously read by the background reader. r.cond.L must be held. +func (r *BGReader) readFromReadResults(p []byte) (int, error) { + buf := r.readResults[0].buf + var err error + + n := copy(p, *buf) + if n == len(*buf) { + err = r.readResults[0].err + iobufpool.Put(buf) + if len(r.readResults) == 1 { + r.readResults = nil + } else { + r.readResults = r.readResults[1:] + } + } else { + *buf = (*buf)[n:] + r.readResults[0].buf = buf + } + + return n, err +} + +func New(r io.Reader) *BGReader { + return &BGReader{ + r: r, + cond: &sync.Cond{ + L: &sync.Mutex{}, + }, + } +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go b/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go new file mode 100644 index 0000000..3c1af34 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go @@ -0,0 +1,100 @@ +package pgconn + +import ( + "errors" + "fmt" + + "github.com/jackc/pgx/v5/pgproto3" +) + +// NewGSSFunc creates a GSS authentication provider, for use with +// RegisterGSSProvider. +type NewGSSFunc func() (GSS, error) + +var newGSS NewGSSFunc + +// RegisterGSSProvider registers a GSS authentication provider. For example, if +// you need to use Kerberos to authenticate with your server, add this to your +// main package: +// +// import "github.com/otan/gopgkrb5" +// +// func init() { +// pgconn.RegisterGSSProvider(func() (pgconn.GSS, error) { return gopgkrb5.NewGSS() }) +// } +func RegisterGSSProvider(newGSSArg NewGSSFunc) { + newGSS = newGSSArg +} + +// GSS provides GSSAPI authentication (e.g., Kerberos). +type GSS interface { + GetInitToken(host string, service string) ([]byte, error) + GetInitTokenFromSPN(spn string) ([]byte, error) + Continue(inToken []byte) (done bool, outToken []byte, err error) +} + +func (c *PgConn) gssAuth() error { + if newGSS == nil { + return errors.New("kerberos error: no GSSAPI provider registered, see https://github.com/otan/gopgkrb5") + } + cli, err := newGSS() + if err != nil { + return err + } + + var nextData []byte + if c.config.KerberosSpn != "" { + // Use the supplied SPN if provided. + nextData, err = cli.GetInitTokenFromSPN(c.config.KerberosSpn) + } else { + // Allow the kerberos service name to be overridden + service := "postgres" + if c.config.KerberosSrvName != "" { + service = c.config.KerberosSrvName + } + nextData, err = cli.GetInitToken(c.config.Host, service) + } + if err != nil { + return err + } + + for { + gssResponse := &pgproto3.GSSResponse{ + Data: nextData, + } + c.frontend.Send(gssResponse) + err = c.flushWithPotentialWriteReadDeadlock() + if err != nil { + return err + } + resp, err := c.rxGSSContinue() + if err != nil { + return err + } + var done bool + done, nextData, err = cli.Continue(resp.Data) + if err != nil { + return err + } + if done { + break + } + } + return nil +} + +func (c *PgConn) rxGSSContinue() (*pgproto3.AuthenticationGSSContinue, error) { + msg, err := c.receiveMessage() + if err != nil { + return nil, err + } + + switch m := msg.(type) { + case *pgproto3.AuthenticationGSSContinue: + return m, nil + case *pgproto3.ErrorResponse: + return nil, ErrorResponseToPgError(m) + } + + return nil, fmt.Errorf("expected AuthenticationGSSContinue message but received unexpected message %T", msg) +} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go new file mode 100644 index 0000000..7efb522 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go @@ -0,0 +1,2346 @@ +package pgconn + +import ( + "context" + "crypto/md5" + "crypto/tls" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "math" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/jackc/pgx/v5/internal/iobufpool" + "github.com/jackc/pgx/v5/internal/pgio" + "github.com/jackc/pgx/v5/pgconn/ctxwatch" + "github.com/jackc/pgx/v5/pgconn/internal/bgreader" + "github.com/jackc/pgx/v5/pgproto3" +) + +const ( + connStatusUninitialized = iota + connStatusConnecting + connStatusClosed + connStatusIdle + connStatusBusy +) + +// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from +// LISTEN/NOTIFY notification. +type Notice PgError + +// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system +type Notification struct { + PID uint32 // backend pid that sent the notification + Channel string // channel from which notification was received + Payload string +} + +// DialFunc is a function that can be used to connect to a PostgreSQL server. +type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + +// LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be +// returned in order to override the connection string's port. +type LookupFunc func(ctx context.Context, host string) (addrs []string, err error) + +// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection. +type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend + +// PgErrorHandler is a function that handles errors returned from Postgres. This function must return true to keep +// the connection open. Returning false will cause the connection to be closed immediately. You should return +// false on any FATAL-severity errors. This will not receive network errors. The *PgConn is provided so the handler is +// aware of the origin of the error, but it must not invoke any query method. +type PgErrorHandler func(*PgConn, *PgError) bool + +// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at +// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin +// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY +// notification. +type NoticeHandler func(*PgConn, *Notice) + +// NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications +// can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is +// aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a +// notice event. +type NotificationHandler func(*PgConn, *Notification) + +// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage. +type PgConn struct { + conn net.Conn + pid uint32 // backend pid + secretKey uint32 // key to use to send a cancel query message to the server + parameterStatuses map[string]string // parameters that have been reported by the server + txStatus byte + frontend *pgproto3.Frontend + bgReader *bgreader.BGReader + slowWriteTimer *time.Timer + bgReaderStarted chan struct{} + + customData map[string]any + + config *Config + + status byte // One of connStatus* constants + + bufferingReceive bool + bufferingReceiveMux sync.Mutex + bufferingReceiveMsg pgproto3.BackendMessage + bufferingReceiveErr error + + peekedMsg pgproto3.BackendMessage + + // Reusable / preallocated resources + resultReader ResultReader + multiResultReader MultiResultReader + pipeline Pipeline + contextWatcher *ctxwatch.ContextWatcher + fieldDescriptions [16]FieldDescription + + cleanupDone chan struct{} +} + +// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value +// format) to provide configuration. See documentation for [ParseConfig] for details. ctx can be used to cancel a +// connect attempt. +func Connect(ctx context.Context, connString string) (*PgConn, error) { + config, err := ParseConfig(connString) + if err != nil { + return nil, err + } + + return ConnectConfig(ctx, config) +} + +// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value +// format) and ParseConfigOptions to provide additional configuration. See documentation for [ParseConfig] for details. +// ctx can be used to cancel a connect attempt. +func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptions ParseConfigOptions) (*PgConn, error) { + config, err := ParseConfigWithOptions(connString, parseConfigOptions) + if err != nil { + return nil, err + } + + return ConnectConfig(ctx, config) +} + +// Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with +// [ParseConfig]. ctx can be used to cancel a connect attempt. +// +// If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An +// authentication error will terminate the chain of attempts (like libpq: +// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. +func ConnectConfig(ctx context.Context, config *Config) (*PgConn, error) { + // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from + // zero values. + if !config.createdByParseConfig { + panic("config must be created by ParseConfig") + } + + var allErrors []error + + connectConfigs, errs := buildConnectOneConfigs(ctx, config) + if len(errs) > 0 { + allErrors = append(allErrors, errs...) + } + + if len(connectConfigs) == 0 { + return nil, &ConnectError{Config: config, err: fmt.Errorf("hostname resolving error: %w", errors.Join(allErrors...))} + } + + pgConn, errs := connectPreferred(ctx, config, connectConfigs) + if len(errs) > 0 { + allErrors = append(allErrors, errs...) + return nil, &ConnectError{Config: config, err: errors.Join(allErrors...)} + } + + if config.AfterConnect != nil { + err := config.AfterConnect(ctx, pgConn) + if err != nil { + pgConn.conn.Close() + return nil, &ConnectError{Config: config, err: fmt.Errorf("AfterConnect error: %w", err)} + } + } + + return pgConn, nil +} + +// buildConnectOneConfigs resolves hostnames and builds a list of connectOneConfigs to try connecting to. It returns a +// slice of successfully resolved connectOneConfigs and a slice of errors. It is possible for both slices to contain +// values if some hosts were successfully resolved and others were not. +func buildConnectOneConfigs(ctx context.Context, config *Config) ([]*connectOneConfig, []error) { + // Simplify usage by treating primary config and fallbacks the same. + fallbackConfigs := []*FallbackConfig{ + { + Host: config.Host, + Port: config.Port, + TLSConfig: config.TLSConfig, + }, + } + fallbackConfigs = append(fallbackConfigs, config.Fallbacks...) + + var configs []*connectOneConfig + + var allErrors []error + + for _, fb := range fallbackConfigs { + // skip resolve for unix sockets + if isAbsolutePath(fb.Host) { + network, address := NetworkAddress(fb.Host, fb.Port) + configs = append(configs, &connectOneConfig{ + network: network, + address: address, + originalHostname: fb.Host, + tlsConfig: fb.TLSConfig, + }) + + continue + } + + ips, err := config.LookupFunc(ctx, fb.Host) + if err != nil { + allErrors = append(allErrors, err) + continue + } + + for _, ip := range ips { + splitIP, splitPort, err := net.SplitHostPort(ip) + if err == nil { + port, err := strconv.ParseUint(splitPort, 10, 16) + if err != nil { + return nil, []error{fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)} + } + network, address := NetworkAddress(splitIP, uint16(port)) + configs = append(configs, &connectOneConfig{ + network: network, + address: address, + originalHostname: fb.Host, + tlsConfig: fb.TLSConfig, + }) + } else { + network, address := NetworkAddress(ip, fb.Port) + configs = append(configs, &connectOneConfig{ + network: network, + address: address, + originalHostname: fb.Host, + tlsConfig: fb.TLSConfig, + }) + } + } + } + + return configs, allErrors +} + +// connectPreferred attempts to connect to the preferred host from connectOneConfigs. The connections are attempted in +// order. If a connection is successful it is returned. If no connection is successful then all errors are returned. If +// a connection attempt returns a [NotPreferredError], then that host will be used if no other hosts are successful. +func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []*connectOneConfig) (*PgConn, []error) { + octx := ctx + var allErrors []error + + var fallbackConnectOneConfig *connectOneConfig + for i, c := range connectOneConfigs { + // ConnectTimeout restricts the whole connection process. + if config.ConnectTimeout != 0 { + // create new context first time or when previous host was different + if i == 0 || (connectOneConfigs[i].address != connectOneConfigs[i-1].address) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout) + defer cancel() + } + } else { + ctx = octx + } + + pgConn, err := connectOne(ctx, config, c, false) + if pgConn != nil { + return pgConn, nil + } + + allErrors = append(allErrors, err) + + var pgErr *PgError + if errors.As(err, &pgErr) { + const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password + const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings + const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist + const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege + if pgErr.Code == ERRCODE_INVALID_PASSWORD || + pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil || + pgErr.Code == ERRCODE_INVALID_CATALOG_NAME || + pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE { + return nil, allErrors + } + } + + var npErr *NotPreferredError + if errors.As(err, &npErr) { + fallbackConnectOneConfig = c + } + } + + if fallbackConnectOneConfig != nil { + pgConn, err := connectOne(ctx, config, fallbackConnectOneConfig, true) + if err == nil { + return pgConn, nil + } + allErrors = append(allErrors, err) + } + + return nil, allErrors +} + +// connectOne makes one connection attempt to a single host. +func connectOne(ctx context.Context, config *Config, connectConfig *connectOneConfig, + ignoreNotPreferredErr bool, +) (*PgConn, error) { + pgConn := new(PgConn) + pgConn.config = config + pgConn.cleanupDone = make(chan struct{}) + pgConn.customData = make(map[string]any) + + var err error + + newPerDialConnectError := func(msg string, err error) *perDialConnectError { + err = normalizeTimeoutError(ctx, err) + e := &perDialConnectError{address: connectConfig.address, originalHostname: connectConfig.originalHostname, err: fmt.Errorf("%s: %w", msg, err)} + return e + } + + pgConn.conn, err = config.DialFunc(ctx, connectConfig.network, connectConfig.address) + if err != nil { + return nil, newPerDialConnectError("dial error", err) + } + + if connectConfig.tlsConfig != nil { + pgConn.contextWatcher = ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: pgConn.conn}) + pgConn.contextWatcher.Watch(ctx) + tlsConn, err := startTLS(pgConn.conn, connectConfig.tlsConfig) + pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS. + if err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("tls error", err) + } + + pgConn.conn = tlsConn + } + + pgConn.contextWatcher = ctxwatch.NewContextWatcher(config.BuildContextWatcherHandler(pgConn)) + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + + pgConn.parameterStatuses = make(map[string]string) + pgConn.status = connStatusConnecting + pgConn.bgReader = bgreader.New(pgConn.conn) + pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), + func() { + pgConn.bgReader.Start() + pgConn.bgReaderStarted <- struct{}{} + }, + ) + pgConn.slowWriteTimer.Stop() + pgConn.bgReaderStarted = make(chan struct{}) + pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn) + + startupMsg := pgproto3.StartupMessage{ + ProtocolVersion: pgproto3.ProtocolVersionNumber, + Parameters: make(map[string]string), + } + + // Copy default run-time params + for k, v := range config.RuntimeParams { + startupMsg.Parameters[k] = v + } + + startupMsg.Parameters["user"] = config.User + if config.Database != "" { + startupMsg.Parameters["database"] = config.Database + } + + pgConn.frontend.Send(&startupMsg) + if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("failed to write startup message", err) + } + + for { + msg, err := pgConn.receiveMessage() + if err != nil { + pgConn.conn.Close() + if err, ok := err.(*PgError); ok { + return nil, newPerDialConnectError("server error", err) + } + return nil, newPerDialConnectError("failed to receive message", err) + } + + switch msg := msg.(type) { + case *pgproto3.BackendKeyData: + pgConn.pid = msg.ProcessID + pgConn.secretKey = msg.SecretKey + + case *pgproto3.AuthenticationOk: + case *pgproto3.AuthenticationCleartextPassword: + err = pgConn.txPasswordMessage(pgConn.config.Password) + if err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("failed to write password message", err) + } + case *pgproto3.AuthenticationMD5Password: + digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:])) + err = pgConn.txPasswordMessage(digestedPassword) + if err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("failed to write password message", err) + } + case *pgproto3.AuthenticationSASL: + err = pgConn.scramAuth(msg.AuthMechanisms) + if err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("failed SASL auth", err) + } + case *pgproto3.AuthenticationGSS: + err = pgConn.gssAuth() + if err != nil { + pgConn.conn.Close() + return nil, newPerDialConnectError("failed GSS auth", err) + } + case *pgproto3.ReadyForQuery: + pgConn.status = connStatusIdle + if config.ValidateConnect != nil { + // ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid + // the watch already in progress panic. This is that last thing done by this method so there is no need to + // restart the watch after ValidateConnect returns. + // + // See https://github.com/jackc/pgconn/issues/40. + pgConn.contextWatcher.Unwatch() + + err := config.ValidateConnect(ctx, pgConn) + if err != nil { + if _, ok := err.(*NotPreferredError); ignoreNotPreferredErr && ok { + return pgConn, nil + } + pgConn.conn.Close() + return nil, newPerDialConnectError("ValidateConnect failed", err) + } + } + return pgConn, nil + case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse: + // handled by ReceiveMessage + case *pgproto3.ErrorResponse: + pgConn.conn.Close() + return nil, newPerDialConnectError("server error", ErrorResponseToPgError(msg)) + default: + pgConn.conn.Close() + return nil, newPerDialConnectError("received unexpected message", err) + } + } +} + +func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { + err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103}) + if err != nil { + return nil, err + } + + response := make([]byte, 1) + if _, err = io.ReadFull(conn, response); err != nil { + return nil, err + } + + if response[0] != 'S' { + return nil, errors.New("server refused TLS connection") + } + + return tls.Client(conn, tlsConfig), nil +} + +func (pgConn *PgConn) txPasswordMessage(password string) (err error) { + pgConn.frontend.Send(&pgproto3.PasswordMessage{Password: password}) + return pgConn.flushWithPotentialWriteReadDeadlock() +} + +func hexMD5(s string) string { + hash := md5.New() + io.WriteString(hash, s) + return hex.EncodeToString(hash.Sum(nil)) +} + +func (pgConn *PgConn) signalMessage() chan struct{} { + if pgConn.bufferingReceive { + panic("BUG: signalMessage when already in progress") + } + + pgConn.bufferingReceive = true + pgConn.bufferingReceiveMux.Lock() + + ch := make(chan struct{}) + go func() { + pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive() + pgConn.bufferingReceiveMux.Unlock() + close(ch) + }() + + return ch +} + +// ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the +// connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages +// are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger +// the OnNotification callback. +// +// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. +// See https://www.postgresql.org/docs/current/protocol.html. +func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) { + if err := pgConn.lock(); err != nil { + return nil, err + } + defer pgConn.unlock() + + if ctx != context.Background() { + select { + case <-ctx.Done(): + return nil, newContextAlreadyDoneError(ctx) + default: + } + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + msg, err := pgConn.receiveMessage() + if err != nil { + err = &pgconnError{ + msg: "receive message failed", + err: normalizeTimeoutError(ctx, err), + safeToRetry: true, + } + } + return msg, err +} + +// peekMessage peeks at the next message without setting up context cancellation. +func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) { + if pgConn.peekedMsg != nil { + return pgConn.peekedMsg, nil + } + + var msg pgproto3.BackendMessage + var err error + if pgConn.bufferingReceive { + pgConn.bufferingReceiveMux.Lock() + msg = pgConn.bufferingReceiveMsg + err = pgConn.bufferingReceiveErr + pgConn.bufferingReceiveMux.Unlock() + pgConn.bufferingReceive = false + + // If a timeout error happened in the background try the read again. + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + msg, err = pgConn.frontend.Receive() + } + } else { + msg, err = pgConn.frontend.Receive() + } + + if err != nil { + // Close on anything other than timeout error - everything else is fatal + var netErr net.Error + isNetErr := errors.As(err, &netErr) + if !(isNetErr && netErr.Timeout()) { + pgConn.asyncClose() + } + + return nil, err + } + + pgConn.peekedMsg = msg + return msg, nil +} + +// receiveMessage receives a message without setting up context cancellation +func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { + msg, err := pgConn.peekMessage() + if err != nil { + return nil, err + } + pgConn.peekedMsg = nil + + switch msg := msg.(type) { + case *pgproto3.ReadyForQuery: + pgConn.txStatus = msg.TxStatus + case *pgproto3.ParameterStatus: + pgConn.parameterStatuses[msg.Name] = msg.Value + case *pgproto3.ErrorResponse: + err := ErrorResponseToPgError(msg) + if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) { + pgConn.status = connStatusClosed + pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. + close(pgConn.cleanupDone) + return nil, err + } + case *pgproto3.NoticeResponse: + if pgConn.config.OnNotice != nil { + pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) + } + case *pgproto3.NotificationResponse: + if pgConn.config.OnNotification != nil { + pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) + } + } + + return msg, nil +} + +// Conn returns the underlying net.Conn. This rarely necessary. If the connection will be directly used for reading or +// writing then SyncConn should usually be called before Conn. +func (pgConn *PgConn) Conn() net.Conn { + return pgConn.conn +} + +// PID returns the backend PID. +func (pgConn *PgConn) PID() uint32 { + return pgConn.pid +} + +// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message. +// +// Possible return values: +// +// 'I' - idle / not in transaction +// 'T' - in a transaction +// 'E' - in a failed transaction +// +// See https://www.postgresql.org/docs/current/protocol-message-formats.html. +func (pgConn *PgConn) TxStatus() byte { + return pgConn.txStatus +} + +// SecretKey returns the backend secret key used to send a cancel query message to the server. +func (pgConn *PgConn) SecretKey() uint32 { + return pgConn.secretKey +} + +// Frontend returns the underlying *pgproto3.Frontend. This rarely necessary. +func (pgConn *PgConn) Frontend() *pgproto3.Frontend { + return pgConn.frontend +} + +// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by +// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The +// underlying net.Conn.Close() will always be called regardless of any other errors. +func (pgConn *PgConn) Close(ctx context.Context) error { + if pgConn.status == connStatusClosed { + return nil + } + pgConn.status = connStatusClosed + + defer close(pgConn.cleanupDone) + defer pgConn.conn.Close() + + if ctx != context.Background() { + // Close may be called while a cancellable query is in progress. This will most often be triggered by panic when + // a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any + // previous watch. It is safe to Unwatch regardless of whether a watch is already is progress. + // + // See https://github.com/jackc/pgconn/issues/29 + pgConn.contextWatcher.Unwatch() + + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + // Ignore any errors sending Terminate message and waiting for server to close connection. + // This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully + // ignores errors. + // + // See https://github.com/jackc/pgx/issues/637 + pgConn.frontend.Send(&pgproto3.Terminate{}) + pgConn.flushWithPotentialWriteReadDeadlock() + + return pgConn.conn.Close() +} + +// asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying +// connection. +func (pgConn *PgConn) asyncClose() { + if pgConn.status == connStatusClosed { + return + } + pgConn.status = connStatusClosed + + go func() { + defer close(pgConn.cleanupDone) + defer pgConn.conn.Close() + + deadline := time.Now().Add(time.Second * 15) + + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + + pgConn.CancelRequest(ctx) + + pgConn.conn.SetDeadline(deadline) + + pgConn.frontend.Send(&pgproto3.Terminate{}) + pgConn.flushWithPotentialWriteReadDeadlock() + }() +} + +// CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed +// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing +// yet. This is because certain errors such as a context cancellation require that the interrupted function call return +// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are +// closed asynchronously. +// +// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while +// an old connection is still being cleaned up and thereby exceeding the maximum pool size. +func (pgConn *PgConn) CleanupDone() chan (struct{}) { + return pgConn.cleanupDone +} + +// IsClosed reports if the connection has been closed. +// +// CleanupDone() can be used to determine if all cleanup has been completed. +func (pgConn *PgConn) IsClosed() bool { + return pgConn.status < connStatusIdle +} + +// IsBusy reports if the connection is busy. +func (pgConn *PgConn) IsBusy() bool { + return pgConn.status == connStatusBusy +} + +// lock locks the connection. +func (pgConn *PgConn) lock() error { + switch pgConn.status { + case connStatusBusy: + return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug. + case connStatusClosed: + return &connLockError{status: "conn closed"} + case connStatusUninitialized: + return &connLockError{status: "conn uninitialized"} + } + pgConn.status = connStatusBusy + return nil +} + +func (pgConn *PgConn) unlock() { + switch pgConn.status { + case connStatusBusy: + pgConn.status = connStatusIdle + case connStatusClosed: + default: + panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package. + } +} + +// ParameterStatus returns the value of a parameter reported by the server (e.g. +// server_version). Returns an empty string for unknown parameters. +func (pgConn *PgConn) ParameterStatus(key string) string { + return pgConn.parameterStatuses[key] +} + +// CommandTag is the status text returned by PostgreSQL for a query. +type CommandTag struct { + s string +} + +// NewCommandTag makes a CommandTag from s. +func NewCommandTag(s string) CommandTag { + return CommandTag{s: s} +} + +// RowsAffected returns the number of rows affected. If the CommandTag was not +// for a row affecting command (e.g. "CREATE TABLE") then it returns 0. +func (ct CommandTag) RowsAffected() int64 { + // Find last non-digit + idx := -1 + for i := len(ct.s) - 1; i >= 0; i-- { + if ct.s[i] >= '0' && ct.s[i] <= '9' { + idx = i + } else { + break + } + } + + if idx == -1 { + return 0 + } + + var n int64 + for _, b := range ct.s[idx:] { + n = n*10 + int64(b-'0') + } + + return n +} + +func (ct CommandTag) String() string { + return ct.s +} + +// Insert is true if the command tag starts with "INSERT". +func (ct CommandTag) Insert() bool { + return strings.HasPrefix(ct.s, "INSERT") +} + +// Update is true if the command tag starts with "UPDATE". +func (ct CommandTag) Update() bool { + return strings.HasPrefix(ct.s, "UPDATE") +} + +// Delete is true if the command tag starts with "DELETE". +func (ct CommandTag) Delete() bool { + return strings.HasPrefix(ct.s, "DELETE") +} + +// Select is true if the command tag starts with "SELECT". +func (ct CommandTag) Select() bool { + return strings.HasPrefix(ct.s, "SELECT") +} + +type FieldDescription struct { + Name string + TableOID uint32 + TableAttributeNumber uint16 + DataTypeOID uint32 + DataTypeSize int16 + TypeModifier int32 + Format int16 +} + +func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription { + if cap(dst) >= len(rd.Fields) { + dst = dst[:len(rd.Fields):len(rd.Fields)] + } else { + dst = make([]FieldDescription, len(rd.Fields)) + } + + for i := range rd.Fields { + dst[i].Name = string(rd.Fields[i].Name) + dst[i].TableOID = rd.Fields[i].TableOID + dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber + dst[i].DataTypeOID = rd.Fields[i].DataTypeOID + dst[i].DataTypeSize = rd.Fields[i].DataTypeSize + dst[i].TypeModifier = rd.Fields[i].TypeModifier + dst[i].Format = rd.Fields[i].Format + } + + return dst +} + +type StatementDescription struct { + Name string + SQL string + ParamOIDs []uint32 + Fields []FieldDescription +} + +// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This +// allows Prepare to also to describe statements without creating a server-side prepared statement. +// +// Prepare does not send a PREPARE statement to the server. It uses the PostgreSQL Parse and Describe protocol messages +// directly. +func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) { + if err := pgConn.lock(); err != nil { + return nil, err + } + defer pgConn.unlock() + + if ctx != context.Background() { + select { + case <-ctx.Done(): + return nil, newContextAlreadyDoneError(ctx) + default: + } + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + pgConn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) + pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) + pgConn.frontend.SendSync(&pgproto3.Sync{}) + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + return nil, err + } + + psd := &StatementDescription{Name: name, SQL: sql} + + var parseErr error + +readloop: + for { + msg, err := pgConn.receiveMessage() + if err != nil { + pgConn.asyncClose() + return nil, normalizeTimeoutError(ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.ParameterDescription: + psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) + copy(psd.ParamOIDs, msg.ParameterOIDs) + case *pgproto3.RowDescription: + psd.Fields = pgConn.convertRowDescription(nil, msg) + case *pgproto3.ErrorResponse: + parseErr = ErrorResponseToPgError(msg) + case *pgproto3.ReadyForQuery: + break readloop + } + } + + if parseErr != nil { + return nil, parseErr + } + return psd, nil +} + +// Deallocate deallocates a prepared statement. +// +// Deallocate does not send a DEALLOCATE statement to the server. It uses the PostgreSQL Close protocol message +// directly. This has slightly different behavior than executing DEALLOCATE statement. +// - Deallocate can succeed in an aborted transaction. +// - Deallocating a non-existent prepared statement is not an error. +func (pgConn *PgConn) Deallocate(ctx context.Context, name string) error { + if err := pgConn.lock(); err != nil { + return err + } + defer pgConn.unlock() + + if ctx != context.Background() { + select { + case <-ctx.Done(): + return newContextAlreadyDoneError(ctx) + default: + } + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + pgConn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) + pgConn.frontend.SendSync(&pgproto3.Sync{}) + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + return err + } + + for { + msg, err := pgConn.receiveMessage() + if err != nil { + pgConn.asyncClose() + return normalizeTimeoutError(ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.ErrorResponse: + return ErrorResponseToPgError(msg) + case *pgproto3.ReadyForQuery: + return nil + } + } +} + +// ErrorResponseToPgError converts a wire protocol error message to a *PgError. +func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError { + return &PgError{ + Severity: msg.Severity, + SeverityUnlocalized: msg.SeverityUnlocalized, + Code: string(msg.Code), + Message: string(msg.Message), + Detail: string(msg.Detail), + Hint: msg.Hint, + Position: msg.Position, + InternalPosition: msg.InternalPosition, + InternalQuery: string(msg.InternalQuery), + Where: string(msg.Where), + SchemaName: string(msg.SchemaName), + TableName: string(msg.TableName), + ColumnName: string(msg.ColumnName), + DataTypeName: string(msg.DataTypeName), + ConstraintName: msg.ConstraintName, + File: string(msg.File), + Line: msg.Line, + Routine: string(msg.Routine), + } +} + +func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice { + pgerr := ErrorResponseToPgError((*pgproto3.ErrorResponse)(msg)) + return (*Notice)(pgerr) +} + +// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel +// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there +// is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9 +func (pgConn *PgConn) CancelRequest(ctx context.Context) error { + // Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing + // the connection config. This is important in high availability configurations where fallback connections may be + // specified or DNS may be used to load balance. + serverAddr := pgConn.conn.RemoteAddr() + var serverNetwork string + var serverAddress string + if serverAddr.Network() == "unix" { + // for unix sockets, RemoteAddr() calls getpeername() which returns the name the + // server passed to bind(). For Postgres, this is always a relative path "./.s.PGSQL.5432" + // so connecting to it will fail. Fall back to the config's value + serverNetwork, serverAddress = NetworkAddress(pgConn.config.Host, pgConn.config.Port) + } else { + serverNetwork, serverAddress = serverAddr.Network(), serverAddr.String() + } + cancelConn, err := pgConn.config.DialFunc(ctx, serverNetwork, serverAddress) + if err != nil { + // In case of unix sockets, RemoteAddr() returns only the file part of the path. If the + // first connect failed, try the config. + if serverAddr.Network() != "unix" { + return err + } + serverNetwork, serverAddr := NetworkAddress(pgConn.config.Host, pgConn.config.Port) + cancelConn, err = pgConn.config.DialFunc(ctx, serverNetwork, serverAddr) + if err != nil { + return err + } + } + defer cancelConn.Close() + + if ctx != context.Background() { + contextWatcher := ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: cancelConn}) + contextWatcher.Watch(ctx) + defer contextWatcher.Unwatch() + } + + buf := make([]byte, 16) + binary.BigEndian.PutUint32(buf[0:4], 16) + binary.BigEndian.PutUint32(buf[4:8], 80877102) + binary.BigEndian.PutUint32(buf[8:12], pgConn.pid) + binary.BigEndian.PutUint32(buf[12:16], pgConn.secretKey) + + if _, err := cancelConn.Write(buf); err != nil { + return fmt.Errorf("write to connection for cancellation: %w", err) + } + + // Wait for the cancel request to be acknowledged by the server. + // It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960 + _, _ = cancelConn.Read(buf) + + return nil +} + +// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not +// received. +func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { + if err := pgConn.lock(); err != nil { + return err + } + defer pgConn.unlock() + + if ctx != context.Background() { + select { + case <-ctx.Done(): + return newContextAlreadyDoneError(ctx) + default: + } + + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + for { + msg, err := pgConn.receiveMessage() + if err != nil { + return normalizeTimeoutError(ctx, err) + } + + switch msg.(type) { + case *pgproto3.NotificationResponse: + return nil + } + } +} + +// Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is +// implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control +// statements. +// +// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. +func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { + if err := pgConn.lock(); err != nil { + return &MultiResultReader{ + closed: true, + err: err, + } + } + + pgConn.multiResultReader = MultiResultReader{ + pgConn: pgConn, + ctx: ctx, + } + multiResult := &pgConn.multiResultReader + if ctx != context.Background() { + select { + case <-ctx.Done(): + multiResult.closed = true + multiResult.err = newContextAlreadyDoneError(ctx) + pgConn.unlock() + return multiResult + default: + } + pgConn.contextWatcher.Watch(ctx) + } + + pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + pgConn.contextWatcher.Unwatch() + multiResult.closed = true + multiResult.err = err + pgConn.unlock() + return multiResult + } + + return multiResult +} + +// ExecParams executes a command via the PostgreSQL extended query protocol. +// +// sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3, +// etc. +// +// paramValues are the parameter values. It must be encoded in the format given by paramFormats. +// +// paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for +// all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter. +// ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues). +// +// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or +// binary format. If paramFormats is nil all params are text format. ExecParams will panic if +// len(paramFormats) is not 0, 1, or len(paramValues). +// +// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or +// binary format. If resultFormats is nil all results will be in text format. +// +// ResultReader must be closed before PgConn can be used again. +func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader { + result := pgConn.execExtendedPrefix(ctx, paramValues) + if result.closed { + return result + } + + pgConn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) + pgConn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) + + pgConn.execExtendedSuffix(result) + + return result +} + +// ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol. +// +// paramValues are the parameter values. It must be encoded in the format given by paramFormats. +// +// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or +// binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if +// len(paramFormats) is not 0, 1, or len(paramValues). +// +// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or +// binary format. If resultFormats is nil all results will be in text format. +// +// ResultReader must be closed before PgConn can be used again. +func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader { + result := pgConn.execExtendedPrefix(ctx, paramValues) + if result.closed { + return result + } + + pgConn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) + + pgConn.execExtendedSuffix(result) + + return result +} + +func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader { + pgConn.resultReader = ResultReader{ + pgConn: pgConn, + ctx: ctx, + } + result := &pgConn.resultReader + + if err := pgConn.lock(); err != nil { + result.concludeCommand(CommandTag{}, err) + result.closed = true + return result + } + + if len(paramValues) > math.MaxUint16 { + result.concludeCommand(CommandTag{}, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16)) + result.closed = true + pgConn.unlock() + return result + } + + if ctx != context.Background() { + select { + case <-ctx.Done(): + result.concludeCommand(CommandTag{}, newContextAlreadyDoneError(ctx)) + result.closed = true + pgConn.unlock() + return result + default: + } + pgConn.contextWatcher.Watch(ctx) + } + + return result +} + +func (pgConn *PgConn) execExtendedSuffix(result *ResultReader) { + pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) + pgConn.frontend.SendExecute(&pgproto3.Execute{}) + pgConn.frontend.SendSync(&pgproto3.Sync{}) + + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + result.concludeCommand(CommandTag{}, err) + pgConn.contextWatcher.Unwatch() + result.closed = true + pgConn.unlock() + return + } + + result.readUntilRowDescription() +} + +// CopyTo executes the copy command sql and copies the results to w. +func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) { + if err := pgConn.lock(); err != nil { + return CommandTag{}, err + } + + if ctx != context.Background() { + select { + case <-ctx.Done(): + pgConn.unlock() + return CommandTag{}, newContextAlreadyDoneError(ctx) + default: + } + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + // Send copy to command + pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) + + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + pgConn.unlock() + return CommandTag{}, err + } + + // Read results + var commandTag CommandTag + var pgErr error + for { + msg, err := pgConn.receiveMessage() + if err != nil { + pgConn.asyncClose() + return CommandTag{}, normalizeTimeoutError(ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.CopyDone: + case *pgproto3.CopyData: + _, err := w.Write(msg.Data) + if err != nil { + pgConn.asyncClose() + return CommandTag{}, err + } + case *pgproto3.ReadyForQuery: + pgConn.unlock() + return commandTag, pgErr + case *pgproto3.CommandComplete: + commandTag = pgConn.makeCommandTag(msg.CommandTag) + case *pgproto3.ErrorResponse: + pgErr = ErrorResponseToPgError(msg) + } + } +} + +// CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. +// +// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r +// could still block. +func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { + if err := pgConn.lock(); err != nil { + return CommandTag{}, err + } + defer pgConn.unlock() + + if ctx != context.Background() { + select { + case <-ctx.Done(): + return CommandTag{}, newContextAlreadyDoneError(ctx) + default: + } + pgConn.contextWatcher.Watch(ctx) + defer pgConn.contextWatcher.Unwatch() + } + + // Send copy from query + pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) + err := pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + return CommandTag{}, err + } + + // Send copy data + abortCopyChan := make(chan struct{}) + copyErrChan := make(chan error, 1) + signalMessageChan := pgConn.signalMessage() + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + buf := iobufpool.Get(65536) + defer iobufpool.Put(buf) + (*buf)[0] = 'd' + + for { + n, readErr := r.Read((*buf)[5:cap(*buf)]) + if n > 0 { + *buf = (*buf)[0 : n+5] + pgio.SetInt32((*buf)[1:], int32(n+4)) + + writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf) + if writeErr != nil { + // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not + // setting pgConn.status or closing pgConn.cleanupDone for the same reason. + pgConn.conn.Close() + + copyErrChan <- writeErr + return + } + } + if readErr != nil { + copyErrChan <- readErr + return + } + + select { + case <-abortCopyChan: + return + default: + } + } + }() + + var pgErr error + var copyErr error + for copyErr == nil && pgErr == nil { + select { + case copyErr = <-copyErrChan: + case <-signalMessageChan: + // If pgConn.receiveMessage encounters an error it will call pgConn.asyncClose. But that is a race condition with + // the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an + // error is found then forcibly close the connection without sending the Terminate message. + if err := pgConn.bufferingReceiveErr; err != nil { + pgConn.status = connStatusClosed + pgConn.conn.Close() + close(pgConn.cleanupDone) + return CommandTag{}, normalizeTimeoutError(ctx, err) + } + msg, _ := pgConn.receiveMessage() + + switch msg := msg.(type) { + case *pgproto3.ErrorResponse: + pgErr = ErrorResponseToPgError(msg) + default: + signalMessageChan = pgConn.signalMessage() + } + } + } + close(abortCopyChan) + // Make sure io goroutine finishes before writing. + wg.Wait() + + if copyErr == io.EOF || pgErr != nil { + pgConn.frontend.Send(&pgproto3.CopyDone{}) + } else { + pgConn.frontend.Send(&pgproto3.CopyFail{Message: copyErr.Error()}) + } + err = pgConn.flushWithPotentialWriteReadDeadlock() + if err != nil { + pgConn.asyncClose() + return CommandTag{}, err + } + + // Read results + var commandTag CommandTag + for { + msg, err := pgConn.receiveMessage() + if err != nil { + pgConn.asyncClose() + return CommandTag{}, normalizeTimeoutError(ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.ReadyForQuery: + return commandTag, pgErr + case *pgproto3.CommandComplete: + commandTag = pgConn.makeCommandTag(msg.CommandTag) + case *pgproto3.ErrorResponse: + pgErr = ErrorResponseToPgError(msg) + } + } +} + +// MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch. +type MultiResultReader struct { + pgConn *PgConn + ctx context.Context + pipeline *Pipeline + + rr *ResultReader + + closed bool + err error +} + +// ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods. +func (mrr *MultiResultReader) ReadAll() ([]*Result, error) { + var results []*Result + + for mrr.NextResult() { + results = append(results, mrr.ResultReader().Read()) + } + err := mrr.Close() + + return results, err +} + +func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) { + msg, err := mrr.pgConn.receiveMessage() + if err != nil { + mrr.pgConn.contextWatcher.Unwatch() + mrr.err = normalizeTimeoutError(mrr.ctx, err) + mrr.closed = true + mrr.pgConn.asyncClose() + return nil, mrr.err + } + + switch msg := msg.(type) { + case *pgproto3.ReadyForQuery: + mrr.closed = true + if mrr.pipeline != nil { + mrr.pipeline.expectedReadyForQueryCount-- + } else { + mrr.pgConn.contextWatcher.Unwatch() + mrr.pgConn.unlock() + } + case *pgproto3.ErrorResponse: + mrr.err = ErrorResponseToPgError(msg) + } + + return msg, nil +} + +// NextResult returns advances the MultiResultReader to the next result and returns true if a result is available. +func (mrr *MultiResultReader) NextResult() bool { + for !mrr.closed && mrr.err == nil { + msg, err := mrr.receiveMessage() + if err != nil { + return false + } + + switch msg := msg.(type) { + case *pgproto3.RowDescription: + mrr.pgConn.resultReader = ResultReader{ + pgConn: mrr.pgConn, + multiResultReader: mrr, + ctx: mrr.ctx, + fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg), + } + + mrr.rr = &mrr.pgConn.resultReader + return true + case *pgproto3.CommandComplete: + mrr.pgConn.resultReader = ResultReader{ + commandTag: mrr.pgConn.makeCommandTag(msg.CommandTag), + commandConcluded: true, + closed: true, + } + mrr.rr = &mrr.pgConn.resultReader + return true + case *pgproto3.EmptyQueryResponse: + return false + } + } + + return false +} + +// ResultReader returns the current ResultReader. +func (mrr *MultiResultReader) ResultReader() *ResultReader { + return mrr.rr +} + +// Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use. +func (mrr *MultiResultReader) Close() error { + for !mrr.closed { + _, err := mrr.receiveMessage() + if err != nil { + return mrr.err + } + } + + return mrr.err +} + +// ResultReader is a reader for the result of a single query. +type ResultReader struct { + pgConn *PgConn + multiResultReader *MultiResultReader + pipeline *Pipeline + ctx context.Context + + fieldDescriptions []FieldDescription + rowValues [][]byte + commandTag CommandTag + commandConcluded bool + closed bool + err error +} + +// Result is the saved query response that is returned by calling Read on a ResultReader. +type Result struct { + FieldDescriptions []FieldDescription + Rows [][][]byte + CommandTag CommandTag + Err error +} + +// Read saves the query response to a Result. +func (rr *ResultReader) Read() *Result { + br := &Result{} + + for rr.NextRow() { + if br.FieldDescriptions == nil { + br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions())) + copy(br.FieldDescriptions, rr.FieldDescriptions()) + } + + values := rr.Values() + row := make([][]byte, len(values)) + for i := range row { + if values[i] != nil { + row[i] = make([]byte, len(values[i])) + copy(row[i], values[i]) + } + } + br.Rows = append(br.Rows, row) + } + + br.CommandTag, br.Err = rr.Close() + + return br +} + +// NextRow advances the ResultReader to the next row and returns true if a row is available. +func (rr *ResultReader) NextRow() bool { + for !rr.commandConcluded { + msg, err := rr.receiveMessage() + if err != nil { + return false + } + + switch msg := msg.(type) { + case *pgproto3.DataRow: + rr.rowValues = msg.Values + return true + } + } + + return false +} + +// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until +// the ResultReader is closed. It may return nil (for example, if the query did not return a result set or an error was +// encountered.) +func (rr *ResultReader) FieldDescriptions() []FieldDescription { + return rr.fieldDescriptions +} + +// Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only +// valid until the next NextRow call or the ResultReader is closed. +func (rr *ResultReader) Values() [][]byte { + return rr.rowValues +} + +// Close consumes any remaining result data and returns the command tag or +// error. +func (rr *ResultReader) Close() (CommandTag, error) { + if rr.closed { + return rr.commandTag, rr.err + } + rr.closed = true + + for !rr.commandConcluded { + _, err := rr.receiveMessage() + if err != nil { + return CommandTag{}, rr.err + } + } + + if rr.multiResultReader == nil && rr.pipeline == nil { + for { + msg, err := rr.receiveMessage() + if err != nil { + return CommandTag{}, rr.err + } + + switch msg := msg.(type) { + // Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete. + case *pgproto3.ErrorResponse: + rr.err = ErrorResponseToPgError(msg) + case *pgproto3.ReadyForQuery: + rr.pgConn.contextWatcher.Unwatch() + rr.pgConn.unlock() + return rr.commandTag, rr.err + } + } + } + + return rr.commandTag, rr.err +} + +// readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any +// error will be stored in the ResultReader. +func (rr *ResultReader) readUntilRowDescription() { + for !rr.commandConcluded { + // Peek before receive to avoid consuming a DataRow if the result set does not include a RowDescription method. + // This should never happen under normal pgconn usage, but it is possible if SendBytes and ReceiveResults are + // manually used to construct a query that does not issue a describe statement. + msg, _ := rr.pgConn.peekMessage() + if _, ok := msg.(*pgproto3.DataRow); ok { + return + } + + // Consume the message + msg, _ = rr.receiveMessage() + if _, ok := msg.(*pgproto3.RowDescription); ok { + return + } + } +} + +func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) { + if rr.multiResultReader == nil { + msg, err = rr.pgConn.receiveMessage() + } else { + msg, err = rr.multiResultReader.receiveMessage() + } + + if err != nil { + err = normalizeTimeoutError(rr.ctx, err) + rr.concludeCommand(CommandTag{}, err) + rr.pgConn.contextWatcher.Unwatch() + rr.closed = true + if rr.multiResultReader == nil { + rr.pgConn.asyncClose() + } + + return nil, rr.err + } + + switch msg := msg.(type) { + case *pgproto3.RowDescription: + rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg) + case *pgproto3.CommandComplete: + rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil) + case *pgproto3.EmptyQueryResponse: + rr.concludeCommand(CommandTag{}, nil) + case *pgproto3.ErrorResponse: + rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg)) + } + + return msg, nil +} + +func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) { + // Keep the first error that is recorded. Store the error before checking if the command is already concluded to + // allow for receiving an error after CommandComplete but before ReadyForQuery. + if err != nil && rr.err == nil { + rr.err = err + } + + if rr.commandConcluded { + return + } + + rr.commandTag = commandTag + rr.rowValues = nil + rr.commandConcluded = true +} + +// Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip. +type Batch struct { + buf []byte + err error +} + +// ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions. +func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { + if batch.err != nil { + return + } + + batch.buf, batch.err = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf) + if batch.err != nil { + return + } + batch.ExecPrepared("", paramValues, paramFormats, resultFormats) +} + +// ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions. +func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { + if batch.err != nil { + return + } + + batch.buf, batch.err = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf) + if batch.err != nil { + return + } + + batch.buf, batch.err = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf) + if batch.err != nil { + return + } + + batch.buf, batch.err = (&pgproto3.Execute{}).Encode(batch.buf) + if batch.err != nil { + return + } +} + +// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a +// transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing +// multiple queries in a single round trip than using pipeline mode. +func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { + if batch.err != nil { + return &MultiResultReader{ + closed: true, + err: batch.err, + } + } + + if err := pgConn.lock(); err != nil { + return &MultiResultReader{ + closed: true, + err: err, + } + } + + pgConn.multiResultReader = MultiResultReader{ + pgConn: pgConn, + ctx: ctx, + } + multiResult := &pgConn.multiResultReader + + if ctx != context.Background() { + select { + case <-ctx.Done(): + multiResult.closed = true + multiResult.err = newContextAlreadyDoneError(ctx) + pgConn.unlock() + return multiResult + default: + } + pgConn.contextWatcher.Watch(ctx) + } + + batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf) + if batch.err != nil { + multiResult.closed = true + multiResult.err = batch.err + pgConn.unlock() + return multiResult + } + + pgConn.enterPotentialWriteReadDeadlock() + defer pgConn.exitPotentialWriteReadDeadlock() + _, err := pgConn.conn.Write(batch.buf) + if err != nil { + multiResult.closed = true + multiResult.err = err + pgConn.unlock() + return multiResult + } + + return multiResult +} + +// EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include +// the surrounding single quotes. +// +// The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these +// conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future. +func (pgConn *PgConn) EscapeString(s string) (string, error) { + if pgConn.ParameterStatus("standard_conforming_strings") != "on" { + return "", errors.New("EscapeString must be run with standard_conforming_strings=on") + } + + if pgConn.ParameterStatus("client_encoding") != "UTF8" { + return "", errors.New("EscapeString must be run with client_encoding=UTF8") + } + + return strings.Replace(s, "'", "''", -1), nil +} + +// CheckConn checks the underlying connection without writing any bytes. This is currently implemented by doing a read +// with a very short deadline. This can be useful because a TCP connection can be broken such that a write will appear +// to succeed even though it will never actually reach the server. Reading immediately before a write will detect this +// condition. If this is done immediately before sending a query it reduces the chances a query will be sent that fails +// without the client knowing whether the server received it or not. +// +// Deprecated: CheckConn is deprecated in favor of Ping. CheckConn cannot detect all types of broken connections where +// the write would still appear to succeed. Prefer Ping unless on a high latency connection. +func (pgConn *PgConn) CheckConn() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + + _, err := pgConn.ReceiveMessage(ctx) + if err != nil { + if !Timeout(err) { + return err + } + } + + return nil +} + +// Ping pings the server. This can be useful because a TCP connection can be broken such that a write will appear to +// succeed even though it will never actually reach the server. Pinging immediately before sending a query reduces the +// chances a query will be sent that fails without the client knowing whether the server received it or not. +func (pgConn *PgConn) Ping(ctx context.Context) error { + return pgConn.Exec(ctx, "-- ping").Close() +} + +// makeCommandTag makes a CommandTag. It does not retain a reference to buf or buf's underlying memory. +func (pgConn *PgConn) makeCommandTag(buf []byte) CommandTag { + return CommandTag{s: string(buf)} +} + +// enterPotentialWriteReadDeadlock must be called before a write that could deadlock if the server is simultaneously +// blocked writing to us. +func (pgConn *PgConn) enterPotentialWriteReadDeadlock() { + // The time to wait is somewhat arbitrary. A Write should only take as long as the syscall and memcpy to the OS + // outbound network buffer unless the buffer is full (which potentially is a block). It needs to be long enough for + // the normal case, but short enough not to kill performance if a block occurs. + // + // In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is + // ineffective. + if pgConn.slowWriteTimer.Reset(15 * time.Millisecond) { + panic("BUG: slow write timer already active") + } +} + +// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock. +func (pgConn *PgConn) exitPotentialWriteReadDeadlock() { + if !pgConn.slowWriteTimer.Stop() { + // The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has + // started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a + // serious problem. But what is a serious problem is that the background reader may start at an inopportune time in + // a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to + // interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background + // reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error. + <-pgConn.bgReaderStarted + pgConn.bgReader.Stop() + } +} + +func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error { + pgConn.enterPotentialWriteReadDeadlock() + defer pgConn.exitPotentialWriteReadDeadlock() + err := pgConn.frontend.Flush() + return err +} + +// SyncConn prepares the underlying net.Conn for direct use. PgConn may internally buffer reads or use goroutines for +// background IO. This means that any direct use of the underlying net.Conn may be corrupted if a read is already +// buffered or a read is in progress. SyncConn drains read buffers and stops background IO. In some cases this may +// require sending a ping to the server. ctx can be used to cancel this operation. This should be called before any +// operation that will use the underlying net.Conn directly. e.g. Before Conn() or Hijack(). +// +// This should not be confused with the PostgreSQL protocol Sync message. +func (pgConn *PgConn) SyncConn(ctx context.Context) error { + for i := 0; i < 10; i++ { + if pgConn.bgReader.Status() == bgreader.StatusStopped && pgConn.frontend.ReadBufferLen() == 0 { + return nil + } + + err := pgConn.Ping(ctx) + if err != nil { + return fmt.Errorf("SyncConn: Ping failed while syncing conn: %w", err) + } + } + + // This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as + // LISTEN/NOTIFY or log notifications such that we never can get an empty buffer. + return errors.New("SyncConn: conn never synchronized") +} + +// CustomData returns a map that can be used to associate custom data with the connection. +func (pgConn *PgConn) CustomData() map[string]any { + return pgConn.customData +} + +// HijackedConn is the result of hijacking a connection. +// +// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning +// compatibility. +type HijackedConn struct { + Conn net.Conn + PID uint32 // backend pid + SecretKey uint32 // key to use to send a cancel query message to the server + ParameterStatuses map[string]string // parameters that have been reported by the server + TxStatus byte + Frontend *pgproto3.Frontend + Config *Config + CustomData map[string]any +} + +// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately +// before Hijack. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish +// a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy). +// +// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning +// compatibility. +func (pgConn *PgConn) Hijack() (*HijackedConn, error) { + if err := pgConn.lock(); err != nil { + return nil, err + } + pgConn.status = connStatusClosed + + return &HijackedConn{ + Conn: pgConn.conn, + PID: pgConn.pid, + SecretKey: pgConn.secretKey, + ParameterStatuses: pgConn.parameterStatuses, + TxStatus: pgConn.txStatus, + Frontend: pgConn.frontend, + Config: pgConn.config, + CustomData: pgConn.customData, + }, nil +} + +// Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of +// PgConn.Hijack. The connection must be in an idle state. +// +// hc.Frontend is replaced by a new pgproto3.Frontend built by hc.Config.BuildFrontend. +// +// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning +// compatibility. +func Construct(hc *HijackedConn) (*PgConn, error) { + pgConn := &PgConn{ + conn: hc.Conn, + pid: hc.PID, + secretKey: hc.SecretKey, + parameterStatuses: hc.ParameterStatuses, + txStatus: hc.TxStatus, + frontend: hc.Frontend, + config: hc.Config, + customData: hc.CustomData, + + status: connStatusIdle, + + cleanupDone: make(chan struct{}), + } + + pgConn.contextWatcher = ctxwatch.NewContextWatcher(hc.Config.BuildContextWatcherHandler(pgConn)) + pgConn.bgReader = bgreader.New(pgConn.conn) + pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), + func() { + pgConn.bgReader.Start() + pgConn.bgReaderStarted <- struct{}{} + }, + ) + pgConn.slowWriteTimer.Stop() + pgConn.bgReaderStarted = make(chan struct{}) + pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn) + + return pgConn, nil +} + +// Pipeline represents a connection in pipeline mode. +// +// SendPrepare, SendQueryParams, and SendQueryPrepared queue requests to the server. These requests are not written until +// pipeline is flushed by Flush or Sync. Sync must be called after the last request is queued. Requests between +// synchronization points are implicitly transactional unless explicit transaction control statements have been issued. +// +// The context the pipeline was started with is in effect for the entire life of the Pipeline. +// +// For a deeper understanding of pipeline mode see the PostgreSQL documentation for the extended query protocol +// (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) and the libpq pipeline mode +// (https://www.postgresql.org/docs/current/libpq-pipeline-mode.html). +type Pipeline struct { + conn *PgConn + ctx context.Context + + expectedReadyForQueryCount int + pendingSync bool + + err error + closed bool +} + +// PipelineSync is returned by GetResults when a ReadyForQuery message is received. +type PipelineSync struct{} + +// CloseComplete is returned by GetResults when a CloseComplete message is received. +type CloseComplete struct{} + +// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent +// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection +// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except +// CancelRequest and Close. ctx is in effect for entire life of the *Pipeline. +// +// Prefer ExecBatch when only sending one group of queries at once. +func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline { + if err := pgConn.lock(); err != nil { + return &Pipeline{ + closed: true, + err: err, + } + } + + pgConn.pipeline = Pipeline{ + conn: pgConn, + ctx: ctx, + } + pipeline := &pgConn.pipeline + + if ctx != context.Background() { + select { + case <-ctx.Done(): + pipeline.closed = true + pipeline.err = newContextAlreadyDoneError(ctx) + pgConn.unlock() + return pipeline + default: + } + pgConn.contextWatcher.Watch(ctx) + } + + return pipeline +} + +// SendPrepare is the pipeline version of *PgConn.Prepare. +func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) { + if p.closed { + return + } + p.pendingSync = true + + p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) + p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) +} + +// SendDeallocate deallocates a prepared statement. +func (p *Pipeline) SendDeallocate(name string) { + if p.closed { + return + } + p.pendingSync = true + + p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) +} + +// SendQueryParams is the pipeline version of *PgConn.QueryParams. +func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { + if p.closed { + return + } + p.pendingSync = true + + p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) + p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) + p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) + p.conn.frontend.SendExecute(&pgproto3.Execute{}) +} + +// SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. +func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { + if p.closed { + return + } + p.pendingSync = true + + p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) + p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) + p.conn.frontend.SendExecute(&pgproto3.Execute{}) +} + +// Flush flushes the queued requests without establishing a synchronization point. +func (p *Pipeline) Flush() error { + if p.closed { + if p.err != nil { + return p.err + } + return errors.New("pipeline closed") + } + + err := p.conn.flushWithPotentialWriteReadDeadlock() + if err != nil { + err = normalizeTimeoutError(p.ctx, err) + + p.conn.asyncClose() + + p.conn.contextWatcher.Unwatch() + p.conn.unlock() + p.closed = true + p.err = err + return err + } + + return nil +} + +// Sync establishes a synchronization point and flushes the queued requests. +func (p *Pipeline) Sync() error { + if p.closed { + if p.err != nil { + return p.err + } + return errors.New("pipeline closed") + } + + p.conn.frontend.SendSync(&pgproto3.Sync{}) + err := p.Flush() + if err != nil { + return err + } + + p.pendingSync = false + p.expectedReadyForQueryCount++ + + return nil +} + +// GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or +// *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no +// results are available, results and err will both be nil. +func (p *Pipeline) GetResults() (results any, err error) { + if p.closed { + if p.err != nil { + return nil, p.err + } + return nil, errors.New("pipeline closed") + } + + if p.expectedReadyForQueryCount == 0 { + return nil, nil + } + + return p.getResults() +} + +func (p *Pipeline) getResults() (results any, err error) { + for { + msg, err := p.conn.receiveMessage() + if err != nil { + p.closed = true + p.err = err + p.conn.asyncClose() + return nil, normalizeTimeoutError(p.ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.RowDescription: + p.conn.resultReader = ResultReader{ + pgConn: p.conn, + pipeline: p, + ctx: p.ctx, + fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg), + } + return &p.conn.resultReader, nil + case *pgproto3.CommandComplete: + p.conn.resultReader = ResultReader{ + commandTag: p.conn.makeCommandTag(msg.CommandTag), + commandConcluded: true, + closed: true, + } + return &p.conn.resultReader, nil + case *pgproto3.ParseComplete: + peekedMsg, err := p.conn.peekMessage() + if err != nil { + p.conn.asyncClose() + return nil, normalizeTimeoutError(p.ctx, err) + } + if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok { + return p.getResultsPrepare() + } + case *pgproto3.CloseComplete: + return &CloseComplete{}, nil + case *pgproto3.ReadyForQuery: + p.expectedReadyForQueryCount-- + return &PipelineSync{}, nil + case *pgproto3.ErrorResponse: + pgErr := ErrorResponseToPgError(msg) + return nil, pgErr + } + + } +} + +func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) { + psd := &StatementDescription{} + + for { + msg, err := p.conn.receiveMessage() + if err != nil { + p.conn.asyncClose() + return nil, normalizeTimeoutError(p.ctx, err) + } + + switch msg := msg.(type) { + case *pgproto3.ParameterDescription: + psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) + copy(psd.ParamOIDs, msg.ParameterOIDs) + case *pgproto3.RowDescription: + psd.Fields = p.conn.convertRowDescription(nil, msg) + return psd, nil + + // NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING + // clause. + case *pgproto3.NoData: + return psd, nil + + // These should never happen here. But don't take chances that could lead to a deadlock. + case *pgproto3.ErrorResponse: + pgErr := ErrorResponseToPgError(msg) + return nil, pgErr + case *pgproto3.CommandComplete: + p.conn.asyncClose() + return nil, errors.New("BUG: received CommandComplete while handling Describe") + case *pgproto3.ReadyForQuery: + p.conn.asyncClose() + return nil, errors.New("BUG: received ReadyForQuery while handling Describe") + } + } +} + +// Close closes the pipeline and returns the connection to normal mode. +func (p *Pipeline) Close() error { + if p.closed { + return p.err + } + + p.closed = true + + if p.pendingSync { + p.conn.asyncClose() + p.err = errors.New("pipeline has unsynced requests") + p.conn.contextWatcher.Unwatch() + p.conn.unlock() + + return p.err + } + + for p.expectedReadyForQueryCount > 0 { + _, err := p.getResults() + if err != nil { + p.err = err + var pgErr *PgError + if !errors.As(err, &pgErr) { + p.conn.asyncClose() + break + } + } + } + + p.conn.contextWatcher.Unwatch() + p.conn.unlock() + + return p.err +} + +// DeadlineContextWatcherHandler handles canceled contexts by setting a deadline on a net.Conn. +type DeadlineContextWatcherHandler struct { + Conn net.Conn + + // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled. + DeadlineDelay time.Duration +} + +func (h *DeadlineContextWatcherHandler) HandleCancel(ctx context.Context) { + h.Conn.SetDeadline(time.Now().Add(h.DeadlineDelay)) +} + +func (h *DeadlineContextWatcherHandler) HandleUnwatchAfterCancel() { + h.Conn.SetDeadline(time.Time{}) +} + +// CancelRequestContextWatcherHandler handles canceled contexts by sending a cancel request to the server. It also sets +// a deadline on a net.Conn as a fallback. +type CancelRequestContextWatcherHandler struct { + Conn *PgConn + + // CancelRequestDelay is the delay before sending the cancel request to the server. + CancelRequestDelay time.Duration + + // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled. + DeadlineDelay time.Duration + + cancelFinishedChan chan struct{} + handleUnwatchAfterCancelCalled func() +} + +func (h *CancelRequestContextWatcherHandler) HandleCancel(context.Context) { + h.cancelFinishedChan = make(chan struct{}) + var handleUnwatchedAfterCancelCalledCtx context.Context + handleUnwatchedAfterCancelCalledCtx, h.handleUnwatchAfterCancelCalled = context.WithCancel(context.Background()) + + deadline := time.Now().Add(h.DeadlineDelay) + h.Conn.conn.SetDeadline(deadline) + + go func() { + defer close(h.cancelFinishedChan) + + select { + case <-handleUnwatchedAfterCancelCalledCtx.Done(): + return + case <-time.After(h.CancelRequestDelay): + } + + cancelRequestCtx, cancel := context.WithDeadline(handleUnwatchedAfterCancelCalledCtx, deadline) + defer cancel() + h.Conn.CancelRequest(cancelRequestCtx) + + // CancelRequest is inherently racy. Even though the cancel request has been received by the server at this point, + // it hasn't necessarily been delivered to the other connection. If we immediately return and the connection is + // immediately used then it is possible the CancelRequest will actually cancel our next query. The + // TestCancelRequestContextWatcherHandler Stress test can produce this error without the sleep below. The sleep time + // is arbitrary, but should be sufficient to prevent this error case. + time.Sleep(100 * time.Millisecond) + }() +} + +func (h *CancelRequestContextWatcherHandler) HandleUnwatchAfterCancel() { + h.handleUnwatchAfterCancelCalled() + <-h.cancelFinishedChan + + h.Conn.conn.SetDeadline(time.Time{}) +} |