api/src/services/canopy-runner.tsblame
View source
10943a11import { spawn, execFileSync } from "child_process";
10943a12import { mkdirSync, rmSync } from "fs";
80fafdf3import {
80fafdf4 createCipheriv,
80fafdf5 createDecipheriv,
80fafdf6 randomBytes,
80fafdf7 createHash,
80fafdf8} from "crypto";
80fafdf9import type Database from "better-sqlite3";
80fafdf10import { parse as parseYaml } from "yaml";
80fafdf11import { minimatch } from "minimatch";
5bcd5db12import type { CanopyEventBus } from "./canopy-events.js";
80fafdf13
80fafdf14interface PipelineConfig {
80fafdf15 name: string;
80fafdf16 on: {
80fafdf17 push?: { branches?: string[]; paths?: string[] };
80fafdf18 };
f60476c19 checkout?: boolean;
409bc7920 concurrency?: number;
409bc7921 order?: number;
80fafdf22 env?: Record<string, string>;
80fafdf23 steps: StepConfig[];
80fafdf24}
80fafdf25
80fafdf26interface StepConfig {
80fafdf27 name: string;
80fafdf28 image: string;
80fafdf29 run: string;
80fafdf30 env?: Record<string, string>;
80fafdf31 timeout?: number;
7422c6532 volumes?: string[];
80fafdf33}
80fafdf34
80fafdf35export interface PushEvent {
80fafdf36 repo: string;
80fafdf37 branch: string;
80fafdf38 oldCommitId: string;
80fafdf39 newCommitId: string;
80fafdf40}
80fafdf41
80fafdf42export class CanopyRunner {
80fafdf43 private running = new Map<number, AbortController>();
80fafdf44
80fafdf45 constructor(
80fafdf46 private db: Database.Database,
80fafdf47 private bridgeUrl: string,
80fafdf48 private workspaceDir: string,
1e64dbc49 private workspaceHostDir: string,
80fafdf50 private jwtSecret: string,
5bcd5db51 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void },
5bcd5db52 private eventBus?: CanopyEventBus
80fafdf53 ) {
80fafdf54 mkdirSync(workspaceDir, { recursive: true });
7422c6555 this.recoverOrphanedRuns();
7422c6556 }
7422c6557
7422c6558 /**
7422c6559 * On startup, mark any "running" pipeline runs/steps as failed.
7422c6560 * These are orphans from a previous process crash (e.g. deploy step
7422c6561 * restarted grove-api before the callback could fire).
7422c6562 */
7422c6563 private recoverOrphanedRuns(): void {
7422c6564 const steps = this.db
7422c6565 .prepare(
7422c6566 `UPDATE pipeline_steps SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
7422c6567 )
7422c6568 .run();
7422c6569 const runs = this.db
7422c6570 .prepare(
7422c6571 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE status = 'running'`
7422c6572 )
7422c6573 .run();
7422c6574 if (steps.changes > 0 || runs.changes > 0) {
7422c6575 this.logger.info(
7422c6576 `Recovered ${runs.changes} orphaned pipeline run(s), ${steps.changes} step(s)`
7422c6577 );
7422c6578 }
80fafdf79 }
80fafdf80
5bcd5db81 private getRunRow(runId: number): Record<string, unknown> | undefined {
5bcd5db82 return this.db.prepare(`SELECT * FROM pipeline_runs WHERE id = ?`).get(runId) as any;
5bcd5db83 }
5bcd5db84
5bcd5db85 private getStepRow(stepId: number): Record<string, unknown> | undefined {
5bcd5db86 return this.db.prepare(`SELECT * FROM pipeline_steps WHERE id = ?`).get(stepId) as any;
5bcd5db87 }
5bcd5db88
791afd489 private ensureRepo(repoName: string): number | null {
791afd490 const existing = this.db
791afd491 .prepare(`SELECT id FROM repos WHERE name = ?`)
791afd492 .get(repoName) as any;
791afd493 if (existing) return existing.id;
791afd494
791afd495 try {
791afd496 // Ensure a placeholder user exists for FK constraint
791afd497 this.db
791afd498 .prepare(
791afd499 `INSERT OR IGNORE INTO users (id, username, display_name) VALUES (0, '_system', 'System')`
791afd4100 )
791afd4101 .run();
791afd4102 const result = this.db
791afd4103 .prepare(`INSERT INTO repos (name, owner_id) VALUES (?, 0)`)
791afd4104 .run(repoName);
791afd4105 return Number(result.lastInsertRowid);
791afd4106 } catch {
791afd4107 return null;
791afd4108 }
791afd4109 }
791afd4110
37f6938111 private ensurePipeline(repoId: number, name: string, file: string): number {
37f6938112 const existing = this.db
37f6938113 .prepare(`SELECT id FROM pipelines WHERE repo_id = ? AND name = ?`)
37f6938114 .get(repoId, name) as any;
37f6938115 if (existing) {
37f6938116 this.db
37f6938117 .prepare(`UPDATE pipelines SET file = ? WHERE id = ?`)
37f6938118 .run(file, existing.id);
37f6938119 return existing.id;
37f6938120 }
37f6938121 const result = this.db
37f6938122 .prepare(`INSERT INTO pipelines (repo_id, name, file) VALUES (?, ?, ?)`)
37f6938123 .run(repoId, name, file);
37f6938124 return Number(result.lastInsertRowid);
37f6938125 }
37f6938126
80fafdf127 async onPush(event: PushEvent): Promise<void> {
791afd4128 const repoId = this.ensureRepo(event.repo);
791afd4129 if (!repoId) return;
80fafdf130
791afd4131 const configs = await this.readPipelineConfigs(event.repo, event.branch);
80fafdf132 if (configs.length === 0) return;
80fafdf133
80fafdf134 let changedFiles: string[] | null = null;
80fafdf135 if (event.oldCommitId && event.oldCommitId !== event.newCommitId) {
80fafdf136 changedFiles = await this.getChangedFiles(
80fafdf137 event.repo,
80fafdf138 event.oldCommitId,
80fafdf139 event.newCommitId
80fafdf140 );
80fafdf141 }
80fafdf142
818dc90143 // Fetch commit message from bridge
818dc90144 const commitMessage = await this.getCommitMessage(event.repo, event.newCommitId);
818dc90145
409bc79146 const matched = configs
d948b49147 .filter(({ file, config }) => this.matchesTrigger(config, file, event.branch, changedFiles))
409bc79148 .sort((a, b) => (a.config.order ?? 0) - (b.config.order ?? 0));
c98936c149 if (matched.length === 0) return;
409bc79150
c98936c151 type QueuedRun = {
c98936c152 order: number;
c98936c153 runId: number;
c98936c154 config: PipelineConfig;
c98936c155 };
409bc79156
c98936c157 const queuedRuns: QueuedRun[] = [];
c98936c158 for (const { file, config } of matched) {
c98936c159 // Cancel in-progress runs of this pipeline if concurrency is limited
c98936c160 if (config.concurrency != null && config.concurrency >= 1) {
c98936c161 const pipelineId = this.ensurePipeline(repoId, config.name, file);
c98936c162 const active = this.db
c98936c163 .prepare(
c98936c164 `SELECT id FROM pipeline_runs
c98936c165 WHERE pipeline_id = ? AND status IN ('pending', 'running')
c98936c166 ORDER BY id`
c98936c167 )
c98936c168 .all(pipelineId) as any[];
603c6c9169
c98936c170 const toCancel = active.slice(0, Math.max(0, active.length - config.concurrency + 1));
c98936c171 for (const row of toCancel) {
c98936c172 this.cancelRun(row.id);
c98936c173 this.db
409bc79174 .prepare(
c98936c175 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ? AND status IN ('pending', 'running')`
409bc79176 )
c98936c177 .run(row.id);
c98936c178 this.db
c98936c179 .prepare(
c98936c180 `UPDATE pipeline_steps SET status = 'skipped' WHERE run_id = ? AND status IN ('pending', 'running')`
c98936c181 )
c98936c182 .run(row.id);
5bcd5db183 this.eventBus?.publish({
5bcd5db184 type: "run:cancelled",
5bcd5db185 runId: row.id,
5bcd5db186 repoId,
5bcd5db187 status: "cancelled",
5bcd5db188 run: this.getRunRow(row.id),
5bcd5db189 ts: new Date().toISOString(),
5bcd5db190 });
409bc79191 }
c98936c192 }
80fafdf193
c98936c194 const runId = this.createRun(
c98936c195 repoId,
c98936c196 config.name,
c98936c197 file,
c98936c198 "push",
c98936c199 event.branch,
c98936c200 event.newCommitId,
c98936c201 commitMessage,
c98936c202 config.steps
c98936c203 );
c98936c204 queuedRuns.push({ order: config.order ?? 0, runId, config });
c98936c205 }
c98936c206
c98936c207 // Execute queued runs in the background so push handling can return quickly.
c98936c208 // Order groups are still respected: same order runs in parallel, higher orders
c98936c209 // wait for lower-order groups to finish.
c98936c210 void (async () => {
c98936c211 const groups = new Map<number, QueuedRun[]>();
c98936c212 for (const run of queuedRuns) {
c98936c213 if (!groups.has(run.order)) groups.set(run.order, []);
c98936c214 groups.get(run.order)!.push(run);
c98936c215 }
80fafdf216
c98936c217 for (const [, group] of [...groups.entries()].sort(([a], [b]) => a - b)) {
c98936c218 await Promise.all(
c98936c219 group.map((run) =>
c98936c220 this.executePipeline(run.runId, run.config, event.repo, event.branch).catch(
c98936c221 (err) => this.logger.error({ err, runId: run.runId }, "Pipeline execution failed")
c98936c222 )
603c6c9223 )
603c6c9224 );
603c6c9225 }
c98936c226 })().catch((err) =>
c98936c227 this.logger.error(
c98936c228 { err, repo: event.repo, branch: event.branch, commit: event.newCommitId },
c98936c229 "Queued push execution failed"
c98936c230 )
c98936c231 );
80fafdf232 }
80fafdf233
818dc90234 private async getCommitMessage(repo: string, commitId: string): Promise<string | null> {
818dc90235 try {
818dc90236 const res = await fetch(
818dc90237 `${this.bridgeUrl}/repos/${repo}/commit/${commitId}/history?limit=1`
818dc90238 );
818dc90239 if (!res.ok) return null;
818dc90240 const data = await res.json();
818dc90241 const commit = data.commits?.[0];
818dc90242 if (!commit?.message) return null;
818dc90243 return commit.message.split("\n")[0];
818dc90244 } catch {
818dc90245 return null;
818dc90246 }
818dc90247 }
818dc90248
80fafdf249 // --- Pipeline config reading ---
80fafdf250
80fafdf251 private async readPipelineConfigs(
80fafdf252 repo: string,
791afd4253 ref: string
80fafdf254 ): Promise<Array<{ file: string; config: PipelineConfig }>> {
80fafdf255 try {
80fafdf256 const treeRes = await fetch(
791afd4257 `${this.bridgeUrl}/repos/${repo}/tree/${ref}/.canopy`
80fafdf258 );
80fafdf259 if (!treeRes.ok) return [];
80fafdf260 const tree = await treeRes.json();
80fafdf261
80fafdf262 const results: Array<{ file: string; config: PipelineConfig }> = [];
80fafdf263 for (const entry of tree.entries) {
80fafdf264 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
80fafdf265 continue;
80fafdf266
80fafdf267 const blobRes = await fetch(
791afd4268 `${this.bridgeUrl}/repos/${repo}/blob/${ref}/.canopy/${entry.name}`
80fafdf269 );
80fafdf270 if (!blobRes.ok) continue;
80fafdf271 const blob = await blobRes.json();
80fafdf272
80fafdf273 try {
80fafdf274 const config = parseYaml(blob.content) as PipelineConfig;
80fafdf275 if (config.name && config.on && config.steps) {
80fafdf276 results.push({ file: `.canopy/${entry.name}`, config });
80fafdf277 }
80fafdf278 } catch {
80fafdf279 /* invalid YAML, skip */
80fafdf280 }
80fafdf281 }
80fafdf282 return results;
80fafdf283 } catch {
80fafdf284 return [];
80fafdf285 }
80fafdf286 }
80fafdf287
80fafdf288 // --- Trigger matching ---
80fafdf289
80fafdf290 private matchesTrigger(
80fafdf291 config: PipelineConfig,
d948b49292 file: string,
80fafdf293 branch: string,
80fafdf294 changedFiles: string[] | null
80fafdf295 ): boolean {
80fafdf296 const push = config.on.push;
80fafdf297 if (!push) return false;
80fafdf298
80fafdf299 if (push.branches && push.branches.length > 0) {
80fafdf300 const branchMatches = push.branches.some((pattern) =>
80fafdf301 minimatch(branch, pattern)
80fafdf302 );
80fafdf303 if (!branchMatches) return false;
80fafdf304 }
80fafdf305
80fafdf306 if (push.paths && push.paths.length > 0 && changedFiles !== null) {
d948b49307 // Always trigger if the pipeline's own config file was changed
d948b49308 const selfChanged = changedFiles.includes(file);
d948b49309 if (!selfChanged) {
d948b49310 const anyPathMatches = changedFiles.some((f) =>
d948b49311 push.paths!.some((pattern) => minimatch(f, pattern))
d948b49312 );
d948b49313 if (!anyPathMatches) return false;
d948b49314 }
80fafdf315 }
80fafdf316
80fafdf317 return true;
80fafdf318 }
80fafdf319
80fafdf320 private async getChangedFiles(
80fafdf321 repo: string,
80fafdf322 oldId: string,
80fafdf323 newId: string
791afd4324 ): Promise<string[] | null> {
80fafdf325 try {
80fafdf326 const diffRes = await fetch(
80fafdf327 `${this.bridgeUrl}/repos/${repo}/diff/${oldId}/${newId}`
80fafdf328 );
791afd4329 if (!diffRes.ok) return null;
80fafdf330 const data = await diffRes.json();
80fafdf331 return (data.diffs || []).map((d: any) => d.path);
80fafdf332 } catch {
791afd4333 return null;
80fafdf334 }
80fafdf335 }
80fafdf336
791afd4337 // --- Source checkout via grove-bridge API ---
791afd4338
791afd4339 private async checkoutFromBridge(
791afd4340 repo: string,
791afd4341 ref: string,
791afd4342 destPath: string
791afd4343 ): Promise<void> {
10943a1344 const url = `${this.bridgeUrl}/repos/${repo}/archive/${ref}`;
10943a1345 const res = await fetch(url);
10943a1346 if (!res.ok) {
10943a1347 throw new Error(`Archive fetch failed: ${res.status} ${res.statusText}`);
10943a1348 }
791afd4349
10943a1350 mkdirSync(destPath, { recursive: true });
791afd4351
10943a1352 const tarData = Buffer.from(await res.arrayBuffer());
10943a1353 execFileSync("tar", ["xf", "-", "-C", destPath], { input: tarData });
791afd4354 }
791afd4355
80fafdf356 // --- Run/step record creation ---
80fafdf357
80fafdf358 private createRun(
80fafdf359 repoId: number,
80fafdf360 pipelineName: string,
80fafdf361 pipelineFile: string,
80fafdf362 triggerType: string,
80fafdf363 triggerRef: string,
80fafdf364 commitId: string,
818dc90365 commitMessage: string | null,
80fafdf366 steps: StepConfig[]
80fafdf367 ): number {
37f6938368 const pipelineId = this.ensurePipeline(repoId, pipelineName, pipelineFile);
37f6938369
80fafdf370 const result = this.db
80fafdf371 .prepare(
37f6938372 `INSERT INTO pipeline_runs (pipeline_id, repo_id, pipeline_name, pipeline_file, trigger_type, trigger_ref, commit_id, commit_message, status)
37f6938373 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending')`
80fafdf374 )
37f6938375 .run(pipelineId, repoId, pipelineName, pipelineFile, triggerType, triggerRef, commitId, commitMessage);
80fafdf376
80fafdf377 const runId = Number(result.lastInsertRowid);
80fafdf378
80fafdf379 const insertStep = this.db.prepare(
80fafdf380 `INSERT INTO pipeline_steps (run_id, step_index, name, image, status)
80fafdf381 VALUES (?, ?, ?, ?, 'pending')`
80fafdf382 );
80fafdf383
80fafdf384 for (let i = 0; i < steps.length; i++) {
80fafdf385 insertStep.run(runId, i, steps[i].name, steps[i].image);
80fafdf386 }
80fafdf387
5bcd5db388 this.eventBus?.publish({
5bcd5db389 type: "run:created",
5bcd5db390 runId,
5bcd5db391 repoId,
5bcd5db392 status: "pending",
5bcd5db393 run: this.getRunRow(runId),
5bcd5db394 ts: new Date().toISOString(),
5bcd5db395 });
5bcd5db396
80fafdf397 return runId;
80fafdf398 }
80fafdf399
80fafdf400 // --- Pipeline execution ---
80fafdf401
80fafdf402 private async executePipeline(
80fafdf403 runId: number,
80fafdf404 config: PipelineConfig,
80fafdf405 repoName: string,
791afd4406 ref: string
80fafdf407 ): Promise<void> {
80fafdf408 const abort = new AbortController();
80fafdf409 this.running.set(runId, abort);
80fafdf410
5bcd5db411 const repoRow = this.db
5bcd5db412 .prepare(`SELECT id FROM repos WHERE name = ?`)
5bcd5db413 .get(repoName) as any;
5bcd5db414 const repoId = repoRow?.id ?? 0;
5bcd5db415
80fafdf416 this.db
80fafdf417 .prepare(
80fafdf418 `UPDATE pipeline_runs SET status = 'running', started_at = datetime('now') WHERE id = ?`
80fafdf419 )
80fafdf420 .run(runId);
80fafdf421
5bcd5db422 this.eventBus?.publish({
5bcd5db423 type: "run:started",
5bcd5db424 runId,
5bcd5db425 repoId,
5bcd5db426 status: "running",
5bcd5db427 run: this.getRunRow(runId),
5bcd5db428 ts: new Date().toISOString(),
5bcd5db429 });
5bcd5db430
80fafdf431 const workspacePath = `${this.workspaceDir}/${runId}`;
791afd4432 const srcPath = `${workspacePath}/src`;
791afd4433 mkdirSync(srcPath, { recursive: true });
80fafdf434
f60476c435 if (config.checkout !== false) {
f60476c436 try {
f60476c437 await this.checkoutFromBridge(repoName, ref, srcPath);
f60476c438 } catch (err) {
f60476c439 this.logger.error({ err, runId }, "Failed to checkout repo for pipeline");
f60476c440 this.db
f60476c441 .prepare(
f60476c442 `UPDATE pipeline_runs SET status = 'failed', finished_at = datetime('now') WHERE id = ?`
f60476c443 )
f60476c444 .run(runId);
5bcd5db445 this.eventBus?.publish({
5bcd5db446 type: "run:completed",
5bcd5db447 runId,
5bcd5db448 repoId,
5bcd5db449 status: "failed",
5bcd5db450 run: this.getRunRow(runId),
5bcd5db451 ts: new Date().toISOString(),
5bcd5db452 });
f60476c453 this.running.delete(runId);
f60476c454 return;
f60476c455 }
80fafdf456 }
80fafdf457
80fafdf458 const secrets = repoRow ? this.loadSecrets(repoRow.id) : {};
80fafdf459
80fafdf460 const steps = this.db
80fafdf461 .prepare(
80fafdf462 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
80fafdf463 )
80fafdf464 .all(runId) as any[];
80fafdf465 let allPassed = true;
80fafdf466
80fafdf467 for (const step of steps) {
80fafdf468 if (abort.signal.aborted || !allPassed) {
80fafdf469 this.db
80fafdf470 .prepare(`UPDATE pipeline_steps SET status = 'skipped' WHERE id = ?`)
80fafdf471 .run(step.id);
5bcd5db472 this.eventBus?.publish({
5bcd5db473 type: "step:skipped",
5bcd5db474 runId,
5bcd5db475 repoId,
5bcd5db476 stepId: step.id,
5bcd5db477 stepIndex: step.step_index,
5bcd5db478 status: "skipped",
5bcd5db479 step: this.getStepRow(step.id),
5bcd5db480 ts: new Date().toISOString(),
5bcd5db481 });
80fafdf482 continue;
80fafdf483 }
80fafdf484
80fafdf485 const stepConfig = config.steps[step.step_index];
80fafdf486 const mergedEnv = { ...config.env, ...stepConfig.env };
80fafdf487 const resolvedEnv = this.resolveSecrets(mergedEnv, secrets);
80fafdf488
80fafdf489 const passed = await this.executeStep(
5bcd5db490 runId,
5bcd5db491 repoId,
80fafdf492 step.id,
5bcd5db493 step.step_index,
80fafdf494 stepConfig,
80fafdf495 resolvedEnv,
80fafdf496 `${workspacePath}/src`,
80fafdf497 abort.signal
80fafdf498 );
80fafdf499 if (!passed) allPassed = false;
80fafdf500 }
80fafdf501
80fafdf502 const run = this.db
80fafdf503 .prepare(`SELECT started_at FROM pipeline_runs WHERE id = ?`)
80fafdf504 .get(runId) as any;
80fafdf505 const durationMs = run?.started_at
80fafdf506 ? Date.now() - new Date(run.started_at + "Z").getTime()
80fafdf507 : 0;
80fafdf508
5bcd5db509 const finalStatus = allPassed ? "passed" : "failed";
80fafdf510 this.db
80fafdf511 .prepare(
80fafdf512 `UPDATE pipeline_runs SET status = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
80fafdf513 )
5bcd5db514 .run(finalStatus, durationMs, runId);
5bcd5db515
5bcd5db516 this.eventBus?.publish({
5bcd5db517 type: "run:completed",
5bcd5db518 runId,
5bcd5db519 repoId,
5bcd5db520 status: finalStatus,
5bcd5db521 run: this.getRunRow(runId),
5bcd5db522 ts: new Date().toISOString(),
5bcd5db523 });
80fafdf524
80fafdf525 try {
80fafdf526 rmSync(workspacePath, { recursive: true, force: true });
80fafdf527 } catch {}
80fafdf528 this.running.delete(runId);
80fafdf529 }
80fafdf530
80fafdf531 // --- Step execution (Docker container) ---
80fafdf532
1e64dbc533 private toHostPath(containerPath: string): string {
1e64dbc534 if (containerPath.startsWith(this.workspaceDir)) {
1e64dbc535 return this.workspaceHostDir + containerPath.slice(this.workspaceDir.length);
1e64dbc536 }
1e64dbc537 return containerPath;
1e64dbc538 }
1e64dbc539
80fafdf540 private executeStep(
5bcd5db541 runId: number,
5bcd5db542 repoId: number,
80fafdf543 stepId: number,
5bcd5db544 stepIndex: number,
80fafdf545 step: StepConfig,
80fafdf546 env: Record<string, string>,
80fafdf547 workspacePath: string,
80fafdf548 signal: AbortSignal
80fafdf549 ): Promise<boolean> {
80fafdf550 return new Promise((resolve) => {
80fafdf551 this.db
80fafdf552 .prepare(
80fafdf553 `UPDATE pipeline_steps SET status = 'running', started_at = datetime('now') WHERE id = ?`
80fafdf554 )
80fafdf555 .run(stepId);
80fafdf556
5bcd5db557 this.eventBus?.publish({
5bcd5db558 type: "step:started",
5bcd5db559 runId,
5bcd5db560 repoId,
5bcd5db561 stepId,
5bcd5db562 stepIndex,
5bcd5db563 status: "running",
5bcd5db564 step: this.getStepRow(stepId),
5bcd5db565 ts: new Date().toISOString(),
5bcd5db566 });
5bcd5db567
80fafdf568 const timeout = (step.timeout ?? 600) * 1000;
5f0fbcf569 const envArgs = Object.entries({
5f0fbcf570 GROVE_REGISTRY: "localhost:5000",
5f0fbcf571 ...env,
5f0fbcf572 }).flatMap(([k, v]) => ["-e", `${k}=${v}`]);
80fafdf573
1e64dbc574 const hostWorkspacePath = this.toHostPath(workspacePath);
1e64dbc575
7422c65576 const volumeArgs = [
7422c65577 "-v", `${hostWorkspacePath}:/workspace`,
7422c65578 "-v", "/var/run/docker.sock:/var/run/docker.sock",
7422c65579 "-v", "/opt/grove:/opt/grove:ro",
7422c65580 ...(step.volumes ?? []).flatMap((v) => ["-v", v]),
7422c65581 ];
7422c65582
57c315f583 const containerName = `canopy-${runId}-${stepIndex}`;
80fafdf584 const proc = spawn("docker", [
80fafdf585 "run",
80fafdf586 "--rm",
57c315f587 "--name",
57c315f588 containerName,
7422c65589 ...volumeArgs,
80fafdf590 "-w",
80fafdf591 "/workspace",
80fafdf592 "--network",
80fafdf593 "host",
80fafdf594 ...envArgs,
80fafdf595 step.image,
80fafdf596 "sh",
00c0ecf597 "-ec",
80fafdf598 step.run,
80fafdf599 ]);
80fafdf600
80fafdf601 const insertLog = this.db.prepare(
80fafdf602 `INSERT INTO step_logs (step_id, stream, content) VALUES (?, ?, ?)`
80fafdf603 );
80fafdf604
80fafdf605 proc.stdout.on("data", (data: Buffer) => {
5bcd5db606 const content = data.toString();
5bcd5db607 insertLog.run(stepId, "stdout", content);
5bcd5db608 this.eventBus?.publish({
5bcd5db609 type: "log:append",
5bcd5db610 runId,
5bcd5db611 repoId,
5bcd5db612 stepId,
5bcd5db613 stepIndex,
5bcd5db614 log: { stream: "stdout", content, created_at: new Date().toISOString() },
5bcd5db615 ts: new Date().toISOString(),
5bcd5db616 });
80fafdf617 });
80fafdf618
80fafdf619 proc.stderr.on("data", (data: Buffer) => {
5bcd5db620 const content = data.toString();
5bcd5db621 insertLog.run(stepId, "stderr", content);
5bcd5db622 this.eventBus?.publish({
5bcd5db623 type: "log:append",
5bcd5db624 runId,
5bcd5db625 repoId,
5bcd5db626 stepId,
5bcd5db627 stepIndex,
5bcd5db628 log: { stream: "stderr", content, created_at: new Date().toISOString() },
5bcd5db629 ts: new Date().toISOString(),
5bcd5db630 });
80fafdf631 });
80fafdf632
80fafdf633 const timer = setTimeout(() => {
80fafdf634 proc.kill("SIGTERM");
80fafdf635 insertLog.run(
80fafdf636 stepId,
80fafdf637 "stderr",
80fafdf638 `Step timed out after ${step.timeout ?? 600}s`
80fafdf639 );
80fafdf640 }, timeout);
80fafdf641
80fafdf642 const abortHandler = () => {
57c315f643 spawn("docker", ["kill", containerName]).on("close", () => {
57c315f644 proc.kill("SIGTERM");
57c315f645 });
80fafdf646 insertLog.run(stepId, "stderr", "Step cancelled");
80fafdf647 };
80fafdf648 signal.addEventListener("abort", abortHandler, { once: true });
80fafdf649
80fafdf650 proc.on("close", (code) => {
80fafdf651 clearTimeout(timer);
80fafdf652 signal.removeEventListener("abort", abortHandler);
80fafdf653
80fafdf654 const startedAt = this.db
80fafdf655 .prepare(`SELECT started_at FROM pipeline_steps WHERE id = ?`)
80fafdf656 .get(stepId) as any;
80fafdf657 const durationMs = startedAt?.started_at
80fafdf658 ? Date.now() - new Date(startedAt.started_at + "Z").getTime()
80fafdf659 : 0;
80fafdf660
80fafdf661 const passed = code === 0;
5bcd5db662 const stepStatus = passed ? "passed" : "failed";
80fafdf663 this.db
80fafdf664 .prepare(
80fafdf665 `UPDATE pipeline_steps SET status = ?, exit_code = ?, finished_at = datetime('now'), duration_ms = ? WHERE id = ?`
80fafdf666 )
5bcd5db667 .run(stepStatus, code, durationMs, stepId);
5bcd5db668
5bcd5db669 this.eventBus?.publish({
5bcd5db670 type: "step:completed",
5bcd5db671 runId,
5bcd5db672 repoId,
5bcd5db673 stepId,
5bcd5db674 stepIndex,
5bcd5db675 status: stepStatus,
5bcd5db676 step: this.getStepRow(stepId),
5bcd5db677 ts: new Date().toISOString(),
5bcd5db678 });
80fafdf679
80fafdf680 resolve(passed);
80fafdf681 });
80fafdf682 });
80fafdf683 }
80fafdf684
80fafdf685 // --- Secrets ---
80fafdf686
80fafdf687 private deriveKey(): Buffer {
80fafdf688 return createHash("sha256")
80fafdf689 .update(this.jwtSecret + ":canopy-secrets")
80fafdf690 .digest();
80fafdf691 }
80fafdf692
80fafdf693 encryptSecret(plaintext: string): string {
80fafdf694 const key = this.deriveKey();
80fafdf695 const iv = randomBytes(12);
80fafdf696 const cipher = createCipheriv("aes-256-gcm", key, iv);
80fafdf697 const encrypted = Buffer.concat([
80fafdf698 cipher.update(plaintext, "utf8"),
80fafdf699 cipher.final(),
80fafdf700 ]);
80fafdf701 const tag = cipher.getAuthTag();
80fafdf702 return Buffer.concat([iv, tag, encrypted]).toString("base64");
80fafdf703 }
80fafdf704
80fafdf705 private decrypt(encoded: string): string {
80fafdf706 const key = this.deriveKey();
80fafdf707 const buf = Buffer.from(encoded, "base64");
80fafdf708 const iv = buf.subarray(0, 12);
80fafdf709 const tag = buf.subarray(12, 28);
80fafdf710 const data = buf.subarray(28);
80fafdf711 const decipher = createDecipheriv("aes-256-gcm", key, iv);
80fafdf712 decipher.setAuthTag(tag);
80fafdf713 return decipher.update(data).toString("utf8") + decipher.final("utf8");
80fafdf714 }
80fafdf715
80fafdf716 private loadSecrets(repoId: number): Record<string, string> {
80fafdf717 const rows = this.db
80fafdf718 .prepare(
80fafdf719 `SELECT name, encrypted_value FROM canopy_secrets WHERE repo_id = ?`
80fafdf720 )
80fafdf721 .all(repoId) as any[];
80fafdf722 const secrets: Record<string, string> = {};
80fafdf723 for (const row of rows) {
80fafdf724 try {
80fafdf725 secrets[row.name] = this.decrypt(row.encrypted_value);
80fafdf726 } catch {}
80fafdf727 }
80fafdf728 return secrets;
80fafdf729 }
80fafdf730
80fafdf731 private resolveSecrets(
80fafdf732 env: Record<string, string>,
80fafdf733 secrets: Record<string, string>
80fafdf734 ): Record<string, string> {
80fafdf735 const resolved: Record<string, string> = {};
80fafdf736 for (const [key, value] of Object.entries(env)) {
80fafdf737 resolved[key] = value.replace(
80fafdf738 /\$\{\{\s*secrets\.(\w+)\s*\}\}/g,
80fafdf739 (_, name) => secrets[name] ?? ""
80fafdf740 );
80fafdf741 }
80fafdf742 return resolved;
80fafdf743 }
80fafdf744
80fafdf745 cancelRun(runId: number): void {
80fafdf746 const controller = this.running.get(runId);
80fafdf747 if (controller) controller.abort();
80fafdf748 }
80fafdf749}