diff options
| author | Gibheer <gibheer+git@zero-knowledge.org> | 2018-11-16 20:10:15 +0100 | 
|---|---|---|
| committer | Gibheer <gibheer+git@zero-knowledge.org> | 2018-11-16 20:10:15 +0100 | 
| commit | 2a4332951a3f00da69b03560fcc96deee8fdc086 (patch) | |
| tree | f8d8194ae69071a334e3d628b043df4594600cc8 /cmd | |
| parent | 1960907547d5da614a73ea35247bbaf771ced3c5 (diff) | |
monwork - add new server
This server generates active checks from the check configuration.
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/monwork/main.go | 172 | 
1 files changed, 172 insertions, 0 deletions
diff --git a/cmd/monwork/main.go b/cmd/monwork/main.go new file mode 100644 index 0000000..48b43bd --- /dev/null +++ b/cmd/monwork/main.go @@ -0,0 +1,172 @@ +package main + +import ( +	"bytes" +	"database/sql" +	"encoding/json" +	"flag" +	"io/ioutil" +	"log" +	"sync" +	"text/template" +	"time" + +	"github.com/lib/pq" +) + +var ( +	configPath = flag.String("config", "monwork.conf", "path to the config file") +) + +type ( +	Config struct { +		DB            string `json:"db"` +		CheckInterval string `json:"interval"` +	} +) + +func main() { +	flag.Parse() + +	raw, err := ioutil.ReadFile(*configPath) +	if err != nil { +		log.Fatalf("could not read config: %s", err) +	} +	config := Config{} +	if err := json.Unmarshal(raw, &config); err != nil { +		log.Fatalf("could not parse config: %s", err) +	} + +	checkInterval, err := time.ParseDuration(config.CheckInterval) +	if err != nil { +		log.Fatalf("could not parse check interval: %s", err) +	} + +	db, err := sql.Open("postgres", config.DB) +	if err != nil { +		log.Fatalf("could not open database connection: %s", err) +	} + +	go startConfigGen(db, checkInterval) + +	// don't exit, we have work to do +	wg := sync.WaitGroup{} +	wg.Add(1) +	wg.Wait() +} + +func startConfigGen(db *sql.DB, checkInterval time.Duration) { +	for { +		tx, err := db.Begin() +		if err != nil { +			log.Printf("could not create transaction: %s", err) +			time.Sleep(checkInterval) +			continue +		} +		rows, err := tx.Query(SQLGetConfigUpdates) +		if err != nil { +			log.Printf("could not get updates: %s", err) +			tx.Rollback() +			time.Sleep(checkInterval) +			continue +		} +		var ( +			check_id int +			command  string +			options  []byte +		) +		for rows.Next() { +			if rows.Err() != nil { +				log.Printf("could not receive rows: %s", err) +				break +			} +			if err := rows.Scan(&check_id, &command, &options); err != nil { +				log.Printf("could not scan row: %s", err) +				break +			} +		} +		if check_id == 0 { +			tx.Rollback() +			time.Sleep(checkInterval) +			continue +		} +		tmpl, err := template.New("command").Parse(command) +		if err != nil { +			tx.Rollback() +			log.Printf("could not parse command for check '%d': %s", check_id, err) +			time.Sleep(checkInterval) +			continue +		} +		var cmd bytes.Buffer +		var opts map[string]interface{} +		if err := json.Unmarshal(options, &opts); err != nil { +			tx.Rollback() +			log.Printf("could not parse options for check '%d': %s", check_id, err) +			time.Sleep(checkInterval) +			continue +		} +		if err := tmpl.Execute(&cmd, opts); err != nil { +			tx.Rollback() +			log.Printf("could not complete command for check '%d': %s", check_id, err) +			time.Sleep(checkInterval) +			continue +		} +		if _, err := tx.Exec(SQLRefreshActiveCheck, check_id, pq.Array(stringToShellFields(cmd.Bytes()))); err != nil { +			tx.Rollback() +			log.Printf("could not refresh check '%d': %s", check_id, err) +			continue +		} +		if _, err := tx.Exec(SQLUpdateLastRefresh, check_id); err != nil { +			tx.Rollback() +			log.Printf("could not update timestamp for check '%d': %s", check_id, err) +			continue +		} +		if err := tx.Commit(); err != nil { +			tx.Rollback() +			log.Printf("could not commit changes: %s", err) +		} +	} +} + +func stringToShellFields(in []byte) [][]byte { +	if len(in) == 0 { +		return [][]byte{} +	} +	fields := bytes.Fields(in) +	result := [][]byte{fields[0]} + +	var quote byte + +	for _, field := range fields[1:] { +		if quote == 0 && (field[0] != '\'' && field[0] != '"') { +			result = append(result, field) +			continue +		} +		if quote == 0 && (field[0] == '\'' || field[0] == '"') { +			quote = field[0] +			result = append(result, field) +			continue +		} +		idx := len(result) - 1 +		result[idx] = append(result[idx], append([]byte(" "), field...)...) +		if quote != 0 && (bytes.HasSuffix(field, []byte{quote})) { +			quote = 0 +			continue +		} +	} +	return result +} + +var ( +	SQLGetConfigUpdates = `select c.id, co.command, c.options +	from checks c +	join commands co on c.command_id = co.id +	where c.last_refresh < c.updated +  limit 1 +	for update of c skip locked;` +	SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, notify) +select id, $2, intval, enabled, notify from checks where id = $1 +on conflict(check_id) +do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, notify = excluded.notify;` +	SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;` +)  | 
