From 510f133d1abe4b0fd115a8776bc1b28a5a9ae22f Mon Sep 17 00:00:00 2001 From: yndu13 Date: Mon, 1 Apr 2024 16:20:08 +0800 Subject: [PATCH] fix: resolve sse parsing when retry invalid and dara divided --- .github/workflows/ci.yml | 2 +- .github/workflows/node.js.yml | 2 +- src/stream.ts | 45 +++++---- test/stream.spec.ts | 180 +++++++++++++++++++++++++++++++--- 4 files changed, 195 insertions(+), 34 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0464af2..a99428a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: operating-system: [ubuntu-latest, macos-latest] - node-version: [10.x, 12.x, 14.x] + node-version: [10.x, 12.x, 14.x, 16.x, 18.x, 20.x] name: Node.js ${{ matrix.node-version }} Test on ${{ matrix.operating-system }} steps: diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index cd70c30..7600a4b 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: - node-version: [12.x, 14.x, 16.x] + node-version: [12.x, 14.x, 16.x, 18.x, 20.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: diff --git a/src/stream.ts b/src/stream.ts index 8daf2b0..badfac8 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,12 +1,19 @@ -import { Readable, Writable } from 'stream'; -import { IncomingMessage } from 'http'; -import { readAsSSE } from 'httpx'; +import { Readable } from 'stream'; const DATA_PREFIX = 'data:'; const EVENT_PREFIX = 'event:'; const ID_PREFIX = 'id:'; const RETRY_PREFIX = 'retry:'; +function isDigitsOnly(str: string) { + for (let i = 0; i < str.length; i++) { + const c = str.charAt(i); + if (c < '0' || c > '9') { + return false; + } + } + return str.length > 0; +} export class SSEEvent { data?: string; @@ -14,7 +21,7 @@ export class SSEEvent { event?: string; retry?: number; - constructor(data: {[key: string]: any} = {}){ + constructor(data: { [key: string]: any } = {}) { this.data = data.data; this.id = data.id; this.event = data.event; @@ -25,8 +32,8 @@ export class SSEEvent { function read(readable: Readable): Promise { return new Promise((resolve, reject) => { - let onData: { (chunk: any): void; (buf: Buffer): void; (chunk: any): void; }, - onError: { (err: Error): void; (err: Error): void; (err: Error): void; }, + let onData: { (chunk: any): void; (buf: Buffer): void; (chunk: any): void; }, + onError: { (err: Error): void; (err: Error): void; (err: Error): void; }, onEnd: { (): void; (): void; (): void; }; const cleanup = function () { // cleanup @@ -34,25 +41,25 @@ function read(readable: Readable): Promise { readable.removeListener('data', onData); readable.removeListener('end', onEnd); }; - + const bufs: Uint8Array[] | Buffer[] = []; let size = 0; - + onData = function (buf: Buffer) { bufs.push(buf); size += buf.length; }; - + onError = function (err: Error) { cleanup(); reject(err); }; - + onEnd = function () { cleanup(); resolve(Buffer.concat(bufs, size)); }; - + readable.on('error', onError); readable.on('data', onData); readable.on('end', onEnd); @@ -63,8 +70,8 @@ function read(readable: Readable): Promise { function readyToRead(readable: Readable) { return new Promise((resolve, reject) => { - let onReadable: { (): void; (): void; (): void; }, - onEnd: { (): void; (): void; (): void; }, + let onReadable: { (): void; (): void; (): void; }, + onEnd: { (): void; (): void; (): void; }, onError: { (err: Error): void; (err: any): void; (err: Error): void; }; const cleanup = function () { // cleanup @@ -119,7 +126,9 @@ function tryGetEvents(head: string, chunk: string): EventResult { event.id = line.substring(ID_PREFIX.length).trim(); } else if (line.startsWith(RETRY_PREFIX)) { const retry = line.substring(RETRY_PREFIX.length).trim(); - event.retry = parseInt(retry, 10); + if (isDigitsOnly(retry)) { + event.retry = parseInt(retry, 10); + } } else if (line.startsWith(':')) { // ignore the line } @@ -130,7 +139,7 @@ function tryGetEvents(head: string, chunk: string): EventResult { } const remain = all.substring(start); - return { events, remain } ; + return { events, remain }; } @@ -151,16 +160,16 @@ export default class TeaStream { } static async *readAsSSE(stream: Readable): AsyncGenerator { + let rest = ''; while (true) { const ended = await readyToRead(stream); if (ended) { return; } - - let rest = ''; + let chunk; while (null !== (chunk = stream.read())) { - const { events, remain } = tryGetEvents(rest, chunk.toString()); + const { events, remain } = tryGetEvents(rest, chunk.toString()); rest = remain; if (events && events.length > 0) { for (const event of events) { diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 3104fca..64bfd3a 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -16,15 +16,55 @@ const server = http.createServer((req, res) => { res.writeHead(200, headers); res.flushHeaders(); let count = 0; - const timer = setInterval(() => { - if (count >= 5) { - clearInterval(timer); - res.end(); - return; - } - res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`); - count++; - }, 100); + if (req.url === '/sse') { + const timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data: ${JSON.stringify({ count: count })}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_with_no_spaces') { + const timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({ count: count })}\nevent:flow\nid:sse-test\nretry:3\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_invalid_retry') { + const timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + res.write(`data:${JSON.stringify({ count: count })}\nevent:flow\nid:sse-test\nretry: abc\n\n`); + count++; + }, 100); + } else if (req.url === '/sse_with_data_divided') { + const timer = setInterval(() => { + if (count >= 5) { + clearInterval(timer); + res.end(); + return; + } + if (count === 1) { + res.write('data:{"count":'); + count++; + return; + } + if (count === 2) { + res.write(`${count++},"tag":"divided"}\nevent:flow\nid:sse-test\nretry:3\n\n`); + return; + } + res.write(`data:${JSON.stringify({ count: count++ })}\nevent:flow\nid:sse-test\nretry:3\n\n`); + }, 100); + } }); class MyReadable extends Readable { @@ -46,7 +86,7 @@ describe('$dara stream', function () { before((done) => { server.listen(8384, done); }); - + after(function (done) { this.timeout(20000); server.close(done); @@ -71,16 +111,55 @@ describe('$dara stream', function () { }); it('readAsSSE', async function () { - const res = await httpx.request("http://127.0.0.1:8384", {readTimeout: 5000}); + const res = await httpx.request("http://127.0.0.1:8384/sse", { readTimeout: 5000 }); + assert.strictEqual(res.statusCode, 200); + const events: SSEEvent[] = []; + + for await (const event of $dara.Stream.readAsSSE(res)) { + + events.push(event); + } + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([new SSEEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: 3, + })], events); + }); + + it('readAsSSE with no spaces', async function () { + const res = await httpx.request("http://127.0.0.1:8384/sse_with_no_spaces", { readTimeout: 5000 }); assert.strictEqual(res.statusCode, 200); const events: SSEEvent[] = []; - + for await (const event of $dara.Stream.readAsSSE(res)) { - + events.push(event); } assert.strictEqual(events.length, 5); - + assert.deepStrictEqual([new SSEEvent({ data: '{"count":0}', event: 'flow', @@ -108,4 +187,77 @@ describe('$dara stream', function () { retry: 3, })], events); }); + + it('readAsSSE with invalid retry', async function () { + const res = await httpx.request("http://127.0.0.1:8384/sse_invalid_retry", { readTimeout: 5000 }); + assert.strictEqual(res.statusCode, 200); + const events: SSEEvent[] = []; + + for await (const event of $dara.Stream.readAsSSE(res)) { + + events.push(event); + } + assert.strictEqual(events.length, 5); + + assert.deepStrictEqual([new SSEEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), new SSEEvent({ + data: '{"count":1}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), new SSEEvent({ + data: '{"count":2}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), new SSEEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: undefined, + }), new SSEEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: undefined, + })], events); + }); + + it('readAsSSE with dara divided', async function () { + const res = await httpx.request("http://127.0.0.1:8384/sse_with_data_divided", { readTimeout: 5000 }); + assert.strictEqual(res.statusCode, 200); + const events: SSEEvent[] = []; + + for await (const event of $dara.Stream.readAsSSE(res)) { + + events.push(event); + } + assert.strictEqual(events.length, 4); + + assert.deepStrictEqual([new SSEEvent({ + data: '{"count":0}', + event: 'flow', + id: 'sse-test', + retry: 3 + }), new SSEEvent({ + data: '{"count":2,"tag":"divided"}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":3}', + event: 'flow', + id: 'sse-test', + retry: 3, + }), new SSEEvent({ + data: '{"count":4}', + event: 'flow', + id: 'sse-test', + retry: 3, + })], events); + }); }); \ No newline at end of file