// Copyright (c) 2015 Couchbase, Inc. // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file // except in compliance with the License. You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software distributed under the // License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, // either express or implied. See the License for the specific language governing permissions // and limitations under the License. package firestorm import ( "bytes" "math/rand" "sort" "sync" "github.com/blevesearch/bleve/document" "github.com/steveyen/gtreap" "github.com/willf/bitset" ) type Compensator struct { inFlightMutex sync.RWMutex maxRead uint64 inFlight *gtreap.Treap deletedMutex sync.RWMutex deletedDocNumbers *bitset.BitSet } func NewCompensator() *Compensator { rv := Compensator{ inFlight: gtreap.NewTreap(inFlightItemCompare), deletedDocNumbers: bitset.New(1000000), } return &rv } type Snapshot struct { maxRead uint64 inFlight *gtreap.Treap deletedDocNumbers *bitset.BitSet } // returns which doc number is valid // if none, then 0 func (s *Snapshot) Which(docID []byte, docNumList DocNumberList) uint64 { sort.Sort(docNumList) highestValidDocNum := docNumList.HighestValid(s.maxRead) if highestValidDocNum > 0 && s.Valid(docID, highestValidDocNum) { return highestValidDocNum } return 0 } func (s *Snapshot) Valid(docID []byte, docNum uint64) bool { logger.Printf("checking validity of: '%s' - % x - %d", docID, docID, docNum) if docNum > s.maxRead { return false } logger.Printf("<= maxRead") inFlightVal := s.inFlight.Get(&InFlightItem{docID: docID}) if inFlightVal != nil && inFlightVal.(*InFlightItem).docNum != docNum { return false } logger.Printf("not in flight") if s.deletedDocNumbers.Test(uint(docNum)) { return false } logger.Printf("not deleted") return true } func (c *Compensator) Mutate(docID []byte, docNum uint64) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: docID, docNum: docNum}, rand.Int()) if docNum != 0 { c.maxRead = docNum } } func (c *Compensator) MutateBatch(docs map[string]*document.Document, docNum uint64) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() for docID, doc := range docs { if doc != nil { c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: doc.Number}, rand.Int()) } else { c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: 0}, rand.Int()) } } c.maxRead = docNum } func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() c.deletedMutex.Lock() defer c.deletedMutex.Unlock() // clone deleted doc numbers and mutate if len(oldDocNums) > 0 { newDeletedDocNumbers := c.deletedDocNumbers.Clone() for _, oldDocNum := range oldDocNums { newDeletedDocNumbers.Set(uint(oldDocNum)) } // update pointer c.deletedDocNumbers = newDeletedDocNumbers } // remove entry from in-flight if it still has same doc num val := c.inFlight.Get(&InFlightItem{docID: docID}) if val.(*InFlightItem).docNum == docNum { c.inFlight = c.inFlight.Delete(&InFlightItem{docID: docID}) } } func (c *Compensator) GarbageCollect(docNums []uint64) { c.deletedMutex.Lock() defer c.deletedMutex.Unlock() for _, docNum := range docNums { c.deletedDocNumbers.Clear(uint(docNum)) } } func (c *Compensator) Snapshot() *Snapshot { c.inFlightMutex.RLock() defer c.inFlightMutex.RUnlock() c.deletedMutex.RLock() defer c.deletedMutex.RUnlock() rv := Snapshot{ maxRead: c.maxRead, inFlight: c.inFlight, deletedDocNumbers: c.deletedDocNumbers, } return &rv } func (c *Compensator) GarbageCount() uint64 { return uint64(c.deletedDocNumbers.Count()) } //************** type InFlightItem struct { docID []byte docNum uint64 } func inFlightItemCompare(a, b interface{}) int { return bytes.Compare(a.(*InFlightItem).docID, b.(*InFlightItem).docID) }