21.3 KB750 lines
Blame
1import { spawn, execFileSync } from "child_process";
2import { mkdirSync, rmSync } from "fs";
3import {
4 createCipheriv,
5 createDecipheriv,
6 randomBytes,
7 createHash,
8} from "crypto";
9import type Database from "better-sqlite3";
10import { parse as parseYaml } from "yaml";
11import { minimatch } from "minimatch";
12import type { CanopyEventBus } from "./canopy-events.js";
13
14interface PipelineConfig {
15 name: string;
16 on: {
17 push?: { branches?: string[]; paths?: string[] };
18 };
19 checkout?: boolean;
20 concurrency?: number;
21 order?: number;
22 env?: Record<string, string>;
23 steps: StepConfig[];
24}
25
26interface StepConfig {
27 name: string;
28 image: string;
29 run: string;
30 env?: Record<string, string>;
31 timeout?: number;
32 volumes?: string[];
33}
34
35export interface PushEvent {
36 repo: string;
37 branch: string;
38 oldCommitId: string;
39 newCommitId: string;
40}
41
42export class CanopyRunner {
43 private running = new Map<number, AbortController>();
44
45 constructor(
46 private db: Database.Database,
47 private bridgeUrl: string,
48 private workspaceDir: string,
49 private workspaceHostDir: string,
50 private jwtSecret: string,
51 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void },
52 private eventBus?: CanopyEventBus
53 ) {
54 mkdirSync(workspaceDir, { recursive: true });
55 this.recoverOrphanedRuns();
56 }
57
58 /**
59 * On startup, mark any "running" pipeline runs/steps as failed.
60 * These are orphans from a previous process crash (e.g. deploy step
61 * restarted grove-api before the callback could fire).
62 */
63 private recoverOrphanedRuns(): void {
64 const steps = this.db
65 .prepare(
66 `UPDATE pipeline_steps SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
67 )
68 .run();
69 const runs = this.db
70 .prepare(
71 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
72 )
73 .run();
74 if (steps.changes > 0 || runs.changes > 0) {
75 this.logger.info(
76 `Recovered ${runs.changes} orphaned pipeline run(s), ${steps.changes} step(s)`
77 );
78 }
79 }
80
81 private getRunRow(runId: number): Record<string, unknown> | undefined {
82 return this.db.prepare(`SELECT * FROM pipeline_runs WHERE id = ?`).get(runId) as any;
83 }
84
85 private getStepRow(stepId: number): Record<string, unknown> | undefined {
86 return this.db.prepare(`SELECT * FROM pipeline_steps WHERE id = ?`).get(stepId) as any;
87 }
88
89 private ensureRepo(repoName: string): number | null {
90 const existing = this.db
91 .prepare(`SELECT id FROM repos WHERE name = ?`)
92 .get(repoName) as any;
93 if (existing) return existing.id;
94
95 try {
96 // Ensure a placeholder user exists for FK constraint
97 this.db
98 .prepare(
99 `INSERT OR IGNORE INTO users (id, username, display_name) VALUES (0, '_system', 'System')`
100 )
101 .run();
102 const result = this.db
103 .prepare(`INSERT INTO repos (name, owner_id) VALUES (?, 0)`)
104 .run(repoName);
105 return Number(result.lastInsertRowid);
106 } catch {
107 return null;
108 }
109 }
110
111 private ensurePipeline(repoId: number, name: string, file: string): number {
112 const existing = this.db
113 .prepare(`SELECT id FROM pipelines WHERE repo_id = ? AND name = ?`)
114 .get(repoId, name) as any;
115 if (existing) {
116 this.db
117 .prepare(`UPDATE pipelines SET file = ? WHERE id = ?`)
118 .run(file, existing.id);
119 return existing.id;
120 }
121 const result = this.db
122 .prepare(`INSERT INTO pipelines (repo_id, name, file) VALUES (?, ?, ?)`)
123 .run(repoId, name, file);
124 return Number(result.lastInsertRowid);
125 }
126
127 async onPush(event: PushEvent): Promise<void> {
128 const repoId = this.ensureRepo(event.repo);
129 if (!repoId) return;
130
131 const configs = await this.readPipelineConfigs(event.repo, event.branch);
132 if (configs.length === 0) return;
133
134 let changedFiles: string[] | null = null;
135 if (event.oldCommitId && event.oldCommitId !== event.newCommitId) {
136 changedFiles = await this.getChangedFiles(
137 event.repo,
138 event.oldCommitId,
139 event.newCommitId
140 );
141 }
142
143 // Fetch commit message from bridge
144 const commitMessage = await this.getCommitMessage(event.repo, event.newCommitId);
145
146 const matched = configs
147 .filter(({ file, config }) => this.matchesTrigger(config, file, event.branch, changedFiles))
148 .sort((a, b) => (a.config.order ?? 0) - (b.config.order ?? 0));
149 if (matched.length === 0) return;
150
151 type QueuedRun = {
152 order: number;
153 runId: number;
154 config: PipelineConfig;
155 };
156
157 const queuedRuns: QueuedRun[] = [];
158 for (const { file, config } of matched) {
159 // Cancel in-progress runs of this pipeline if concurrency is limited
160 if (config.concurrency != null && config.concurrency >= 1) {
161 const pipelineId = this.ensurePipeline(repoId, config.name, file);
162 const active = this.db
163 .prepare(
164 `SELECT id FROM pipeline_runs
165 WHERE pipeline_id = ? AND status IN ('pending', 'running')
166 ORDER BY id`
167 )
168 .all(pipelineId) as any[];
169
170 const toCancel = active.slice(0, Math.max(0, active.length - config.concurrency + 1));
171 for (const row of toCancel) {
172 this.cancelRun(row.id);
173 this.db
174 .prepare(
175 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ? AND status IN ('pending', 'running')`
176 )
177 .run(row.id);
178 this.db
179 .prepare(
180 `UPDATE pipeline_steps SET status = 'skipped' WHERE run_id = ? AND status IN ('pending', 'running')`
181 )
182 .run(row.id);
183 this.eventBus?.publish({
184 type: "run:cancelled",
185 runId: row.id,
186 repoId,
187 status: "cancelled",
188 run: this.getRunRow(row.id),
189 ts: new Date().toISOString(),
190 });
191 }
192 }
193
194 const runId = this.createRun(
195 repoId,
196 config.name,
197 file,
198 "push",
199 event.branch,
200 event.newCommitId,
201 commitMessage,
202 config.steps
203 );
204 queuedRuns.push({ order: config.order ?? 0, runId, config });
205 }
206
207 // Execute queued runs in the background so push handling can return quickly.
208 // Order groups are still respected: same order runs in parallel, higher orders
209 // wait for lower-order groups to finish.
210 void (async () => {
211 const groups = new Map<number, QueuedRun[]>();
212 for (const run of queuedRuns) {
213 if (!groups.has(run.order)) groups.set(run.order, []);
214 groups.get(run.order)!.push(run);
215 }
216
217 for (const [, group] of [...groups.entries()].sort(([a], [b]) => a - b)) {
218 await Promise.all(
219 group.map((run) =>
220 this.executePipeline(run.runId, run.config, event.repo, event.branch).catch(
221 (err) => this.logger.error({ err, runId: run.runId }, "Pipeline execution failed")
222 )
223 )
224 );
225 }
226 })().catch((err) =>
227 this.logger.error(
228 { err, repo: event.repo, branch: event.branch, commit: event.newCommitId },
229 "Queued push execution failed"
230 )
231 );
232 }
233
234 private async getCommitMessage(repo: string, commitId: string): Promise<string | null> {
235 try {
236 const res = await fetch(
237 `${this.bridgeUrl}/repos/${repo}/commit/${commitId}/history?limit=1`
238 );
239 if (!res.ok) return null;
240 const data = await res.json();
241 const commit = data.commits?.[0];
242 if (!commit?.message) return null;
243 return commit.message.split("\n")[0];
244 } catch {
245 return null;
246 }
247 }
248
249 // --- Pipeline config reading ---
250
251 private async readPipelineConfigs(
252 repo: string,
253 ref: string
254 ): Promise<Array<{ file: string; config: PipelineConfig }>> {
255 try {
256 const treeRes = await fetch(
257 `${this.bridgeUrl}/repos/${repo}/tree/${ref}/.canopy`
258 );
259 if (!treeRes.ok) return [];
260 const tree = await treeRes.json();
261
262 const results: Array<{ file: string; config: PipelineConfig }> = [];
263 for (const entry of tree.entries) {
264 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
265 continue;
266
267 const blobRes = await fetch(
268 `${this.bridgeUrl}/repos/${repo}/blob/${ref}/.canopy/${entry.name}`
269 );
270 if (!blobRes.ok) continue;
271 const blob = await blobRes.json();
272
273 try {
274 const config = parseYaml(blob.content) as PipelineConfig;
275 if (config.name && config.on && config.steps) {
276 results.push({ file: `.canopy/${entry.name}`, config });
277 }
278 } catch {
279 /* invalid YAML, skip */
280 }
281 }
282 return results;
283 } catch {
284 return [];
285 }
286 }
287
288 // --- Trigger matching ---
289
290 private matchesTrigger(
291 config: PipelineConfig,
292 file: string,
293 branch: string,
294 changedFiles: string[] | null
295 ): boolean {
296 const push = config.on.push;
297 if (!push) return false;
298
299 if (push.branches && push.branches.length > 0) {
300 const branchMatches = push.branches.some((pattern) =>
301 minimatch(branch, pattern)
302 );
303 if (!branchMatches) return false;
304 }
305
306 if (push.paths && push.paths.length > 0 && changedFiles !== null) {
307 // Always trigger if the pipeline's own config file was changed
308 const selfChanged = changedFiles.includes(file);
309 if (!selfChanged) {
310 const anyPathMatches = changedFiles.some((f) =>
311 push.paths!.some((pattern) => minimatch(f, pattern))
312 );
313 if (!anyPathMatches) return false;
314 }
315 }
316
317 return true;
318 }
319
320 private async getChangedFiles(
321 repo: string,
322 oldId: string,
323 newId: string
324 ): Promise<string[] | null> {
325 try {
326 const diffRes = await fetch(
327 `${this.bridgeUrl}/repos/${repo}/diff/${oldId}/${newId}`
328 );
329 if (!diffRes.ok) return null;
330 const data = await diffRes.json();
331 return (data.diffs || []).map((d: any) => d.path);
332 } catch {
333 return null;
334 }
335 }
336
337 // --- Source checkout via grove-bridge API ---
338
339 private async checkoutFromBridge(
340 repo: string,
341 ref: string,
342 destPath: string
343 ): Promise<void> {
344 const url = `${this.bridgeUrl}/repos/${repo}/archive/${ref}`;
345 const res = await fetch(url);
346 if (!res.ok) {
347 throw new Error(`Archive fetch failed: ${res.status} ${res.statusText}`);
348 }
349
350 mkdirSync(destPath, { recursive: true });
351
352 const tarData = Buffer.from(await res.arrayBuffer());
353 execFileSync("tar", ["xf", "-", "-C", destPath], { input: tarData });
354 }
355
356 // --- Run/step record creation ---
357
358 private createRun(
359 repoId: number,
360 pipelineName: string,
361 pipelineFile: string,
362 triggerType: string,
363 triggerRef: string,
364 commitId: string,
365 commitMessage: string | null,
366 steps: StepConfig[]
367 ): number {
368 const pipelineId = this.ensurePipeline(repoId, pipelineName, pipelineFile);
369
370 const result = this.db
371 .prepare(
372 `INSERT INTO pipeline_runs (pipeline_id, repo_id, pipeline_name, pipeline_file, trigger_type, trigger_ref, commit_id, commit_message, status)
373 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending')`
374 )
375 .run(pipelineId, repoId, pipelineName, pipelineFile, triggerType, triggerRef, commitId, commitMessage);
376
377 const runId = Number(result.lastInsertRowid);
378
379 const insertStep = this.db.prepare(
380 `INSERT INTO pipeline_steps (run_id, step_index, name, image, status)
381 VALUES (?, ?, ?, ?, 'pending')`
382 );
383
384 for (let i = 0; i < steps.length; i++) {
385 insertStep.run(runId, i, steps[i].name, steps[i].image);
386 }
387
388 this.eventBus?.publish({
389 type: "run:created",
390 runId,
391 repoId,
392 status: "pending",
393 run: this.getRunRow(runId),
394 ts: new Date().toISOString(),
395 });
396
397 return runId;
398 }
399
400 // --- Pipeline execution ---
401
402 private async executePipeline(
403 runId: number,
404 config: PipelineConfig,
405 repoName: string,
406 ref: string
407 ): Promise<void> {
408 const abort = new AbortController();
409 this.running.set(runId, abort);
410
411 const repoRow = this.db
412 .prepare(`SELECT id FROM repos WHERE name = ?`)
413 .get(repoName) as any;
414 const repoId = repoRow?.id ?? 0;
415
416 this.db
417 .prepare(
418 `UPDATE pipeline_runs SET status = 'running', started_at = datetime('now') WHERE id = ?`
419 )
420 .run(runId);
421
422 this.eventBus?.publish({
423 type: "run:started",
424 runId,
425 repoId,
426 status: "running",
427 run: this.getRunRow(runId),
428 ts: new Date().toISOString(),
429 });
430
431 const workspacePath = `${this.workspaceDir}/${runId}`;
432 const srcPath = `${workspacePath}/src`;
433 mkdirSync(srcPath, { recursive: true });
434
435 if (config.checkout !== false) {
436 try {
437 await this.checkoutFromBridge(repoName, ref, srcPath);
438 } catch (err) {
439 this.logger.error({ err, runId }, "Failed to checkout repo for pipeline");
440 this.db
441 .prepare(
442 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE id = ?`
443 )
444 .run(runId);
445 this.eventBus?.publish({
446 type: "run:completed",
447 runId,
448 repoId,
449 status: "failed",
450 run: this.getRunRow(runId),
451 ts: new Date().toISOString(),
452 });
453 this.running.delete(runId);
454 return;
455 }
456 }
457
458 const secrets = repoRow ? this.loadSecrets(repoRow.id) : {};
459
460 const steps = this.db
461 .prepare(
462 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
463 )
464 .all(runId) as any[];
465 let allPassed = true;
466
467 for (const step of steps) {
468 if (abort.signal.aborted || !allPassed) {
469 this.db
470 .prepare(`UPDATE pipeline_steps SET status = 'skipped' WHERE id = ?`)
471 .run(step.id);
472 this.eventBus?.publish({
473 type: "step:skipped",
474 runId,
475 repoId,
476 stepId: step.id,
477 stepIndex: step.step_index,
478 status: "skipped",
479 step: this.getStepRow(step.id),
480 ts: new Date().toISOString(),
481 });
482 continue;
483 }
484
485 const stepConfig = config.steps[step.step_index];
486 const mergedEnv = { ...config.env, ...stepConfig.env };
487 const resolvedEnv = this.resolveSecrets(mergedEnv, secrets);
488
489 const passed = await this.executeStep(
490 runId,
491 repoId,
492 step.id,
493 step.step_index,
494 stepConfig,
495 resolvedEnv,
496 `${workspacePath}/src`,
497 abort.signal
498 );
499 if (!passed) allPassed = false;
500 }
501
502 const run = this.db
503 .prepare(`SELECT started_at FROM pipeline_runs WHERE id = ?`)
504 .get(runId) as any;
505 const durationMs = run?.started_at
506 ? Date.now() - new Date(run.started_at + "Z").getTime()
507 : 0;
508
509 const finalStatus = allPassed ? "passed" : "failed";
510 this.db
511 .prepare(
512 `UPDATE pipeline_runs SET status = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
513 )
514 .run(finalStatus, durationMs, runId);
515
516 this.eventBus?.publish({
517 type: "run:completed",
518 runId,
519 repoId,
520 status: finalStatus,
521 run: this.getRunRow(runId),
522 ts: new Date().toISOString(),
523 });
524
525 try {
526 rmSync(workspacePath, { recursive: true, force: true });
527 } catch {}
528 this.running.delete(runId);
529 }
530
531 // --- Step execution (Docker container) ---
532
533 private toHostPath(containerPath: string): string {
534 if (containerPath.startsWith(this.workspaceDir)) {
535 return this.workspaceHostDir + containerPath.slice(this.workspaceDir.length);
536 }
537 return containerPath;
538 }
539
540 private executeStep(
541 runId: number,
542 repoId: number,
543 stepId: number,
544 stepIndex: number,
545 step: StepConfig,
546 env: Record<string, string>,
547 workspacePath: string,
548 signal: AbortSignal
549 ): Promise<boolean> {
550 return new Promise((resolve) => {
551 this.db
552 .prepare(
553 `UPDATE pipeline_steps SET status = 'running', started_at = datetime('now') WHERE id = ?`
554 )
555 .run(stepId);
556
557 this.eventBus?.publish({
558 type: "step:started",
559 runId,
560 repoId,
561 stepId,
562 stepIndex,
563 status: "running",
564 step: this.getStepRow(stepId),
565 ts: new Date().toISOString(),
566 });
567
568 const timeout = (step.timeout ?? 600) * 1000;
569 const envArgs = Object.entries({
570 GROVE_REGISTRY: "localhost:5000",
571 ...env,
572 }).flatMap(([k, v]) => ["-e", `${k}=${v}`]);
573
574 const hostWorkspacePath = this.toHostPath(workspacePath);
575
576 const volumeArgs = [
577 "-v", `${hostWorkspacePath}:/workspace`,
578 "-v", "/var/run/docker.sock:/var/run/docker.sock",
579 "-v", "/opt/grove:/opt/grove:ro",
580 ...(step.volumes ?? []).flatMap((v) => ["-v", v]),
581 ];
582
583 const containerName = `canopy-${runId}-${stepIndex}`;
584 const proc = spawn("docker", [
585 "run",
586 "--rm",
587 "--name",
588 containerName,
589 ...volumeArgs,
590 "-w",
591 "/workspace",
592 "--network",
593 "host",
594 ...envArgs,
595 step.image,
596 "sh",
597 "-ec",
598 step.run,
599 ]);
600
601 const insertLog = this.db.prepare(
602 `INSERT INTO step_logs (step_id, stream, content) VALUES (?, ?, ?)`
603 );
604
605 proc.stdout.on("data", (data: Buffer) => {
606 const content = data.toString();
607 insertLog.run(stepId, "stdout", content);
608 this.eventBus?.publish({
609 type: "log:append",
610 runId,
611 repoId,
612 stepId,
613 stepIndex,
614 log: { stream: "stdout", content, created_at: new Date().toISOString() },
615 ts: new Date().toISOString(),
616 });
617 });
618
619 proc.stderr.on("data", (data: Buffer) => {
620 const content = data.toString();
621 insertLog.run(stepId, "stderr", content);
622 this.eventBus?.publish({
623 type: "log:append",
624 runId,
625 repoId,
626 stepId,
627 stepIndex,
628 log: { stream: "stderr", content, created_at: new Date().toISOString() },
629 ts: new Date().toISOString(),
630 });
631 });
632
633 const timer = setTimeout(() => {
634 proc.kill("SIGTERM");
635 insertLog.run(
636 stepId,
637 "stderr",
638 `Step timed out after ${step.timeout ?? 600}s`
639 );
640 }, timeout);
641
642 const abortHandler = () => {
643 spawn("docker", ["kill", containerName]).on("close", () => {
644 proc.kill("SIGTERM");
645 });
646 insertLog.run(stepId, "stderr", "Step cancelled");
647 };
648 signal.addEventListener("abort", abortHandler, { once: true });
649
650 proc.on("close", (code) => {
651 clearTimeout(timer);
652 signal.removeEventListener("abort", abortHandler);
653
654 const startedAt = this.db
655 .prepare(`SELECT started_at FROM pipeline_steps WHERE id = ?`)
656 .get(stepId) as any;
657 const durationMs = startedAt?.started_at
658 ? Date.now() - new Date(startedAt.started_at + "Z").getTime()
659 : 0;
660
661 const passed = code === 0;
662 const stepStatus = passed ? "passed" : "failed";
663 this.db
664 .prepare(
665 `UPDATE pipeline_steps SET status = ?, exit_code = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
666 )
667 .run(stepStatus, code, durationMs, stepId);
668
669 this.eventBus?.publish({
670 type: "step:completed",
671 runId,
672 repoId,
673 stepId,
674 stepIndex,
675 status: stepStatus,
676 step: this.getStepRow(stepId),
677 ts: new Date().toISOString(),
678 });
679
680 resolve(passed);
681 });
682 });
683 }
684
685 // --- Secrets ---
686
687 private deriveKey(): Buffer {
688 return createHash("sha256")
689 .update(this.jwtSecret + ":canopy-secrets")
690 .digest();
691 }
692
693 encryptSecret(plaintext: string): string {
694 const key = this.deriveKey();
695 const iv = randomBytes(12);
696 const cipher = createCipheriv("aes-256-gcm", key, iv);
697 const encrypted = Buffer.concat([
698 cipher.update(plaintext, "utf8"),
699 cipher.final(),
700 ]);
701 const tag = cipher.getAuthTag();
702 return Buffer.concat([iv, tag, encrypted]).toString("base64");
703 }
704
705 private decrypt(encoded: string): string {
706 const key = this.deriveKey();
707 const buf = Buffer.from(encoded, "base64");
708 const iv = buf.subarray(0, 12);
709 const tag = buf.subarray(12, 28);
710 const data = buf.subarray(28);
711 const decipher = createDecipheriv("aes-256-gcm", key, iv);
712 decipher.setAuthTag(tag);
713 return decipher.update(data).toString("utf8") + decipher.final("utf8");
714 }
715
716 private loadSecrets(repoId: number): Record<string, string> {
717 const rows = this.db
718 .prepare(
719 `SELECT name, encrypted_value FROM canopy_secrets WHERE repo_id = ?`
720 )
721 .all(repoId) as any[];
722 const secrets: Record<string, string> = {};
723 for (const row of rows) {
724 try {
725 secrets[row.name] = this.decrypt(row.encrypted_value);
726 } catch {}
727 }
728 return secrets;
729 }
730
731 private resolveSecrets(
732 env: Record<string, string>,
733 secrets: Record<string, string>
734 ): Record<string, string> {
735 const resolved: Record<string, string> = {};
736 for (const [key, value] of Object.entries(env)) {
737 resolved[key] = value.replace(
738 /\$\{\{\s*secrets\.(\w+)\s*\}\}/g,
739 (_, name) => secrets[name] ?? ""
740 );
741 }
742 return resolved;
743 }
744
745 cancelRun(runId: number): void {
746 const controller = this.running.get(runId);
747 if (controller) controller.abort();
748 }
749}
750