api/src/services/canopy-runner.tsblame
View source
10943a11import { spawn, execFileSync } from "child_process";
10943a12import { mkdirSync, rmSync } from "fs";
80fafdf3import {
80fafdf4 createCipheriv,
80fafdf5 createDecipheriv,
80fafdf6 randomBytes,
80fafdf7 createHash,
80fafdf8} from "crypto";
80fafdf9import type Database from "better-sqlite3";
80fafdf10import { parse as parseYaml } from "yaml";
80fafdf11import { minimatch } from "minimatch";
5bcd5db12import type { CanopyEventBus } from "./canopy-events.js";
80fafdf13
80fafdf14interface PipelineConfig {
80fafdf15 name: string;
80fafdf16 on: {
80fafdf17 push?: { branches?: string[]; paths?: string[] };
80fafdf18 };
f60476c19 checkout?: boolean;
409bc7920 concurrency?: number;
409bc7921 order?: number;
80fafdf22 env?: Record<string, string>;
80fafdf23 steps: StepConfig[];
80fafdf24}
80fafdf25
80fafdf26interface StepConfig {
80fafdf27 name: string;
80fafdf28 image: string;
80fafdf29 run: string;
80fafdf30 env?: Record<string, string>;
80fafdf31 timeout?: number;
7422c6532 volumes?: string[];
80fafdf33}
80fafdf34
80fafdf35export interface PushEvent {
80fafdf36 repo: string;
80fafdf37 branch: string;
80fafdf38 oldCommitId: string;
80fafdf39 newCommitId: string;
80fafdf40}
80fafdf41
80fafdf42export class CanopyRunner {
80fafdf43 private running = new Map<number, AbortController>();
80fafdf44
80fafdf45 constructor(
80fafdf46 private db: Database.Database,
80fafdf47 private bridgeUrl: string,
80fafdf48 private workspaceDir: string,
1e64dbc49 private workspaceHostDir: string,
80fafdf50 private jwtSecret: string,
5bcd5db51 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void },
5bcd5db52 private eventBus?: CanopyEventBus
80fafdf53 ) {
80fafdf54 mkdirSync(workspaceDir, { recursive: true });
7422c6555 this.recoverOrphanedRuns();
7422c6556 }
7422c6557
7422c6558 /**
7422c6559 * On startup, mark any "running" pipeline runs/steps as failed.
7422c6560 * These are orphans from a previous process crash (e.g. deploy step
7422c6561 * restarted grove-api before the callback could fire).
7422c6562 */
7422c6563 private recoverOrphanedRuns(): void {
7422c6564 const steps = this.db
7422c6565 .prepare(
7422c6566 `UPDATE pipeline_steps SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
7422c6567 )
7422c6568 .run();
7422c6569 const runs = this.db
7422c6570 .prepare(
7422c6571 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
7422c6572 )
7422c6573 .run();
7422c6574 if (steps.changes > 0 || runs.changes > 0) {
7422c6575 this.logger.info(
7422c6576 `Recovered ${runs.changes} orphaned pipeline run(s), ${steps.changes} step(s)`
7422c6577 );
7422c6578 }
80fafdf79 }
80fafdf80
5bcd5db81 private getRunRow(runId: number): Record<string, unknown> | undefined {
5bcd5db82 return this.db.prepare(`SELECT * FROM pipeline_runs WHERE id = ?`).get(runId) as any;
5bcd5db83 }
5bcd5db84
5bcd5db85 private getStepRow(stepId: number): Record<string, unknown> | undefined {
5bcd5db86 return this.db.prepare(`SELECT * FROM pipeline_steps WHERE id = ?`).get(stepId) as any;
5bcd5db87 }
5bcd5db88
791afd489 private ensureRepo(repoName: string): number | null {
791afd490 const existing = this.db
791afd491 .prepare(`SELECT id FROM repos WHERE name = ?`)
791afd492 .get(repoName) as any;
791afd493 if (existing) return existing.id;
791afd494
791afd495 try {
791afd496 // Ensure a placeholder user exists for FK constraint
791afd497 this.db
791afd498 .prepare(
791afd499 `INSERT OR IGNORE INTO users (id, username, display_name) VALUES (0, '_system', 'System')`
791afd4100 )
791afd4101 .run();
791afd4102 const result = this.db
791afd4103 .prepare(`INSERT INTO repos (name, owner_id) VALUES (?, 0)`)
791afd4104 .run(repoName);
791afd4105 return Number(result.lastInsertRowid);
791afd4106 } catch {
791afd4107 return null;
791afd4108 }
791afd4109 }
791afd4110
37f6938111 private ensurePipeline(repoId: number, name: string, file: string): number {
37f6938112 const existing = this.db
37f6938113 .prepare(`SELECT id FROM pipelines WHERE repo_id = ? AND name = ?`)
37f6938114 .get(repoId, name) as any;
37f6938115 if (existing) {
37f6938116 this.db
37f6938117 .prepare(`UPDATE pipelines SET file = ? WHERE id = ?`)
37f6938118 .run(file, existing.id);
37f6938119 return existing.id;
37f6938120 }
37f6938121 const result = this.db
37f6938122 .prepare(`INSERT INTO pipelines (repo_id, name, file) VALUES (?, ?, ?)`)
37f6938123 .run(repoId, name, file);
37f6938124 return Number(result.lastInsertRowid);
37f6938125 }
37f6938126
191af2a127 async onPush(event: PushEvent, pipelineFilter?: string): Promise<void> {
791afd4128 const repoId = this.ensureRepo(event.repo);
791afd4129 if (!repoId) return;
80fafdf130
791afd4131 const configs = await this.readPipelineConfigs(event.repo, event.branch);
80fafdf132 if (configs.length === 0) return;
80fafdf133
80fafdf134 let changedFiles: string[] | null = null;
80fafdf135 if (event.oldCommitId && event.oldCommitId !== event.newCommitId) {
80fafdf136 changedFiles = await this.getChangedFiles(
80fafdf137 event.repo,
80fafdf138 event.oldCommitId,
80fafdf139 event.newCommitId
80fafdf140 );
80fafdf141 }
80fafdf142
818dc90143 // Fetch commit message from bridge
818dc90144 const commitMessage = await this.getCommitMessage(event.repo, event.newCommitId);
818dc90145
409bc79146 const matched = configs
d948b49147 .filter(({ file, config }) => this.matchesTrigger(config, file, event.branch, changedFiles))
191af2a148 .filter(({ config }) => !pipelineFilter || config.name === pipelineFilter)
409bc79149 .sort((a, b) => (a.config.order ?? 0) - (b.config.order ?? 0));
c98936c150 if (matched.length === 0) return;
409bc79151
c98936c152 type QueuedRun = {
c98936c153 order: number;
c98936c154 runId: number;
c98936c155 config: PipelineConfig;
c98936c156 };
409bc79157
c98936c158 const queuedRuns: QueuedRun[] = [];
c98936c159 for (const { file, config } of matched) {
c98936c160 // Cancel in-progress runs of this pipeline if concurrency is limited
c98936c161 if (config.concurrency != null && config.concurrency >= 1) {
c98936c162 const pipelineId = this.ensurePipeline(repoId, config.name, file);
c98936c163 const active = this.db
c98936c164 .prepare(
c98936c165 `SELECT id FROM pipeline_runs
c98936c166 WHERE pipeline_id = ? AND status IN ('pending', 'running')
c98936c167 ORDER BY id`
c98936c168 )
c98936c169 .all(pipelineId) as any[];
603c6c9170
c98936c171 const toCancel = active.slice(0, Math.max(0, active.length - config.concurrency + 1));
c98936c172 for (const row of toCancel) {
c98936c173 this.cancelRun(row.id);
c98936c174 this.db
409bc79175 .prepare(
c98936c176 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ? AND status IN ('pending', 'running')`
409bc79177 )
c98936c178 .run(row.id);
c98936c179 this.db
c98936c180 .prepare(
c98936c181 `UPDATE pipeline_steps SET status = 'skipped' WHERE run_id = ? AND status IN ('pending', 'running')`
c98936c182 )
c98936c183 .run(row.id);
5bcd5db184 this.eventBus?.publish({
5bcd5db185 type: "run:cancelled",
5bcd5db186 runId: row.id,
5bcd5db187 repoId,
5bcd5db188 status: "cancelled",
5bcd5db189 run: this.getRunRow(row.id),
5bcd5db190 ts: new Date().toISOString(),
5bcd5db191 });
409bc79192 }
c98936c193 }
80fafdf194
c98936c195 const runId = this.createRun(
c98936c196 repoId,
c98936c197 config.name,
c98936c198 file,
c98936c199 "push",
c98936c200 event.branch,
c98936c201 event.newCommitId,
c98936c202 commitMessage,
c98936c203 config.steps
c98936c204 );
c98936c205 queuedRuns.push({ order: config.order ?? 0, runId, config });
c98936c206 }
c98936c207
c98936c208 // Execute queued runs in the background so push handling can return quickly.
c98936c209 // Order groups are still respected: same order runs in parallel, higher orders
c98936c210 // wait for lower-order groups to finish.
c98936c211 void (async () => {
c98936c212 const groups = new Map<number, QueuedRun[]>();
c98936c213 for (const run of queuedRuns) {
c98936c214 if (!groups.has(run.order)) groups.set(run.order, []);
c98936c215 groups.get(run.order)!.push(run);
c98936c216 }
80fafdf217
c98936c218 for (const [, group] of [...groups.entries()].sort(([a], [b]) => a - b)) {
c98936c219 await Promise.all(
c98936c220 group.map((run) =>
c98936c221 this.executePipeline(run.runId, run.config, event.repo, event.branch).catch(
c98936c222 (err) => this.logger.error({ err, runId: run.runId }, "Pipeline execution failed")
c98936c223 )
603c6c9224 )
603c6c9225 );
603c6c9226 }
c98936c227 })().catch((err) =>
c98936c228 this.logger.error(
c98936c229 { err, repo: event.repo, branch: event.branch, commit: event.newCommitId },
c98936c230 "Queued push execution failed"
c98936c231 )
c98936c232 );
80fafdf233 }
80fafdf234
818dc90235 private async getCommitMessage(repo: string, commitId: string): Promise<string | null> {
818dc90236 try {
818dc90237 const res = await fetch(
818dc90238 `${this.bridgeUrl}/repos/${repo}/commit/${commitId}/history?limit=1`
818dc90239 );
818dc90240 if (!res.ok) return null;
818dc90241 const data = await res.json();
818dc90242 const commit = data.commits?.[0];
818dc90243 if (!commit?.message) return null;
818dc90244 return commit.message.split("\n")[0];
818dc90245 } catch {
818dc90246 return null;
818dc90247 }
818dc90248 }
818dc90249
80fafdf250 // --- Pipeline config reading ---
80fafdf251
80fafdf252 private async readPipelineConfigs(
80fafdf253 repo: string,
791afd4254 ref: string
80fafdf255 ): Promise<Array<{ file: string; config: PipelineConfig }>> {
80fafdf256 try {
80fafdf257 const treeRes = await fetch(
791afd4258 `${this.bridgeUrl}/repos/${repo}/tree/${ref}/.canopy`
80fafdf259 );
80fafdf260 if (!treeRes.ok) return [];
80fafdf261 const tree = await treeRes.json();
80fafdf262
80fafdf263 const results: Array<{ file: string; config: PipelineConfig }> = [];
80fafdf264 for (const entry of tree.entries) {
80fafdf265 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
80fafdf266 continue;
80fafdf267
80fafdf268 const blobRes = await fetch(
791afd4269 `${this.bridgeUrl}/repos/${repo}/blob/${ref}/.canopy/${entry.name}`
80fafdf270 );
80fafdf271 if (!blobRes.ok) continue;
80fafdf272 const blob = await blobRes.json();
80fafdf273
80fafdf274 try {
80fafdf275 const config = parseYaml(blob.content) as PipelineConfig;
80fafdf276 if (config.name && config.on && config.steps) {
80fafdf277 results.push({ file: `.canopy/${entry.name}`, config });
80fafdf278 }
80fafdf279 } catch {
80fafdf280 /* invalid YAML, skip */
80fafdf281 }
80fafdf282 }
80fafdf283 return results;
80fafdf284 } catch {
80fafdf285 return [];
80fafdf286 }
80fafdf287 }
80fafdf288
80fafdf289 // --- Trigger matching ---
80fafdf290
80fafdf291 private matchesTrigger(
80fafdf292 config: PipelineConfig,
d948b49293 file: string,
80fafdf294 branch: string,
80fafdf295 changedFiles: string[] | null
80fafdf296 ): boolean {
80fafdf297 const push = config.on.push;
80fafdf298 if (!push) return false;
80fafdf299
80fafdf300 if (push.branches && push.branches.length > 0) {
80fafdf301 const branchMatches = push.branches.some((pattern) =>
80fafdf302 minimatch(branch, pattern)
80fafdf303 );
80fafdf304 if (!branchMatches) return false;
80fafdf305 }
80fafdf306
80fafdf307 if (push.paths && push.paths.length > 0 && changedFiles !== null) {
d948b49308 // Always trigger if the pipeline's own config file was changed
d948b49309 const selfChanged = changedFiles.includes(file);
d948b49310 if (!selfChanged) {
d948b49311 const anyPathMatches = changedFiles.some((f) =>
d948b49312 push.paths!.some((pattern) => minimatch(f, pattern))
d948b49313 );
d948b49314 if (!anyPathMatches) return false;
d948b49315 }
80fafdf316 }
80fafdf317
80fafdf318 return true;
80fafdf319 }
80fafdf320
80fafdf321 private async getChangedFiles(
80fafdf322 repo: string,
80fafdf323 oldId: string,
80fafdf324 newId: string
791afd4325 ): Promise<string[] | null> {
80fafdf326 try {
80fafdf327 const diffRes = await fetch(
80fafdf328 `${this.bridgeUrl}/repos/${repo}/diff/${oldId}/${newId}`
80fafdf329 );
791afd4330 if (!diffRes.ok) return null;
80fafdf331 const data = await diffRes.json();
80fafdf332 return (data.diffs || []).map((d: any) => d.path);
80fafdf333 } catch {
791afd4334 return null;
80fafdf335 }
80fafdf336 }
80fafdf337
791afd4338 // --- Source checkout via grove-bridge API ---
791afd4339
791afd4340 private async checkoutFromBridge(
791afd4341 repo: string,
791afd4342 ref: string,
791afd4343 destPath: string
791afd4344 ): Promise<void> {
10943a1345 const url = `${this.bridgeUrl}/repos/${repo}/archive/${ref}`;
10943a1346 const res = await fetch(url);
10943a1347 if (!res.ok) {
10943a1348 throw new Error(`Archive fetch failed: ${res.status} ${res.statusText}`);
10943a1349 }
791afd4350
10943a1351 mkdirSync(destPath, { recursive: true });
791afd4352
10943a1353 const tarData = Buffer.from(await res.arrayBuffer());
10943a1354 execFileSync("tar", ["xf", "-", "-C", destPath], { input: tarData });
791afd4355 }
791afd4356
80fafdf357 // --- Run/step record creation ---
80fafdf358
80fafdf359 private createRun(
80fafdf360 repoId: number,
80fafdf361 pipelineName: string,
80fafdf362 pipelineFile: string,
80fafdf363 triggerType: string,
80fafdf364 triggerRef: string,
80fafdf365 commitId: string,
818dc90366 commitMessage: string | null,
80fafdf367 steps: StepConfig[]
80fafdf368 ): number {
37f6938369 const pipelineId = this.ensurePipeline(repoId, pipelineName, pipelineFile);
37f6938370
80fafdf371 const result = this.db
80fafdf372 .prepare(
37f6938373 `INSERT INTO pipeline_runs (pipeline_id, repo_id, pipeline_name, pipeline_file, trigger_type, trigger_ref, commit_id, commit_message, status)
37f6938374 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending')`
80fafdf375 )
37f6938376 .run(pipelineId, repoId, pipelineName, pipelineFile, triggerType, triggerRef, commitId, commitMessage);
80fafdf377
80fafdf378 const runId = Number(result.lastInsertRowid);
80fafdf379
80fafdf380 const insertStep = this.db.prepare(
80fafdf381 `INSERT INTO pipeline_steps (run_id, step_index, name, image, status)
80fafdf382 VALUES (?, ?, ?, ?, 'pending')`
80fafdf383 );
80fafdf384
80fafdf385 for (let i = 0; i < steps.length; i++) {
80fafdf386 insertStep.run(runId, i, steps[i].name, steps[i].image);
80fafdf387 }
80fafdf388
5bcd5db389 this.eventBus?.publish({
5bcd5db390 type: "run:created",
5bcd5db391 runId,
5bcd5db392 repoId,
5bcd5db393 status: "pending",
5bcd5db394 run: this.getRunRow(runId),
5bcd5db395 ts: new Date().toISOString(),
5bcd5db396 });
5bcd5db397
80fafdf398 return runId;
80fafdf399 }
80fafdf400
80fafdf401 // --- Pipeline execution ---
80fafdf402
80fafdf403 private async executePipeline(
80fafdf404 runId: number,
80fafdf405 config: PipelineConfig,
80fafdf406 repoName: string,
791afd4407 ref: string
80fafdf408 ): Promise<void> {
80fafdf409 const abort = new AbortController();
80fafdf410 this.running.set(runId, abort);
80fafdf411
5bcd5db412 const repoRow = this.db
5bcd5db413 .prepare(`SELECT id FROM repos WHERE name = ?`)
5bcd5db414 .get(repoName) as any;
5bcd5db415 const repoId = repoRow?.id ?? 0;
5bcd5db416
80fafdf417 this.db
80fafdf418 .prepare(
80fafdf419 `UPDATE pipeline_runs SET status = 'running', started_at = datetime('now') WHERE id = ?`
80fafdf420 )
80fafdf421 .run(runId);
80fafdf422
5bcd5db423 this.eventBus?.publish({
5bcd5db424 type: "run:started",
5bcd5db425 runId,
5bcd5db426 repoId,
5bcd5db427 status: "running",
5bcd5db428 run: this.getRunRow(runId),
5bcd5db429 ts: new Date().toISOString(),
5bcd5db430 });
5bcd5db431
80fafdf432 const workspacePath = `${this.workspaceDir}/${runId}`;
791afd4433 const srcPath = `${workspacePath}/src`;
791afd4434 mkdirSync(srcPath, { recursive: true });
80fafdf435
f60476c436 if (config.checkout !== false) {
f60476c437 try {
f60476c438 await this.checkoutFromBridge(repoName, ref, srcPath);
f60476c439 } catch (err) {
f60476c440 this.logger.error({ err, runId }, "Failed to checkout repo for pipeline");
f60476c441 this.db
f60476c442 .prepare(
f60476c443 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE id = ?`
f60476c444 )
f60476c445 .run(runId);
5bcd5db446 this.eventBus?.publish({
5bcd5db447 type: "run:completed",
5bcd5db448 runId,
5bcd5db449 repoId,
5bcd5db450 status: "failed",
5bcd5db451 run: this.getRunRow(runId),
5bcd5db452 ts: new Date().toISOString(),
5bcd5db453 });
f60476c454 this.running.delete(runId);
f60476c455 return;
f60476c456 }
80fafdf457 }
80fafdf458
80fafdf459 const secrets = repoRow ? this.loadSecrets(repoRow.id) : {};
80fafdf460
80fafdf461 const steps = this.db
80fafdf462 .prepare(
80fafdf463 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
80fafdf464 )
80fafdf465 .all(runId) as any[];
80fafdf466 let allPassed = true;
80fafdf467
80fafdf468 for (const step of steps) {
80fafdf469 if (abort.signal.aborted || !allPassed) {
80fafdf470 this.db
80fafdf471 .prepare(`UPDATE pipeline_steps SET status = 'skipped' WHERE id = ?`)
80fafdf472 .run(step.id);
5bcd5db473 this.eventBus?.publish({
5bcd5db474 type: "step:skipped",
5bcd5db475 runId,
5bcd5db476 repoId,
5bcd5db477 stepId: step.id,
5bcd5db478 stepIndex: step.step_index,
5bcd5db479 status: "skipped",
5bcd5db480 step: this.getStepRow(step.id),
5bcd5db481 ts: new Date().toISOString(),
5bcd5db482 });
80fafdf483 continue;
80fafdf484 }
80fafdf485
80fafdf486 const stepConfig = config.steps[step.step_index];
80fafdf487 const mergedEnv = { ...config.env, ...stepConfig.env };
80fafdf488 const resolvedEnv = this.resolveSecrets(mergedEnv, secrets);
80fafdf489
80fafdf490 const passed = await this.executeStep(
5bcd5db491 runId,
5bcd5db492 repoId,
80fafdf493 step.id,
5bcd5db494 step.step_index,
80fafdf495 stepConfig,
80fafdf496 resolvedEnv,
80fafdf497 `${workspacePath}/src`,
80fafdf498 abort.signal
80fafdf499 );
80fafdf500 if (!passed) allPassed = false;
80fafdf501 }
80fafdf502
80fafdf503 const run = this.db
80fafdf504 .prepare(`SELECT started_at FROM pipeline_runs WHERE id = ?`)
80fafdf505 .get(runId) as any;
80fafdf506 const durationMs = run?.started_at
80fafdf507 ? Date.now() - new Date(run.started_at + "Z").getTime()
80fafdf508 : 0;
80fafdf509
5bcd5db510 const finalStatus = allPassed ? "passed" : "failed";
80fafdf511 this.db
80fafdf512 .prepare(
80fafdf513 `UPDATE pipeline_runs SET status = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
80fafdf514 )
5bcd5db515 .run(finalStatus, durationMs, runId);
5bcd5db516
5bcd5db517 this.eventBus?.publish({
5bcd5db518 type: "run:completed",
5bcd5db519 runId,
5bcd5db520 repoId,
5bcd5db521 status: finalStatus,
5bcd5db522 run: this.getRunRow(runId),
5bcd5db523 ts: new Date().toISOString(),
5bcd5db524 });
80fafdf525
80fafdf526 try {
80fafdf527 rmSync(workspacePath, { recursive: true, force: true });
80fafdf528 } catch {}
80fafdf529 this.running.delete(runId);
80fafdf530 }
80fafdf531
80fafdf532 // --- Step execution (Docker container) ---
80fafdf533
1e64dbc534 private toHostPath(containerPath: string): string {
1e64dbc535 if (containerPath.startsWith(this.workspaceDir)) {
1e64dbc536 return this.workspaceHostDir + containerPath.slice(this.workspaceDir.length);
1e64dbc537 }
1e64dbc538 return containerPath;
1e64dbc539 }
1e64dbc540
80fafdf541 private executeStep(
5bcd5db542 runId: number,
5bcd5db543 repoId: number,
80fafdf544 stepId: number,
5bcd5db545 stepIndex: number,
80fafdf546 step: StepConfig,
80fafdf547 env: Record<string, string>,
80fafdf548 workspacePath: string,
80fafdf549 signal: AbortSignal
80fafdf550 ): Promise<boolean> {
80fafdf551 return new Promise((resolve) => {
80fafdf552 this.db
80fafdf553 .prepare(
80fafdf554 `UPDATE pipeline_steps SET status = 'running', started_at = datetime('now') WHERE id = ?`
80fafdf555 )
80fafdf556 .run(stepId);
80fafdf557
5bcd5db558 this.eventBus?.publish({
5bcd5db559 type: "step:started",
5bcd5db560 runId,
5bcd5db561 repoId,
5bcd5db562 stepId,
5bcd5db563 stepIndex,
5bcd5db564 status: "running",
5bcd5db565 step: this.getStepRow(stepId),
5bcd5db566 ts: new Date().toISOString(),
5bcd5db567 });
5bcd5db568
80fafdf569 const timeout = (step.timeout ?? 600) * 1000;
5f0fbcf570 const envArgs = Object.entries({
5f0fbcf571 GROVE_REGISTRY: "localhost:5000",
5f0fbcf572 ...env,
5f0fbcf573 }).flatMap(([k, v]) => ["-e", `${k}=${v}`]);
80fafdf574
1e64dbc575 const hostWorkspacePath = this.toHostPath(workspacePath);
1e64dbc576
7422c65577 const volumeArgs = [
7422c65578 "-v", `${hostWorkspacePath}:/workspace`,
7422c65579 "-v", "/var/run/docker.sock:/var/run/docker.sock",
7422c65580 "-v", "/opt/grove:/opt/grove:ro",
7422c65581 ...(step.volumes ?? []).flatMap((v) => ["-v", v]),
7422c65582 ];
7422c65583
57c315f584 const containerName = `canopy-${runId}-${stepIndex}`;
80fafdf585 const proc = spawn("docker", [
80fafdf586 "run",
80fafdf587 "--rm",
57c315f588 "--name",
57c315f589 containerName,
7422c65590 ...volumeArgs,
80fafdf591 "-w",
80fafdf592 "/workspace",
80fafdf593 "--network",
80fafdf594 "host",
80fafdf595 ...envArgs,
80fafdf596 step.image,
80fafdf597 "sh",
00c0ecf598 "-ec",
80fafdf599 step.run,
80fafdf600 ]);
80fafdf601
80fafdf602 const insertLog = this.db.prepare(
80fafdf603 `INSERT INTO step_logs (step_id, stream, content) VALUES (?, ?, ?)`
80fafdf604 );
80fafdf605
80fafdf606 proc.stdout.on("data", (data: Buffer) => {
5bcd5db607 const content = data.toString();
5bcd5db608 insertLog.run(stepId, "stdout", content);
5bcd5db609 this.eventBus?.publish({
5bcd5db610 type: "log:append",
5bcd5db611 runId,
5bcd5db612 repoId,
5bcd5db613 stepId,
5bcd5db614 stepIndex,
5bcd5db615 log: { stream: "stdout", content, created_at: new Date().toISOString() },
5bcd5db616 ts: new Date().toISOString(),
5bcd5db617 });
80fafdf618 });
80fafdf619
80fafdf620 proc.stderr.on("data", (data: Buffer) => {
5bcd5db621 const content = data.toString();
5bcd5db622 insertLog.run(stepId, "stderr", content);
5bcd5db623 this.eventBus?.publish({
5bcd5db624 type: "log:append",
5bcd5db625 runId,
5bcd5db626 repoId,
5bcd5db627 stepId,
5bcd5db628 stepIndex,
5bcd5db629 log: { stream: "stderr", content, created_at: new Date().toISOString() },
5bcd5db630 ts: new Date().toISOString(),
5bcd5db631 });
80fafdf632 });
80fafdf633
80fafdf634 const timer = setTimeout(() => {
80fafdf635 proc.kill("SIGTERM");
80fafdf636 insertLog.run(
80fafdf637 stepId,
80fafdf638 "stderr",
80fafdf639 `Step timed out after ${step.timeout ?? 600}s`
80fafdf640 );
80fafdf641 }, timeout);
80fafdf642
80fafdf643 const abortHandler = () => {
57c315f644 spawn("docker", ["kill", containerName]).on("close", () => {
57c315f645 proc.kill("SIGTERM");
57c315f646 });
80fafdf647 insertLog.run(stepId, "stderr", "Step cancelled");
80fafdf648 };
80fafdf649 signal.addEventListener("abort", abortHandler, { once: true });
80fafdf650
80fafdf651 proc.on("close", (code) => {
80fafdf652 clearTimeout(timer);
80fafdf653 signal.removeEventListener("abort", abortHandler);
80fafdf654
80fafdf655 const startedAt = this.db
80fafdf656 .prepare(`SELECT started_at FROM pipeline_steps WHERE id = ?`)
80fafdf657 .get(stepId) as any;
80fafdf658 const durationMs = startedAt?.started_at
80fafdf659 ? Date.now() - new Date(startedAt.started_at + "Z").getTime()
80fafdf660 : 0;
80fafdf661
80fafdf662 const passed = code === 0;
5bcd5db663 const stepStatus = passed ? "passed" : "failed";
80fafdf664 this.db
80fafdf665 .prepare(
80fafdf666 `UPDATE pipeline_steps SET status = ?, exit_code = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
80fafdf667 )
5bcd5db668 .run(stepStatus, code, durationMs, stepId);
5bcd5db669
5bcd5db670 this.eventBus?.publish({
5bcd5db671 type: "step:completed",
5bcd5db672 runId,
5bcd5db673 repoId,
5bcd5db674 stepId,
5bcd5db675 stepIndex,
5bcd5db676 status: stepStatus,
5bcd5db677 step: this.getStepRow(stepId),
5bcd5db678 ts: new Date().toISOString(),
5bcd5db679 });
80fafdf680
80fafdf681 resolve(passed);
80fafdf682 });
80fafdf683 });
80fafdf684 }
80fafdf685
80fafdf686 // --- Secrets ---
80fafdf687
80fafdf688 private deriveKey(): Buffer {
80fafdf689 return createHash("sha256")
80fafdf690 .update(this.jwtSecret + ":canopy-secrets")
80fafdf691 .digest();
80fafdf692 }
80fafdf693
80fafdf694 encryptSecret(plaintext: string): string {
80fafdf695 const key = this.deriveKey();
80fafdf696 const iv = randomBytes(12);
80fafdf697 const cipher = createCipheriv("aes-256-gcm", key, iv);
80fafdf698 const encrypted = Buffer.concat([
80fafdf699 cipher.update(plaintext, "utf8"),
80fafdf700 cipher.final(),
80fafdf701 ]);
80fafdf702 const tag = cipher.getAuthTag();
80fafdf703 return Buffer.concat([iv, tag, encrypted]).toString("base64");
80fafdf704 }
80fafdf705
80fafdf706 private decrypt(encoded: string): string {
80fafdf707 const key = this.deriveKey();
80fafdf708 const buf = Buffer.from(encoded, "base64");
80fafdf709 const iv = buf.subarray(0, 12);
80fafdf710 const tag = buf.subarray(12, 28);
80fafdf711 const data = buf.subarray(28);
80fafdf712 const decipher = createDecipheriv("aes-256-gcm", key, iv);
80fafdf713 decipher.setAuthTag(tag);
80fafdf714 return decipher.update(data).toString("utf8") + decipher.final("utf8");
80fafdf715 }
80fafdf716
80fafdf717 private loadSecrets(repoId: number): Record<string, string> {
80fafdf718 const rows = this.db
80fafdf719 .prepare(
80fafdf720 `SELECT name, encrypted_value FROM canopy_secrets WHERE repo_id = ?`
80fafdf721 )
80fafdf722 .all(repoId) as any[];
80fafdf723 const secrets: Record<string, string> = {};
80fafdf724 for (const row of rows) {
80fafdf725 try {
80fafdf726 secrets[row.name] = this.decrypt(row.encrypted_value);
80fafdf727 } catch {}
80fafdf728 }
80fafdf729 return secrets;
80fafdf730 }
80fafdf731
80fafdf732 private resolveSecrets(
80fafdf733 env: Record<string, string>,
80fafdf734 secrets: Record<string, string>
80fafdf735 ): Record<string, string> {
80fafdf736 const resolved: Record<string, string> = {};
80fafdf737 for (const [key, value] of Object.entries(env)) {
80fafdf738 resolved[key] = value.replace(
80fafdf739 /\$\{\{\s*secrets\.(\w+)\s*\}\}/g,
80fafdf740 (_, name) => secrets[name] ?? ""
80fafdf741 );
80fafdf742 }
80fafdf743 return resolved;
80fafdf744 }
80fafdf745
80fafdf746 cancelRun(runId: number): void {
80fafdf747 const controller = this.running.get(runId);
80fafdf748 if (controller) controller.abort();
80fafdf749 }
80fafdf750}