21.4 KB751 lines
Blame
1import { spawn, execFileSync } from "child_process";
2import { mkdirSync, rmSync } from "fs";
3import {
4 createCipheriv,
5 createDecipheriv,
6 randomBytes,
7 createHash,
8} from "crypto";
9import type Database from "better-sqlite3";
10import { parse as parseYaml } from "yaml";
11import { minimatch } from "minimatch";
12import type { CanopyEventBus } from "./canopy-events.js";
13
14interface PipelineConfig {
15 name: string;
16 on: {
17 push?: { branches?: string[]; paths?: string[] };
18 };
19 checkout?: boolean;
20 concurrency?: number;
21 order?: number;
22 env?: Record<string, string>;
23 steps: StepConfig[];
24}
25
26interface StepConfig {
27 name: string;
28 image: string;
29 run: string;
30 env?: Record<string, string>;
31 timeout?: number;
32 volumes?: string[];
33}
34
35export interface PushEvent {
36 repo: string;
37 branch: string;
38 oldCommitId: string;
39 newCommitId: string;
40}
41
42export class CanopyRunner {
43 private running = new Map<number, AbortController>();
44
45 constructor(
46 private db: Database.Database,
47 private bridgeUrl: string,
48 private workspaceDir: string,
49 private workspaceHostDir: string,
50 private jwtSecret: string,
51 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void },
52 private eventBus?: CanopyEventBus
53 ) {
54 mkdirSync(workspaceDir, { recursive: true });
55 this.recoverOrphanedRuns();
56 }
57
58 /**
59 * On startup, mark any "running" pipeline runs/steps as failed.
60 * These are orphans from a previous process crash (e.g. deploy step
61 * restarted grove-api before the callback could fire).
62 */
63 private recoverOrphanedRuns(): void {
64 const steps = this.db
65 .prepare(
66 `UPDATE pipeline_steps SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
67 )
68 .run();
69 const runs = this.db
70 .prepare(
71 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
72 )
73 .run();
74 if (steps.changes > 0 || runs.changes > 0) {
75 this.logger.info(
76 `Recovered ${runs.changes} orphaned pipeline run(s), ${steps.changes} step(s)`
77 );
78 }
79 }
80
81 private getRunRow(runId: number): Record<string, unknown> | undefined {
82 return this.db.prepare(`SELECT * FROM pipeline_runs WHERE id = ?`).get(runId) as any;
83 }
84
85 private getStepRow(stepId: number): Record<string, unknown> | undefined {
86 return this.db.prepare(`SELECT * FROM pipeline_steps WHERE id = ?`).get(stepId) as any;
87 }
88
89 private ensureRepo(repoName: string): number | null {
90 const existing = this.db
91 .prepare(`SELECT id FROM repos WHERE name = ?`)
92 .get(repoName) as any;
93 if (existing) return existing.id;
94
95 try {
96 // Ensure a placeholder user exists for FK constraint
97 this.db
98 .prepare(
99 `INSERT OR IGNORE INTO users (id, username, display_name) VALUES (0, '_system', 'System')`
100 )
101 .run();
102 const result = this.db
103 .prepare(`INSERT INTO repos (name, owner_id) VALUES (?, 0)`)
104 .run(repoName);
105 return Number(result.lastInsertRowid);
106 } catch {
107 return null;
108 }
109 }
110
111 private ensurePipeline(repoId: number, name: string, file: string): number {
112 const existing = this.db
113 .prepare(`SELECT id FROM pipelines WHERE repo_id = ? AND name = ?`)
114 .get(repoId, name) as any;
115 if (existing) {
116 this.db
117 .prepare(`UPDATE pipelines SET file = ? WHERE id = ?`)
118 .run(file, existing.id);
119 return existing.id;
120 }
121 const result = this.db
122 .prepare(`INSERT INTO pipelines (repo_id, name, file) VALUES (?, ?, ?)`)
123 .run(repoId, name, file);
124 return Number(result.lastInsertRowid);
125 }
126
127 async onPush(event: PushEvent, pipelineFilter?: string): Promise<void> {
128 const repoId = this.ensureRepo(event.repo);
129 if (!repoId) return;
130
131 const configs = await this.readPipelineConfigs(event.repo, event.branch);
132 if (configs.length === 0) return;
133
134 let changedFiles: string[] | null = null;
135 if (event.oldCommitId && event.oldCommitId !== event.newCommitId) {
136 changedFiles = await this.getChangedFiles(
137 event.repo,
138 event.oldCommitId,
139 event.newCommitId
140 );
141 }
142
143 // Fetch commit message from bridge
144 const commitMessage = await this.getCommitMessage(event.repo, event.newCommitId);
145
146 const matched = configs
147 .filter(({ file, config }) => this.matchesTrigger(config, file, event.branch, changedFiles))
148 .filter(({ config }) => !pipelineFilter || config.name === pipelineFilter)
149 .sort((a, b) => (a.config.order ?? 0) - (b.config.order ?? 0));
150 if (matched.length === 0) return;
151
152 type QueuedRun = {
153 order: number;
154 runId: number;
155 config: PipelineConfig;
156 };
157
158 const queuedRuns: QueuedRun[] = [];
159 for (const { file, config } of matched) {
160 // Cancel in-progress runs of this pipeline if concurrency is limited
161 if (config.concurrency != null && config.concurrency >= 1) {
162 const pipelineId = this.ensurePipeline(repoId, config.name, file);
163 const active = this.db
164 .prepare(
165 `SELECT id FROM pipeline_runs
166 WHERE pipeline_id = ? AND status IN ('pending', 'running')
167 ORDER BY id`
168 )
169 .all(pipelineId) as any[];
170
171 const toCancel = active.slice(0, Math.max(0, active.length - config.concurrency + 1));
172 for (const row of toCancel) {
173 this.cancelRun(row.id);
174 this.db
175 .prepare(
176 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ? AND status IN ('pending', 'running')`
177 )
178 .run(row.id);
179 this.db
180 .prepare(
181 `UPDATE pipeline_steps SET status = 'skipped' WHERE run_id = ? AND status IN ('pending', 'running')`
182 )
183 .run(row.id);
184 this.eventBus?.publish({
185 type: "run:cancelled",
186 runId: row.id,
187 repoId,
188 status: "cancelled",
189 run: this.getRunRow(row.id),
190 ts: new Date().toISOString(),
191 });
192 }
193 }
194
195 const runId = this.createRun(
196 repoId,
197 config.name,
198 file,
199 "push",
200 event.branch,
201 event.newCommitId,
202 commitMessage,
203 config.steps
204 );
205 queuedRuns.push({ order: config.order ?? 0, runId, config });
206 }
207
208 // Execute queued runs in the background so push handling can return quickly.
209 // Order groups are still respected: same order runs in parallel, higher orders
210 // wait for lower-order groups to finish.
211 void (async () => {
212 const groups = new Map<number, QueuedRun[]>();
213 for (const run of queuedRuns) {
214 if (!groups.has(run.order)) groups.set(run.order, []);
215 groups.get(run.order)!.push(run);
216 }
217
218 for (const [, group] of [...groups.entries()].sort(([a], [b]) => a - b)) {
219 await Promise.all(
220 group.map((run) =>
221 this.executePipeline(run.runId, run.config, event.repo, event.branch).catch(
222 (err) => this.logger.error({ err, runId: run.runId }, "Pipeline execution failed")
223 )
224 )
225 );
226 }
227 })().catch((err) =>
228 this.logger.error(
229 { err, repo: event.repo, branch: event.branch, commit: event.newCommitId },
230 "Queued push execution failed"
231 )
232 );
233 }
234
235 private async getCommitMessage(repo: string, commitId: string): Promise<string | null> {
236 try {
237 const res = await fetch(
238 `${this.bridgeUrl}/repos/${repo}/commit/${commitId}/history?limit=1`
239 );
240 if (!res.ok) return null;
241 const data = await res.json();
242 const commit = data.commits?.[0];
243 if (!commit?.message) return null;
244 return commit.message.split("\n")[0];
245 } catch {
246 return null;
247 }
248 }
249
250 // --- Pipeline config reading ---
251
252 private async readPipelineConfigs(
253 repo: string,
254 ref: string
255 ): Promise<Array<{ file: string; config: PipelineConfig }>> {
256 try {
257 const treeRes = await fetch(
258 `${this.bridgeUrl}/repos/${repo}/tree/${ref}/.canopy`
259 );
260 if (!treeRes.ok) return [];
261 const tree = await treeRes.json();
262
263 const results: Array<{ file: string; config: PipelineConfig }> = [];
264 for (const entry of tree.entries) {
265 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
266 continue;
267
268 const blobRes = await fetch(
269 `${this.bridgeUrl}/repos/${repo}/blob/${ref}/.canopy/${entry.name}`
270 );
271 if (!blobRes.ok) continue;
272 const blob = await blobRes.json();
273
274 try {
275 const config = parseYaml(blob.content) as PipelineConfig;
276 if (config.name && config.on && config.steps) {
277 results.push({ file: `.canopy/${entry.name}`, config });
278 }
279 } catch {
280 /* invalid YAML, skip */
281 }
282 }
283 return results;
284 } catch {
285 return [];
286 }
287 }
288
289 // --- Trigger matching ---
290
291 private matchesTrigger(
292 config: PipelineConfig,
293 file: string,
294 branch: string,
295 changedFiles: string[] | null
296 ): boolean {
297 const push = config.on.push;
298 if (!push) return false;
299
300 if (push.branches && push.branches.length > 0) {
301 const branchMatches = push.branches.some((pattern) =>
302 minimatch(branch, pattern)
303 );
304 if (!branchMatches) return false;
305 }
306
307 if (push.paths && push.paths.length > 0 && changedFiles !== null) {
308 // Always trigger if the pipeline's own config file was changed
309 const selfChanged = changedFiles.includes(file);
310 if (!selfChanged) {
311 const anyPathMatches = changedFiles.some((f) =>
312 push.paths!.some((pattern) => minimatch(f, pattern))
313 );
314 if (!anyPathMatches) return false;
315 }
316 }
317
318 return true;
319 }
320
321 private async getChangedFiles(
322 repo: string,
323 oldId: string,
324 newId: string
325 ): Promise<string[] | null> {
326 try {
327 const diffRes = await fetch(
328 `${this.bridgeUrl}/repos/${repo}/diff/${oldId}/${newId}`
329 );
330 if (!diffRes.ok) return null;
331 const data = await diffRes.json();
332 return (data.diffs || []).map((d: any) => d.path);
333 } catch {
334 return null;
335 }
336 }
337
338 // --- Source checkout via grove-bridge API ---
339
340 private async checkoutFromBridge(
341 repo: string,
342 ref: string,
343 destPath: string
344 ): Promise<void> {
345 const url = `${this.bridgeUrl}/repos/${repo}/archive/${ref}`;
346 const res = await fetch(url);
347 if (!res.ok) {
348 throw new Error(`Archive fetch failed: ${res.status} ${res.statusText}`);
349 }
350
351 mkdirSync(destPath, { recursive: true });
352
353 const tarData = Buffer.from(await res.arrayBuffer());
354 execFileSync("tar", ["xf", "-", "-C", destPath], { input: tarData });
355 }
356
357 // --- Run/step record creation ---
358
359 private createRun(
360 repoId: number,
361 pipelineName: string,
362 pipelineFile: string,
363 triggerType: string,
364 triggerRef: string,
365 commitId: string,
366 commitMessage: string | null,
367 steps: StepConfig[]
368 ): number {
369 const pipelineId = this.ensurePipeline(repoId, pipelineName, pipelineFile);
370
371 const result = this.db
372 .prepare(
373 `INSERT INTO pipeline_runs (pipeline_id, repo_id, pipeline_name, pipeline_file, trigger_type, trigger_ref, commit_id, commit_message, status)
374 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending')`
375 )
376 .run(pipelineId, repoId, pipelineName, pipelineFile, triggerType, triggerRef, commitId, commitMessage);
377
378 const runId = Number(result.lastInsertRowid);
379
380 const insertStep = this.db.prepare(
381 `INSERT INTO pipeline_steps (run_id, step_index, name, image, status)
382 VALUES (?, ?, ?, ?, 'pending')`
383 );
384
385 for (let i = 0; i < steps.length; i++) {
386 insertStep.run(runId, i, steps[i].name, steps[i].image);
387 }
388
389 this.eventBus?.publish({
390 type: "run:created",
391 runId,
392 repoId,
393 status: "pending",
394 run: this.getRunRow(runId),
395 ts: new Date().toISOString(),
396 });
397
398 return runId;
399 }
400
401 // --- Pipeline execution ---
402
403 private async executePipeline(
404 runId: number,
405 config: PipelineConfig,
406 repoName: string,
407 ref: string
408 ): Promise<void> {
409 const abort = new AbortController();
410 this.running.set(runId, abort);
411
412 const repoRow = this.db
413 .prepare(`SELECT id FROM repos WHERE name = ?`)
414 .get(repoName) as any;
415 const repoId = repoRow?.id ?? 0;
416
417 this.db
418 .prepare(
419 `UPDATE pipeline_runs SET status = 'running', started_at = datetime('now') WHERE id = ?`
420 )
421 .run(runId);
422
423 this.eventBus?.publish({
424 type: "run:started",
425 runId,
426 repoId,
427 status: "running",
428 run: this.getRunRow(runId),
429 ts: new Date().toISOString(),
430 });
431
432 const workspacePath = `${this.workspaceDir}/${runId}`;
433 const srcPath = `${workspacePath}/src`;
434 mkdirSync(srcPath, { recursive: true });
435
436 if (config.checkout !== false) {
437 try {
438 await this.checkoutFromBridge(repoName, ref, srcPath);
439 } catch (err) {
440 this.logger.error({ err, runId }, "Failed to checkout repo for pipeline");
441 this.db
442 .prepare(
443 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE id = ?`
444 )
445 .run(runId);
446 this.eventBus?.publish({
447 type: "run:completed",
448 runId,
449 repoId,
450 status: "failed",
451 run: this.getRunRow(runId),
452 ts: new Date().toISOString(),
453 });
454 this.running.delete(runId);
455 return;
456 }
457 }
458
459 const secrets = repoRow ? this.loadSecrets(repoRow.id) : {};
460
461 const steps = this.db
462 .prepare(
463 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
464 )
465 .all(runId) as any[];
466 let allPassed = true;
467
468 for (const step of steps) {
469 if (abort.signal.aborted || !allPassed) {
470 this.db
471 .prepare(`UPDATE pipeline_steps SET status = 'skipped' WHERE id = ?`)
472 .run(step.id);
473 this.eventBus?.publish({
474 type: "step:skipped",
475 runId,
476 repoId,
477 stepId: step.id,
478 stepIndex: step.step_index,
479 status: "skipped",
480 step: this.getStepRow(step.id),
481 ts: new Date().toISOString(),
482 });
483 continue;
484 }
485
486 const stepConfig = config.steps[step.step_index];
487 const mergedEnv = { ...config.env, ...stepConfig.env };
488 const resolvedEnv = this.resolveSecrets(mergedEnv, secrets);
489
490 const passed = await this.executeStep(
491 runId,
492 repoId,
493 step.id,
494 step.step_index,
495 stepConfig,
496 resolvedEnv,
497 `${workspacePath}/src`,
498 abort.signal
499 );
500 if (!passed) allPassed = false;
501 }
502
503 const run = this.db
504 .prepare(`SELECT started_at FROM pipeline_runs WHERE id = ?`)
505 .get(runId) as any;
506 const durationMs = run?.started_at
507 ? Date.now() - new Date(run.started_at + "Z").getTime()
508 : 0;
509
510 const finalStatus = allPassed ? "passed" : "failed";
511 this.db
512 .prepare(
513 `UPDATE pipeline_runs SET status = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
514 )
515 .run(finalStatus, durationMs, runId);
516
517 this.eventBus?.publish({
518 type: "run:completed",
519 runId,
520 repoId,
521 status: finalStatus,
522 run: this.getRunRow(runId),
523 ts: new Date().toISOString(),
524 });
525
526 try {
527 rmSync(workspacePath, { recursive: true, force: true });
528 } catch {}
529 this.running.delete(runId);
530 }
531
532 // --- Step execution (Docker container) ---
533
534 private toHostPath(containerPath: string): string {
535 if (containerPath.startsWith(this.workspaceDir)) {
536 return this.workspaceHostDir + containerPath.slice(this.workspaceDir.length);
537 }
538 return containerPath;
539 }
540
541 private executeStep(
542 runId: number,
543 repoId: number,
544 stepId: number,
545 stepIndex: number,
546 step: StepConfig,
547 env: Record<string, string>,
548 workspacePath: string,
549 signal: AbortSignal
550 ): Promise<boolean> {
551 return new Promise((resolve) => {
552 this.db
553 .prepare(
554 `UPDATE pipeline_steps SET status = 'running', started_at = datetime('now') WHERE id = ?`
555 )
556 .run(stepId);
557
558 this.eventBus?.publish({
559 type: "step:started",
560 runId,
561 repoId,
562 stepId,
563 stepIndex,
564 status: "running",
565 step: this.getStepRow(stepId),
566 ts: new Date().toISOString(),
567 });
568
569 const timeout = (step.timeout ?? 600) * 1000;
570 const envArgs = Object.entries({
571 GROVE_REGISTRY: "localhost:5000",
572 ...env,
573 }).flatMap(([k, v]) => ["-e", `${k}=${v}`]);
574
575 const hostWorkspacePath = this.toHostPath(workspacePath);
576
577 const volumeArgs = [
578 "-v", `${hostWorkspacePath}:/workspace`,
579 "-v", "/var/run/docker.sock:/var/run/docker.sock",
580 "-v", "/opt/grove:/opt/grove:ro",
581 ...(step.volumes ?? []).flatMap((v) => ["-v", v]),
582 ];
583
584 const containerName = `canopy-${runId}-${stepIndex}`;
585 const proc = spawn("docker", [
586 "run",
587 "--rm",
588 "--name",
589 containerName,
590 ...volumeArgs,
591 "-w",
592 "/workspace",
593 "--network",
594 "host",
595 ...envArgs,
596 step.image,
597 "sh",
598 "-ec",
599 step.run,
600 ]);
601
602 const insertLog = this.db.prepare(
603 `INSERT INTO step_logs (step_id, stream, content) VALUES (?, ?, ?)`
604 );
605
606 proc.stdout.on("data", (data: Buffer) => {
607 const content = data.toString();
608 insertLog.run(stepId, "stdout", content);
609 this.eventBus?.publish({
610 type: "log:append",
611 runId,
612 repoId,
613 stepId,
614 stepIndex,
615 log: { stream: "stdout", content, created_at: new Date().toISOString() },
616 ts: new Date().toISOString(),
617 });
618 });
619
620 proc.stderr.on("data", (data: Buffer) => {
621 const content = data.toString();
622 insertLog.run(stepId, "stderr", content);
623 this.eventBus?.publish({
624 type: "log:append",
625 runId,
626 repoId,
627 stepId,
628 stepIndex,
629 log: { stream: "stderr", content, created_at: new Date().toISOString() },
630 ts: new Date().toISOString(),
631 });
632 });
633
634 const timer = setTimeout(() => {
635 proc.kill("SIGTERM");
636 insertLog.run(
637 stepId,
638 "stderr",
639 `Step timed out after ${step.timeout ?? 600}s`
640 );
641 }, timeout);
642
643 const abortHandler = () => {
644 spawn("docker", ["kill", containerName]).on("close", () => {
645 proc.kill("SIGTERM");
646 });
647 insertLog.run(stepId, "stderr", "Step cancelled");
648 };
649 signal.addEventListener("abort", abortHandler, { once: true });
650
651 proc.on("close", (code) => {
652 clearTimeout(timer);
653 signal.removeEventListener("abort", abortHandler);
654
655 const startedAt = this.db
656 .prepare(`SELECT started_at FROM pipeline_steps WHERE id = ?`)
657 .get(stepId) as any;
658 const durationMs = startedAt?.started_at
659 ? Date.now() - new Date(startedAt.started_at + "Z").getTime()
660 : 0;
661
662 const passed = code === 0;
663 const stepStatus = passed ? "passed" : "failed";
664 this.db
665 .prepare(
666 `UPDATE pipeline_steps SET status = ?, exit_code = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
667 )
668 .run(stepStatus, code, durationMs, stepId);
669
670 this.eventBus?.publish({
671 type: "step:completed",
672 runId,
673 repoId,
674 stepId,
675 stepIndex,
676 status: stepStatus,
677 step: this.getStepRow(stepId),
678 ts: new Date().toISOString(),
679 });
680
681 resolve(passed);
682 });
683 });
684 }
685
686 // --- Secrets ---
687
688 private deriveKey(): Buffer {
689 return createHash("sha256")
690 .update(this.jwtSecret + ":canopy-secrets")
691 .digest();
692 }
693
694 encryptSecret(plaintext: string): string {
695 const key = this.deriveKey();
696 const iv = randomBytes(12);
697 const cipher = createCipheriv("aes-256-gcm", key, iv);
698 const encrypted = Buffer.concat([
699 cipher.update(plaintext, "utf8"),
700 cipher.final(),
701 ]);
702 const tag = cipher.getAuthTag();
703 return Buffer.concat([iv, tag, encrypted]).toString("base64");
704 }
705
706 private decrypt(encoded: string): string {
707 const key = this.deriveKey();
708 const buf = Buffer.from(encoded, "base64");
709 const iv = buf.subarray(0, 12);
710 const tag = buf.subarray(12, 28);
711 const data = buf.subarray(28);
712 const decipher = createDecipheriv("aes-256-gcm", key, iv);
713 decipher.setAuthTag(tag);
714 return decipher.update(data).toString("utf8") + decipher.final("utf8");
715 }
716
717 private loadSecrets(repoId: number): Record<string, string> {
718 const rows = this.db
719 .prepare(
720 `SELECT name, encrypted_value FROM canopy_secrets WHERE repo_id = ?`
721 )
722 .all(repoId) as any[];
723 const secrets: Record<string, string> = {};
724 for (const row of rows) {
725 try {
726 secrets[row.name] = this.decrypt(row.encrypted_value);
727 } catch {}
728 }
729 return secrets;
730 }
731
732 private resolveSecrets(
733 env: Record<string, string>,
734 secrets: Record<string, string>
735 ): Record<string, string> {
736 const resolved: Record<string, string> = {};
737 for (const [key, value] of Object.entries(env)) {
738 resolved[key] = value.replace(
739 /\$\{\{\s*secrets\.(\w+)\s*\}\}/g,
740 (_, name) => secrets[name] ?? ""
741 );
742 }
743 return resolved;
744 }
745
746 cancelRun(runId: number): void {
747 const controller = this.running.get(runId);
748 if (controller) controller.abort();
749 }
750}
751