tasks.go

77 lines
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
package internal

import (
	"context"
	"strings"
	"sync"
	"time"

	"congo.gg/dev/models"
)

const maxOutputLines = 200

var (
	taskMu      sync.Mutex
	activeTasks = make(map[string]context.CancelFunc)
)

// RunTask executes fn in a goroutine, tracking progress in the Task record.
// The log function appends lines to the task output (stored in DB).
func RunTask(task *models.Task, fn func(ctx context.Context, log func(string))) {
	task.Status = "running"
	task.StartedAt = time.Now()
	models.Tasks.Insert(task)

	ctx, cancel := context.WithCancel(context.Background())
	taskMu.Lock()
	activeTasks[task.ID] = cancel
	taskMu.Unlock()

	go func() {
		defer func() {
			taskMu.Lock()
			delete(activeTasks, task.ID)
			taskMu.Unlock()
			cancel()
		}()

		fn(ctx, func(line string) {
			appendOutput(task, line)
		})

		if task.Status == "running" {
			task.Status = "completed"
		}
		task.EndedAt = time.Now()
		models.Tasks.Update(task)
	}()
}

// FailTask marks a task as failed with an error message.
func FailTask(task *models.Task, errMsg string) {
	task.Status = "failed"
	task.EndedAt = time.Now()
	appendOutput(task, "ERROR: "+errMsg)
	models.Tasks.Update(task)
}

// CancelTask cancels a running task by ID.
func CancelTask(taskID string) {
	taskMu.Lock()
	if cancel, ok := activeTasks[taskID]; ok {
		cancel()
	}
	taskMu.Unlock()
}

func appendOutput(task *models.Task, line string) {
	lines := strings.Split(task.Output, "\n")
	lines = append(lines, line)
	// Keep only the last maxOutputLines
	if len(lines) > maxOutputLines {
		lines = lines[len(lines)-maxOutputLines:]
	}
	task.Output = strings.Join(lines, "\n")
	models.Tasks.Update(task)
}