Gibheer
10fb89a017
This function was needed as bytes.Fields had some problems with the quoting. Now there are test cases too so that errors can be found more easily.
239 lines
5.8 KiB
Go
239 lines
5.8 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"io/ioutil"
|
|
"log"
|
|
"sync"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
configPath = flag.String("config", "monwork.conf", "path to the config file")
|
|
)
|
|
|
|
type (
|
|
Config struct {
|
|
DB string `json:"db"`
|
|
CheckInterval string `json:"interval"`
|
|
}
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
raw, err := ioutil.ReadFile(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("could not read config: %s", err)
|
|
}
|
|
config := Config{}
|
|
if err := json.Unmarshal(raw, &config); err != nil {
|
|
log.Fatalf("could not parse config: %s", err)
|
|
}
|
|
|
|
checkInterval, err := time.ParseDuration(config.CheckInterval)
|
|
if err != nil {
|
|
log.Fatalf("could not parse check interval: %s", err)
|
|
}
|
|
|
|
db, err := sql.Open("postgres", config.DB)
|
|
if err != nil {
|
|
log.Fatalf("could not open database connection: %s", err)
|
|
}
|
|
|
|
go startNodeGen(db, checkInterval)
|
|
go startCommandGen(db, checkInterval)
|
|
go startConfigGen(db, checkInterval)
|
|
|
|
// don't exit, we have work to do
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
wg.Wait()
|
|
}
|
|
|
|
func startNodeGen(db *sql.DB, checkInterval time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("could not create transaction: %s", err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
_, err = tx.Exec(`update checks c
|
|
set updated = n.updated
|
|
from nodes n
|
|
where c.node_id = n.id
|
|
and c.last_refresh < n.updated;`)
|
|
if err != nil {
|
|
log.Printf("could not update nodes: %s", err)
|
|
tx.Rollback()
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
log.Printf("could not commit node updates: %s", err)
|
|
tx.Rollback()
|
|
}
|
|
time.Sleep(checkInterval)
|
|
}
|
|
}
|
|
|
|
func startCommandGen(db *sql.DB, checkInterval time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("could not create transaction: %s", err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
_, err = tx.Exec(`update checks c
|
|
set updated = co.updated
|
|
from commands co
|
|
where c.command_id = co.id
|
|
and c.last_refresh < co.updated;`)
|
|
if err != nil {
|
|
log.Printf("could not update checks: %s", err)
|
|
tx.Rollback()
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
log.Printf("could not commit command updates: %s", err)
|
|
tx.Rollback()
|
|
}
|
|
time.Sleep(checkInterval)
|
|
}
|
|
}
|
|
|
|
func startConfigGen(db *sql.DB, checkInterval time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("could not create transaction: %s", err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
rows, err := tx.Query(SQLGetConfigUpdates)
|
|
if err != nil {
|
|
log.Printf("could not get updates: %s", err)
|
|
tx.Rollback()
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
var (
|
|
check_id int
|
|
command string
|
|
options []byte
|
|
)
|
|
for rows.Next() {
|
|
if rows.Err() != nil {
|
|
log.Printf("could not receive rows: %s", err)
|
|
break
|
|
}
|
|
if err := rows.Scan(&check_id, &command, &options); err != nil {
|
|
log.Printf("could not scan row: %s", err)
|
|
break
|
|
}
|
|
}
|
|
if check_id == 0 {
|
|
tx.Rollback()
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
tmpl, err := template.New("command").Parse(command)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not parse command for check '%d': %s", check_id, err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
var cmd bytes.Buffer
|
|
var opts map[string]interface{}
|
|
if err := json.Unmarshal(options, &opts); err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not parse options for check '%d': %s", check_id, err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
if err := tmpl.Execute(&cmd, opts); err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not complete command for check '%d': %s", check_id, err)
|
|
time.Sleep(checkInterval)
|
|
continue
|
|
}
|
|
if _, err := tx.Exec(SQLRefreshActiveCheck, check_id, pq.Array(stringToShellFields(cmd.Bytes()))); err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not refresh check '%d': %s", check_id, err)
|
|
continue
|
|
}
|
|
if _, err := tx.Exec(SQLUpdateLastRefresh, check_id); err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not update timestamp for check '%d': %s", check_id, err)
|
|
continue
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
tx.Rollback()
|
|
log.Printf("could not commit changes: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func stringToShellFields(in []byte) [][]byte {
|
|
if len(in) == 0 {
|
|
return [][]byte{}
|
|
}
|
|
fields := bytes.Fields(in)
|
|
result := [][]byte{}
|
|
|
|
var quote byte
|
|
|
|
for _, field := range fields {
|
|
if quote == 0 && (field[0] != '\'' && field[0] != '"') {
|
|
result = append(result, field)
|
|
continue
|
|
}
|
|
if quote == 0 && (field[0] == '\'' || field[0] == '"') {
|
|
quote = field[0]
|
|
if field[len(field)-1] == quote {
|
|
result = append(result, field[1:len(field)-1])
|
|
quote = 0
|
|
continue
|
|
}
|
|
result = append(result, field[1:])
|
|
continue
|
|
}
|
|
idx := len(result) - 1
|
|
if bytes.HasSuffix(field, []byte{quote}) {
|
|
result[idx] = append(result[idx], append([]byte(" "), field[:len(field)-1]...)...)
|
|
quote = 0
|
|
continue
|
|
}
|
|
result[idx] = append(result[idx], append([]byte(" "), field...)...)
|
|
}
|
|
return result
|
|
}
|
|
|
|
var (
|
|
SQLGetConfigUpdates = `select c.id, co.command, c.options
|
|
from checks c
|
|
join commands co on c.command_id = co.id
|
|
where c.last_refresh < c.updated or c.last_refresh is null
|
|
limit 1
|
|
for update of c skip locked;`
|
|
SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, msg, mapping_id)
|
|
select c.id, $2, c.intval, c.enabled, case when ac.msg is null then '' else ac.msg end, case when c.mapping_id is not null then c.mapping_id when n.mapping_id is not null then n.mapping_id else 1 end
|
|
from checks c
|
|
left join active_checks ac on c.id = ac.check_id
|
|
left join nodes n on c.node_id = n.id
|
|
where c.id = $1
|
|
on conflict(check_id)
|
|
do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, mapping_id = excluded.mapping_id;`
|
|
SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;`
|
|
)
|