12.4 KB396 lines
Blame
1import type { FastifyInstance } from "fastify";
2import { z } from "zod";
3import type { CanopyRunner } from "../services/canopy-runner.js";
4import type { CanopyEventBus, CanopyEvent } from "../services/canopy-events.js";
5
6function lookupRepo(db: any, repo: string) {
7 return db
8 .prepare(`SELECT id, name, default_branch FROM repos WHERE name = ?`)
9 .get(repo) as any;
10}
11
12export async function canopyGlobalRoutes(app: FastifyInstance) {
13 const db = (app as any).db;
14 const eventBus = (app as any).canopyEventBus as CanopyEventBus | undefined;
15
16 // SSE endpoint for live updates
17 app.get<{
18 Querystring: { scope?: string; runId?: string; owner?: string; repo?: string };
19 }>("/canopy/events", async (request, reply) => {
20 const { scope = "global", runId, owner, repo } = request.query;
21
22 // Resolve owner/repo to repoId for repo-scoped filtering
23 let repoId: number | null = null;
24 if (scope === "repo" && repo) {
25 const repoRow = lookupRepo(db, repo);
26 if (repoRow) repoId = repoRow.id;
27 }
28
29 const numericRunId = runId ? parseInt(runId) : null;
30
31 reply.raw.writeHead(200, {
32 "Content-Type": "text/event-stream",
33 "Cache-Control": "no-cache",
34 "Connection": "keep-alive",
35 "X-Accel-Buffering": "no",
36 });
37 reply.raw.write(": connected\n\n");
38
39 const keepalive = setInterval(() => {
40 reply.raw.write(": keepalive\n\n");
41 }, 15000);
42
43 const listener = (event: CanopyEvent) => {
44 // Filter by scope
45 if (scope === "run" && event.runId !== numericRunId) return;
46 if (scope === "repo" && repoId !== null && event.repoId !== repoId) return;
47 if (scope === "repo" && event.type === "log:append") return;
48 if (scope === "global" && !["run:created", "run:completed", "run:cancelled", "run:started"].includes(event.type)) return;
49
50 try {
51 reply.raw.write(`event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`);
52 } catch {
53 // Client disconnected
54 }
55 };
56
57 const unsubscribe = eventBus?.subscribe(listener);
58
59 request.raw.on("close", () => {
60 clearInterval(keepalive);
61 unsubscribe?.();
62 });
63
64 await reply.hijack();
65 });
66
67 // Recent runs across all repos (optionally filtered by owner).
68 // Returns the N most recent runs per repo so active repos don't crowd out others.
69 app.get<{
70 Querystring: { limit?: string; owner?: string; per_repo?: string };
71 }>("/canopy/recent-runs", async (request) => {
72 const perRepo = parseInt(request.query.per_repo ?? "30") || 30;
73 const owner = request.query.owner;
74 const ownerClause = owner ? `WHERE rwo.owner_name = ?` : "";
75 const query = `
76 SELECT * FROM (
77 SELECT pr.*, pr.pipeline_name, r.name AS repo_name, rwo.owner_name,
78 ROW_NUMBER() OVER (PARTITION BY pr.repo_id ORDER BY pr.created_at DESC) AS rn
79 FROM pipeline_runs pr
80 JOIN repos r ON pr.repo_id = r.id
81 JOIN repos_with_owner rwo ON r.id = rwo.id
82 ${ownerClause}
83 ) WHERE rn <= ?
84 ORDER BY created_at DESC`;
85 const params: any[] = [];
86 if (owner) params.push(owner);
87 params.push(perRepo);
88 const runs = db.prepare(query).all(...params);
89 return { runs };
90 });
91}
92
93export async function canopyWebhookRoute(app: FastifyInstance) {
94 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
95
96 // Internal webhook — no auth (only reachable from Docker network)
97 app.post("/canopy/webhook", async (request, reply) => {
98 const { event, repo, bookmark, old_id, new_id } = request.body as any;
99 if (event !== "push" || !repo || !bookmark || !new_id) {
100 return reply.code(400).send({ error: "Invalid webhook payload" });
101 }
102 if (!runner) {
103 return reply.code(503).send({ error: "Canopy not enabled" });
104 }
105 try {
106 await runner.onPush({
107 repo,
108 branch: bookmark,
109 oldCommitId: old_id ?? "",
110 newCommitId: new_id,
111 });
112 } catch {
113 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
114 }
115 return { ok: true };
116 });
117}
118
119export async function canopyRoutes(app: FastifyInstance) {
120 const db = (app as any).db;
121 const runner = (app as any).canopyRunner as CanopyRunner | undefined;
122
123 const RUN_SELECT = `
124 SELECT
125 pr.*,
126 COALESCE(p.name, pr.pipeline_name) AS pipeline_name,
127 COALESCE(p.file, pr.pipeline_file) AS pipeline_file,
128 pr.repo_id
129 FROM pipeline_runs pr
130 LEFT JOIN pipelines p ON pr.pipeline_id = p.id`;
131
132 // List pipeline runs
133 app.get<{
134 Params: { owner: string; repo: string };
135 Querystring: { status?: string; limit?: string };
136 }>("/:owner/:repo/canopy/runs", async (request, reply) => {
137 const { repo } = request.params;
138 const { status, limit } = request.query;
139 const repoRow = lookupRepo(db, repo);
140 if (!repoRow) return { runs: [] };
141
142 let query = `${RUN_SELECT} WHERE pr.repo_id = ?`;
143 const params: any[] = [repoRow.id];
144 if (status) {
145 query += ` AND pr.status = ?`;
146 params.push(status);
147 }
148 query += ` ORDER BY pr.created_at DESC LIMIT ?`;
149 params.push(parseInt(limit ?? "20") || 20);
150
151 const runs = db.prepare(query).all(...params);
152 return { runs };
153 });
154
155 // Get single run with steps
156 app.get<{
157 Params: { owner: string; repo: string; id: string };
158 }>("/:owner/:repo/canopy/runs/:id", async (request, reply) => {
159 const { id } = request.params;
160 const run = db
161 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
162 .get(parseInt(id));
163 if (!run) return reply.code(404).send({ error: "Run not found" });
164
165 const steps = db
166 .prepare(
167 `SELECT * FROM pipeline_steps WHERE run_id = ? ORDER BY step_index`
168 )
169 .all(parseInt(id));
170
171 return { run, steps };
172 });
173
174 // Get step logs
175 app.get<{
176 Params: { owner: string; repo: string; id: string; idx: string };
177 }>("/:owner/:repo/canopy/runs/:id/logs/:idx", async (request, reply) => {
178 const { id, idx } = request.params;
179 const step = db
180 .prepare(
181 `SELECT id FROM pipeline_steps WHERE run_id = ? AND step_index = ?`
182 )
183 .get(parseInt(id), parseInt(idx)) as any;
184 if (!step) return reply.code(404).send({ error: "Step not found" });
185
186 const logs = db
187 .prepare(
188 `SELECT stream, content, created_at FROM step_logs WHERE step_id = ? ORDER BY id`
189 )
190 .all(step.id);
191
192 return { logs };
193 });
194
195 // Cancel a running pipeline
196 app.post<{
197 Params: { owner: string; repo: string; id: string };
198 }>(
199 "/:owner/:repo/canopy/runs/:id/cancel",
200 { preHandler: [(app as any).authenticate] },
201 async (request, reply) => {
202 if (!runner) {
203 return reply.code(503).send({ error: "Canopy not enabled" });
204 }
205 const { id } = request.params;
206 const run = db
207 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
208 .get(parseInt(id)) as any;
209 if (!run || run.status !== "running") {
210 return reply.code(400).send({ error: "Run is not running" });
211 }
212
213 runner.cancelRun(parseInt(id));
214 db.prepare(
215 `UPDATE pipeline_runs SET status = 'cancelled', finished_at = datetime('now') WHERE id = ?`
216 ).run(parseInt(id));
217
218 const updated = db
219 .prepare(`${RUN_SELECT} WHERE pr.id = ?`)
220 .get(parseInt(id));
221 return { run: updated };
222 }
223 );
224
225 // Manual trigger
226 app.post<{
227 Params: { owner: string; repo: string };
228 }>(
229 "/:owner/:repo/canopy/trigger",
230 { preHandler: [(app as any).authenticate] },
231 async (request, reply) => {
232 if (!runner) {
233 return reply.code(503).send({ error: "Canopy not enabled" });
234 }
235 const { owner, repo } = request.params;
236 const { ref } = (request.body as any) ?? {};
237 const branch = ref || "main";
238 const bridgeUrl =
239 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
240 const resolveRes = await fetch(
241 `${bridgeUrl}/repos/${repo}/resolve/${branch}`
242 );
243 if (!resolveRes.ok) {
244 return reply.code(400).send({ error: "Could not resolve branch" });
245 }
246 const { commit_id } = (await resolveRes.json()) as any;
247
248 try {
249 await runner.onPush({
250 repo,
251 branch,
252 oldCommitId: "",
253 newCommitId: commit_id,
254 });
255 } catch {
256 return reply.code(500).send({ error: "Failed to queue pipeline runs" });
257 }
258
259 // Deploy pages if applicable
260 const pagesDeployer = (app as any).pagesDeployer;
261 if (pagesDeployer) {
262 void pagesDeployer.deploy(owner, repo, branch).catch((err: any) => {
263 app.log.error({ err, repo, branch }, "Pages deployment failed (manual trigger)");
264 });
265 }
266
267 return { triggered: true, branch, commit_id };
268 }
269 );
270
271 // List pipeline definitions from repo HEAD
272 app.get<{
273 Params: { owner: string; repo: string };
274 }>("/:owner/:repo/canopy/pipelines", async (request, reply) => {
275 const { repo } = request.params;
276 const bridgeUrl =
277 process.env.GROVE_BRIDGE_URL ?? "http://localhost:3100";
278 const branch = "main";
279
280 try {
281 const treeRes = await fetch(
282 `${bridgeUrl}/repos/${repo}/tree/${branch}/.canopy`
283 );
284 if (!treeRes.ok) return { pipelines: [] };
285 const tree = await treeRes.json();
286
287 const pipelines: Array<{ file: string; name: string }> = [];
288 for (const entry of (tree as any).entries) {
289 if (!entry.name.endsWith(".yml") && !entry.name.endsWith(".yaml"))
290 continue;
291 const blobRes = await fetch(
292 `${bridgeUrl}/repos/${repo}/blob/${branch}/.canopy/${entry.name}`
293 );
294 if (!blobRes.ok) continue;
295 const blob = (await blobRes.json()) as any;
296 try {
297 const { parse: parseYaml } = await import("yaml");
298 const config = parseYaml(blob.content);
299 if (config?.name) {
300 pipelines.push({ file: `.canopy/${entry.name}`, name: config.name });
301 }
302 } catch {}
303 }
304 return { pipelines };
305 } catch {
306 return { pipelines: [] };
307 }
308 });
309
310 // --- Secrets CRUD ---
311
312 const createSecretSchema = z.object({
313 name: z
314 .string()
315 .min(1)
316 .max(100)
317 .regex(/^\w+$/),
318 value: z.string().min(1),
319 });
320
321 // List secret names (no values)
322 app.get<{ Params: { owner: string; repo: string } }>(
323 "/:owner/:repo/canopy/secrets",
324 { preHandler: [(app as any).authenticate] },
325 async (request, reply) => {
326 const { owner, repo } = request.params;
327 const repoRow = lookupRepo(db, repo);
328 if (!repoRow)
329 return reply.code(404).send({ error: "Repository not found" });
330
331 const secrets = db
332 .prepare(
333 `SELECT name, created_at, updated_at FROM canopy_secrets WHERE repo_id = ? ORDER BY name`
334 )
335 .all(repoRow.id);
336 return { secrets };
337 }
338 );
339
340 // Create/update secret
341 app.post<{ Params: { owner: string; repo: string } }>(
342 "/:owner/:repo/canopy/secrets",
343 { preHandler: [(app as any).authenticate] },
344 async (request, reply) => {
345 if (!runner) {
346 return reply.code(503).send({ error: "Canopy not enabled" });
347 }
348 const parsed = createSecretSchema.safeParse(request.body);
349 if (!parsed.success) {
350 return reply.code(400).send({ error: parsed.error.flatten() });
351 }
352
353 const { owner, repo } = request.params;
354 const repoRow = lookupRepo(db, repo);
355 if (!repoRow)
356 return reply.code(404).send({ error: "Repository not found" });
357
358 const { name, value } = parsed.data;
359 const encrypted = runner.encryptSecret(value);
360
361 db.prepare(
362 `INSERT INTO canopy_secrets (repo_id, name, encrypted_value)
363 VALUES (?, ?, ?)
364 ON CONFLICT(repo_id, name) DO UPDATE SET
365 encrypted_value = excluded.encrypted_value,
366 updated_at = datetime('now')`
367 ).run(repoRow.id, name, encrypted);
368
369 return { ok: true };
370 }
371 );
372
373 // Delete secret
374 app.delete<{ Params: { owner: string; repo: string; name: string } }>(
375 "/:owner/:repo/canopy/secrets/:name",
376 { preHandler: [(app as any).authenticate] },
377 async (request, reply) => {
378 const { owner, repo, name } = request.params;
379 const repoRow = lookupRepo(db, repo);
380 if (!repoRow)
381 return reply.code(404).send({ error: "Repository not found" });
382
383 const result = db
384 .prepare(
385 `DELETE FROM canopy_secrets WHERE repo_id = ? AND name = ?`
386 )
387 .run(repoRow.id, name);
388
389 if (result.changes === 0) {
390 return reply.code(404).send({ error: "Secret not found" });
391 }
392 return reply.code(204).send();
393 }
394 );
395}
396