parent
56a30a3574
commit
f1ec73e764
@ -65,7 +65,11 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
|
||||
|
||||
keyPrefix := []byte{'f'}
|
||||
it := kvreader.Iterator(keyPrefix)
|
||||
defer it.Close()
|
||||
defer func() {
|
||||
if cerr := it.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
it.Seek(keyPrefix)
|
||||
key, val, valid := it.Current()
|
||||
@ -75,9 +79,10 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
|
||||
if !bytes.HasPrefix(key, keyPrefix) {
|
||||
break
|
||||
}
|
||||
fieldRow, err := NewFieldRowKV(key, val)
|
||||
var fieldRow *FieldRow
|
||||
fieldRow, err = NewFieldRowKV(key, val)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
udc.fieldIndexCache.AddExisting(fieldRow.name, fieldRow.index)
|
||||
|
||||
@ -88,14 +93,16 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
|
||||
keyPrefix = []byte{'v'}
|
||||
val, err = kvreader.Get(keyPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
vr, err := NewVersionRowKV(keyPrefix, val)
|
||||
var vr *VersionRow
|
||||
vr, err = NewVersionRowKV(keyPrefix, val)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
if vr.version != Version {
|
||||
return IncompatibleVersion
|
||||
err = IncompatibleVersion
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
@ -145,80 +152,96 @@ func (udc *UpsideDownCouch) DocCount() (uint64, error) {
|
||||
return udc.docCount, nil
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Open() error {
|
||||
func (udc *UpsideDownCouch) Open() (err error) {
|
||||
// start a writer for the open process
|
||||
kvwriter, err := udc.store.Writer()
|
||||
var kvwriter store.KVWriter
|
||||
kvwriter, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer kvwriter.Close()
|
||||
defer func() {
|
||||
if cerr := kvwriter.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
value, err := kvwriter.Get(VersionKey)
|
||||
var value []byte
|
||||
value, err = kvwriter.Get(VersionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// init new index OR load schema
|
||||
if value == nil {
|
||||
err = udc.init(kvwriter)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = udc.loadSchema(kvwriter)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
}
|
||||
// set doc count
|
||||
udc.docCount = udc.countDocs(kvwriter)
|
||||
return nil
|
||||
udc.docCount, err = udc.countDocs(kvwriter)
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) uint64 {
|
||||
func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) (count uint64, err error) {
|
||||
it := kvreader.Iterator([]byte{'b'})
|
||||
defer it.Close()
|
||||
defer func() {
|
||||
if cerr := it.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
var rv uint64
|
||||
key, _, valid := it.Current()
|
||||
for valid {
|
||||
if !bytes.HasPrefix(key, []byte{'b'}) {
|
||||
break
|
||||
}
|
||||
rv++
|
||||
count++
|
||||
it.Next()
|
||||
key, _, valid = it.Current()
|
||||
}
|
||||
|
||||
return rv
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) rowCount() uint64 {
|
||||
func (udc *UpsideDownCouch) rowCount() (count uint64, err error) {
|
||||
// start an isolated reader for use during the rowcount
|
||||
kvreader, err := udc.store.Reader()
|
||||
if err != nil {
|
||||
return 0
|
||||
return
|
||||
}
|
||||
defer kvreader.Close()
|
||||
defer func() {
|
||||
if cerr := kvreader.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
it := kvreader.Iterator([]byte{0})
|
||||
defer it.Close()
|
||||
defer func() {
|
||||
if cerr := it.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
var rv uint64
|
||||
_, _, valid := it.Current()
|
||||
for valid {
|
||||
rv++
|
||||
count++
|
||||
it.Next()
|
||||
_, _, valid = it.Current()
|
||||
}
|
||||
|
||||
return rv
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Close() error {
|
||||
return udc.store.Close()
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Update(doc *document.Document) error {
|
||||
func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
|
||||
// do analysis before acquiring write lock
|
||||
analysisStart := time.Now()
|
||||
resultChan := make(chan *AnalysisResult)
|
||||
@ -239,18 +262,24 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
|
||||
|
||||
// start a writer for this update
|
||||
indexStart := time.Now()
|
||||
kvwriter, err := udc.store.Writer()
|
||||
var kvwriter store.KVWriter
|
||||
kvwriter, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer kvwriter.Close()
|
||||
defer func() {
|
||||
if cerr := kvwriter.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
// first we lookup the backindex row for the doc id if it exists
|
||||
// lookup the back index row
|
||||
backIndexRow, err := udc.backIndexRowForDoc(kvwriter, doc.ID)
|
||||
var backIndexRow *BackIndexRow
|
||||
backIndexRow, err = udc.backIndexRowForDoc(kvwriter, doc.ID)
|
||||
if err != nil {
|
||||
atomic.AddUint64(&udc.stats.errors, 1)
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// prepare a list of rows
|
||||
@ -270,7 +299,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
|
||||
} else {
|
||||
atomic.AddUint64(&udc.stats.errors, 1)
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
|
||||
@ -382,24 +411,30 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
|
||||
return rows, backIndexTermEntries
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Delete(id string) error {
|
||||
func (udc *UpsideDownCouch) Delete(id string) (err error) {
|
||||
indexStart := time.Now()
|
||||
// start a writer for this delete
|
||||
kvwriter, err := udc.store.Writer()
|
||||
var kvwriter store.KVWriter
|
||||
kvwriter, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer kvwriter.Close()
|
||||
defer func() {
|
||||
if cerr := kvwriter.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
// lookup the back index row
|
||||
backIndexRow, err := udc.backIndexRowForDoc(kvwriter, id)
|
||||
var backIndexRow *BackIndexRow
|
||||
backIndexRow, err = udc.backIndexRowForDoc(kvwriter, id)
|
||||
if err != nil {
|
||||
atomic.AddUint64(&udc.stats.errors, 1)
|
||||
return err
|
||||
return
|
||||
}
|
||||
if backIndexRow == nil {
|
||||
atomic.AddUint64(&udc.stats.deletes, 1)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
deleteRows := make([]UpsideDownCouchRow, 0)
|
||||
@ -415,7 +450,7 @@ func (udc *UpsideDownCouch) Delete(id string) error {
|
||||
} else {
|
||||
atomic.AddUint64(&udc.stats.errors, 1)
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) deleteSingle(id string, backIndexRow *BackIndexRow, deleteRows []UpsideDownCouchRow) []UpsideDownCouchRow {
|
||||
@ -526,7 +561,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
|
||||
return rv
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Batch(batch *index.Batch) error {
|
||||
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||
analysisStart := time.Now()
|
||||
resultChan := make(chan *AnalysisResult)
|
||||
|
||||
@ -565,16 +600,22 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) error {
|
||||
|
||||
indexStart := time.Now()
|
||||
// start a writer for this batch
|
||||
kvwriter, err := udc.store.Writer()
|
||||
var kvwriter store.KVWriter
|
||||
kvwriter, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer kvwriter.Close()
|
||||
defer func() {
|
||||
if cerr := kvwriter.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
// first lookup all the back index rows
|
||||
backIndexRows, err := udc.backIndexRowsForBatch(kvwriter, batch)
|
||||
var backIndexRows map[string]*BackIndexRow
|
||||
backIndexRows, err = udc.backIndexRowsForBatch(kvwriter, batch)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// prepare a list of rows
|
||||
@ -621,26 +662,36 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) error {
|
||||
} else {
|
||||
atomic.AddUint64(&udc.stats.errors, 1)
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) SetInternal(key, val []byte) error {
|
||||
func (udc *UpsideDownCouch) SetInternal(key, val []byte) (err error) {
|
||||
internalRow := NewInternalRow(key, val)
|
||||
writer, err := udc.store.Writer()
|
||||
var writer store.KVWriter
|
||||
writer, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer writer.Close()
|
||||
defer func() {
|
||||
if cerr := writer.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
return writer.Set(internalRow.Key(), internalRow.Value())
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) DeleteInternal(key []byte) error {
|
||||
func (udc *UpsideDownCouch) DeleteInternal(key []byte) (err error) {
|
||||
internalRow := NewInternalRow(key, nil)
|
||||
writer, err := udc.store.Writer()
|
||||
var writer store.KVWriter
|
||||
writer, err = udc.store.Writer()
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer writer.Close()
|
||||
defer func() {
|
||||
if cerr := writer.Close(); err == nil && cerr != nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
return writer.Delete(internalRow.Key())
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,10 @@ func TestIndexOpenReopen(t *testing.T) {
|
||||
|
||||
// opening the database should have inserted a version
|
||||
expectedLength := uint64(1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -110,7 +113,10 @@ func TestIndexInsert(t *testing.T) {
|
||||
|
||||
// should have 4 rows (1 for version, 1 for schema field, and 1 for single term, and 1 for the term count, and 1 for the back index entry)
|
||||
expectedLength := uint64(1 + 1 + 1 + 1 + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -191,7 +197,10 @@ func TestIndexInsertThenDelete(t *testing.T) {
|
||||
|
||||
// should have 2 rows (1 for version, 1 for schema field)
|
||||
expectedLength := uint64(1 + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -226,7 +235,10 @@ func TestIndexInsertThenUpdate(t *testing.T) {
|
||||
|
||||
// should have 2 rows (1 for version, 1 for schema field, and 2 for the two term, and 2 for the term counts, and 1 for the back index entry)
|
||||
expectedLength := uint64(1 + 1 + 2 + 2 + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -241,7 +253,10 @@ func TestIndexInsertThenUpdate(t *testing.T) {
|
||||
|
||||
// should have 2 rows (1 for version, 1 for schema field, and 1 for the remaining term, and 1 for the term count, and 1 for the back index entry)
|
||||
expectedLength = uint64(1 + 1 + 1 + 1 + 1)
|
||||
rowCount = idx.rowCount()
|
||||
rowCount, err = idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -278,7 +293,10 @@ func TestIndexInsertMultiple(t *testing.T) {
|
||||
|
||||
// should have 4 rows (1 for version, 1 for schema field, and 2 for single term, and 1 for the term count, and 2 for the back index entries)
|
||||
expectedLength := uint64(1 + 1 + 2 + 1 + 2)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -352,7 +370,10 @@ func TestIndexInsertWithStore(t *testing.T) {
|
||||
|
||||
// should have 6 rows (1 for version, 1 for schema field, and 1 for single term, and 1 for the stored field and 1 for the term count, and 1 for the back index entry)
|
||||
expectedLength := uint64(1 + 1 + 1 + 1 + 1 + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -589,7 +610,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
|
||||
// 16 for date term counts
|
||||
// 1 for the back index entry
|
||||
expectedLength := uint64(1 + 3 + 1 + (64 / document.DefaultPrecisionStep) + (64 / document.DefaultPrecisionStep) + 3 + 1 + (64 / document.DefaultPrecisionStep) + (64 / document.DefaultPrecisionStep) + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -782,7 +806,10 @@ func TestIndexUpdateComposites(t *testing.T) {
|
||||
// 4 for the text term count
|
||||
// 1 for the back index entry
|
||||
expectedLength := uint64(1 + 3 + 4 + 2 + 4 + 1)
|
||||
rowCount := idx.rowCount()
|
||||
rowCount, err := idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
@ -820,7 +847,10 @@ func TestIndexUpdateComposites(t *testing.T) {
|
||||
}
|
||||
|
||||
// should have the same row count as before
|
||||
rowCount = idx.rowCount()
|
||||
rowCount, err = idx.rowCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rowCount != expectedLength {
|
||||
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user