0
0

rewrite to keep using same buffer when possible

This commit is contained in:
Marty Schoch 2015-10-13 14:04:56 -07:00
parent 8de860bf12
commit 4c6bc23043

View File

@ -120,7 +120,7 @@ func GetRowBuffer() []byte {
if rb, ok := rowBufferPool.Get().([]byte); ok { if rb, ok := rowBufferPool.Get().([]byte); ok {
return rb return rb
} else { } else {
return make([]byte, 4096) return make([]byte, 4*1024)
} }
} }
@ -133,92 +133,60 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
// prepare batch // prepare batch
wb := writer.NewBatch() wb := writer.NewBatch()
// buffer to work with
rowBuf := GetRowBuffer()
// add // add
for _, row := range addRows { for _, row := range addRows {
keyBuf := GetRowBuffer()
valBuf := GetRowBuffer()
tfr, ok := row.(*TermFrequencyRow) tfr, ok := row.(*TermFrequencyRow)
if ok { if ok {
// need to increment term dictinoary counter dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
if tfr.DictionaryRowKeySize() > len(keyBuf) {
keyBuf = make([]byte, 2*tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(keyBuf)
if err != nil { if err != nil {
return err return err
} }
wb.Merge(keyBuf[:dictKeySize], dictionaryTermIncr) wb.Merge(rowBuf[:dictKeySize], dictionaryTermIncr)
} }
if row.KeySize() > len(keyBuf) { keySize, err := row.KeyTo(rowBuf)
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil { if err != nil {
return err return err
} }
if row.ValueSize() > len(valBuf) { valSize, err := row.ValueTo(rowBuf[keySize:])
// grow buffer wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
valBuf = make([]byte, 2*row.ValueSize())
}
valSize, err := row.ValueTo(valBuf)
wb.Set(keyBuf[:keySize], valBuf[:valSize])
PutRowBuffer(keyBuf)
PutRowBuffer(valBuf)
} }
// update // update
for _, row := range updateRows { for _, row := range updateRows {
keyBuf := GetRowBuffer() keySize, err := row.KeyTo(rowBuf)
valBuf := GetRowBuffer()
if row.KeySize() > len(keyBuf) {
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil { if err != nil {
return err return err
} }
if row.ValueSize() > len(valBuf) { valSize, err := row.ValueTo(rowBuf[keySize:])
// grow buffer if err != nil {
valBuf = make([]byte, 2*row.ValueSize()) return err
} }
valSize, err := row.ValueTo(valBuf) wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
wb.Set(keyBuf[:keySize], valBuf[:valSize])
PutRowBuffer(keyBuf)
PutRowBuffer(valBuf)
} }
// delete // delete
for _, row := range deleteRows { for _, row := range deleteRows {
keyBuf := GetRowBuffer()
tfr, ok := row.(*TermFrequencyRow) tfr, ok := row.(*TermFrequencyRow)
if ok { if ok {
// need to decrement counter // need to decrement counter
if tfr.DictionaryRowKeySize() > len(keyBuf) { dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
keyBuf = make([]byte, 2*tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(keyBuf)
if err != nil { if err != nil {
return err return err
} }
wb.Merge(keyBuf[:dictKeySize], dictionaryTermDecr) wb.Merge(rowBuf[:dictKeySize], dictionaryTermDecr)
} }
if row.KeySize() > len(keyBuf) { keySize, err := row.KeyTo(rowBuf)
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil { if err != nil {
return err return err
} }
wb.Delete(keyBuf[:keySize]) wb.Delete(rowBuf[:keySize])
PutRowBuffer(keyBuf)
} }
PutRowBuffer(rowBuf)
// write out the batch // write out the batch
return writer.ExecuteBatch(wb) return writer.ExecuteBatch(wb)
} }
@ -435,13 +403,10 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in
existingStoredKeys[string(key)] = true existingStoredKeys[string(key)] = true
} }
keyBuf := GetRowBuffer()
for _, row := range rows { for _, row := range rows {
switch row := row.(type) { switch row := row.(type) {
case *TermFrequencyRow: case *TermFrequencyRow:
keyBuf := GetRowBuffer()
if row.KeySize() > len(keyBuf) {
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, _ := row.KeyTo(keyBuf) keySize, _ := row.KeyTo(keyBuf)
if _, ok := existingTermKeys[string(keyBuf[:keySize])]; ok { if _, ok := existingTermKeys[string(keyBuf[:keySize])]; ok {
updateRows = append(updateRows, row) updateRows = append(updateRows, row)
@ -449,12 +414,7 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in
} else { } else {
addRows = append(addRows, row) addRows = append(addRows, row)
} }
PutRowBuffer(keyBuf)
case *StoredRow: case *StoredRow:
keyBuf := GetRowBuffer()
if row.KeySize() > len(keyBuf) {
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, _ := row.KeyTo(keyBuf) keySize, _ := row.KeyTo(keyBuf)
if _, ok := existingStoredKeys[string(keyBuf[:keySize])]; ok { if _, ok := existingStoredKeys[string(keyBuf[:keySize])]; ok {
updateRows = append(updateRows, row) updateRows = append(updateRows, row)
@ -465,8 +425,8 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in
default: default:
updateRows = append(updateRows, row) updateRows = append(updateRows, row)
} }
} }
PutRowBuffer(keyBuf)
// any of the existing rows that weren't updated need to be deleted // any of the existing rows that weren't updated need to be deleted
for existingTermKey := range existingTermKeys { for existingTermKey := range existingTermKeys {