0
0
Fork 0

refactored index layer to support batch operations

this change was then exposed at the higher levels
also the beer-sample app was upgraded to index in batches of 100
by default.  this yieled an indexing speed up from 27s to 16s.
closes #57
This commit is contained in:
Marty Schoch 2014-08-11 16:27:18 -04:00
parent cac707b5b7
commit e5d4e6f1e4
5 changed files with 146 additions and 32 deletions

View File

@ -20,6 +20,7 @@ import (
bleveHttp "github.com/couchbaselabs/bleve/http"
)
var batchSize = flag.Int("batchSize", 100, "batch size for indexing")
var bindAddr = flag.String("addr", ":8094", "http listen address")
var jsonDir = flag.String("jsonDir", "../../samples/beer-sample/", "json directory")
var indexDir = flag.String("indexDir", "beer-search.bleve", "index directory")
@ -78,6 +79,8 @@ func indexBeer(i bleve.Index) error {
log.Printf("Indexing...")
count := 0
startTime := time.Now()
batch := bleve.NewBatch()
batchCount := 0
for _, dirEntry := range dirEntries {
filename := dirEntry.Name()
// read the bytes
@ -88,9 +91,16 @@ func indexBeer(i bleve.Index) error {
// // shred them into a document
ext := filepath.Ext(filename)
docId := filename[:(len(filename) - len(ext))]
err = i.Index(docId, jsonBytes)
if err != nil {
return err
batch.Index(docId, jsonBytes)
batchCount++
if batchCount >= *batchSize {
err = i.Batch(batch)
if err != nil {
return err
}
batch = bleve.NewBatch()
batchCount = 0
}
count++
if count%1000 == 0 {

View File

@ -12,15 +12,26 @@ import (
"github.com/couchbaselabs/bleve/document"
)
type Classifier interface {
Type() string
type Batch map[string]interface{}
func NewBatch() Batch {
return make(Batch, 0)
}
func (b Batch) Index(id string, data interface{}) {
b[id] = data
}
func (b Batch) Delete(id string) {
b[id] = nil
}
type Index interface {
Index(id string, data interface{}) error
Delete(id string) error
Batch(b Batch) error
Document(id string) (*document.Document, error)
DocCount() uint64
@ -35,6 +46,10 @@ type Index interface {
Close()
}
type Classifier interface {
Type() string
}
// Open the index at the specified path, and create it if it does not exist.
// The provided mapping will be used for all Index/Search operations.
func Open(path string, mapping *IndexMapping) (Index, error) {

View File

@ -18,6 +18,7 @@ type Index interface {
Update(doc *document.Document) error
Delete(id string) error
Batch(batch Batch) error
TermFieldReader(term []byte, field string) (TermFieldReader, error)
DocIdReader(start, end string) (DocIdReader, error)
@ -70,3 +71,13 @@ type DocIdReader interface {
Advance(ID string) (string, error)
Close()
}
type Batch map[string]*document.Document
func (b Batch) Index(id string, doc *document.Document) {
b[id] = doc
}
func (b Batch) Delete(id string) {
b[id] = nil
}

View File

@ -85,6 +85,7 @@ func (udc *UpsideDownCouch) loadSchema() (err error) {
}
func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) {
// prepare batch
wb := udc.store.NewBatch()
@ -235,11 +236,25 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
return err
}
var isAdd = true
// prepare a list of rows
addRows := make([]UpsideDownCouchRow, 0)
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
addRows, updateRows, deleteRows = udc.updateSingle(doc, backIndexRow, addRows, updateRows, deleteRows)
err = udc.batchRows(addRows, updateRows, deleteRows)
if err == nil && backIndexRow == nil {
udc.docCount += 1
}
return err
}
func (udc *UpsideDownCouch) updateSingle(doc *document.Document, backIndexRow *BackIndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
// a map for each field, map key is term (string) bool true for existence
existingTermFieldMaps := make(fieldTermMap, 0)
if backIndexRow != nil {
isAdd = false
for _, entry := range backIndexRow.entries {
existingTermMap, fieldExists := existingTermFieldMaps[entry.field]
if !fieldExists {
@ -256,10 +271,6 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
}
}
// prepare a list of rows
updateRows := make([]UpsideDownCouchRow, 0)
addRows := make([]UpsideDownCouchRow, 0)
// track our back index entries
backIndexEntries := make([]*BackIndexEntry, 0)
backIndexStoredFields := make([]uint16, 0)
@ -319,7 +330,6 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
updateRows = append(updateRows, backIndexRow)
// any of the existing rows that weren't updated need to be deleted
deleteRows := make([]UpsideDownCouchRow, 0)
for fieldIndex, existingTermFieldMap := range existingTermFieldMaps {
if existingTermFieldMap != nil {
for termString, _ := range existingTermFieldMap {
@ -334,11 +344,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
deleteRows = append(deleteRows, storedRow)
}
err = udc.batchRows(addRows, updateRows, deleteRows)
if err == nil && isAdd {
udc.docCount += 1
}
return err
return addRows, updateRows, deleteRows
}
func (udc *UpsideDownCouch) storeField(docId string, field document.Field, fieldIndex uint16, existingStoredFieldMap map[uint16]bool) ([]UpsideDownCouchRow, []UpsideDownCouchRow) {
@ -440,27 +446,32 @@ func (udc *UpsideDownCouch) Delete(id string) error {
return nil
}
// prepare a list of rows to delete
rows := make([]UpsideDownCouchRow, 0)
for _, backIndexEntry := range backIndexRow.entries {
tfr := NewTermFrequencyRow(backIndexEntry.term, backIndexEntry.field, id, 0, 0)
rows = append(rows, tfr)
}
for _, sf := range backIndexRow.storedFields {
sf := NewStoredRow(id, sf, 'x', nil)
rows = append(rows, sf)
}
deleteRows := make([]UpsideDownCouchRow, 0)
deleteRows = udc.deleteSingle(id, backIndexRow, deleteRows)
// also delete the back entry itself
rows = append(rows, backIndexRow)
err = udc.batchRows(nil, nil, rows)
err = udc.batchRows(nil, nil, deleteRows)
if err == nil {
udc.docCount -= 1
}
return err
}
func (udc *UpsideDownCouch) deleteSingle(id string, backIndexRow *BackIndexRow, deleteRows []UpsideDownCouchRow) []UpsideDownCouchRow {
for _, backIndexEntry := range backIndexRow.entries {
tfr := NewTermFrequencyRow(backIndexEntry.term, backIndexEntry.field, id, 0, 0)
deleteRows = append(deleteRows, tfr)
}
for _, sf := range backIndexRow.storedFields {
sf := NewStoredRow(id, sf, 'x', nil)
deleteRows = append(deleteRows, sf)
}
// also delete the back entry itself
deleteRows = append(deleteRows, backIndexRow)
return deleteRows
}
func (udc *UpsideDownCouch) backIndexRowForDoc(docId string) (*BackIndexRow, error) {
// use a temporary row structure to build key
tempRow := &BackIndexRow{
@ -481,6 +492,20 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(docId string) (*BackIndexRow, err
return backIndexRow, nil
}
func (udc *UpsideDownCouch) backIndexRowsForBatch(batch index.Batch) (map[string]*BackIndexRow, error) {
// FIXME faster to order the ids and scan sequentially
// for now just get it working
rv := make(map[string]*BackIndexRow, 0)
for docId, _ := range batch {
backIndexRow, err := udc.backIndexRowForDoc(docId)
if err != nil {
return nil, err
}
rv[docId] = backIndexRow
}
return rv, nil
}
func (udc *UpsideDownCouch) Dump() {
it := udc.store.Iterator([]byte{0})
defer it.Close()
@ -725,3 +750,39 @@ func (udc *UpsideDownCouch) fieldIndexToName(i uint16) string {
}
return ""
}
func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
// first lookup all the back index rows
backIndexRows, err := udc.backIndexRowsForBatch(batch)
if err != nil {
return err
}
// prepare a list of rows
addRows := make([]UpsideDownCouchRow, 0)
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
docsAdded := uint64(0)
docsDeleted := uint64(0)
for docId, doc := range batch {
backIndexRow := backIndexRows[docId]
if doc == nil && backIndexRow != nil {
//delete
deleteRows = udc.deleteSingle(docId, backIndexRow, deleteRows)
docsDeleted++
} else if doc != nil {
addRows, updateRows, deleteRows = udc.updateSingle(doc, backIndexRow, addRows, updateRows, deleteRows)
if backIndexRow == nil {
docsAdded++
}
}
}
err = udc.batchRows(addRows, updateRows, deleteRows)
if err == nil {
udc.docCount += docsAdded
udc.docCount -= docsDeleted
}
return err
}

View File

@ -64,6 +64,23 @@ func (i *indexImpl) Delete(id string) error {
return nil
}
func (i *indexImpl) Batch(b Batch) error {
ib := make(index.Batch, len(b))
for bk, bd := range b {
if bd == nil {
ib.Delete(bk)
} else {
doc := document.NewDocument(bk)
err := i.m.MapDocument(doc, bd)
if err != nil {
return err
}
ib.Index(bk, doc)
}
}
return i.i.Batch(ib)
}
func (i *indexImpl) Document(id string) (*document.Document, error) {
return i.i.Document(id)
}