package main import ( "bytes" "context" "database/sql" "encoding/json" "flag" "io/ioutil" "log" "os/exec" "sync" "syscall" "time" "github.com/lib/pq" ) var ( configPath = flag.String("config", "moncheck.conf", "path to the config file") ) type ( Config struct { DB string `json:"db"` Wait string `json:"wait"` } ) func main() { flag.Parse() raw, err := ioutil.ReadFile(*configPath) if err != nil { log.Fatalf("could not read config: %s", err) } config := Config{} if err := json.Unmarshal(raw, &config); err != nil { log.Fatalf("could not parse config: %s", err) } waitDuration, err := time.ParseDuration(config.Wait) if err != nil { log.Fatalf("could not parse wait duration: %s", err) } db, err := sql.Open("postgres", config.DB) if err != nil { log.Fatalf("could not open database connection: %s", err) } for i := 0; i < 25; i++ { go check(i, db, waitDuration) } wg := sync.WaitGroup{} wg.Add(1) wg.Wait() } func check(thread int, db *sql.DB, waitDuration time.Duration) { for { tx, err := db.Begin() if err != nil { log.Printf("[%d] could not start transaction: %s", thread, err) continue } rows, err := tx.Query("select check_id, cmdLine, states, notify from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;") if err != nil { log.Printf("[%d] could not start query: %s", thread, err) tx.Rollback() continue } var ( id int64 cmdLine []string states []int64 notify bool ) found := false for rows.Next() { if err := rows.Err(); err != nil { log.Printf("could not fetch row: %s", err) tx.Rollback() break } if err := rows.Scan(&id, pq.Array(&cmdLine), pq.Array(&states), ¬ify); err != nil { log.Printf("could not scan values: %s", err) tx.Rollback() break } found = true } if !found { time.Sleep(waitDuration) tx.Rollback() continue } statePos := 5 if len(states) < 6 { statePos = len(states) } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...) output := bytes.NewBuffer([]byte{}) cmd.Stdout = output cmd.Stderr = output if err != nil && err == context.Canceled { log.Printf("[%d] check took too long: %s", id, err) // TODO which state to choose? // TODO add notification handler // TODO all this casting should be done better states = append([]int64{1}, states[:statePos]...) } else if err != nil { cancel() exitErr, ok := err.(*exec.ExitError) if !ok { log.Printf("[%d]error running check: %s", id, err) states = append([]int64{1}, states[:statePos]...) } else { status, ok := exitErr.Sys().(syscall.WaitStatus) if !ok { } else { states = append([]int64{int64(status.ExitStatus())}, states[:statePos]...) } } } else { cancel() states = append([]int64{0}, states[:statePos]...) } if _, err := tx.Exec("update active_checks set next_time = now() + intval, states = $2 where check_id = $1", id, pq.Array(&states)); err != nil { log.Printf("[%d] could not update row '%d': %s", thread, id, err) tx.Rollback() continue } if notify { if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", &id, pq.Array(&states), output.Bytes()); err != nil { log.Printf("[%d] could not create notification for '%d': %s", thread, id, err) tx.Rollback() continue } } tx.Commit() } }