tasks.go
77 lines1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package internal
import (
"context"
"strings"
"sync"
"time"
"congo.gg/dev/models"
)
const maxOutputLines = 200
var (
taskMu sync.Mutex
activeTasks = make(map[string]context.CancelFunc)
)
// RunTask executes fn in a goroutine, tracking progress in the Task record.
// The log function appends lines to the task output (stored in DB).
func RunTask(task *models.Task, fn func(ctx context.Context, log func(string))) {
task.Status = "running"
task.StartedAt = time.Now()
models.Tasks.Insert(task)
ctx, cancel := context.WithCancel(context.Background())
taskMu.Lock()
activeTasks[task.ID] = cancel
taskMu.Unlock()
go func() {
defer func() {
taskMu.Lock()
delete(activeTasks, task.ID)
taskMu.Unlock()
cancel()
}()
fn(ctx, func(line string) {
appendOutput(task, line)
})
if task.Status == "running" {
task.Status = "completed"
}
task.EndedAt = time.Now()
models.Tasks.Update(task)
}()
}
// FailTask marks a task as failed with an error message.
func FailTask(task *models.Task, errMsg string) {
task.Status = "failed"
task.EndedAt = time.Now()
appendOutput(task, "ERROR: "+errMsg)
models.Tasks.Update(task)
}
// CancelTask cancels a running task by ID.
func CancelTask(taskID string) {
taskMu.Lock()
if cancel, ok := activeTasks[taskID]; ok {
cancel()
}
taskMu.Unlock()
}
func appendOutput(task *models.Task, line string) {
lines := strings.Split(task.Output, "\n")
lines = append(lines, line)
// Keep only the last maxOutputLines
if len(lines) > maxOutputLines {
lines = lines[len(lines)-maxOutputLines:]
}
task.Output = strings.Join(lines, "\n")
models.Tasks.Update(task)
}