2015-08-25 20:52:42 +02:00
|
|
|
// Copyright (c) 2015 Couchbase, Inc.
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
|
|
|
// except in compliance with the License. You may obtain a copy of the License at
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
|
|
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
|
|
// either express or implied. See the License for the specific language governing permissions
|
|
|
|
// and limitations under the License.
|
|
|
|
|
|
|
|
package firestorm
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"math/rand"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/steveyen/gtreap"
|
|
|
|
"github.com/willf/bitset"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Compensator struct {
|
|
|
|
inFlightMutex sync.RWMutex
|
|
|
|
maxRead uint64
|
|
|
|
inFlight *gtreap.Treap
|
|
|
|
deletedMutex sync.RWMutex
|
|
|
|
deletedDocNumbers *bitset.BitSet
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCompensator() *Compensator {
|
|
|
|
rv := Compensator{
|
|
|
|
inFlight: gtreap.NewTreap(inFlightItemCompare),
|
|
|
|
deletedDocNumbers: bitset.New(1000000),
|
|
|
|
}
|
|
|
|
return &rv
|
|
|
|
}
|
|
|
|
|
|
|
|
type Snapshot struct {
|
|
|
|
maxRead uint64
|
|
|
|
inFlight *gtreap.Treap
|
|
|
|
deletedDocNumbers *bitset.BitSet
|
|
|
|
}
|
|
|
|
|
|
|
|
// returns which doc number is valid
|
|
|
|
// if none, then 0
|
|
|
|
func (s *Snapshot) Which(docID []byte, docNumList DocNumberList) uint64 {
|
|
|
|
sort.Sort(docNumList)
|
2016-01-07 01:04:56 +01:00
|
|
|
for _, docNum := range docNumList { // docNumList is sorted descending.
|
|
|
|
if docNum > 0 && s.Valid(docID, docNum) {
|
|
|
|
return docNum
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Snapshot) Valid(docID []byte, docNum uint64) bool {
|
|
|
|
logger.Printf("checking validity of: '%s' - % x - %d", docID, docID, docNum)
|
|
|
|
if docNum > s.maxRead {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
logger.Printf("<= maxRead")
|
|
|
|
inFlightVal := s.inFlight.Get(&InFlightItem{docID: docID})
|
|
|
|
if inFlightVal != nil && inFlightVal.(*InFlightItem).docNum != docNum {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
logger.Printf("not in flight")
|
|
|
|
if s.deletedDocNumbers.Test(uint(docNum)) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
logger.Printf("not deleted")
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Compensator) Mutate(docID []byte, docNum uint64) {
|
|
|
|
c.inFlightMutex.Lock()
|
|
|
|
defer c.inFlightMutex.Unlock()
|
|
|
|
c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: docID, docNum: docNum}, rand.Int())
|
|
|
|
if docNum != 0 {
|
|
|
|
c.maxRead = docNum
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-31 05:43:31 +01:00
|
|
|
func (c *Compensator) MutateBatch(inflightItems []*InFlightItem, lastDocNum uint64) {
|
2015-08-25 20:52:42 +02:00
|
|
|
c.inFlightMutex.Lock()
|
|
|
|
defer c.inFlightMutex.Unlock()
|
2015-12-31 05:43:31 +01:00
|
|
|
for _, item := range inflightItems {
|
|
|
|
c.inFlight = c.inFlight.Upsert(item, rand.Int())
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
2015-12-31 05:43:31 +01:00
|
|
|
c.maxRead = lastDocNum
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) {
|
|
|
|
c.inFlightMutex.Lock()
|
|
|
|
defer c.inFlightMutex.Unlock()
|
|
|
|
c.deletedMutex.Lock()
|
|
|
|
defer c.deletedMutex.Unlock()
|
|
|
|
|
|
|
|
// clone deleted doc numbers and mutate
|
2015-09-10 14:19:05 +02:00
|
|
|
if len(oldDocNums) > 0 {
|
|
|
|
newDeletedDocNumbers := c.deletedDocNumbers.Clone()
|
|
|
|
for _, oldDocNum := range oldDocNums {
|
|
|
|
newDeletedDocNumbers.Set(uint(oldDocNum))
|
|
|
|
}
|
|
|
|
// update pointer
|
|
|
|
c.deletedDocNumbers = newDeletedDocNumbers
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// remove entry from in-flight if it still has same doc num
|
|
|
|
val := c.inFlight.Get(&InFlightItem{docID: docID})
|
2016-01-03 19:20:56 +01:00
|
|
|
if val != nil && val.(*InFlightItem).docNum == docNum {
|
2015-08-25 20:52:42 +02:00
|
|
|
c.inFlight = c.inFlight.Delete(&InFlightItem{docID: docID})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Compensator) GarbageCollect(docNums []uint64) {
|
|
|
|
c.deletedMutex.Lock()
|
|
|
|
defer c.deletedMutex.Unlock()
|
|
|
|
|
|
|
|
for _, docNum := range docNums {
|
|
|
|
c.deletedDocNumbers.Clear(uint(docNum))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Compensator) Snapshot() *Snapshot {
|
|
|
|
c.inFlightMutex.RLock()
|
|
|
|
defer c.inFlightMutex.RUnlock()
|
|
|
|
c.deletedMutex.RLock()
|
|
|
|
defer c.deletedMutex.RUnlock()
|
|
|
|
|
|
|
|
rv := Snapshot{
|
|
|
|
maxRead: c.maxRead,
|
|
|
|
inFlight: c.inFlight,
|
|
|
|
deletedDocNumbers: c.deletedDocNumbers,
|
|
|
|
}
|
|
|
|
return &rv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Compensator) GarbageCount() uint64 {
|
|
|
|
return uint64(c.deletedDocNumbers.Count())
|
|
|
|
}
|
|
|
|
|
|
|
|
//**************
|
|
|
|
|
|
|
|
type InFlightItem struct {
|
|
|
|
docID []byte
|
|
|
|
docNum uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
func inFlightItemCompare(a, b interface{}) int {
|
|
|
|
return bytes.Compare(a.(*InFlightItem).docID, b.(*InFlightItem).docID)
|
|
|
|
}
|