aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/lib/pq/copy.go
diff options
context:
space:
mode:
authorGibheer <gibheer+git@zero-knowledge.org>2024-09-05 19:38:25 +0200
committerGibheer <gibheer+git@zero-knowledge.org>2024-09-05 19:38:25 +0200
commit6ea4d2c82de80efc87708e5e182034b7c6c2019e (patch)
tree35c0856a929040216c82153ca62d43b27530a887 /vendor/github.com/lib/pq/copy.go
parent6f64eeace1b66639b9380b44e88a8d54850a4306 (diff)
switch from github.com/lib/pq to github.com/jackc/pgx/v5HEAD20240905master
lib/pq is out of maintenance for some time now, so switch to the newer more active library. Looks like it finally stabilized after a long time.
Diffstat (limited to 'vendor/github.com/lib/pq/copy.go')
-rw-r--r--vendor/github.com/lib/pq/copy.go341
1 files changed, 0 insertions, 341 deletions
diff --git a/vendor/github.com/lib/pq/copy.go b/vendor/github.com/lib/pq/copy.go
deleted file mode 100644
index 2f5c1ec..0000000
--- a/vendor/github.com/lib/pq/copy.go
+++ /dev/null
@@ -1,341 +0,0 @@
-package pq
-
-import (
- "context"
- "database/sql/driver"
- "encoding/binary"
- "errors"
- "fmt"
- "sync"
-)
-
-var (
- errCopyInClosed = errors.New("pq: copyin statement has already been closed")
- errBinaryCopyNotSupported = errors.New("pq: only text format supported for COPY")
- errCopyToNotSupported = errors.New("pq: COPY TO is not supported")
- errCopyNotSupportedOutsideTxn = errors.New("pq: COPY is only allowed inside a transaction")
- errCopyInProgress = errors.New("pq: COPY in progress")
-)
-
-// CopyIn creates a COPY FROM statement which can be prepared with
-// Tx.Prepare(). The target table should be visible in search_path.
-func CopyIn(table string, columns ...string) string {
- stmt := "COPY " + QuoteIdentifier(table) + " ("
- for i, col := range columns {
- if i != 0 {
- stmt += ", "
- }
- stmt += QuoteIdentifier(col)
- }
- stmt += ") FROM STDIN"
- return stmt
-}
-
-// CopyInSchema creates a COPY FROM statement which can be prepared with
-// Tx.Prepare().
-func CopyInSchema(schema, table string, columns ...string) string {
- stmt := "COPY " + QuoteIdentifier(schema) + "." + QuoteIdentifier(table) + " ("
- for i, col := range columns {
- if i != 0 {
- stmt += ", "
- }
- stmt += QuoteIdentifier(col)
- }
- stmt += ") FROM STDIN"
- return stmt
-}
-
-type copyin struct {
- cn *conn
- buffer []byte
- rowData chan []byte
- done chan bool
-
- closed bool
-
- mu struct {
- sync.Mutex
- err error
- driver.Result
- }
-}
-
-const ciBufferSize = 64 * 1024
-
-// flush buffer before the buffer is filled up and needs reallocation
-const ciBufferFlushSize = 63 * 1024
-
-func (cn *conn) prepareCopyIn(q string) (_ driver.Stmt, err error) {
- if !cn.isInTransaction() {
- return nil, errCopyNotSupportedOutsideTxn
- }
-
- ci := &copyin{
- cn: cn,
- buffer: make([]byte, 0, ciBufferSize),
- rowData: make(chan []byte),
- done: make(chan bool, 1),
- }
- // add CopyData identifier + 4 bytes for message length
- ci.buffer = append(ci.buffer, 'd', 0, 0, 0, 0)
-
- b := cn.writeBuf('Q')
- b.string(q)
- cn.send(b)
-
-awaitCopyInResponse:
- for {
- t, r := cn.recv1()
- switch t {
- case 'G':
- if r.byte() != 0 {
- err = errBinaryCopyNotSupported
- break awaitCopyInResponse
- }
- go ci.resploop()
- return ci, nil
- case 'H':
- err = errCopyToNotSupported
- break awaitCopyInResponse
- case 'E':
- err = parseError(r)
- case 'Z':
- if err == nil {
- ci.setBad(driver.ErrBadConn)
- errorf("unexpected ReadyForQuery in response to COPY")
- }
- cn.processReadyForQuery(r)
- return nil, err
- default:
- ci.setBad(driver.ErrBadConn)
- errorf("unknown response for copy query: %q", t)
- }
- }
-
- // something went wrong, abort COPY before we return
- b = cn.writeBuf('f')
- b.string(err.Error())
- cn.send(b)
-
- for {
- t, r := cn.recv1()
- switch t {
- case 'c', 'C', 'E':
- case 'Z':
- // correctly aborted, we're done
- cn.processReadyForQuery(r)
- return nil, err
- default:
- ci.setBad(driver.ErrBadConn)
- errorf("unknown response for CopyFail: %q", t)
- }
- }
-}
-
-func (ci *copyin) flush(buf []byte) {
- // set message length (without message identifier)
- binary.BigEndian.PutUint32(buf[1:], uint32(len(buf)-1))
-
- _, err := ci.cn.c.Write(buf)
- if err != nil {
- panic(err)
- }
-}
-
-func (ci *copyin) resploop() {
- for {
- var r readBuf
- t, err := ci.cn.recvMessage(&r)
- if err != nil {
- ci.setBad(driver.ErrBadConn)
- ci.setError(err)
- ci.done <- true
- return
- }
- switch t {
- case 'C':
- // complete
- res, _ := ci.cn.parseComplete(r.string())
- ci.setResult(res)
- case 'N':
- if n := ci.cn.noticeHandler; n != nil {
- n(parseError(&r))
- }
- case 'Z':
- ci.cn.processReadyForQuery(&r)
- ci.done <- true
- return
- case 'E':
- err := parseError(&r)
- ci.setError(err)
- default:
- ci.setBad(driver.ErrBadConn)
- ci.setError(fmt.Errorf("unknown response during CopyIn: %q", t))
- ci.done <- true
- return
- }
- }
-}
-
-func (ci *copyin) setBad(err error) {
- ci.cn.err.set(err)
-}
-
-func (ci *copyin) getBad() error {
- return ci.cn.err.get()
-}
-
-func (ci *copyin) err() error {
- ci.mu.Lock()
- err := ci.mu.err
- ci.mu.Unlock()
- return err
-}
-
-// setError() sets ci.err if one has not been set already. Caller must not be
-// holding ci.Mutex.
-func (ci *copyin) setError(err error) {
- ci.mu.Lock()
- if ci.mu.err == nil {
- ci.mu.err = err
- }
- ci.mu.Unlock()
-}
-
-func (ci *copyin) setResult(result driver.Result) {
- ci.mu.Lock()
- ci.mu.Result = result
- ci.mu.Unlock()
-}
-
-func (ci *copyin) getResult() driver.Result {
- ci.mu.Lock()
- result := ci.mu.Result
- ci.mu.Unlock()
- if result == nil {
- return driver.RowsAffected(0)
- }
- return result
-}
-
-func (ci *copyin) NumInput() int {
- return -1
-}
-
-func (ci *copyin) Query(v []driver.Value) (r driver.Rows, err error) {
- return nil, ErrNotSupported
-}
-
-// Exec inserts values into the COPY stream. The insert is asynchronous
-// and Exec can return errors from previous Exec calls to the same
-// COPY stmt.
-//
-// You need to call Exec(nil) to sync the COPY stream and to get any
-// errors from pending data, since Stmt.Close() doesn't return errors
-// to the user.
-func (ci *copyin) Exec(v []driver.Value) (r driver.Result, err error) {
- if ci.closed {
- return nil, errCopyInClosed
- }
-
- if err := ci.getBad(); err != nil {
- return nil, err
- }
- defer ci.cn.errRecover(&err)
-
- if err := ci.err(); err != nil {
- return nil, err
- }
-
- if len(v) == 0 {
- if err := ci.Close(); err != nil {
- return driver.RowsAffected(0), err
- }
-
- return ci.getResult(), nil
- }
-
- numValues := len(v)
- for i, value := range v {
- ci.buffer = appendEncodedText(&ci.cn.parameterStatus, ci.buffer, value)
- if i < numValues-1 {
- ci.buffer = append(ci.buffer, '\t')
- }
- }
-
- ci.buffer = append(ci.buffer, '\n')
-
- if len(ci.buffer) > ciBufferFlushSize {
- ci.flush(ci.buffer)
- // reset buffer, keep bytes for message identifier and length
- ci.buffer = ci.buffer[:5]
- }
-
- return driver.RowsAffected(0), nil
-}
-
-// CopyData inserts a raw string into the COPY stream. The insert is
-// asynchronous and CopyData can return errors from previous CopyData calls to
-// the same COPY stmt.
-//
-// You need to call Exec(nil) to sync the COPY stream and to get any
-// errors from pending data, since Stmt.Close() doesn't return errors
-// to the user.
-func (ci *copyin) CopyData(ctx context.Context, line string) (r driver.Result, err error) {
- if ci.closed {
- return nil, errCopyInClosed
- }
-
- if finish := ci.cn.watchCancel(ctx); finish != nil {
- defer finish()
- }
-
- if err := ci.getBad(); err != nil {
- return nil, err
- }
- defer ci.cn.errRecover(&err)
-
- if err := ci.err(); err != nil {
- return nil, err
- }
-
- ci.buffer = append(ci.buffer, []byte(line)...)
- ci.buffer = append(ci.buffer, '\n')
-
- if len(ci.buffer) > ciBufferFlushSize {
- ci.flush(ci.buffer)
- // reset buffer, keep bytes for message identifier and length
- ci.buffer = ci.buffer[:5]
- }
-
- return driver.RowsAffected(0), nil
-}
-
-func (ci *copyin) Close() (err error) {
- if ci.closed { // Don't do anything, we're already closed
- return nil
- }
- ci.closed = true
-
- if err := ci.getBad(); err != nil {
- return err
- }
- defer ci.cn.errRecover(&err)
-
- if len(ci.buffer) > 0 {
- ci.flush(ci.buffer)
- }
- // Avoid touching the scratch buffer as resploop could be using it.
- err = ci.cn.sendSimpleMessage('c')
- if err != nil {
- return err
- }
-
- <-ci.done
- ci.cn.inCopy = false
-
- if err := ci.err(); err != nil {
- return err
- }
- return nil
-}