12.7 KB400 lines
Blame
1import type { FastifyInstance } from "fastify";
2import { z } from "zod";
3import type { CanopyRunner } from "../services/canopy-runner.js";
4import type { CanopyEventBus, CanopyEvent } from "../services/canopy-events.js";
5
6function lookupRepo(db: any, repo: string) {
7 return db
8 .prepare(`SELECT id, name, default_branch FROM repos WHERE name = ?`)
9 .get(repo) as any;
10}
11
12export async function canopyGlobalRoutes(app: FastifyInstance) {
13 const db = (app as any).db;
14 const eventBus = (app as any).canopyEventBus as CanopyEventBus | undefined;
15
16 // SSE endpoint for live updates
17 app.get<{
18 Querystring: { scope?: string; runId?: string; owner?: string; repo?: string };
19 }>("/canopy/events", async (request, reply) => {
20 const { scope = "global", runId, owner, repo } = request.query;
21
22 // Resolve owner/repo to repoId for repo-scoped filtering
23 let repoId: number | null = null;
24 if (scope === "repo" && repo) {
25 const repoRow = lookupRepo(db, repo);
26 if (repoRow) repoId = repoRow.id;
27 }
28
29 const numericRunId = runId ? parseInt(runId) : null;
30
31 reply.raw.writeHead(200, {
32 "Content-Type": "text/event-stream",
33 "Cache-Control": "no-cache",
34 "Connection": "keep-alive",
35 "X-Accel-Buffering": "no",
36 });
37 reply.raw.write(": connected\n\n");
38
39 const keepalive = setInterval(() => {
40 reply.raw.write(": keepalive\n\n");
41 }, 15000);
42
43 const listener = (event: CanopyEvent) => {
44 // Filter by scope
45 if (scope === "run" && event.runId !== numericRunId) return;
46 if (scope === "repo" && repoId !== null && event.repoId !== repoId) return;
47 if (scope === "repo" && event.type === "log:append") return;
48 if (scope === "global" && !["run:created", "run:completed", "run:cancelled", "run:started"].includes(event.type)) return;
49
50 try {
51 reply.raw.write(`event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`);
52 } catch {
53 // Client disconnected
54 }
55 };
56
57 const unsubscribe = eventBus?.subscribe(listener);
58
59 request.raw.on("close", () => {
60 clearInterval(keepalive);
61 unsubscribe?.();
62 });
63
64 await reply.hijack();
65 });
66
67 // Recent runs across all repos (optionally filtered by owner).
68 // Returns the N most recent runs per repo so active repos don't crowd out others.
69 app.get<{
70 Querystring: { limit?: string; owner?: string; per_repo?: string };
71 }>("/canopy/recent-runs", async (request) => {
72 const perRepo = parseInt(request.query.per_repo ?? "30") || 30;
73 const owner = request.query.owner;
74 const ownerClause = owner ? `WHERE rwo.owner_name = ?` : "";
75 const query = `
76 SELECT * FROM (
77 SELECT pr.*, pr.pipeline_name, r.name AS repo_name, rwo.owner_name,
78 ROW_NUMBER() OVER (PARTITION BY pr.repo_id ORDER BY pr.created_at DESC) AS rn
79 FROM pipeline_runs pr
80 JOIN repos r ON pr.repo_id = r.id
81 JOIN repos_with_owner rwo ON r.id = rwo.id
82 ${ownerClause}
83 ) WHERE rn <= ?
84 ORDER BY created_at DESC`;
85 const params: any[] = [];
86 if (owner) params.push(owner);
87 params.push(perRepo);
88 const runs = db.prepare(query).all(...params);
89 return { runs };
90 });
91}
92
93export async function canopyWebhookRoute(app: FastifyInstance) {
94 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
95
96 // Internal webhook — no auth (only reachable from Docker network)
97 app.post("/canopy/webhook", async (request, reply) => {
98 const { event, repo, bookmark, old_id, new_id } = request.body as any;
99 if (event !== "push" || !repo || !bookmark || !new_id) {
100 return reply.code(400).send({ error: "Invalid webhook payload" });
101 }
102 if (!runner) {
103 return reply.code(503).send({ error: "Canopy not enabled" });
104 }
105 try {
106 await runner.onPush({
107 repo,
108 branch: bookmark,
109 oldCommitId: old_id ?? "",
110 newCommitId: new_id,
111 });
112 } catch {
113 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
114 }
115 return { ok: true };
116 });
117}
118
119export async function canopyRoutes(app: FastifyInstance) {
120 const db = (app as any).db;
121 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
122
123 const RUN_SELECT = `
124 SELECT
125 pr.*,
126 COALESCE(p.name, pr.pipeline_name) AS pipeline_name,
127 COALESCE(p.file, pr.pipeline_file) AS pipeline_file,
128 pr.repo_id
129 FROM pipeline_runs pr
130 LEFT JOIN pipelines p ON pr.pipeline_id = p.id`;
131
132 // List pipeline runs
133 app.get<{
134 Params: { owner: string; repo: string };
135 Querystring: { status?: string; limit?: string };
136 }>("/:owner/:repo/canopy/runs", async (request, reply) => {
137 const { repo } = request.params;
138 const { status, limit } = request.query;
139 const repoRow = lookupRepo(db, repo);
140 if (!repoRow) return { runs: [] };
141
142 let query = `${RUN_SELECT} WHERE pr.repo_id = ?`;
143 const params: any[] = [repoRow.id];
144 if (status) {
145 query += ` AND pr.status = ?`;
146 params.push(status);
147 }
148 query += ` ORDER BY pr.created_at DESC LIMIT ?`;
149 params.push(parseInt(limit ?? "20") || 20);
150
151 const runs = db.prepare(query).all(...params);
152 return { runs };
153 });
154
155 // Get single run with steps
156 app.get<{
157 Params: { owner: string; repo: string; id: string };
158 }>("/:owner/:repo/canopy/runs/:id", async (request, reply) => {
159 const { id } = request.params;
160 const run = db
161 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
162 .get(parseInt(id));
163 if (!run) return reply.code(404).send({ error: "Run not found" });
164
165 const steps = db
166 .prepare(
167 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
168 )
169 .all(parseInt(id));
170
171 return { run, steps };
172 });
173
174 // Get step logs
175 app.get<{
176 Params: { owner: string; repo: string; id: string; idx: string };
177 }>("/:owner/:repo/canopy/runs/:id/logs/:idx", async (request, reply) => {
178 const { id, idx } = request.params;
179 const step = db
180 .prepare(
181 `SELECT id FROM pipeline_steps WHERE run_id = ? AND step_index = ?`
182 )
183 .get(parseInt(id), parseInt(idx)) as any;
184 if (!step) return reply.code(404).send({ error: "Step not found" });
185
186 const logs = db
187 .prepare(
188 `SELECT stream, content, created_at FROM step_logs WHERE step_id = ? ORDER BY id`
189 )
190 .all(step.id);
191
192 return { logs };
193 });
194
195 // Cancel a running pipeline
196 app.post<{
197 Params: { owner: string; repo: string; id: string };
198 }>(
199 "/:owner/:repo/canopy/runs/:id/cancel",
200 { preHandler: [(app as any).authenticate] },
201 async (request, reply) => {
202 if (!runner) {
203 return reply.code(503).send({ error: "Canopy not enabled" });
204 }
205 const { id } = request.params;
206 const run = db
207 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
208 .get(parseInt(id)) as any;
209 if (!run || (run.status !== "running" && run.status !== "pending")) {
210 return reply.code(400).send({ error: "Run is not running or pending" });
211 }
212
213 runner.cancelRun(parseInt(id));
214 db.prepare(
215 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ?`
216 ).run(parseInt(id));
217 // Also cancel any pending/running steps
218 db.prepare(
219 `UPDATE pipeline_steps SET status = 'cancelled' WHERE run_id = ? AND status IN ('pending', 'running')`
220 ).run(parseInt(id));
221
222 const updated = db
223 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
224 .get(parseInt(id));
225 return { run: updated };
226 }
227 );
228
229 // Manual trigger
230 app.post<{
231 Params: { owner: string; repo: string };
232 }>(
233 "/:owner/:repo/canopy/trigger",
234 { preHandler: [(app as any).authenticate] },
235 async (request, reply) => {
236 if (!runner) {
237 return reply.code(503).send({ error: "Canopy not enabled" });
238 }
239 const { owner, repo } = request.params;
240 const { ref, pipeline } = (request.body as any) ?? {};
241 const branch = ref || "main";
242 const bridgeUrl =
243 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
244 const resolveRes = await fetch(
245 `${bridgeUrl}/repos/${repo}/resolve/${branch}`
246 );
247 if (!resolveRes.ok) {
248 return reply.code(400).send({ error: "Could not resolve branch" });
249 }
250 const { commit_id } = (await resolveRes.json()) as any;
251
252 try {
253 await runner.onPush({
254 repo,
255 branch,
256 oldCommitId: "",
257 newCommitId: commit_id,
258 }, pipeline);
259 } catch {
260 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
261 }
262
263 // Deploy pages if applicable
264 const pagesDeployer = (app as any).pagesDeployer;
265 if (pagesDeployer) {
266 void pagesDeployer.deploy(owner, repo, branch).catch((err: any) => {
267 app.log.error({ err, repo, branch }, "Pages deployment failed (manual trigger)");
268 });
269 }
270
271 return { triggered: true, branch, commit_id };
272 }
273 );
274
275 // List pipeline definitions from repo HEAD
276 app.get<{
277 Params: { owner: string; repo: string };
278 }>("/:owner/:repo/canopy/pipelines", async (request, reply) => {
279 const { repo } = request.params;
280 const bridgeUrl =
281 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
282 const branch = "main";
283
284 try {
285 const treeRes = await fetch(
286 `${bridgeUrl}/repos/${repo}/tree/${branch}/.canopy`
287 );
288 if (!treeRes.ok) return { pipelines: [] };
289 const tree = await treeRes.json();
290
291 const pipelines: Array<{ file: string; name: string }> = [];
292 for (const entry of (tree as any).entries) {
293 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
294 continue;
295 const blobRes = await fetch(
296 `${bridgeUrl}/repos/${repo}/blob/${branch}/.canopy/${entry.name}`
297 );
298 if (!blobRes.ok) continue;
299 const blob = (await blobRes.json()) as any;
300 try {
301 const { parse: parseYaml } = await import("yaml");
302 const config = parseYaml(blob.content);
303 if (config?.name) {
304 pipelines.push({ file: `.canopy/${entry.name}`, name: config.name });
305 }
306 } catch {}
307 }
308 return { pipelines };
309 } catch {
310 return { pipelines: [] };
311 }
312 });
313
314 // --- Secrets CRUD ---
315
316 const createSecretSchema = z.object({
317 name: z
318 .string()
319 .min(1)
320 .max(100)
321 .regex(/^\w+$/),
322 value: z.string().min(1),
323 });
324
325 // List secret names (no values)
326 app.get<{ Params: { owner: string; repo: string } }>(
327 "/:owner/:repo/canopy/secrets",
328 { preHandler: [(app as any).authenticate] },
329 async (request, reply) => {
330 const { owner, repo } = request.params;
331 const repoRow = lookupRepo(db, repo);
332 if (!repoRow)
333 return reply.code(404).send({ error: "Repository not found" });
334
335 const secrets = db
336 .prepare(
337 `SELECT name, created_at, updated_at FROM canopy_secrets WHERE repo_id = ? ORDER BY name`
338 )
339 .all(repoRow.id);
340 return { secrets };
341 }
342 );
343
344 // Create/update secret
345 app.post<{ Params: { owner: string; repo: string } }>(
346 "/:owner/:repo/canopy/secrets",
347 { preHandler: [(app as any).authenticate] },
348 async (request, reply) => {
349 if (!runner) {
350 return reply.code(503).send({ error: "Canopy not enabled" });
351 }
352 const parsed = createSecretSchema.safeParse(request.body);
353 if (!parsed.success) {
354 return reply.code(400).send({ error: parsed.error.flatten() });
355 }
356
357 const { owner, repo } = request.params;
358 const repoRow = lookupRepo(db, repo);
359 if (!repoRow)
360 return reply.code(404).send({ error: "Repository not found" });
361
362 const { name, value } = parsed.data;
363 const encrypted = runner.encryptSecret(value);
364
365 db.prepare(
366 `INSERT INTO canopy_secrets (repo_id, name, encrypted_value)
367 VALUES (?, ?, ?)
368 ON CONFLICT(repo_id, name) DO UPDATE SET
369 encrypted_value = excluded.encrypted_value,
370 updated_at = datetime('now')`
371 ).run(repoRow.id, name, encrypted);
372
373 return { ok: true };
374 }
375 );
376
377 // Delete secret
378 app.delete<{ Params: { owner: string; repo: string; name: string } }>(
379 "/:owner/:repo/canopy/secrets/:name",
380 { preHandler: [(app as any).authenticate] },
381 async (request, reply) => {
382 const { owner, repo, name } = request.params;
383 const repoRow = lookupRepo(db, repo);
384 if (!repoRow)
385 return reply.code(404).send({ error: "Repository not found" });
386
387 const result = db
388 .prepare(
389 `DELETE FROM canopy_secrets WHERE repo_id = ? AND name = ?`
390 )
391 .run(repoRow.id, name);
392
393 if (result.changes === 0) {
394 return reply.code(404).send({ error: "Secret not found" });
395 }
396 return reply.code(204).send();
397 }
398 );
399}
400