From c34f1b7eb9045c4a7b4039ff0881114a04e657e2 Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 4 Mar 2022 05:56:51 +0900 Subject: [PATCH 1/5] feat: web streams based encoding/csv --- encoding/csv.ts | 263 +++------------------------------- encoding/csv/_io.ts | 253 ++++++++++++++++++++++++++++++++ encoding/csv/stream.ts | 51 +++++++ encoding/csv/stream_test.ts | 63 ++++++++ encoding/testdata/complex.csv | 7 + encoding/testdata/simple.csv | 3 + 6 files changed, 393 insertions(+), 247 deletions(-) create mode 100644 encoding/csv/_io.ts create mode 100644 encoding/csv/stream.ts create mode 100644 encoding/csv/stream_test.ts create mode 100644 encoding/testdata/complex.csv create mode 100644 encoding/testdata/simple.csv diff --git a/encoding/csv.ts b/encoding/csv.ts index 8d3010026571..83dfdbfb0bc1 100644 --- a/encoding/csv.ts +++ b/encoding/csv.ts @@ -5,10 +5,24 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. import { BufReader } from "../io/buffer.ts"; -import { TextProtoReader } from "../textproto/mod.ts"; import { StringReader } from "../io/readers.ts"; import { assert } from "../_util/assert.ts"; - +import { + ERR_FIELD_COUNT, + ERR_INVALID_DELIM, + ParseError, + readRecord, +} from "./csv/_io.ts"; +import type { ReadOptions } from "./csv/_io.ts"; + +export { + ERR_BARE_QUOTE, + ERR_FIELD_COUNT, + ERR_INVALID_DELIM, + ERR_QUOTE, + ParseError, +} from "./csv/_io.ts"; +export type { ReadOptions } from "./csv/_io.ts"; export { NEWLINE, stringify, StringifyError } from "./csv_stringify.ts"; export type { @@ -20,64 +34,6 @@ export type { const INVALID_RUNE = ["\r", "\n", '"']; -export const ERR_BARE_QUOTE = 'bare " in non-quoted-field'; -export const ERR_QUOTE = 'extraneous or missing " in quoted-field'; -export const ERR_INVALID_DELIM = "Invalid Delimiter"; -export const ERR_FIELD_COUNT = "wrong number of fields"; - -/** - * A ParseError is returned for parsing errors. - * Line numbers are 1-indexed and columns are 0-indexed. - */ -export class ParseError extends Error { - /** Line where the record starts*/ - startLine: number; - /** Line where the error occurred */ - line: number; - /** Column (rune index) where the error occurred */ - column: number | null; - - constructor( - start: number, - line: number, - column: number | null, - message: string, - ) { - super(); - this.startLine = start; - this.column = column; - this.line = line; - - if (message === ERR_FIELD_COUNT) { - this.message = `record on line ${line}: ${message}`; - } else if (start !== line) { - this.message = - `record on line ${start}; parse error on line ${line}, column ${column}: ${message}`; - } else { - this.message = - `parse error on line ${line}, column ${column}: ${message}`; - } - } -} - -/** - * @property separator - Character which separates values. Default: ',' - * @property comment - Character to start a comment. Default: '#' - * @property trimLeadingSpace - Flag to trim the leading space of the value. - * Default: 'false' - * @property lazyQuotes - Allow unquoted quote in a quoted field or non double - * quoted quotes in quoted field. Default: 'false' - * @property fieldsPerRecord - Enabling the check of fields for each row. - * If == 0, first row is used as referral for the number of fields. - */ -export interface ReadOptions { - separator?: string; - comment?: string; - trimLeadingSpace?: boolean; - lazyQuotes?: boolean; - fieldsPerRecord?: number; -} - function chkOptions(opt: ReadOptions): void { if (!opt.separator) { opt.separator = ","; @@ -94,193 +50,6 @@ function chkOptions(opt: ReadOptions): void { } } -async function readRecord( - startLine: number, - reader: BufReader, - opt: ReadOptions = { separator: ",", trimLeadingSpace: false }, -): Promise { - const tp = new TextProtoReader(reader); - let line = await readLine(tp); - let lineIndex = startLine + 1; - - if (line === null) return null; - if (line.length === 0) { - return []; - } - // line starting with comment character is ignored - if (opt.comment && line[0] === opt.comment) { - return []; - } - - assert(opt.separator != null); - - let fullLine = line; - let quoteError: ParseError | null = null; - const quote = '"'; - const quoteLen = quote.length; - const separatorLen = opt.separator.length; - let recordBuffer = ""; - const fieldIndexes = [] as number[]; - parseField: - for (;;) { - if (opt.trimLeadingSpace) { - line = line.trimLeft(); - } - - if (line.length === 0 || !line.startsWith(quote)) { - // Non-quoted string field - const i = line.indexOf(opt.separator); - let field = line; - if (i >= 0) { - field = field.substring(0, i); - } - // Check to make sure a quote does not appear in field. - if (!opt.lazyQuotes) { - const j = field.indexOf(quote); - if (j >= 0) { - const col = runeCount( - fullLine.slice(0, fullLine.length - line.slice(j).length), - ); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_BARE_QUOTE, - ); - break parseField; - } - } - recordBuffer += field; - fieldIndexes.push(recordBuffer.length); - if (i >= 0) { - line = line.substring(i + separatorLen); - continue parseField; - } - break parseField; - } else { - // Quoted string field - line = line.substring(quoteLen); - for (;;) { - const i = line.indexOf(quote); - if (i >= 0) { - // Hit next quote. - recordBuffer += line.substring(0, i); - line = line.substring(i + quoteLen); - if (line.startsWith(quote)) { - // `""` sequence (append quote). - recordBuffer += quote; - line = line.substring(quoteLen); - } else if (line.startsWith(opt.separator)) { - // `","` sequence (end of field). - line = line.substring(separatorLen); - fieldIndexes.push(recordBuffer.length); - continue parseField; - } else if (0 === line.length) { - // `"\n` sequence (end of line). - fieldIndexes.push(recordBuffer.length); - break parseField; - } else if (opt.lazyQuotes) { - // `"` sequence (bare quote). - recordBuffer += quote; - } else { - // `"*` sequence (invalid non-escaped quote). - const col = runeCount( - fullLine.slice(0, fullLine.length - line.length - quoteLen), - ); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - } else if (line.length > 0 || !(await isEOF(tp))) { - // Hit end of line (copy all data so far). - recordBuffer += line; - const r = await readLine(tp); - lineIndex++; - line = r ?? ""; // This is a workaround for making this module behave similarly to the encoding/csv/reader.go. - fullLine = line; - if (r === null) { - // Abrupt end of file (EOF or error). - if (!opt.lazyQuotes) { - const col = runeCount(fullLine); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - fieldIndexes.push(recordBuffer.length); - break parseField; - } - recordBuffer += "\n"; // preserve line feed (This is because TextProtoReader removes it.) - } else { - // Abrupt end of file (EOF on error). - if (!opt.lazyQuotes) { - const col = runeCount(fullLine); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - fieldIndexes.push(recordBuffer.length); - break parseField; - } - } - } - } - if (quoteError) { - throw quoteError; - } - const result = [] as string[]; - let preIdx = 0; - for (const i of fieldIndexes) { - result.push(recordBuffer.slice(preIdx, i)); - preIdx = i; - } - return result; -} - -async function isEOF(tp: TextProtoReader): Promise { - return (await tp.r.peek(0)) === null; -} - -function runeCount(s: string): number { - // Array.from considers the surrogate pair. - return Array.from(s).length; -} - -async function readLine(tp: TextProtoReader): Promise { - let line: string; - const r = await tp.readLine(); - if (r === null) return null; - line = r; - - // For backwards compatibility, drop trailing \r before EOF. - if ((await isEOF(tp)) && line.length > 0 && line[line.length - 1] === "\r") { - line = line.substring(0, line.length - 1); - } - - // Normalize \r\n to \n on all input lines. - if ( - line.length >= 2 && - line[line.length - 2] === "\r" && - line[line.length - 1] === "\n" - ) { - line = line.substring(0, line.length - 2); - line = line + "\n"; - } - - return line; -} - /** * Parse the CSV from the `reader` with the options provided and return `string[][]`. * diff --git a/encoding/csv/_io.ts b/encoding/csv/_io.ts new file mode 100644 index 000000000000..8b8fe80827ca --- /dev/null +++ b/encoding/csv/_io.ts @@ -0,0 +1,253 @@ +import { TextProtoReader } from "../../textproto/mod.ts"; +import type { BufReader } from "../../io/buffer.ts"; +import { assert } from "../../_util/assert.ts"; + +/** + * @property separator - Character which separates values. Default: ',' + * @property comment - Character to start a comment. Default: '#' + * @property trimLeadingSpace - Flag to trim the leading space of the value. + * Default: 'false' + * @property lazyQuotes - Allow unquoted quote in a quoted field or non double + * quoted quotes in quoted field. Default: 'false' + * @property fieldsPerRecord - Enabling the check of fields for each row. + * If == 0, first row is used as referral for the number of fields. + */ +export interface ReadOptions { + separator?: string; + comment?: string; + trimLeadingSpace?: boolean; + lazyQuotes?: boolean; + fieldsPerRecord?: number; +} + +export const defaultReadOptions: ReadOptions = { + separator: ",", + trimLeadingSpace: false, +}; + +export async function readRecord( + startLine: number, + reader: BufReader, + opt: ReadOptions = defaultReadOptions, +): Promise { + const tp = new TextProtoReader(reader); + let line = await readLine(tp); + let lineIndex = startLine + 1; + + if (line === null) return null; + if (line.length === 0) { + return []; + } + // line starting with comment character is ignored + if (opt.comment && line[0] === opt.comment) { + return []; + } + + assert(opt.separator != null); + + let fullLine = line; + let quoteError: ParseError | null = null; + const quote = '"'; + const quoteLen = quote.length; + const separatorLen = opt.separator.length; + let recordBuffer = ""; + const fieldIndexes = [] as number[]; + parseField: + for (;;) { + if (opt.trimLeadingSpace) { + line = line.trimLeft(); + } + + if (line.length === 0 || !line.startsWith(quote)) { + // Non-quoted string field + const i = line.indexOf(opt.separator); + let field = line; + if (i >= 0) { + field = field.substring(0, i); + } + // Check to make sure a quote does not appear in field. + if (!opt.lazyQuotes) { + const j = field.indexOf(quote); + if (j >= 0) { + const col = runeCount( + fullLine.slice(0, fullLine.length - line.slice(j).length), + ); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_BARE_QUOTE, + ); + break parseField; + } + } + recordBuffer += field; + fieldIndexes.push(recordBuffer.length); + if (i >= 0) { + line = line.substring(i + separatorLen); + continue parseField; + } + break parseField; + } else { + // Quoted string field + line = line.substring(quoteLen); + for (;;) { + const i = line.indexOf(quote); + if (i >= 0) { + // Hit next quote. + recordBuffer += line.substring(0, i); + line = line.substring(i + quoteLen); + if (line.startsWith(quote)) { + // `""` sequence (append quote). + recordBuffer += quote; + line = line.substring(quoteLen); + } else if (line.startsWith(opt.separator)) { + // `","` sequence (end of field). + line = line.substring(separatorLen); + fieldIndexes.push(recordBuffer.length); + continue parseField; + } else if (0 === line.length) { + // `"\n` sequence (end of line). + fieldIndexes.push(recordBuffer.length); + break parseField; + } else if (opt.lazyQuotes) { + // `"` sequence (bare quote). + recordBuffer += quote; + } else { + // `"*` sequence (invalid non-escaped quote). + const col = runeCount( + fullLine.slice(0, fullLine.length - line.length - quoteLen), + ); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + } else if (line.length > 0 || !(await isEOF(tp))) { + // Hit end of line (copy all data so far). + recordBuffer += line; + const r = await readLine(tp); + lineIndex++; + line = r ?? ""; // This is a workaround for making this module behave similarly to the encoding/csv/reader.go. + fullLine = line; + if (r === null) { + // Abrupt end of file (EOF or error). + if (!opt.lazyQuotes) { + const col = runeCount(fullLine); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + fieldIndexes.push(recordBuffer.length); + break parseField; + } + recordBuffer += "\n"; // preserve line feed (This is because TextProtoReader removes it.) + } else { + // Abrupt end of file (EOF on error). + if (!opt.lazyQuotes) { + const col = runeCount(fullLine); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + fieldIndexes.push(recordBuffer.length); + break parseField; + } + } + } + } + if (quoteError) { + throw quoteError; + } + const result = [] as string[]; + let preIdx = 0; + for (const i of fieldIndexes) { + result.push(recordBuffer.slice(preIdx, i)); + preIdx = i; + } + return result; +} + +function runeCount(s: string): number { + // Array.from considers the surrogate pair. + return Array.from(s).length; +} + +async function readLine(tp: TextProtoReader): Promise { + let line: string; + const r = await tp.readLine(); + if (r === null) return null; + line = r; + + // For backwards compatibility, drop trailing \r before EOF. + if ((await isEOF(tp)) && line.length > 0 && line[line.length - 1] === "\r") { + line = line.substring(0, line.length - 1); + } + + // Normalize \r\n to \n on all input lines. + if ( + line.length >= 2 && + line[line.length - 2] === "\r" && + line[line.length - 1] === "\n" + ) { + line = line.substring(0, line.length - 2); + line = line + "\n"; + } + + return line; +} + +async function isEOF(tp: TextProtoReader): Promise { + return (await tp.r.peek(0)) === null; +} + +/** + * A ParseError is returned for parsing errors. + * Line numbers are 1-indexed and columns are 0-indexed. + */ +export class ParseError extends Error { + /** Line where the record starts*/ + startLine: number; + /** Line where the error occurred */ + line: number; + /** Column (rune index) where the error occurred */ + column: number | null; + + constructor( + start: number, + line: number, + column: number | null, + message: string, + ) { + super(); + this.startLine = start; + this.column = column; + this.line = line; + + if (message === ERR_FIELD_COUNT) { + this.message = `record on line ${line}: ${message}`; + } else if (start !== line) { + this.message = + `record on line ${start}; parse error on line ${line}, column ${column}: ${message}`; + } else { + this.message = + `parse error on line ${line}, column ${column}: ${message}`; + } + } +} + +export const ERR_BARE_QUOTE = 'bare " in non-quoted-field'; +export const ERR_QUOTE = 'extraneous or missing " in quoted-field'; +export const ERR_INVALID_DELIM = "Invalid Delimiter"; +export const ERR_FIELD_COUNT = "wrong number of fields"; diff --git a/encoding/csv/stream.ts b/encoding/csv/stream.ts new file mode 100644 index 000000000000..7c00d74b3047 --- /dev/null +++ b/encoding/csv/stream.ts @@ -0,0 +1,51 @@ +import { Buffer, BufReader } from "../../io/buffer.ts"; +import { defaultReadOptions, readRecord } from "./_io.ts"; + +function createTransformStream( + options?: CSVStreamOptions, +): TransformStream> { + let lineIndex = 0; + const buffer = new Buffer(); + const bufReader = BufReader.create(buffer); + return new TransformStream>({ + async transform(chunk, controller) { + buffer.writeSync(chunk); + while (!buffer.empty() || bufReader.buffered() > 0) { + const record = await readRecord(lineIndex, bufReader, options); + if (record === null && buffer.empty()) { + controller.terminate(); + return; + } + + lineIndex++; + if (record && record.length > 0) { + controller.enqueue(record); + } + } + }, + }); +} + +export interface CSVStreamOptions { + separator?: string; + comment?: string; +} + +export class CSVStream { + #transform: TransformStream>; + + constructor(options: CSVStreamOptions = defaultReadOptions) { + this.#transform = createTransformStream({ + ...defaultReadOptions, + ...options, + }); + } + + get readable(): ReadableStream> { + return this.#transform.readable; + } + + get writable(): WritableStream { + return this.#transform.writable; + } +} diff --git a/encoding/csv/stream_test.ts b/encoding/csv/stream_test.ts new file mode 100644 index 000000000000..397b07d0e24a --- /dev/null +++ b/encoding/csv/stream_test.ts @@ -0,0 +1,63 @@ +import { CSVStream } from "./stream.ts"; +import { readableStreamFromIterable } from "../../streams/conversion.ts"; +import { assertEquals } from "../../testing/asserts.ts"; +import { fromFileUrl, join } from "../../path/mod.ts"; + +const testdataDir = join(fromFileUrl(import.meta.url), "../../testdata"); + +Deno.test("[encoding/csv/stream] CSVStream", async () => { + const file = await Deno.open(join(testdataDir, "simple.csv")); + const readable = file.readable.pipeThrough(new CSVStream()); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foobar"], + ["2", "barbaz"], + ]); +}); + +Deno.test("[encoding/csv/stream] CSVStream with `comment` option", async () => { + const file = await Deno.open(join(testdataDir, "complex.csv")); + const readable = file.readable.pipeThrough( + new CSVStream({ + comment: "#", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name", "email"], + ["1", "deno", "deno@example.com"], + ["2", "node", "node@example.com"], + ["3", "", "test@example.com"], + ]); +}); + +Deno.test("[encoding/csv/stream] CSVStream with `separator` option", async () => { + const encoder = new TextEncoder(); + const readable = readableStreamFromIterable([ + encoder.encode("id\tname\n"), + encoder.encode("1\tfoo\n"), + encoder.encode("2\tbar\n"), + encoder.encode("3\tbaz\n"), + ]).pipeThrough( + new CSVStream({ + separator: "\t", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foo"], + ["2", "bar"], + ["3", "baz"], + ]); +}); diff --git a/encoding/testdata/complex.csv b/encoding/testdata/complex.csv new file mode 100644 index 000000000000..982825ef2a49 --- /dev/null +++ b/encoding/testdata/complex.csv @@ -0,0 +1,7 @@ +id,name,email +1,deno,deno@example.com +# This is a comment + +2,node,"node@example.com" + +3,,"test@example.com" diff --git a/encoding/testdata/simple.csv b/encoding/testdata/simple.csv new file mode 100644 index 000000000000..dcc7af2cf694 --- /dev/null +++ b/encoding/testdata/simple.csv @@ -0,0 +1,3 @@ +id,name +1,foobar +2,barbaz \ No newline at end of file From c573f876d6bbf7e85e915b1b0c724594f0495ef9 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 5 Mar 2022 17:53:50 +0900 Subject: [PATCH 2/5] fix: Rewrite CSVStream not to depend on Deno.Reader --- encoding/csv.ts | 43 ++++++++++++- encoding/csv/_io.ts | 64 ++++++++------------ encoding/csv/stream.ts | 114 +++++++++++++++++++++++++---------- encoding/csv/stream_test.ts | 117 ++++++++++++++++++++---------------- 4 files changed, 215 insertions(+), 123 deletions(-) diff --git a/encoding/csv.ts b/encoding/csv.ts index 83dfdbfb0bc1..a963b998c90d 100644 --- a/encoding/csv.ts +++ b/encoding/csv.ts @@ -5,6 +5,7 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. import { BufReader } from "../io/buffer.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; import { StringReader } from "../io/readers.ts"; import { assert } from "../_util/assert.ts"; import { @@ -13,7 +14,7 @@ import { ParseError, readRecord, } from "./csv/_io.ts"; -import type { ReadOptions } from "./csv/_io.ts"; +import type { LineReader, ReadOptions } from "./csv/_io.ts"; export { ERR_BARE_QUOTE, @@ -32,6 +33,43 @@ export type { StringifyOptions, } from "./csv_stringify.ts"; +class TextProtoLineReader implements LineReader { + #tp: TextProtoReader; + constructor(bufReader: BufReader) { + this.#tp = new TextProtoReader(bufReader); + } + + async readLine() { + let line: string; + const r = await this.#tp.readLine(); + if (r === null) return null; + line = r; + + // For backwards compatibility, drop trailing \r before EOF. + if ( + (await this.isEOF()) && line.length > 0 && line[line.length - 1] === "\r" + ) { + line = line.substring(0, line.length - 1); + } + + // Normalize \r\n to \n on all input lines. + if ( + line.length >= 2 && + line[line.length - 2] === "\r" && + line[line.length - 1] === "\n" + ) { + line = line.substring(0, line.length - 2); + line = line + "\n"; + } + + return line; + } + + async isEOF() { + return (await this.#tp.r.peek(0)) === null; + } +} + const INVALID_RUNE = ["\r", "\n", '"']; function chkOptions(opt: ReadOptions): void { @@ -71,8 +109,9 @@ export async function readMatrix( let lineIndex = 0; chkOptions(opt); + const lineReader = new TextProtoLineReader(reader); for (;;) { - const r = await readRecord(lineIndex, reader, opt); + const r = await readRecord(lineIndex, lineReader, opt); if (r === null) break; lineResult = r; lineIndex++; diff --git a/encoding/csv/_io.ts b/encoding/csv/_io.ts index 8b8fe80827ca..e85837e81813 100644 --- a/encoding/csv/_io.ts +++ b/encoding/csv/_io.ts @@ -1,5 +1,8 @@ -import { TextProtoReader } from "../../textproto/mod.ts"; -import type { BufReader } from "../../io/buffer.ts"; +// Originally ported from Go: +// https://github.com/golang/go/blob/go1.12.5/src/encoding/csv/ +// Copyright 2011 The Go Authors. All rights reserved. BSD license. +// https://github.com/golang/go/blob/master/LICENSE +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. import { assert } from "../../_util/assert.ts"; /** @@ -25,19 +28,32 @@ export const defaultReadOptions: ReadOptions = { trimLeadingSpace: false, }; +export interface LineReader { + readLine(): Promise; + isEOF(): Promise; +} + export async function readRecord( startLine: number, - reader: BufReader, + reader: LineReader, opt: ReadOptions = defaultReadOptions, ): Promise { - const tp = new TextProtoReader(reader); - let line = await readLine(tp); - let lineIndex = startLine + 1; - + const line = await reader.readLine(); if (line === null) return null; if (line.length === 0) { return []; } + + return parseRecord(line, reader, opt, startLine, startLine + 1); +} + +export async function parseRecord( + line: string, + reader: LineReader, + opt: ReadOptions = defaultReadOptions, + startLine: number, + lineIndex: number = startLine, +): Promise | null> { // line starting with comment character is ignored if (opt.comment && line[0] === opt.comment) { return []; @@ -55,7 +71,7 @@ export async function readRecord( parseField: for (;;) { if (opt.trimLeadingSpace) { - line = line.trimLeft(); + line = line.trimStart(); } if (line.length === 0 || !line.startsWith(quote)) { @@ -126,10 +142,10 @@ export async function readRecord( ); break parseField; } - } else if (line.length > 0 || !(await isEOF(tp))) { + } else if (line.length > 0 || !(await reader.isEOF())) { // Hit end of line (copy all data so far). recordBuffer += line; - const r = await readLine(tp); + const r = await reader.readLine(); lineIndex++; line = r ?? ""; // This is a workaround for making this module behave similarly to the encoding/csv/reader.go. fullLine = line; @@ -184,34 +200,6 @@ function runeCount(s: string): number { return Array.from(s).length; } -async function readLine(tp: TextProtoReader): Promise { - let line: string; - const r = await tp.readLine(); - if (r === null) return null; - line = r; - - // For backwards compatibility, drop trailing \r before EOF. - if ((await isEOF(tp)) && line.length > 0 && line[line.length - 1] === "\r") { - line = line.substring(0, line.length - 1); - } - - // Normalize \r\n to \n on all input lines. - if ( - line.length >= 2 && - line[line.length - 2] === "\r" && - line[line.length - 1] === "\n" - ) { - line = line.substring(0, line.length - 2); - line = line + "\n"; - } - - return line; -} - -async function isEOF(tp: TextProtoReader): Promise { - return (await tp.r.peek(0)) === null; -} - /** * A ParseError is returned for parsing errors. * Line numbers are 1-indexed and columns are 0-indexed. diff --git a/encoding/csv/stream.ts b/encoding/csv/stream.ts index 7c00d74b3047..9335d5b58d92 100644 --- a/encoding/csv/stream.ts +++ b/encoding/csv/stream.ts @@ -1,51 +1,101 @@ -import { Buffer, BufReader } from "../../io/buffer.ts"; -import { defaultReadOptions, readRecord } from "./_io.ts"; - -function createTransformStream( - options?: CSVStreamOptions, -): TransformStream> { - let lineIndex = 0; - const buffer = new Buffer(); - const bufReader = BufReader.create(buffer); - return new TransformStream>({ - async transform(chunk, controller) { - buffer.writeSync(chunk); - while (!buffer.empty() || bufReader.buffered() > 0) { - const record = await readRecord(lineIndex, bufReader, options); - if (record === null && buffer.empty()) { - controller.terminate(); - return; - } - - lineIndex++; - if (record && record.length > 0) { - controller.enqueue(record); - } - } - }, - }); -} +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +import { defaultReadOptions, parseRecord } from "./_io.ts"; +import type { LineReader } from "./_io.ts"; +import { TextLineStream } from "../../streams/delimiter.ts"; export interface CSVStreamOptions { separator?: string; comment?: string; } -export class CSVStream { - #transform: TransformStream>; +class StreamLineReader implements LineReader { + #reader: ReadableStreamDefaultReader; + #done = false; + constructor(reader: ReadableStreamDefaultReader) { + this.#reader = reader; + } + + async readLine(): Promise { + const { value, done } = await this.#reader.read(); + if (done) { + this.#done = true; + return null; + } else { + return value!; + } + } + + isEOF(): Promise { + return Promise.resolve(this.#done); + } + + cancel(): void { + this.#reader.cancel(); + } +} + +export class CSVStream implements TransformStream> { + readonly #readable: ReadableStream>; + readonly #decoder: TextDecoderStream; + readonly #options: CSVStreamOptions; + readonly #lineReader: StreamLineReader; + #lineIndex = 0; constructor(options: CSVStreamOptions = defaultReadOptions) { - this.#transform = createTransformStream({ + this.#options = { ...defaultReadOptions, ...options, + }; + + const decoder = new TextDecoderStream(); + const textLine = decoder.readable.pipeThrough(new TextLineStream()); + this.#lineReader = new StreamLineReader(textLine.getReader()); + this.#readable = new ReadableStream>({ + pull: (controller) => this.#pull(controller), }); + this.#decoder = decoder; + } + + async #pull( + controller: ReadableStreamDefaultController>, + ): Promise { + const line = await this.#lineReader.readLine(); + if (line === "") { + // Found an empty line + return this.#pull(controller); + } + if (line === null) { + // Reached to EOF + controller.close(); + this.#lineReader.cancel(); + return; + } + + const record = await parseRecord( + line, + this.#lineReader, + this.#options, + this.#lineIndex, + ); + if (record === null) { + controller.close(); + this.#lineReader.cancel(); + return; + } + + this.#lineIndex++; + if (record.length > 0) { + controller.enqueue(record); + } else { + return this.#pull(controller); + } } get readable(): ReadableStream> { - return this.#transform.readable; + return this.#readable; } get writable(): WritableStream { - return this.#transform.writable; + return this.#decoder.writable; } } diff --git a/encoding/csv/stream_test.ts b/encoding/csv/stream_test.ts index 397b07d0e24a..e75007b54c03 100644 --- a/encoding/csv/stream_test.ts +++ b/encoding/csv/stream_test.ts @@ -1,3 +1,4 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. import { CSVStream } from "./stream.ts"; import { readableStreamFromIterable } from "../../streams/conversion.ts"; import { assertEquals } from "../../testing/asserts.ts"; @@ -5,59 +6,73 @@ import { fromFileUrl, join } from "../../path/mod.ts"; const testdataDir = join(fromFileUrl(import.meta.url), "../../testdata"); -Deno.test("[encoding/csv/stream] CSVStream", async () => { - const file = await Deno.open(join(testdataDir, "simple.csv")); - const readable = file.readable.pipeThrough(new CSVStream()); - const records = [] as Array>; - for await (const record of readable) { - records.push(record); - } - assertEquals(records, [ - ["id", "name"], - ["1", "foobar"], - ["2", "barbaz"], - ]); +Deno.test({ + name: "[encoding/csv/stream] CSVStream", + permissions: { + read: [testdataDir], + }, + fn: async () => { + const file = await Deno.open(join(testdataDir, "simple.csv")); + const readable = file.readable.pipeThrough(new CSVStream()); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foobar"], + ["2", "barbaz"], + ]); + }, }); -Deno.test("[encoding/csv/stream] CSVStream with `comment` option", async () => { - const file = await Deno.open(join(testdataDir, "complex.csv")); - const readable = file.readable.pipeThrough( - new CSVStream({ - comment: "#", - }), - ); - const records = [] as Array>; - for await (const record of readable) { - records.push(record); - } - assertEquals(records, [ - ["id", "name", "email"], - ["1", "deno", "deno@example.com"], - ["2", "node", "node@example.com"], - ["3", "", "test@example.com"], - ]); +Deno.test({ + name: "[encoding/csv/stream] CSVStream with `comment` option", + permissions: { read: [testdataDir] }, + fn: async () => { + const file = await Deno.open(join(testdataDir, "complex.csv")); + const readable = file.readable.pipeThrough( + new CSVStream({ + comment: "#", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name", "email"], + ["1", "deno", "deno@example.com"], + ["2", "node", "node@example.com"], + ["3", "", "test@example.com"], + ]); + }, }); -Deno.test("[encoding/csv/stream] CSVStream with `separator` option", async () => { - const encoder = new TextEncoder(); - const readable = readableStreamFromIterable([ - encoder.encode("id\tname\n"), - encoder.encode("1\tfoo\n"), - encoder.encode("2\tbar\n"), - encoder.encode("3\tbaz\n"), - ]).pipeThrough( - new CSVStream({ - separator: "\t", - }), - ); - const records = [] as Array>; - for await (const record of readable) { - records.push(record); - } - assertEquals(records, [ - ["id", "name"], - ["1", "foo"], - ["2", "bar"], - ["3", "baz"], - ]); +Deno.test({ + name: "[encoding/csv/stream] CSVStream with `separator` option", + permissions: { read: [testdataDir] }, + fn: async () => { + const encoder = new TextEncoder(); + const readable = readableStreamFromIterable([ + encoder.encode("id\tname\n"), + encoder.encode("1\tfoo\n"), + encoder.encode("2\tbar\n"), + encoder.encode("3\tbaz\n"), + ]).pipeThrough( + new CSVStream({ + separator: "\t", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foo"], + ["2", "bar"], + ["3", "baz"], + ]); + }, }); From e197808350b5cb08e640d715ecb0c4e4416520f3 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 7 Mar 2022 23:42:13 +0900 Subject: [PATCH 3/5] fix: added test case for error handling and fixed bug --- encoding/csv/stream.ts | 1 + encoding/csv/stream_test.ts | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/encoding/csv/stream.ts b/encoding/csv/stream.ts index 9335d5b58d92..738316775ed2 100644 --- a/encoding/csv/stream.ts +++ b/encoding/csv/stream.ts @@ -62,6 +62,7 @@ export class CSVStream implements TransformStream> { const line = await this.#lineReader.readLine(); if (line === "") { // Found an empty line + this.#lineIndex++; return this.#pull(controller); } if (line === null) { diff --git a/encoding/csv/stream_test.ts b/encoding/csv/stream_test.ts index e75007b54c03..56f5701ee14f 100644 --- a/encoding/csv/stream_test.ts +++ b/encoding/csv/stream_test.ts @@ -1,7 +1,13 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. import { CSVStream } from "./stream.ts"; +import { ERR_QUOTE, ParseError } from "./_io.ts"; import { readableStreamFromIterable } from "../../streams/conversion.ts"; -import { assertEquals } from "../../testing/asserts.ts"; +import { + assert, + assertEquals, + assertRejects, + assertStringIncludes, +} from "../../testing/asserts.ts"; import { fromFileUrl, join } from "../../path/mod.ts"; const testdataDir = join(fromFileUrl(import.meta.url), "../../testdata"); @@ -51,7 +57,6 @@ Deno.test({ Deno.test({ name: "[encoding/csv/stream] CSVStream with `separator` option", - permissions: { read: [testdataDir] }, fn: async () => { const encoder = new TextEncoder(); const readable = readableStreamFromIterable([ @@ -76,3 +81,28 @@ Deno.test({ ]); }, }); + +Deno.test({ + name: "[encoding/csv/stream] CSVStream with invalid csv", + fn: async () => { + const encoder = new TextEncoder(); + const readable = readableStreamFromIterable([ + encoder.encode("id,name\n"), + encoder.encode("\n"), + encoder.encode("1,foo\n"), + encoder.encode('2,"baz\n'), + ]).pipeThrough( + new CSVStream(), + ); + const reader = readable.getReader(); + assertEquals(await reader.read(), { done: false, value: ["id", "name"] }); + assertEquals(await reader.read(), { done: false, value: ["1", "foo"] }); + await assertRejects(() => reader.read(), (error: unknown) => { + assert(error instanceof ParseError); + assertEquals(error.startLine, 4); + assertEquals(error.line, 5); + assertEquals(error.column, 0); + assertStringIncludes(error.message, ERR_QUOTE); + }); + }, +}); From 31bec60756dc3d09ab0902d268ea88be95bb2bf0 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 4 Apr 2022 08:57:22 +0900 Subject: [PATCH 4/5] fix(encoding/csv/stream): Make `CSVStream` be a `TransformStream` --- encoding/csv/stream.ts | 15 +++++++-------- encoding/csv/stream_test.ts | 30 ++++++++++++++++++------------ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/encoding/csv/stream.ts b/encoding/csv/stream.ts index 738316775ed2..c6bcb867a72f 100644 --- a/encoding/csv/stream.ts +++ b/encoding/csv/stream.ts @@ -34,11 +34,11 @@ class StreamLineReader implements LineReader { } } -export class CSVStream implements TransformStream> { +export class CSVStream implements TransformStream> { readonly #readable: ReadableStream>; - readonly #decoder: TextDecoderStream; readonly #options: CSVStreamOptions; readonly #lineReader: StreamLineReader; + readonly #textLine: TextLineStream; #lineIndex = 0; constructor(options: CSVStreamOptions = defaultReadOptions) { @@ -47,13 +47,12 @@ export class CSVStream implements TransformStream> { ...options, }; - const decoder = new TextDecoderStream(); - const textLine = decoder.readable.pipeThrough(new TextLineStream()); - this.#lineReader = new StreamLineReader(textLine.getReader()); + const textLine = new TextLineStream(); + this.#textLine = textLine; + this.#lineReader = new StreamLineReader(textLine.readable.getReader()); this.#readable = new ReadableStream>({ pull: (controller) => this.#pull(controller), }); - this.#decoder = decoder; } async #pull( @@ -96,7 +95,7 @@ export class CSVStream implements TransformStream> { return this.#readable; } - get writable(): WritableStream { - return this.#decoder.writable; + get writable(): WritableStream { + return this.#textLine.writable; } } diff --git a/encoding/csv/stream_test.ts b/encoding/csv/stream_test.ts index 56f5701ee14f..eeabddb5775c 100644 --- a/encoding/csv/stream_test.ts +++ b/encoding/csv/stream_test.ts @@ -19,7 +19,9 @@ Deno.test({ }, fn: async () => { const file = await Deno.open(join(testdataDir, "simple.csv")); - const readable = file.readable.pipeThrough(new CSVStream()); + const readable = file.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new CSVStream()); const records = [] as Array>; for await (const record of readable) { records.push(record); @@ -37,11 +39,13 @@ Deno.test({ permissions: { read: [testdataDir] }, fn: async () => { const file = await Deno.open(join(testdataDir, "complex.csv")); - const readable = file.readable.pipeThrough( - new CSVStream({ - comment: "#", - }), - ); + const readable = file.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough( + new CSVStream({ + comment: "#", + }), + ); const records = [] as Array>; for await (const record of readable) { records.push(record); @@ -64,11 +68,13 @@ Deno.test({ encoder.encode("1\tfoo\n"), encoder.encode("2\tbar\n"), encoder.encode("3\tbaz\n"), - ]).pipeThrough( - new CSVStream({ - separator: "\t", - }), - ); + ]) + .pipeThrough(new TextDecoderStream()) + .pipeThrough( + new CSVStream({ + separator: "\t", + }), + ); const records = [] as Array>; for await (const record of readable) { records.push(record); @@ -91,7 +97,7 @@ Deno.test({ encoder.encode("\n"), encoder.encode("1,foo\n"), encoder.encode('2,"baz\n'), - ]).pipeThrough( + ]).pipeThrough(new TextDecoderStream()).pipeThrough( new CSVStream(), ); const reader = readable.getReader(); From 2723e7913b2e85ee7f2afb88aaeb273980b69c18 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 4 Apr 2022 08:58:49 +0900 Subject: [PATCH 5/5] fix(encoding/csv): Make `ParseError` extend `SyntaxError` instaed of `Error` --- encoding/csv/_io.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/encoding/csv/_io.ts b/encoding/csv/_io.ts index e85837e81813..9e7b38c3094b 100644 --- a/encoding/csv/_io.ts +++ b/encoding/csv/_io.ts @@ -204,7 +204,7 @@ function runeCount(s: string): number { * A ParseError is returned for parsing errors. * Line numbers are 1-indexed and columns are 0-indexed. */ -export class ParseError extends Error { +export class ParseError extends SyntaxError { /** Line where the record starts*/ startLine: number; /** Line where the error occurred */