0
0
Fork 0

initial impl of Index Aliases

an IndexAlias allows you easily work with one logical Index
while changing the actual Index its pointing to behind the scenes
Changing which actual Index is backing an IndexAlias can be done
atomically so that your application smoothly transitions from
one Index to another.
A separate use of IndexAlias is allowed when the IndexAlias is
defined to point to multiple Indexes.  In this case only the
Search() operation is supported, but the Search will be run
on each of the underlying indexes in parallel, and the results
will be merged.
This commit is contained in:
Marty Schoch 2014-10-29 09:22:11 -04:00
parent 3a0263bb72
commit 51a59cb05c
12 changed files with 836 additions and 16 deletions

View File

@ -22,6 +22,8 @@ const (
ErrorUnknownQueryType
ErrorUnknownStorageType
ErrorIndexClosed
ErrorAliasMulti
ErrorAliasEmpty
)
// Error represents a more strongly typed bleve error for detecting
@ -44,4 +46,6 @@ var errorMessages = map[int]string{
int(ErrorUnknownQueryType): "unknown query type",
int(ErrorUnknownStorageType): "unkown storage type",
int(ErrorIndexClosed): "index is closed",
int(ErrorAliasMulti): "cannot perform single index operation on multiple index alias",
int(ErrorAliasEmpty): "cannot perform operation on empty alias",
}

65
http/alias.go Normal file
View File

@ -0,0 +1,65 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package http
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
type AliasAction struct {
Alias string `json:"alias"`
AddIndexes []string `json:"add"`
RemoveIndexes []string `json:"remove"`
}
type AliasHandler struct{}
func NewAliasHandler() *AliasHandler {
return &AliasHandler{}
}
func (h *AliasHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// read the request body
requestBody, err := ioutil.ReadAll(req.Body)
if err != nil {
showError(w, req, fmt.Sprintf("error reading request body: %v", err), 400)
return
}
var aliasAction AliasAction
// interpret request body as index mapping
if len(requestBody) > 0 {
err := json.Unmarshal(requestBody, &aliasAction)
if err != nil {
showError(w, req, fmt.Sprintf("error parsing alias actions: %v", err), 400)
return
}
} else {
showError(w, req, "request body must contain alias actions", 400)
return
}
err = UpdateAlias(aliasAction.Alias, aliasAction.AddIndexes, aliasAction.RemoveIndexes)
if err != nil {
showError(w, req, fmt.Sprintf("error updating alia: %v", err), 400)
return
}
rv := struct {
Status string `json:"status"`
}{
Status: "ok",
}
mustEncode(w, rv)
}

View File

@ -10,6 +10,7 @@
package http
import (
"fmt"
"sync"
"github.com/blevesearch/bleve"
@ -68,3 +69,52 @@ func IndexNames() []string {
func IndexStats() bleve.IndexStats {
return indexStats
}
func UpdateAlias(alias string, add, remove []string) error {
indexNameMappingLock.Lock()
defer indexNameMappingLock.Unlock()
index, exists := indexNameMapping[alias]
if !exists {
// new alias
if len(remove) > 0 {
return fmt.Errorf("cannot remove indexes from a new alias")
}
indexes := make([]bleve.Index, len(add))
for i, addIndexName := range add {
addIndex, indexExists := indexNameMapping[addIndexName]
if !indexExists {
return fmt.Errorf("index named '%s' does not exist", addIndexName)
}
indexes[i] = addIndex
}
indexAlias := bleve.NewIndexAlias(indexes...)
indexNameMapping[alias] = indexAlias
} else {
// something with this name already exists
indexAlias, isAlias := index.(bleve.IndexAlias)
if !isAlias {
return fmt.Errorf("'%s' is not an alias")
}
// build list of add indexes
addIndexes := make([]bleve.Index, len(add))
for i, addIndexName := range add {
addIndex, indexExists := indexNameMapping[addIndexName]
if !indexExists {
return fmt.Errorf("index named '%s' does not exist", addIndexName)
}
addIndexes[i] = addIndex
}
// build list of remove indexes
removeIndexes := make([]bleve.Index, len(add))
for i, removeIndexName := range add {
removeIndex, indexExists := indexNameMapping[removeIndexName]
if !indexExists {
return fmt.Errorf("index named '%s' does not exist", removeIndexName)
}
removeIndexes[i] = removeIndex
}
indexAlias.Swap(addIndexes, removeIndexes)
}
return nil
}

32
index_alias.go Normal file
View File

@ -0,0 +1,32 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package bleve
// An IndexAlias is a wrapper around one or more
// Index objects. It has two distinct modes of
// operation.
// 1. When it points to a single index, ALL index
// operations are valid and will be passed through
// to the underlying index.
// 2. When it points to more than index, the only
// valid operation is Search. In this case the
// search will be performed across all the
// underlying indexes and the results merged.
// Calls to Add/Remove/Swap the underlying indexes
// are atomic, so you can safely change the
// underlying Index objects while other components
// are performing operations.
type IndexAlias interface {
Index
Add(i ...Index)
Remove(i ...Index)
Swap(in, out []Index)
}

424
index_alias_impl.go Normal file
View File

@ -0,0 +1,424 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package bleve
import (
"sort"
"sync"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/search"
)
type indexAliasImpl struct {
indexes []Index
mutex sync.RWMutex
open bool
}
// NewIndexAlias creates a new IndexAlias over the provided
// Index objects.
func NewIndexAlias(indexes ...Index) *indexAliasImpl {
return &indexAliasImpl{
indexes: indexes,
open: true,
}
}
func (i *indexAliasImpl) isAliasToSingleIndex() error {
if len(i.indexes) < 1 {
return ErrorAliasEmpty
} else if len(i.indexes) > 1 {
return ErrorAliasMulti
}
return nil
}
func (i *indexAliasImpl) Index(id string, data interface{}) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return err
}
return i.indexes[0].Index(id, data)
}
func (i *indexAliasImpl) Delete(id string) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return err
}
return i.indexes[0].Delete(id)
}
func (i *indexAliasImpl) Batch(b Batch) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return err
}
return i.indexes[0].Batch(b)
}
func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil, err
}
return i.indexes[0].Document(id)
}
func (i *indexAliasImpl) DocCount() uint64 {
rv := uint64(0)
if !i.open {
return 0
}
for _, index := range i.indexes {
rv += index.DocCount()
}
return rv
}
func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if len(i.indexes) < 1 {
return nil, ErrorAliasEmpty
}
// short circuit the simple case
if len(i.indexes) == 1 {
return i.indexes[0].Search(req)
}
return MultiSearch(req, i.indexes...)
}
func (i *indexAliasImpl) Fields() ([]string, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil, err
}
return i.indexes[0].Fields()
}
func (i *indexAliasImpl) DumpAll() chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil
}
return i.indexes[0].DumpAll()
}
func (i *indexAliasImpl) DumpDoc(id string) chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil
}
return i.indexes[0].DumpDoc(id)
}
func (i *indexAliasImpl) DumpFields() chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil
}
return i.indexes[0].DumpFields()
}
func (i *indexAliasImpl) Close() {
i.mutex.Lock()
defer i.mutex.Unlock()
i.open = false
}
func (i *indexAliasImpl) Mapping() *IndexMapping {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil
}
return i.indexes[0].Mapping()
}
func (i *indexAliasImpl) Stats() *IndexStat {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil
}
return i.indexes[0].Stats()
}
func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return nil, err
}
return i.indexes[0].GetInternal(key)
}
func (i *indexAliasImpl) SetInternal(key, val []byte) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return err
}
return i.indexes[0].SetInternal(key, val)
}
func (i *indexAliasImpl) DeleteInternal(key []byte) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
}
err := i.isAliasToSingleIndex()
if err != nil {
return err
}
return i.indexes[0].DeleteInternal(key)
}
func (i *indexAliasImpl) Add(indexes ...Index) {
i.mutex.Lock()
defer i.mutex.Unlock()
i.indexes = append(i.indexes, indexes...)
}
func (i *indexAliasImpl) removeSingle(index Index) {
for pos, in := range i.indexes {
if in == index {
i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
break
}
}
}
func (i *indexAliasImpl) Remove(indexes ...Index) {
i.mutex.Lock()
defer i.mutex.Unlock()
for _, in := range indexes {
i.removeSingle(in)
}
}
func (i *indexAliasImpl) Swap(in, out []Index) {
i.mutex.Lock()
defer i.mutex.Unlock()
// add
i.indexes = append(i.indexes, in...)
// delete
for _, ind := range out {
i.removeSingle(ind)
}
}
// MultiSearch executes a SearchRequest across multiple
// Index objects, then merges the results.
func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
results := make(chan *SearchResult)
errs := make(chan error)
// remember the original from and size
size := req.Size
req.Size += req.From
from := req.From
req.From = 0
// FIXME should create new request for
// child queries, disable highlighting
// and field loading
// then, perform these steps only on
// the final results
// run search on each index in separate go routine
var waitGroup sync.WaitGroup
var searchChildIndex = func(waitGroup *sync.WaitGroup, in Index, results chan *SearchResult, errs chan error) {
go func() {
defer waitGroup.Done()
searchResult, err := in.Search(req)
if err != nil {
errs <- err
} else {
results <- searchResult
}
}()
}
for _, in := range indexes {
waitGroup.Add(1)
searchChildIndex(&waitGroup, in, results, errs)
}
// on another go routine, close after finished
go func() {
waitGroup.Wait()
close(results)
close(errs)
}()
var sr *SearchResult
var err error
var result *SearchResult
ok := true
for ok {
select {
case result, ok = <-results:
if ok {
if sr == nil {
// first result
sr = result
} else {
// merge with previous
sr.Merge(result)
}
}
case err, ok = <-errs:
}
}
// merge just concatenated all the hits
// now lets clean it up
// first sort it by score
sort.Sort(sr.Hits)
// now skip over the correct From
req.From = from
if req.From > 0 && len(sr.Hits) > req.From {
sr.Hits = sr.Hits[req.From:]
} else if req.From > 0 {
sr.Hits = search.DocumentMatchCollection{}
}
// now trim to the correct size
req.Size = size
if req.Size > 0 && len(sr.Hits) > req.Size {
sr.Hits = sr.Hits[0:req.Size]
}
// fix up facets
for name, fr := range req.Facets {
sr.Facets.Fixup(name, fr.Size)
}
// for now stop on any error
// FIXME offer other behaviors
if err != nil {
return nil, err
}
return sr, nil
}

View File

@ -266,3 +266,13 @@ func (sr *SearchResult) String() string {
}
return rv
}
func (sr *SearchResult) Merge(other *SearchResult) {
sr.Hits = append(sr.Hits, other.Hits...)
sr.Total += other.Total
sr.Took += other.Took
if other.MaxScore > sr.MaxScore {
sr.MaxScore = other.MaxScore
}
sr.Facets.Merge(other.Facets)
}

View File

@ -83,7 +83,7 @@ func (fb *DateTimeFacetBuilder) Update(ft index.FieldTerms) {
}
}
func (fb *DateTimeFacetBuilder) Result() search.FacetResult {
func (fb *DateTimeFacetBuilder) Result() *search.FacetResult {
rv := search.FacetResult{
Field: fb.field,
Total: fb.total,
@ -143,5 +143,5 @@ OUTER:
}
rv.Other = fb.total - notOther
return rv
return &rv
}

View File

@ -82,7 +82,7 @@ func (fb *NumericFacetBuilder) Update(ft index.FieldTerms) {
}
}
func (fb *NumericFacetBuilder) Result() search.FacetResult {
func (fb *NumericFacetBuilder) Result() *search.FacetResult {
rv := search.FacetResult{
Field: fb.field,
Total: fb.total,
@ -136,5 +136,5 @@ OUTER:
}
rv.Other = fb.total - notOther
return rv
return &rv
}

View File

@ -49,7 +49,7 @@ func (fb *TermsFacetBuilder) Update(ft index.FieldTerms) {
}
}
func (fb *TermsFacetBuilder) Result() search.FacetResult {
func (fb *TermsFacetBuilder) Result() *search.FacetResult {
rv := search.FacetResult{
Field: fb.field,
Total: fb.total,
@ -100,5 +100,5 @@ OUTER:
}
rv.Other = fb.total - notOther
return rv
return &rv
}

View File

@ -10,12 +10,14 @@
package search
import (
"sort"
"github.com/blevesearch/bleve/index"
)
type FacetBuilder interface {
Update(index.FieldTerms)
Result() FacetResult
Result() *FacetResult
}
type FacetsBuilder struct {
@ -50,6 +52,24 @@ type TermFacet struct {
Count int `json:"count"`
}
type TermFacets []*TermFacet
func (tf TermFacets) Add(termFacet *TermFacet) TermFacets {
for _, existingTerm := range tf {
if termFacet.Term == existingTerm.Term {
existingTerm.Count += termFacet.Count
return tf
}
}
// if we got here it wasn't already in the existing terms
tf = append(tf, termFacet)
return tf
}
func (tf TermFacets) Len() int { return len(tf) }
func (tf TermFacets) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
func (tf TermFacets) Less(i, j int) bool { return tf[i].Count > tf[j].Count }
type NumericRangeFacet struct {
Name string `json:"name"`
Min *float64 `json:"min,omitempty"`
@ -57,6 +77,24 @@ type NumericRangeFacet struct {
Count int `json:"count"`
}
type NumericRangeFacets []*NumericRangeFacet
func (nrf NumericRangeFacets) Add(numericRangeFacet *NumericRangeFacet) NumericRangeFacets {
for _, existingNr := range nrf {
if numericRangeFacet.Min == existingNr.Min && numericRangeFacet.Max == existingNr.Max {
existingNr.Count += numericRangeFacet.Count
return nrf
}
}
// if we got here it wasn't already in the existing terms
nrf = append(nrf, numericRangeFacet)
return nrf
}
func (nrf NumericRangeFacets) Len() int { return len(nrf) }
func (nrf NumericRangeFacets) Swap(i, j int) { nrf[i], nrf[j] = nrf[j], nrf[i] }
func (nrf NumericRangeFacets) Less(i, j int) bool { return nrf[i].Count > nrf[j].Count }
type DateRangeFacet struct {
Name string `json:"name"`
Start *string `json:"start,omitempty"`
@ -64,17 +102,105 @@ type DateRangeFacet struct {
Count int `json:"count"`
}
type FacetResult struct {
Field string `json:"field"`
Total int `json:"total"`
Missing int `json:"missing"`
Other int `json:"other"`
Terms []*TermFacet `json:"terms,omitempty"`
NumericRanges []*NumericRangeFacet `json:"numeric_ranges,omitempty"`
DateRanges []*DateRangeFacet `json:"date_ranges,omitempty"`
type DateRangeFacets []*DateRangeFacet
func (drf DateRangeFacets) Add(dateRangeFacet *DateRangeFacet) DateRangeFacets {
for _, existingDr := range drf {
if dateRangeFacet.Start == existingDr.Start && dateRangeFacet.End == existingDr.End {
existingDr.Count += dateRangeFacet.Count
return drf
}
}
// if we got here it wasn't already in the existing terms
drf = append(drf, dateRangeFacet)
return drf
}
type FacetResults map[string]FacetResult
func (drf DateRangeFacets) Len() int { return len(drf) }
func (drf DateRangeFacets) Swap(i, j int) { drf[i], drf[j] = drf[j], drf[i] }
func (drf DateRangeFacets) Less(i, j int) bool { return drf[i].Count > drf[j].Count }
type FacetResult struct {
Field string `json:"field"`
Total int `json:"total"`
Missing int `json:"missing"`
Other int `json:"other"`
Terms TermFacets `json:"terms,omitempty"`
NumericRanges NumericRangeFacets `json:"numeric_ranges,omitempty"`
DateRanges DateRangeFacets `json:"date_ranges,omitempty"`
}
func (fr *FacetResult) Merge(other *FacetResult) {
fr.Total += other.Total
fr.Missing += other.Missing
fr.Other += other.Other
if fr.Terms != nil && other.Terms != nil {
for _, term := range other.Terms {
fr.Terms = fr.Terms.Add(term)
}
}
if fr.NumericRanges != nil && other.NumericRanges != nil {
for _, nr := range other.NumericRanges {
fr.NumericRanges = fr.NumericRanges.Add(nr)
}
}
if fr.DateRanges != nil && other.DateRanges != nil {
for _, dr := range other.DateRanges {
fr.DateRanges = fr.DateRanges.Add(dr)
}
}
}
func (fr *FacetResult) Fixup(size int) {
if fr.Terms != nil {
sort.Sort(fr.Terms)
if len(fr.Terms) > size {
moveToOther := fr.Terms[size:]
for _, mto := range moveToOther {
fr.Other += mto.Count
}
fr.Terms = fr.Terms[0:size]
}
} else if fr.NumericRanges != nil {
sort.Sort(fr.NumericRanges)
if len(fr.NumericRanges) > size {
moveToOther := fr.NumericRanges[size:]
for _, mto := range moveToOther {
fr.Other += mto.Count
}
fr.NumericRanges = fr.NumericRanges[0:size]
}
} else if fr.DateRanges != nil {
sort.Sort(fr.DateRanges)
if len(fr.DateRanges) > size {
moveToOther := fr.DateRanges[size:]
for _, mto := range moveToOther {
fr.Other += mto.Count
}
fr.DateRanges = fr.DateRanges[0:size]
}
}
}
type FacetResults map[string]*FacetResult
func (fr FacetResults) Merge(other FacetResults) {
for name, oFacetResult := range other {
facetResult, ok := fr[name]
if ok {
facetResult.Merge(oFacetResult)
} else {
fr[name] = oFacetResult
}
}
}
func (fr FacetResults) Fixup(name string, size int) {
facetResult, ok := fr[name]
if ok {
facetResult.Fixup(size)
}
}
func (fb *FacetsBuilder) Results() FacetResults {
fr := make(FacetResults)

View File

@ -0,0 +1,105 @@
package search
import (
"reflect"
"testing"
)
func TestFacetResultsMerge(t *testing.T) {
fr1 := &FacetResult{
Field: "type",
Total: 100,
Missing: 25,
Other: 25,
Terms: []*TermFacet{
&TermFacet{
Term: "blog",
Count: 25,
},
&TermFacet{
Term: "comment",
Count: 24,
},
&TermFacet{
Term: "feedback",
Count: 1,
},
},
}
fr1Only := &FacetResult{
Field: "category",
Total: 97,
Missing: 22,
Other: 15,
Terms: []*TermFacet{
&TermFacet{
Term: "clothing",
Count: 35,
},
&TermFacet{
Term: "electronics",
Count: 25,
},
},
}
frs1 := FacetResults{
"types": fr1,
"categories": fr1Only,
}
fr2 := &FacetResult{
Field: "type",
Total: 100,
Missing: 25,
Other: 25,
Terms: []*TermFacet{
&TermFacet{
Term: "blog",
Count: 25,
},
&TermFacet{
Term: "comment",
Count: 22,
},
&TermFacet{
Term: "flag",
Count: 3,
},
},
}
frs2 := FacetResults{
"types": fr2,
}
expectedFr := &FacetResult{
Field: "type",
Total: 200,
Missing: 50,
Other: 51,
Terms: []*TermFacet{
&TermFacet{
Term: "blog",
Count: 50,
},
&TermFacet{
Term: "comment",
Count: 46,
},
&TermFacet{
Term: "flag",
Count: 3,
},
},
}
expectedFrs := FacetResults{
"types": expectedFr,
"categories": fr1Only,
}
frs1.Merge(frs2)
frs1.Fixup("types", 3)
if !reflect.DeepEqual(frs1, expectedFrs) {
t.Errorf("expected %v, got %v", expectedFrs, frs1)
}
}

View File

@ -53,6 +53,10 @@ func (dm *DocumentMatch) AddFieldValue(name string, value interface{}) {
type DocumentMatchCollection []*DocumentMatch
func (c DocumentMatchCollection) Len() int { return len(c) }
func (c DocumentMatchCollection) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c DocumentMatchCollection) Less(i, j int) bool { return c[i].Score > c[j].Score }
type Searcher interface {
Next() (*DocumentMatch, error)
Advance(ID string) (*DocumentMatch, error)