0
0
Fork 0

handle read-only and in-mem only cases

This commit is contained in:
Marty Schoch 2017-12-11 09:07:01 -05:00
parent e8cc7ac0bf
commit 8280859bb8
3 changed files with 44 additions and 24 deletions

View File

@ -38,6 +38,7 @@ const Name = "scorch"
const Version uint8 = 1
type Scorch struct {
readOnly bool
version uint8
config map[string]interface{}
analysisQueue *index.AnalysisQueue
@ -67,6 +68,10 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
root: &IndexSnapshot{},
nextSnapshotEpoch: 1,
}
ro, ok := config["read_only"].(bool)
if ok {
rv.readOnly = ro
}
return rv, nil
}
@ -77,25 +82,35 @@ func (s *Scorch) Open() error {
return fmt.Errorf("must specify path")
}
if s.path == "" {
return os.ErrInvalid
s.unsafeBatch = true
}
err := os.MkdirAll(s.path, 0700)
if err != nil {
return err
var rootBoltOpt *bolt.Options
if s.readOnly {
rootBoltOpt = &bolt.Options{
ReadOnly: true,
}
} else {
if s.path != "" {
err := os.MkdirAll(s.path, 0700)
if err != nil {
return err
}
}
}
rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
var err error
if s.path != "" {
s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt)
if err != nil {
return err
}
s.rootBolt, err = bolt.Open(rootBoltPath, 0600, nil)
if err != nil {
return err
}
// now see if there is any existing state to load
err = s.loadFromBolt()
if err != nil {
return err
// now see if there is any existing state to load
err = s.loadFromBolt()
if err != nil {
return err
}
}
s.closeCh = make(chan struct{})
@ -104,8 +119,11 @@ func (s *Scorch) Open() error {
s.asyncTasks.Add(1)
go s.mainLoop()
s.asyncTasks.Add(1)
go s.persisterLoop()
if !s.readOnly && s.path != "" {
s.asyncTasks.Add(1)
go s.persisterLoop()
}
return nil
}
@ -117,12 +135,14 @@ func (s *Scorch) Close() (err error) {
s.asyncTasks.Wait()
// now close the root bolt
err = s.rootBolt.Close()
s.rootLock.Lock()
for _, segment := range s.root.segment {
cerr := segment.Close()
if err == nil {
err = cerr
if s.rootBolt != nil {
err = s.rootBolt.Close()
s.rootLock.Lock()
for _, segment := range s.root.segment {
cerr := segment.Close()
if err == nil {
err = cerr
}
}
}

View File

@ -582,7 +582,7 @@ func persistFields(memSegment *mem.Segment, w *CountHashWriter, dictLocs []uint6
// NOTE: update if you make the footer bigger
// crc + ver + chunk + field offset + stored offset + num docs
const footerSize = 4 + 4 + 4 + 8 + 8 + 8
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset uint64,
chunkFactor uint32, w *CountHashWriter) error {

View File

@ -105,7 +105,7 @@ func (s *Segment) loadConfig() error {
func (s *Segment) loadFields() error {
// NOTE for now we assume the fields index immediately preceeds the footer
// if this changes, need to adjust accordingly (or store epxlicit length)
fieldsIndexEnd := uint64(len(s.mm) - footerSize)
fieldsIndexEnd := uint64(len(s.mm) - FooterSize)
// iterate through fields index
var fieldID uint64