diff options
Diffstat (limited to 'cmd/monwork')
-rw-r--r-- | cmd/monwork/main.go | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/cmd/monwork/main.go b/cmd/monwork/main.go new file mode 100644 index 0000000..48b43bd --- /dev/null +++ b/cmd/monwork/main.go @@ -0,0 +1,172 @@ +package main + +import ( + "bytes" + "database/sql" + "encoding/json" + "flag" + "io/ioutil" + "log" + "sync" + "text/template" + "time" + + "github.com/lib/pq" +) + +var ( + configPath = flag.String("config", "monwork.conf", "path to the config file") +) + +type ( + Config struct { + DB string `json:"db"` + CheckInterval string `json:"interval"` + } +) + +func main() { + flag.Parse() + + raw, err := ioutil.ReadFile(*configPath) + if err != nil { + log.Fatalf("could not read config: %s", err) + } + config := Config{} + if err := json.Unmarshal(raw, &config); err != nil { + log.Fatalf("could not parse config: %s", err) + } + + checkInterval, err := time.ParseDuration(config.CheckInterval) + if err != nil { + log.Fatalf("could not parse check interval: %s", err) + } + + db, err := sql.Open("postgres", config.DB) + if err != nil { + log.Fatalf("could not open database connection: %s", err) + } + + go startConfigGen(db, checkInterval) + + // don't exit, we have work to do + wg := sync.WaitGroup{} + wg.Add(1) + wg.Wait() +} + +func startConfigGen(db *sql.DB, checkInterval time.Duration) { + for { + tx, err := db.Begin() + if err != nil { + log.Printf("could not create transaction: %s", err) + time.Sleep(checkInterval) + continue + } + rows, err := tx.Query(SQLGetConfigUpdates) + if err != nil { + log.Printf("could not get updates: %s", err) + tx.Rollback() + time.Sleep(checkInterval) + continue + } + var ( + check_id int + command string + options []byte + ) + for rows.Next() { + if rows.Err() != nil { + log.Printf("could not receive rows: %s", err) + break + } + if err := rows.Scan(&check_id, &command, &options); err != nil { + log.Printf("could not scan row: %s", err) + break + } + } + if check_id == 0 { + tx.Rollback() + time.Sleep(checkInterval) + continue + } + tmpl, err := template.New("command").Parse(command) + if err != nil { + tx.Rollback() + log.Printf("could not parse command for check '%d': %s", check_id, err) + time.Sleep(checkInterval) + continue + } + var cmd bytes.Buffer + var opts map[string]interface{} + if err := json.Unmarshal(options, &opts); err != nil { + tx.Rollback() + log.Printf("could not parse options for check '%d': %s", check_id, err) + time.Sleep(checkInterval) + continue + } + if err := tmpl.Execute(&cmd, opts); err != nil { + tx.Rollback() + log.Printf("could not complete command for check '%d': %s", check_id, err) + time.Sleep(checkInterval) + continue + } + if _, err := tx.Exec(SQLRefreshActiveCheck, check_id, pq.Array(stringToShellFields(cmd.Bytes()))); err != nil { + tx.Rollback() + log.Printf("could not refresh check '%d': %s", check_id, err) + continue + } + if _, err := tx.Exec(SQLUpdateLastRefresh, check_id); err != nil { + tx.Rollback() + log.Printf("could not update timestamp for check '%d': %s", check_id, err) + continue + } + if err := tx.Commit(); err != nil { + tx.Rollback() + log.Printf("could not commit changes: %s", err) + } + } +} + +func stringToShellFields(in []byte) [][]byte { + if len(in) == 0 { + return [][]byte{} + } + fields := bytes.Fields(in) + result := [][]byte{fields[0]} + + var quote byte + + for _, field := range fields[1:] { + if quote == 0 && (field[0] != '\'' && field[0] != '"') { + result = append(result, field) + continue + } + if quote == 0 && (field[0] == '\'' || field[0] == '"') { + quote = field[0] + result = append(result, field) + continue + } + idx := len(result) - 1 + result[idx] = append(result[idx], append([]byte(" "), field...)...) + if quote != 0 && (bytes.HasSuffix(field, []byte{quote})) { + quote = 0 + continue + } + } + return result +} + +var ( + SQLGetConfigUpdates = `select c.id, co.command, c.options + from checks c + join commands co on c.command_id = co.id + where c.last_refresh < c.updated + limit 1 + for update of c skip locked;` + SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, notify) +select id, $2, intval, enabled, notify from checks where id = $1 +on conflict(check_id) +do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, notify = excluded.notify;` + SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;` +) |