Skip to content

Commit

Permalink
feat: Add queue system for command execution
Browse files Browse the repository at this point in the history
  • Loading branch information
KieraDOG committed Aug 12, 2024
1 parent dc782ef commit 4498fe3
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- beta

jobs:
publish:
Expand Down
8 changes: 7 additions & 1 deletion .releaserc
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
{
"branches": ["main"],
"branches": [
"main",
{
"name": "beta",
"prerelease": true
}
],
"plugins": [
"@semantic-release/commit-analyzer",
"@semantic-release/release-notes-generator",
Expand Down
159 changes: 106 additions & 53 deletions src/CGDGarageDoor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class CGDGarageDoor {
private status?: Status;
private statusUpdateListener?: StatusUpdateListener;
private isUpdating = false;
private runQ: { name: string; fn: () => Promise<unknown>}[] = [];

constructor(log: Logging, config: Config) {
this.log = log;
Expand All @@ -38,72 +39,124 @@ export class CGDGarageDoor {
this.poolStatus();
}

private withRunQ = async (key: string, fn: () => Promise<unknown>) => new Promise((resolve, reject) => {
this.log.debug('Adding to queue');
this.runQ = this.runQ.filter((item) => item.name !== key);
this.runQ.push({
name: key,
fn: async () => {
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
},
});

if (this.runQ.length === 1) {
this.log.debug('Start processing queue');
this.processRunQ();
}
});

private processRunQ = async () => {
this.log.debug('Queue length:', this.runQ.length);
if (this.runQ.length === 0) {
this.log.debug('Queue is empty');
return;
}

const item = this.runQ.shift()!;

try {
await item.fn();
} finally {
this.processRunQ();
}
};

private run = async ({
cmd, value,
softValue = value,
until = async () => {
this.log.debug('Running without until...');
return true;
},
}) => {
this.log.debug(`Setting ${cmd} to ${softValue}`);
let oldStatus: Status;

if (this.status?.[cmd]) {
oldStatus = { ...this.status };
this.status[cmd] = softValue;

if (!this.isStatusEqual(oldStatus)) {
this.log.debug(`Updating ${cmd} to ${softValue}`);
this.statusUpdateListener?.();
until,
}: {
cmd: string; value: string;
softValue?: string;
until?: () => Promise<boolean>;
}): Promise<unknown> => {
const fn = async () => {
this.log.debug(`Setting ${cmd} to ${softValue}`);
let oldStatus: Status;

if (this.status?.[cmd]) {
oldStatus = { ...this.status };
this.status[cmd] = softValue;

if (!this.isStatusEqual(oldStatus)) {
this.log.debug(`Updating ${cmd} to ${softValue}`);
this.statusUpdateListener?.();
}
}
}

return retry(async () => {
this.log.debug(`Running command: ${cmd}=${value}`);
const result = await retry(async () => {
this.log.debug(`Running command: ${cmd}=${value}`);

const { deviceHostname, deviceLocalKey } = this.config;
const response = await fetch(`http://${deviceHostname}/api?key=${deviceLocalKey}&${cmd}=${value}`, {
const { deviceHostname, deviceLocalKey } = this.config;
const response = await fetch(`http://${deviceHostname}/api?key=${deviceLocalKey}&${cmd}=${value}`, {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
agent: httpAgent,
});
agent: httpAgent,
});

const data = await response.json();
const data = await response.json();

const level = response.ok ? 'debug' : 'error';
this.log[level](response.status.toString());
this.log[level](JSON.stringify(data));
const level = response.ok ? 'debug' : 'error';
this.log[level](response.status.toString());
this.log[level](JSON.stringify(data));

if (!response.ok) {
throw new Error(`Fetch failed with status ${response.status}, ${JSON.stringify(data)}`);
}

return data;
}, until, {
retries: 3,
onRetry: (error, retries) => {
this.log.warn(`Failed to run command [${retries} retries]: ${cmd}=${value}`);
if (error instanceof Error) {
this.log.warn(`Error: ${error.message}`);
}
},
onRecover: (retries) => {
this.log.info(`Recovered to run command [${retries} retries]: ${cmd}=${value}`);
},
onFail: (error) => {
this.log.error(`Failed to run command: ${cmd}=${value}`);
if (error instanceof Error) {
this.log.error(`Error: ${error.message}`);
if (!response.ok) {
throw new Error(`Fetch failed with status ${response.status}, ${JSON.stringify(data)}`);
}

if (oldStatus) {
this.log.debug(`Reverting ${cmd}`);
this.status = oldStatus;
this.statusUpdateListener?.();
}
},
});
return data;
}, {
until,
retries: 3,
onRetry: (error, retries) => {
this.log.warn(`Failed to run command [${retries} retries]: ${cmd}=${value}`);
if (error instanceof Error) {
this.log.warn(`Error: ${error.message}`);
}
},
onRecover: (retries) => {
this.log.info(`Recovered to run command [${retries} retries]: ${cmd}=${value}`);
},
onFail: (error) => {
this.log.error(`Failed to run command: ${cmd}=${value}`);
if (error instanceof Error) {
this.log.error(`Error: ${error.message}`);
}

if (oldStatus) {
this.log.debug(`Reverting ${cmd}`);
this.status = oldStatus;
this.statusUpdateListener?.();
}
},
});

return result;
};

let result: unknown;
if (until) {
result = await this.withRunQ(cmd, fn);
}

result = await fn();

return result;
};

private withIsUpdating = async <T>(fn: () => Promise<T>): Promise<void> => {
Expand Down
11 changes: 5 additions & 6 deletions src/retry.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
interface Config {
until?: () => Promise<unknown>;
retries: number;
isRetry?: boolean;
onRetry: (error: unknown, retries: number) => void;
onRecover: (retries: number) => void;
onFail: (error: unknown) => void;
}

const retry = async (fn: () => Promise<unknown>, until: () => Promise<unknown>, config: Config) => {
const { retries, onRetry, onRecover, onFail, isRetry } = config;
const retry = async (fn: () => Promise<unknown>, config: Config) => {
const { until, retries, onRetry, onRecover, onFail, isRetry } = config;

try {
const data = await fn();

const result = await until();

if (!result) {
if (until && !await until()) {
throw new Error('Failed to reach the expected state');
}

Expand All @@ -30,7 +29,7 @@ const retry = async (fn: () => Promise<unknown>, until: () => Promise<unknown>,

onRetry(error, retries);

return retry(fn, until, {
return retry(fn, {
...config,
isRetry: true,
retries: retries - 1,
Expand Down

0 comments on commit 4498fe3

Please sign in to comment.