// Copyright (c) 2016 Couchbase, Inc. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the // License. You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an "AS // IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language // governing permissions and limitations under the License. // Package moss provides a KVStore implementation based on the // github.com/couchbaselabs/moss library. package moss import ( "encoding/json" "fmt" "sync" "github.com/couchbase/moss" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/registry" ) // RegistryCollectionOptions should be treated as read-only after // process init()'ialization. var RegistryCollectionOptions = map[string]moss.CollectionOptions{} const Name = "moss" type Store struct { m sync.Mutex ms moss.Collection mo store.MergeOperator llstore store.KVStore s *stats } // New initializes a moss storage with values from the optional // config["mossCollectionOptions"] (a JSON moss.CollectionOptions). // Next, values from the RegistryCollectionOptions, named by the // optional config["mossCollectionOptionsName"], take precedence. // Finally, base case defaults are taken from // moss.DefaultCollectionOptions. func New(mo store.MergeOperator, config map[string]interface{}) ( store.KVStore, error) { options := moss.DefaultCollectionOptions // Copy. v, ok := config["mossCollectionOptionsName"] if ok { name, ok := v.(string) if !ok { return nil, fmt.Errorf("moss store,"+ " could not parse config[mossCollectionOptionsName]: %v", v) } options, ok = RegistryCollectionOptions[name] // Copy. if !ok { return nil, fmt.Errorf("moss store,"+ " could not find RegistryCollectionOptions, name: %s", name) } } v, ok = config["mossCollectionOptions"] if ok { b, err := json.Marshal(v) // Convert from map[string]interface{}. if err != nil { return nil, fmt.Errorf("moss store,"+ " could not marshal config[mossCollectionOptions]: %v, err: %v", v, err) } err = json.Unmarshal(b, &options) if err != nil { return nil, fmt.Errorf("moss store,"+ " could not unmarshal config[mossCollectionOptions]: %v, err: %v", v, err) } } // -------------------------------------------------- if options.Log == nil || options.Debug <= 0 { options.Log = func(format string, a ...interface{}) {} } // -------------------------------------------------- mossLowerLevelStoreName := "" v, ok = config["mossLowerLevelStoreName"] if ok { mossLowerLevelStoreName, ok = v.(string) if !ok { return nil, fmt.Errorf("moss store,"+ " could not parse config[mossLowerLevelStoreName]: %v", v) } } var llStore store.KVStore if options.LowerLevelInit == nil && options.LowerLevelUpdate == nil && mossLowerLevelStoreName != "" { mossLowerLevelStoreConfig := map[string]interface{}{} v, ok := config["mossLowerLevelStoreConfig"] if ok { mossLowerLevelStoreConfig, ok = v.(map[string]interface{}) if !ok { return nil, fmt.Errorf("moss store, initLowerLevelStore,"+ " could parse mossLowerLevelStoreConfig: %v", v) } } mossLowerLevelMaxBatchSize := uint64(0) v, ok = config["mossLowerLevelMaxBatchSize"] if ok { mossLowerLevelMaxBatchSizeF, ok := v.(float64) if !ok { return nil, fmt.Errorf("moss store,"+ " could not parse config[mossLowerLevelMaxBatchSize]: %v", v) } mossLowerLevelMaxBatchSize = uint64(mossLowerLevelMaxBatchSizeF) } lowerLevelInit, lowerLevelUpdate, lowerLevelStore, err := initLowerLevelStore(mo, config, mossLowerLevelStoreName, mossLowerLevelStoreConfig, mossLowerLevelMaxBatchSize, options.Log) if err != nil { return nil, err } options.LowerLevelInit = lowerLevelInit options.LowerLevelUpdate = lowerLevelUpdate llStore = lowerLevelStore } // -------------------------------------------------- options.MergeOperator = mo ms, err := moss.NewCollection(options) if err != nil { return nil, err } err = ms.Start() if err != nil { return nil, err } rv := Store{ ms: ms, mo: mo, llstore: llStore, } rv.s = &stats{s: &rv} return &rv, nil } func (s *Store) Close() error { return s.ms.Close() } func (s *Store) Reader() (store.KVReader, error) { ss, err := s.ms.Snapshot() if err != nil { return nil, err } return &Reader{ss: ss}, nil } func (s *Store) Writer() (store.KVWriter, error) { return &Writer{s: s}, nil } func (s *Store) Logf(fmt string, args ...interface{}) { options := s.ms.Options() if options.Log != nil { options.Log(fmt, args...) } } func (s *Store) Stats() json.Marshaler { return s.s } func (s *Store) StatsMap() map[string]interface{} { return s.s.statsMap() } func init() { registry.RegisterKVStore(Name, New) }