0
0
Fork 0

modify code to reuse buffer for kv generation

This commit is contained in:
Marty Schoch 2015-10-05 17:49:50 -04:00
parent 66700be4f7
commit 71cbb13e07
4 changed files with 259 additions and 39 deletions

View File

@ -12,7 +12,12 @@ package index
import "github.com/blevesearch/bleve/document"
type IndexRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte
ValueSize() int
ValueTo([]byte) (int, error)
Value() []byte
}

View File

@ -24,8 +24,12 @@ const ByteSeparator byte = 0xff
type UpsideDownCouchRowStream chan UpsideDownCouchRow
type UpsideDownCouchRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte
Value() []byte
ValueSize() int
ValueTo([]byte) (int, error)
}
func ParseFromKeyValue(key, value []byte) (UpsideDownCouchRow, error) {
@ -61,10 +65,28 @@ func (v *VersionRow) Key() []byte {
return []byte{'v'}
}
func (v *VersionRow) KeySize() int {
return 1
}
func (v *VersionRow) KeyTo(buf []byte) (int, error) {
buf[0] = 'v'
return 1, nil
}
func (v *VersionRow) Value() []byte {
return []byte{byte(v.version)}
}
func (v *VersionRow) ValueSize() int {
return 1
}
func (v *VersionRow) ValueTo(buf []byte) (int, error) {
buf[0] = v.version
return 1, nil
}
func (v *VersionRow) String() string {
return fmt.Sprintf("Version: %d", v.version)
}
@ -93,16 +115,34 @@ type InternalRow struct {
}
func (i *InternalRow) Key() []byte {
buf := make([]byte, len(i.key)+1)
buf := make([]byte, i.KeySize())
size, _ := i.KeyTo(buf)
return buf[:size]
}
func (i *InternalRow) KeySize() int {
return len(i.key) + 1
}
func (i *InternalRow) KeyTo(buf []byte) (int, error) {
buf[0] = 'i'
copy(buf[1:], i.key)
return buf
actual := copy(buf[1:], i.key)
return 1 + actual, nil
}
func (i *InternalRow) Value() []byte {
return i.val
}
func (i *InternalRow) ValueSize() int {
return len(i.val)
}
func (i *InternalRow) ValueTo(buf []byte) (int, error) {
actual := copy(buf, i.val)
return actual, nil
}
func (i *InternalRow) String() string {
return fmt.Sprintf("InternalStore - Key: %s (% x) Val: %s (% x)", i.key, i.key, i.val, i.val)
}
@ -129,16 +169,35 @@ type FieldRow struct {
}
func (f *FieldRow) Key() []byte {
buf := make([]byte, 3)
buf := make([]byte, f.KeySize())
size, _ := f.KeyTo(buf)
return buf[:size]
}
func (f *FieldRow) KeySize() int {
return 3
}
func (f *FieldRow) KeyTo(buf []byte) (int, error) {
buf[0] = 'f'
binary.LittleEndian.PutUint16(buf[1:3], f.index)
return buf
return 3, nil
}
func (f *FieldRow) Value() []byte {
return append([]byte(f.name), ByteSeparator)
}
func (f *FieldRow) ValueSize() int {
return len(f.name) + 1
}
func (f *FieldRow) ValueTo(buf []byte) (int, error) {
size := copy(buf, f.name)
buf[size] = ByteSeparator
return size + 1, nil
}
func (f *FieldRow) String() string {
return fmt.Sprintf("Field: %d Name: %s", f.index, f.name)
}
@ -182,18 +241,35 @@ type DictionaryRow struct {
}
func (dr *DictionaryRow) Key() []byte {
buf := make([]byte, 3+len(dr.term))
buf := make([]byte, dr.KeySize())
size, _ := dr.KeyTo(buf)
return buf[:size]
}
func (dr *DictionaryRow) KeySize() int {
return len(dr.term) + 3
}
func (dr *DictionaryRow) KeyTo(buf []byte) (int, error) {
buf[0] = 'd'
binary.LittleEndian.PutUint16(buf[1:3], dr.field)
copy(buf[3:], dr.term)
return buf
size := copy(buf[3:], dr.term)
return size + 3, nil
}
func (dr *DictionaryRow) Value() []byte {
used := 0
buf := make([]byte, binary.MaxVarintLen64)
used += binary.PutUvarint(buf, dr.count)
return buf[0:used]
buf := make([]byte, dr.ValueSize())
size, _ := dr.ValueTo(buf)
return buf[:size]
}
func (dr *DictionaryRow) ValueSize() int {
return binary.MaxVarintLen64
}
func (dr *DictionaryRow) ValueTo(buf []byte) (int, error) {
used := binary.PutUvarint(buf, dr.count)
return used, nil
}
func (dr *DictionaryRow) String() string {
@ -304,13 +380,22 @@ func (tfr *TermFrequencyRow) ScanPrefixForFieldTerm() []byte {
}
func (tfr *TermFrequencyRow) Key() []byte {
buf := make([]byte, 3+len(tfr.term)+1+len(tfr.doc))
buf := make([]byte, tfr.KeySize())
size, _ := tfr.KeyTo(buf)
return buf[:size]
}
func (tfr *TermFrequencyRow) KeySize() int {
return 3 + len(tfr.term) + 1 + len(tfr.doc)
}
func (tfr *TermFrequencyRow) KeyTo(buf []byte) (int, error) {
buf[0] = 't'
binary.LittleEndian.PutUint16(buf[1:3], tfr.field)
termLen := copy(buf[3:], tfr.term)
buf[3+termLen] = ByteSeparator
copy(buf[3+termLen+1:], tfr.doc)
return buf
docLen := copy(buf[3+termLen+1:], tfr.doc)
return 3 + termLen + 1 + docLen, nil
}
func (tfr *TermFrequencyRow) DictionaryRowKey() []byte {
@ -318,15 +403,32 @@ func (tfr *TermFrequencyRow) DictionaryRowKey() []byte {
return dr.Key()
}
func (tfr *TermFrequencyRow) DictionaryRowKeySize() int {
dr := NewDictionaryRow(tfr.term, tfr.field, 0)
return dr.KeySize()
}
func (tfr *TermFrequencyRow) DictionaryRowKeyTo(buf []byte) (int, error) {
dr := NewDictionaryRow(tfr.term, tfr.field, 0)
return dr.KeyTo(buf)
}
func (tfr *TermFrequencyRow) Value() []byte {
used := 0
buf := make([]byte, tfr.ValueSize())
size, _ := tfr.ValueTo(buf)
return buf[:size]
}
func (tfr *TermFrequencyRow) ValueSize() int {
bufLen := binary.MaxVarintLen64 + binary.MaxVarintLen64
for _, vector := range tfr.vectors {
bufLen += (binary.MaxVarintLen64 * 4) + (1+len(vector.arrayPositions))*binary.MaxVarintLen64
}
buf := make([]byte, bufLen)
return bufLen
}
used += binary.PutUvarint(buf[used:used+binary.MaxVarintLen64], tfr.freq)
func (tfr *TermFrequencyRow) ValueTo(buf []byte) (int, error) {
used := binary.PutUvarint(buf[:binary.MaxVarintLen64], tfr.freq)
normuint32 := math.Float32bits(tfr.norm)
newbuf := buf[used : used+binary.MaxVarintLen64]
@ -342,7 +444,7 @@ func (tfr *TermFrequencyRow) Value() []byte {
used += binary.PutUvarint(buf[used:used+binary.MaxVarintLen64], arrayPosition)
}
}
return buf[0:used]
return used, nil
}
func (tfr *TermFrequencyRow) String() string {
@ -514,19 +616,41 @@ func (br *BackIndexRow) AllStoredKeys() [][]byte {
}
func (br *BackIndexRow) Key() []byte {
buf := make([]byte, len(br.doc)+1)
buf := make([]byte, br.KeySize())
size, _ := br.KeyTo(buf)
return buf[:size]
}
func (br *BackIndexRow) KeySize() int {
return len(br.doc) + 1
}
func (br *BackIndexRow) KeyTo(buf []byte) (int, error) {
buf[0] = 'b'
copy(buf[1:], br.doc)
return buf
used := copy(buf[1:], br.doc)
return used + 1, nil
}
func (br *BackIndexRow) Value() []byte {
buf := make([]byte, br.ValueSize())
size, _ := br.ValueTo(buf)
return buf[:size]
}
func (br *BackIndexRow) ValueSize() int {
birv := &BackIndexRowValue{
TermEntries: br.termEntries,
StoredEntries: br.storedEntries,
}
bytes, _ := proto.Marshal(birv)
return bytes
return birv.Size()
}
func (br *BackIndexRow) ValueTo(buf []byte) (int, error) {
birv := &BackIndexRowValue{
TermEntries: br.termEntries,
StoredEntries: br.storedEntries,
}
return birv.MarshalTo(buf)
}
func (br *BackIndexRow) String() string {
@ -582,8 +706,17 @@ type StoredRow struct {
}
func (s *StoredRow) Key() []byte {
buf := make([]byte, s.KeySize())
size, _ := s.KeyTo(buf)
return buf[0:size]
}
func (s *StoredRow) KeySize() int {
return 1 + len(s.doc) + 1 + 2 + (binary.MaxVarintLen64 * len(s.arrayPositions))
}
func (s *StoredRow) KeyTo(buf []byte) (int, error) {
docLen := len(s.doc)
buf := make([]byte, 1+docLen+1+2+(binary.MaxVarintLen64*len(s.arrayPositions)))
buf[0] = 's'
copy(buf[1:], s.doc)
buf[1+docLen] = ByteSeparator
@ -593,14 +726,23 @@ func (s *StoredRow) Key() []byte {
varbytes := binary.PutUvarint(buf[bytesUsed:], arrayPosition)
bytesUsed += varbytes
}
return buf[0:bytesUsed]
return bytesUsed, nil
}
func (s *StoredRow) Value() []byte {
rv := make([]byte, len(s.value)+1)
rv[0] = s.typ
copy(rv[1:], s.value)
return rv
buf := make([]byte, s.ValueSize())
size, _ := s.ValueTo(buf)
return buf[:size]
}
func (s *StoredRow) ValueSize() int {
return len(s.value) + 1
}
func (s *StoredRow) ValueTo(buf []byte) (int, error) {
buf[0] = s.typ
used := copy(buf[1:], s.value)
return used + 1, nil
}
func (s *StoredRow) String() string {

View File

@ -114,6 +114,20 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
return
}
var rowBufferPool sync.Pool
func GetRowBuffer() []byte {
if rb, ok := rowBufferPool.Get().([]byte); ok {
return rb
} else {
return make([]byte, 2048)
}
}
func PutRowBuffer(buf []byte) {
rowBufferPool.Put(buf)
}
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) {
// prepare batch
@ -121,29 +135,88 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
// add
for _, row := range addRows {
keyBuf := GetRowBuffer()
valBuf := GetRowBuffer()
tfr, ok := row.(*TermFrequencyRow)
if ok {
// need to increment counter
dictionaryKey := tfr.DictionaryRowKey()
wb.Merge(dictionaryKey, dictionaryTermIncr)
// need to increment term dictinoary counter
if tfr.DictionaryRowKeySize() > len(keyBuf) {
keyBuf = make([]byte, 2*tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(keyBuf)
if err != nil {
return err
}
wb.Merge(keyBuf[:dictKeySize], dictionaryTermIncr)
}
wb.Set(row.Key(), row.Value())
if row.KeySize() > len(keyBuf) {
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil {
return err
}
if row.ValueSize() > len(valBuf) {
// grow buffer
valBuf = make([]byte, 2*row.ValueSize())
}
valSize, err := row.ValueTo(valBuf)
wb.Set(keyBuf[:keySize], valBuf[:valSize])
PutRowBuffer(keyBuf)
PutRowBuffer(valBuf)
}
// update
for _, row := range updateRows {
wb.Set(row.Key(), row.Value())
keyBuf := GetRowBuffer()
valBuf := GetRowBuffer()
if row.KeySize() > len(keyBuf) {
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil {
return err
}
if row.ValueSize() > len(valBuf) {
// grow buffer
valBuf = make([]byte, 2*row.ValueSize())
}
valSize, err := row.ValueTo(valBuf)
wb.Set(keyBuf[:keySize], valBuf[:valSize])
PutRowBuffer(keyBuf)
PutRowBuffer(valBuf)
}
// delete
for _, row := range deleteRows {
keyBuf := GetRowBuffer()
tfr, ok := row.(*TermFrequencyRow)
if ok {
// need to decrement counter
dictionaryKey := tfr.DictionaryRowKey()
wb.Merge(dictionaryKey, dictionaryTermDecr)
if tfr.DictionaryRowKeySize() > len(keyBuf) {
keyBuf = make([]byte, 2*tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(keyBuf)
if err != nil {
return err
}
wb.Merge(keyBuf[:dictKeySize], dictionaryTermDecr)
}
wb.Delete(row.Key())
if row.KeySize() > len(keyBuf) {
// grow buffer
keyBuf = make([]byte, 2*row.KeySize())
}
keySize, err := row.KeyTo(keyBuf)
if err != nil {
return err
}
wb.Delete(keyBuf[:keySize])
PutRowBuffer(keyBuf)
}
// write out the batch

View File

@ -348,7 +348,7 @@ func TestIndexInsertMultiple(t *testing.T) {
}
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
t.Fatalf("error opening index: %v", err)
}
defer func() {
err := idx.Close()