api/src/services/canopy-poller.tsblame
View source
80fafdf1import type Database from "better-sqlite3";
80fafdf2import type { CanopyRunner } from "./canopy-runner.js";
e5b523e3import type { PagesDeployer } from "./pages-deployer.js";
80fafdf4
80fafdf5/**
80fafdf6 * Polls grove-bridge bookmarks to detect pushes.
791afd47 * Gets the repo list directly from grove-bridge (Mononoke),
791afd48 * compares bookmark positions to `bookmark_state` table,
80fafdf9 * and calls runner.onPush() when a bookmark advances.
80fafdf10 */
80fafdf11export class CanopyPoller {
80fafdf12 private interval: ReturnType<typeof setInterval> | null = null;
80fafdf13 private polling = false;
80fafdf14
80fafdf15 constructor(
80fafdf16 private db: Database.Database,
80fafdf17 private bridgeUrl: string,
80fafdf18 private runner: CanopyRunner,
e5b523e19 private pagesDeployer: PagesDeployer | null,
80fafdf20 private logger: { info: (...args: any[]) => void; error: (...args: any[]) => void }
80fafdf21 ) {}
80fafdf22
80fafdf23 start(intervalMs: number): void {
80fafdf24 this.interval = setInterval(() => this.poll(), intervalMs);
80fafdf25 }
80fafdf26
80fafdf27 stop(): void {
80fafdf28 if (this.interval) {
80fafdf29 clearInterval(this.interval);
80fafdf30 this.interval = null;
80fafdf31 }
80fafdf32 }
80fafdf33
80fafdf34 private async poll(): Promise<void> {
80fafdf35 if (this.polling) return;
80fafdf36 this.polling = true;
80fafdf37
80fafdf38 try {
791afd439 const res = await fetch(`${this.bridgeUrl}/repos`);
791afd440 if (!res.ok) return;
791afd441 const data = (await res.json()) as any;
791afd442 const repos: string[] = data.repos ?? [];
80fafdf43
80fafdf44 for (const repo of repos) {
791afd445 await this.pollRepo(repo);
80fafdf46 }
80fafdf47 } catch (err) {
80fafdf48 this.logger.error({ err }, "Canopy poll error");
80fafdf49 } finally {
80fafdf50 this.polling = false;
80fafdf51 }
80fafdf52 }
80fafdf53
80fafdf54 private async pollRepo(repoName: string): Promise<void> {
80fafdf55 let bookmarks: Array<{ name: string; commit_id: string }>;
80fafdf56 try {
80fafdf57 const res = await fetch(
80fafdf58 `${this.bridgeUrl}/repos/${repoName}/bookmarks`
80fafdf59 );
80fafdf60 if (!res.ok) return;
80fafdf61 const data = (await res.json()) as any;
80fafdf62 bookmarks = data.bookmarks ?? [];
80fafdf63 } catch {
80fafdf64 return;
80fafdf65 }
80fafdf66
80fafdf67 const existingStates = this.db
80fafdf68 .prepare(
80fafdf69 `SELECT bookmark_name, commit_id FROM bookmark_state WHERE repo_name = ?`
80fafdf70 )
80fafdf71 .all(repoName) as Array<{ bookmark_name: string; commit_id: string }>;
80fafdf72
80fafdf73 const stateMap = new Map(
80fafdf74 existingStates.map((s) => [s.bookmark_name, s.commit_id])
80fafdf75 );
80fafdf76
80fafdf77 // If no state exists yet, seed all bookmarks without triggering
80fafdf78 const isFirstSeed = existingStates.length === 0;
80fafdf79
80fafdf80 const upsert = this.db.prepare(
80fafdf81 `INSERT INTO bookmark_state (repo_name, bookmark_name, commit_id, updated_at)
80fafdf82 VALUES (?, ?, ?, datetime('now'))
80fafdf83 ON CONFLICT(repo_name, bookmark_name) DO UPDATE SET
80fafdf84 commit_id = excluded.commit_id,
80fafdf85 updated_at = datetime('now')`
80fafdf86 );
80fafdf87
80fafdf88 for (const bookmark of bookmarks) {
80fafdf89 const oldCommitId = stateMap.get(bookmark.name);
80fafdf90
80fafdf91 if (oldCommitId === bookmark.commit_id) continue;
80fafdf92
c98936c93 if (isFirstSeed) {
c98936c94 upsert.run(repoName, bookmark.name, bookmark.commit_id);
c98936c95 continue;
c98936c96 }
80fafdf97
c98936c98 try {
80fafdf99 this.logger.info(
80fafdf100 { repo: repoName, branch: bookmark.name, commit: bookmark.commit_id },
80fafdf101 "Bookmark changed, triggering pipeline"
80fafdf102 );
c98936c103 await this.runner.onPush({
80fafdf104 repo: repoName,
80fafdf105 branch: bookmark.name,
80fafdf106 oldCommitId: oldCommitId ?? "",
80fafdf107 newCommitId: bookmark.commit_id,
80fafdf108 });
e5b523e109 // Deploy pages if applicable
e5b523e110 if (this.pagesDeployer) {
e5b523e111 const repo = this.db
b5baf6d112 .prepare(
b5baf6d113 `SELECT r.default_branch, rwo.owner_name FROM repos r
b5baf6d114 JOIN repos_with_owner rwo ON rwo.id = r.id
b5baf6d115 WHERE r.name = ? AND r.pages_enabled = 1`
b5baf6d116 )
e5b523e117 .get(repoName) as any;
e5b523e118 if (repo && bookmark.name === (repo.default_branch ?? "main")) {
b5baf6d119 void this.pagesDeployer.deploy(repo.owner_name, repoName, bookmark.name).catch((err) => {
e5b523e120 this.logger.error(
e5b523e121 { err, repo: repoName, branch: bookmark.name },
e5b523e122 "Pages deployment failed"
e5b523e123 );
e5b523e124 });
e5b523e125 }
e5b523e126 }
e5b523e127
c98936c128 // Only mark bookmark processed after runs are queued successfully.
c98936c129 upsert.run(repoName, bookmark.name, bookmark.commit_id);
c98936c130 } catch (err) {
c98936c131 this.logger.error(
c98936c132 { err, repo: repoName, branch: bookmark.name, commit: bookmark.commit_id },
c98936c133 "Failed to queue pipeline for bookmark change"
c98936c134 );
80fafdf135 }
80fafdf136 }
80fafdf137 }
80fafdf138}