diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 7998eae3..76c14551 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -41,162 +41,170 @@ OUTER: break OUTER case notify = <-s.introducerNotifier: + continue case nextMerge := <-s.merges: - // acquire lock - s.rootLock.Lock() - - // prepare new index snapshot - currSize := len(s.root.segment) - newSize := currSize + 1 - len(nextMerge.old) - newSnapshot := &IndexSnapshot{ - segment: make([]*SegmentSnapshot, 0, newSize), - offsets: make([]uint64, 0, newSize), - internal: make(map[string][]byte, len(s.root.segment)), - epoch: s.nextSnapshotEpoch, - } - s.nextSnapshotEpoch++ - - // iterate through current segments - newSegmentDeleted := roaring.NewBitmap() - var running uint64 - for i := range s.root.segment { - segmentID := s.root.segment[i].id - if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok { - // this segment is going away, see if anything else was deleted since we started the merge - if s.root.segment[i].deleted != nil { - // assume all these deletes are new - deletedSince := s.root.segment[i].deleted - // if we already knew about some of them, remove - if segSnapAtMerge.deleted != nil { - deletedSince = roaring.AndNot(s.root.segment[i].deleted, segSnapAtMerge.deleted) - } - deletedSinceItr := deletedSince.Iterator() - for deletedSinceItr.HasNext() { - oldDocNum := deletedSinceItr.Next() - newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum] - newSegmentDeleted.Add(uint32(newDocNum)) - } - } - } else { - // this segment is staying - newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ - id: s.root.segment[i].id, - segment: s.root.segment[i].segment, - notify: s.root.segment[i].notify, - deleted: s.root.segment[i].deleted, - }) - newSnapshot.offsets = append(newSnapshot.offsets, running) - running += s.root.segment[i].Count() - } - } - - // put new segment at end - newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ - id: nextMerge.id, - segment: nextMerge.new, - deleted: newSegmentDeleted, - }) - newSnapshot.offsets = append(newSnapshot.offsets, running) - - // copy old values - for key, oldVal := range s.root.internal { - newSnapshot.internal[key] = oldVal - } - - // swap in new segment - s.root = newSnapshot - // release lock - s.rootLock.Unlock() - - // notify merger we incorporated this - close(nextMerge.notify) - - // notify persister - if notify != nil { - close(notify) - notify = nil - } + s.introduceMerge(nextMerge) case next := <-s.introductions: - // acquire lock - s.rootLock.Lock() - - // prepare new index snapshot, with curr size + 1 - newSnapshot := &IndexSnapshot{ - segment: make([]*SegmentSnapshot, len(s.root.segment)+1), - offsets: make([]uint64, len(s.root.segment)+1), - internal: make(map[string][]byte, len(s.root.segment)), - epoch: s.nextSnapshotEpoch, - } - s.nextSnapshotEpoch++ - - // iterate through current segments - var running uint64 - for i := range s.root.segment { - // see if optimistic work included this segment - delta, ok := next.obsoletes[s.root.segment[i].id] - if !ok { - var err error - delta, err = s.root.segment[i].segment.DocNumbers(next.ids) - if err != nil { - next.applied <- fmt.Errorf("error computing doc numbers: %v", err) - close(next.applied) - continue OUTER - } - } - newSnapshot.segment[i] = &SegmentSnapshot{ - id: s.root.segment[i].id, - segment: s.root.segment[i].segment, - notify: s.root.segment[i].notify, - } - // apply new obsoletions - if s.root.segment[i].deleted == nil { - newSnapshot.segment[i].deleted = delta - } else { - newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta) - } - - newSnapshot.offsets[i] = running - running += s.root.segment[i].Count() - - } - // put new segment at end - newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ - id: next.id, - segment: next.data, - } - newSnapshot.offsets[len(s.root.segment)] = running - if !s.unsafeBatch { - newSnapshot.segment[len(s.root.segment)].notify = append( - newSnapshot.segment[len(s.root.segment)].notify, - next.persisted, - ) - } - // copy old values - for key, oldVal := range s.root.internal { - newSnapshot.internal[key] = oldVal - } - // set new values and apply deletes - for key, newVal := range next.internal { - if newVal != nil { - newSnapshot.internal[key] = newVal - } else { - delete(newSnapshot.internal, key) - } - } - // swap in new segment - s.root = newSnapshot - // release lock - s.rootLock.Unlock() - close(next.applied) - - if notify != nil { - close(notify) - notify = nil + err := s.introduceSegment(next) + if err != nil { + continue OUTER } } + // notify persister + if notify != nil { + close(notify) + notify = nil + } } s.asyncTasks.Done() } + +func (s *Scorch) introduceSegment(next *segmentIntroduction) error { + // acquire lock + s.rootLock.Lock() + + // prepare new index snapshot, with curr size + 1 + newSnapshot := &IndexSnapshot{ + segment: make([]*SegmentSnapshot, len(s.root.segment)+1), + offsets: make([]uint64, len(s.root.segment)+1), + internal: make(map[string][]byte, len(s.root.segment)), + epoch: s.nextSnapshotEpoch, + } + s.nextSnapshotEpoch++ + + // iterate through current segments + var running uint64 + for i := range s.root.segment { + // see if optimistic work included this segment + delta, ok := next.obsoletes[s.root.segment[i].id] + if !ok { + var err error + delta, err = s.root.segment[i].segment.DocNumbers(next.ids) + if err != nil { + next.applied <- fmt.Errorf("error computing doc numbers: %v", err) + close(next.applied) + return err + } + } + newSnapshot.segment[i] = &SegmentSnapshot{ + id: s.root.segment[i].id, + segment: s.root.segment[i].segment, + notify: s.root.segment[i].notify, + } + // apply new obsoletions + if s.root.segment[i].deleted == nil { + newSnapshot.segment[i].deleted = delta + } else { + newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta) + } + + newSnapshot.offsets[i] = running + running += s.root.segment[i].Count() + + } + // put new segment at end + newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ + id: next.id, + segment: next.data, + } + newSnapshot.offsets[len(s.root.segment)] = running + if !s.unsafeBatch { + newSnapshot.segment[len(s.root.segment)].notify = append( + newSnapshot.segment[len(s.root.segment)].notify, + next.persisted, + ) + } + // copy old values + for key, oldVal := range s.root.internal { + newSnapshot.internal[key] = oldVal + } + // set new values and apply deletes + for key, newVal := range next.internal { + if newVal != nil { + newSnapshot.internal[key] = newVal + } else { + delete(newSnapshot.internal, key) + } + } + // swap in new segment + s.root = newSnapshot + // release lock + s.rootLock.Unlock() + close(next.applied) + + return nil +} + +func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { + // acquire lock + s.rootLock.Lock() + + // prepare new index snapshot + currSize := len(s.root.segment) + newSize := currSize + 1 - len(nextMerge.old) + newSnapshot := &IndexSnapshot{ + segment: make([]*SegmentSnapshot, 0, newSize), + offsets: make([]uint64, 0, newSize), + internal: make(map[string][]byte, len(s.root.segment)), + epoch: s.nextSnapshotEpoch, + } + s.nextSnapshotEpoch++ + + // iterate through current segments + newSegmentDeleted := roaring.NewBitmap() + var running uint64 + for i := range s.root.segment { + segmentID := s.root.segment[i].id + if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok { + // this segment is going away, see if anything else was deleted since we started the merge + if s.root.segment[i].deleted != nil { + // assume all these deletes are new + deletedSince := s.root.segment[i].deleted + // if we already knew about some of them, remove + if segSnapAtMerge.deleted != nil { + deletedSince = roaring.AndNot(s.root.segment[i].deleted, segSnapAtMerge.deleted) + } + deletedSinceItr := deletedSince.Iterator() + for deletedSinceItr.HasNext() { + oldDocNum := deletedSinceItr.Next() + newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum] + newSegmentDeleted.Add(uint32(newDocNum)) + } + } + } else { + // this segment is staying + newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ + id: s.root.segment[i].id, + segment: s.root.segment[i].segment, + notify: s.root.segment[i].notify, + deleted: s.root.segment[i].deleted, + }) + newSnapshot.offsets = append(newSnapshot.offsets, running) + running += s.root.segment[i].Count() + } + } + + // put new segment at end + newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ + id: nextMerge.id, + segment: nextMerge.new, + deleted: newSegmentDeleted, + }) + newSnapshot.offsets = append(newSnapshot.offsets, running) + + // copy old values + for key, oldVal := range s.root.internal { + newSnapshot.internal[key] = oldVal + } + + // swap in new segment + s.root = newSnapshot + // release lock + s.rootLock.Unlock() + + // notify merger we incorporated this + close(nextMerge.notify) +}