// Copyright (c) 2017 Couchbase, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package zap import ( "bufio" "bytes" "encoding/binary" "math" "os" "github.com/Smerity/govarint" "github.com/blevesearch/bleve/index/scorch/segment/mem" "github.com/couchbase/vellum" "github.com/golang/snappy" ) const version uint32 = 1 // PersistSegment takes the in-memory segment and persists it to the specified // path in the zap file format. func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) { flag := os.O_RDWR | os.O_CREATE f, err := os.OpenFile(path, flag, 0600) if err != nil { return err } // bufer the output br := bufio.NewWriter(f) // wrap it for counting (tracking offsets) cr := NewCountHashWriter(br) var storedIndexOffset uint64 var dictLocs []uint64 if len(memSegment.Stored) > 0 { storedIndexOffset, err = persistStored(memSegment, cr) if err != nil { return err } var freqOffsets, locOffsets []uint64 freqOffsets, locOffsets, err = persistPostingDetails(memSegment, cr, chunkFactor) if err != nil { return err } var postingsListLocs []uint64 postingsListLocs, err = persistPostingsLocs(memSegment, cr) if err != nil { return err } var postingsLocs []uint64 postingsLocs, err = persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets) if err != nil { return err } dictLocs, err = persistDictionary(memSegment, cr, postingsLocs) if err != nil { return err } } else { dictLocs = make([]uint64, len(memSegment.FieldsInv)) } var fieldIndexStart uint64 fieldIndexStart, err = persistFields(memSegment.FieldsInv, cr, dictLocs) if err != nil { return err } err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset, fieldIndexStart, chunkFactor, cr) if err != nil { return err } err = br.Flush() if err != nil { return err } err = f.Sync() if err != nil { return err } err = f.Close() if err != nil { return err } return nil } func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { var curr int var metaBuf bytes.Buffer var data, compressed []byte docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) for docNum, storedValues := range memSegment.Stored { if docNum != 0 { // reset buffer if necessary metaBuf.Reset() data = data[:0] compressed = compressed[:0] curr = 0 } metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) // encode fields in order for fieldID := range memSegment.FieldsInv { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { // has stored values for this field num := len(storedFieldValues) // process each value for i := 0; i < num; i++ { // encode field _, err2 := metaEncoder.PutU64(uint64(fieldID)) if err2 != nil { return 0, err2 } // encode type _, err2 = metaEncoder.PutU64(uint64(memSegment.StoredTypes[docNum][uint16(fieldID)][i])) if err2 != nil { return 0, err2 } // encode start offset _, err2 = metaEncoder.PutU64(uint64(curr)) if err2 != nil { return 0, err2 } // end len _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) if err2 != nil { return 0, err2 } // encode number of array pos _, err2 = metaEncoder.PutU64(uint64(len(memSegment.StoredPos[docNum][uint16(fieldID)][i]))) if err2 != nil { return 0, err2 } // encode all array positions for j := 0; j < len(memSegment.StoredPos[docNum][uint16(fieldID)][i]); j++ { _, err2 = metaEncoder.PutU64(memSegment.StoredPos[docNum][uint16(fieldID)][i][j]) if err2 != nil { return 0, err2 } } // append data data = append(data, storedFieldValues[i]...) // update curr curr += len(storedFieldValues[i]) } } } metaEncoder.Close() metaBytes := metaBuf.Bytes() // compress the data compressed = snappy.Encode(compressed, data) // record where we're about to start writing docNumOffsets[docNum] = uint64(w.Count()) // write out the meta len and compressed data len _, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed))) if err != nil { return 0, err } // now write the meta _, err = w.Write(metaBytes) if err != nil { return 0, err } // now write the compressed data _, err = w.Write(compressed) if err != nil { return 0, err } } // return value is the start of the stored index rv := uint64(w.Count()) // now write out the stored doc index for docNum := range memSegment.Stored { err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum]) if err != nil { return 0, err } } return rv, nil } func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { var freqOffsets, locOfffsets []uint64 tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) for postingID := range memSegment.Postings { if postingID != 0 { tfEncoder.Reset() } postingsListItr := memSegment.Postings[postingID].Iterator() var offset int for postingsListItr.HasNext() { docNum := uint64(postingsListItr.Next()) // put freq err := tfEncoder.Add(docNum, memSegment.Freqs[postingID][offset]) if err != nil { return nil, nil, err } // put norm norm := memSegment.Norms[postingID][offset] normBits := math.Float32bits(norm) err = tfEncoder.Add(docNum, uint64(normBits)) if err != nil { return nil, nil, err } offset++ } // record where this postings freq info starts freqOffsets = append(freqOffsets, uint64(w.Count())) tfEncoder.Close() _, err := tfEncoder.Write(w) if err != nil { return nil, nil, err } } // now do it again for the locations locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) for postingID := range memSegment.Postings { if postingID != 0 { locEncoder.Reset() } postingsListItr := memSegment.Postings[postingID].Iterator() var offset int var locOffset int for postingsListItr.HasNext() { docNum := uint64(postingsListItr.Next()) for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ { if len(memSegment.Locfields[postingID]) > 0 { // put field err := locEncoder.Add(docNum, uint64(memSegment.Locfields[postingID][locOffset])) if err != nil { return nil, nil, err } // put pos err = locEncoder.Add(docNum, memSegment.Locpos[postingID][locOffset]) if err != nil { return nil, nil, err } // put start err = locEncoder.Add(docNum, memSegment.Locstarts[postingID][locOffset]) if err != nil { return nil, nil, err } // put end err = locEncoder.Add(docNum, memSegment.Locends[postingID][locOffset]) if err != nil { return nil, nil, err } // put array positions num := len(memSegment.Locarraypos[postingID][locOffset]) // put the number of array positions to follow err = locEncoder.Add(docNum, uint64(num)) if err != nil { return nil, nil, err } // put each array position for j := 0; j < num; j++ { err = locEncoder.Add(docNum, memSegment.Locarraypos[postingID][locOffset][j]) if err != nil { return nil, nil, err } } } locOffset++ } offset++ } // record where this postings loc info starts locOfffsets = append(locOfffsets, uint64(w.Count())) locEncoder.Close() _, err := locEncoder.Write(w) if err != nil { return nil, nil, err } } return freqOffsets, locOfffsets, nil } func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) { for postingID := range memSegment.PostingsLocs { // record where we start this posting loc rv = append(rv, uint64(w.Count())) // write out the length and bitmap _, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w) if err != nil { return nil, err } } return rv, nil } func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) { for postingID := range memSegment.Postings { // record where we start this posting list rv = append(rv, uint64(w.Count())) // write out the term info, loc info, and loc posting list offset _, err = writeUvarints(w, freqOffsets[postingID], locOffsets[postingID], postingsListLocs[postingID]) if err != nil { return nil, err } // write out the length and bitmap _, err = writeRoaringWithLen(memSegment.Postings[postingID], w) if err != nil { return nil, err } } return rv, nil } func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) { var rv []uint64 var buffer bytes.Buffer for fieldID, fieldTerms := range memSegment.DictKeys { if fieldID != 0 { buffer.Reset() } // start a new vellum for this field builder, err := vellum.New(&buffer, nil) if err != nil { return nil, err } dict := memSegment.Dicts[fieldID] // now walk the dictionary in order of fieldTerms (already sorted) for i := range fieldTerms { postingID := dict[fieldTerms[i]] - 1 postingsAddr := postingsLocs[postingID] err = builder.Insert([]byte(fieldTerms[i]), postingsAddr) if err != nil { return nil, err } } err = builder.Close() if err != nil { return nil, err } // record where this dictionary starts rv = append(rv, uint64(w.Count())) vellumData := buffer.Bytes() // write out the length of the vellum data buf := make([]byte, binary.MaxVarintLen64) // write out the number of chunks n := binary.PutUvarint(buf, uint64(len(vellumData))) _, err = w.Write(buf[:n]) if err != nil { return nil, err } // write this vellum to disk _, err = w.Write(vellumData) if err != nil { return nil, err } } return rv, nil }