diff --git a/encoding/csv.ts b/encoding/csv.ts index 8d3010026571..a963b998c90d 100644 --- a/encoding/csv.ts +++ b/encoding/csv.ts @@ -8,7 +8,22 @@ import { BufReader } from "../io/buffer.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { StringReader } from "../io/readers.ts"; import { assert } from "../_util/assert.ts"; +import { + ERR_FIELD_COUNT, + ERR_INVALID_DELIM, + ParseError, + readRecord, +} from "./csv/_io.ts"; +import type { LineReader, ReadOptions } from "./csv/_io.ts"; +export { + ERR_BARE_QUOTE, + ERR_FIELD_COUNT, + ERR_INVALID_DELIM, + ERR_QUOTE, + ParseError, +} from "./csv/_io.ts"; +export type { ReadOptions } from "./csv/_io.ts"; export { NEWLINE, stringify, StringifyError } from "./csv_stringify.ts"; export type { @@ -18,66 +33,45 @@ export type { StringifyOptions, } from "./csv_stringify.ts"; -const INVALID_RUNE = ["\r", "\n", '"']; +class TextProtoLineReader implements LineReader { + #tp: TextProtoReader; + constructor(bufReader: BufReader) { + this.#tp = new TextProtoReader(bufReader); + } -export const ERR_BARE_QUOTE = 'bare " in non-quoted-field'; -export const ERR_QUOTE = 'extraneous or missing " in quoted-field'; -export const ERR_INVALID_DELIM = "Invalid Delimiter"; -export const ERR_FIELD_COUNT = "wrong number of fields"; + async readLine() { + let line: string; + const r = await this.#tp.readLine(); + if (r === null) return null; + line = r; -/** - * A ParseError is returned for parsing errors. - * Line numbers are 1-indexed and columns are 0-indexed. - */ -export class ParseError extends Error { - /** Line where the record starts*/ - startLine: number; - /** Line where the error occurred */ - line: number; - /** Column (rune index) where the error occurred */ - column: number | null; - - constructor( - start: number, - line: number, - column: number | null, - message: string, - ) { - super(); - this.startLine = start; - this.column = column; - this.line = line; + // For backwards compatibility, drop trailing \r before EOF. + if ( + (await this.isEOF()) && line.length > 0 && line[line.length - 1] === "\r" + ) { + line = line.substring(0, line.length - 1); + } - if (message === ERR_FIELD_COUNT) { - this.message = `record on line ${line}: ${message}`; - } else if (start !== line) { - this.message = - `record on line ${start}; parse error on line ${line}, column ${column}: ${message}`; - } else { - this.message = - `parse error on line ${line}, column ${column}: ${message}`; + // Normalize \r\n to \n on all input lines. + if ( + line.length >= 2 && + line[line.length - 2] === "\r" && + line[line.length - 1] === "\n" + ) { + line = line.substring(0, line.length - 2); + line = line + "\n"; } + + return line; } -} -/** - * @property separator - Character which separates values. Default: ',' - * @property comment - Character to start a comment. Default: '#' - * @property trimLeadingSpace - Flag to trim the leading space of the value. - * Default: 'false' - * @property lazyQuotes - Allow unquoted quote in a quoted field or non double - * quoted quotes in quoted field. Default: 'false' - * @property fieldsPerRecord - Enabling the check of fields for each row. - * If == 0, first row is used as referral for the number of fields. - */ -export interface ReadOptions { - separator?: string; - comment?: string; - trimLeadingSpace?: boolean; - lazyQuotes?: boolean; - fieldsPerRecord?: number; + async isEOF() { + return (await this.#tp.r.peek(0)) === null; + } } +const INVALID_RUNE = ["\r", "\n", '"']; + function chkOptions(opt: ReadOptions): void { if (!opt.separator) { opt.separator = ","; @@ -94,193 +88,6 @@ function chkOptions(opt: ReadOptions): void { } } -async function readRecord( - startLine: number, - reader: BufReader, - opt: ReadOptions = { separator: ",", trimLeadingSpace: false }, -): Promise { - const tp = new TextProtoReader(reader); - let line = await readLine(tp); - let lineIndex = startLine + 1; - - if (line === null) return null; - if (line.length === 0) { - return []; - } - // line starting with comment character is ignored - if (opt.comment && line[0] === opt.comment) { - return []; - } - - assert(opt.separator != null); - - let fullLine = line; - let quoteError: ParseError | null = null; - const quote = '"'; - const quoteLen = quote.length; - const separatorLen = opt.separator.length; - let recordBuffer = ""; - const fieldIndexes = [] as number[]; - parseField: - for (;;) { - if (opt.trimLeadingSpace) { - line = line.trimLeft(); - } - - if (line.length === 0 || !line.startsWith(quote)) { - // Non-quoted string field - const i = line.indexOf(opt.separator); - let field = line; - if (i >= 0) { - field = field.substring(0, i); - } - // Check to make sure a quote does not appear in field. - if (!opt.lazyQuotes) { - const j = field.indexOf(quote); - if (j >= 0) { - const col = runeCount( - fullLine.slice(0, fullLine.length - line.slice(j).length), - ); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_BARE_QUOTE, - ); - break parseField; - } - } - recordBuffer += field; - fieldIndexes.push(recordBuffer.length); - if (i >= 0) { - line = line.substring(i + separatorLen); - continue parseField; - } - break parseField; - } else { - // Quoted string field - line = line.substring(quoteLen); - for (;;) { - const i = line.indexOf(quote); - if (i >= 0) { - // Hit next quote. - recordBuffer += line.substring(0, i); - line = line.substring(i + quoteLen); - if (line.startsWith(quote)) { - // `""` sequence (append quote). - recordBuffer += quote; - line = line.substring(quoteLen); - } else if (line.startsWith(opt.separator)) { - // `","` sequence (end of field). - line = line.substring(separatorLen); - fieldIndexes.push(recordBuffer.length); - continue parseField; - } else if (0 === line.length) { - // `"\n` sequence (end of line). - fieldIndexes.push(recordBuffer.length); - break parseField; - } else if (opt.lazyQuotes) { - // `"` sequence (bare quote). - recordBuffer += quote; - } else { - // `"*` sequence (invalid non-escaped quote). - const col = runeCount( - fullLine.slice(0, fullLine.length - line.length - quoteLen), - ); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - } else if (line.length > 0 || !(await isEOF(tp))) { - // Hit end of line (copy all data so far). - recordBuffer += line; - const r = await readLine(tp); - lineIndex++; - line = r ?? ""; // This is a workaround for making this module behave similarly to the encoding/csv/reader.go. - fullLine = line; - if (r === null) { - // Abrupt end of file (EOF or error). - if (!opt.lazyQuotes) { - const col = runeCount(fullLine); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - fieldIndexes.push(recordBuffer.length); - break parseField; - } - recordBuffer += "\n"; // preserve line feed (This is because TextProtoReader removes it.) - } else { - // Abrupt end of file (EOF on error). - if (!opt.lazyQuotes) { - const col = runeCount(fullLine); - quoteError = new ParseError( - startLine + 1, - lineIndex, - col, - ERR_QUOTE, - ); - break parseField; - } - fieldIndexes.push(recordBuffer.length); - break parseField; - } - } - } - } - if (quoteError) { - throw quoteError; - } - const result = [] as string[]; - let preIdx = 0; - for (const i of fieldIndexes) { - result.push(recordBuffer.slice(preIdx, i)); - preIdx = i; - } - return result; -} - -async function isEOF(tp: TextProtoReader): Promise { - return (await tp.r.peek(0)) === null; -} - -function runeCount(s: string): number { - // Array.from considers the surrogate pair. - return Array.from(s).length; -} - -async function readLine(tp: TextProtoReader): Promise { - let line: string; - const r = await tp.readLine(); - if (r === null) return null; - line = r; - - // For backwards compatibility, drop trailing \r before EOF. - if ((await isEOF(tp)) && line.length > 0 && line[line.length - 1] === "\r") { - line = line.substring(0, line.length - 1); - } - - // Normalize \r\n to \n on all input lines. - if ( - line.length >= 2 && - line[line.length - 2] === "\r" && - line[line.length - 1] === "\n" - ) { - line = line.substring(0, line.length - 2); - line = line + "\n"; - } - - return line; -} - /** * Parse the CSV from the `reader` with the options provided and return `string[][]`. * @@ -302,8 +109,9 @@ export async function readMatrix( let lineIndex = 0; chkOptions(opt); + const lineReader = new TextProtoLineReader(reader); for (;;) { - const r = await readRecord(lineIndex, reader, opt); + const r = await readRecord(lineIndex, lineReader, opt); if (r === null) break; lineResult = r; lineIndex++; diff --git a/encoding/csv/_io.ts b/encoding/csv/_io.ts new file mode 100644 index 000000000000..9e7b38c3094b --- /dev/null +++ b/encoding/csv/_io.ts @@ -0,0 +1,241 @@ +// Originally ported from Go: +// https://github.com/golang/go/blob/go1.12.5/src/encoding/csv/ +// Copyright 2011 The Go Authors. All rights reserved. BSD license. +// https://github.com/golang/go/blob/master/LICENSE +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +import { assert } from "../../_util/assert.ts"; + +/** + * @property separator - Character which separates values. Default: ',' + * @property comment - Character to start a comment. Default: '#' + * @property trimLeadingSpace - Flag to trim the leading space of the value. + * Default: 'false' + * @property lazyQuotes - Allow unquoted quote in a quoted field or non double + * quoted quotes in quoted field. Default: 'false' + * @property fieldsPerRecord - Enabling the check of fields for each row. + * If == 0, first row is used as referral for the number of fields. + */ +export interface ReadOptions { + separator?: string; + comment?: string; + trimLeadingSpace?: boolean; + lazyQuotes?: boolean; + fieldsPerRecord?: number; +} + +export const defaultReadOptions: ReadOptions = { + separator: ",", + trimLeadingSpace: false, +}; + +export interface LineReader { + readLine(): Promise; + isEOF(): Promise; +} + +export async function readRecord( + startLine: number, + reader: LineReader, + opt: ReadOptions = defaultReadOptions, +): Promise { + const line = await reader.readLine(); + if (line === null) return null; + if (line.length === 0) { + return []; + } + + return parseRecord(line, reader, opt, startLine, startLine + 1); +} + +export async function parseRecord( + line: string, + reader: LineReader, + opt: ReadOptions = defaultReadOptions, + startLine: number, + lineIndex: number = startLine, +): Promise | null> { + // line starting with comment character is ignored + if (opt.comment && line[0] === opt.comment) { + return []; + } + + assert(opt.separator != null); + + let fullLine = line; + let quoteError: ParseError | null = null; + const quote = '"'; + const quoteLen = quote.length; + const separatorLen = opt.separator.length; + let recordBuffer = ""; + const fieldIndexes = [] as number[]; + parseField: + for (;;) { + if (opt.trimLeadingSpace) { + line = line.trimStart(); + } + + if (line.length === 0 || !line.startsWith(quote)) { + // Non-quoted string field + const i = line.indexOf(opt.separator); + let field = line; + if (i >= 0) { + field = field.substring(0, i); + } + // Check to make sure a quote does not appear in field. + if (!opt.lazyQuotes) { + const j = field.indexOf(quote); + if (j >= 0) { + const col = runeCount( + fullLine.slice(0, fullLine.length - line.slice(j).length), + ); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_BARE_QUOTE, + ); + break parseField; + } + } + recordBuffer += field; + fieldIndexes.push(recordBuffer.length); + if (i >= 0) { + line = line.substring(i + separatorLen); + continue parseField; + } + break parseField; + } else { + // Quoted string field + line = line.substring(quoteLen); + for (;;) { + const i = line.indexOf(quote); + if (i >= 0) { + // Hit next quote. + recordBuffer += line.substring(0, i); + line = line.substring(i + quoteLen); + if (line.startsWith(quote)) { + // `""` sequence (append quote). + recordBuffer += quote; + line = line.substring(quoteLen); + } else if (line.startsWith(opt.separator)) { + // `","` sequence (end of field). + line = line.substring(separatorLen); + fieldIndexes.push(recordBuffer.length); + continue parseField; + } else if (0 === line.length) { + // `"\n` sequence (end of line). + fieldIndexes.push(recordBuffer.length); + break parseField; + } else if (opt.lazyQuotes) { + // `"` sequence (bare quote). + recordBuffer += quote; + } else { + // `"*` sequence (invalid non-escaped quote). + const col = runeCount( + fullLine.slice(0, fullLine.length - line.length - quoteLen), + ); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + } else if (line.length > 0 || !(await reader.isEOF())) { + // Hit end of line (copy all data so far). + recordBuffer += line; + const r = await reader.readLine(); + lineIndex++; + line = r ?? ""; // This is a workaround for making this module behave similarly to the encoding/csv/reader.go. + fullLine = line; + if (r === null) { + // Abrupt end of file (EOF or error). + if (!opt.lazyQuotes) { + const col = runeCount(fullLine); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + fieldIndexes.push(recordBuffer.length); + break parseField; + } + recordBuffer += "\n"; // preserve line feed (This is because TextProtoReader removes it.) + } else { + // Abrupt end of file (EOF on error). + if (!opt.lazyQuotes) { + const col = runeCount(fullLine); + quoteError = new ParseError( + startLine + 1, + lineIndex, + col, + ERR_QUOTE, + ); + break parseField; + } + fieldIndexes.push(recordBuffer.length); + break parseField; + } + } + } + } + if (quoteError) { + throw quoteError; + } + const result = [] as string[]; + let preIdx = 0; + for (const i of fieldIndexes) { + result.push(recordBuffer.slice(preIdx, i)); + preIdx = i; + } + return result; +} + +function runeCount(s: string): number { + // Array.from considers the surrogate pair. + return Array.from(s).length; +} + +/** + * A ParseError is returned for parsing errors. + * Line numbers are 1-indexed and columns are 0-indexed. + */ +export class ParseError extends SyntaxError { + /** Line where the record starts*/ + startLine: number; + /** Line where the error occurred */ + line: number; + /** Column (rune index) where the error occurred */ + column: number | null; + + constructor( + start: number, + line: number, + column: number | null, + message: string, + ) { + super(); + this.startLine = start; + this.column = column; + this.line = line; + + if (message === ERR_FIELD_COUNT) { + this.message = `record on line ${line}: ${message}`; + } else if (start !== line) { + this.message = + `record on line ${start}; parse error on line ${line}, column ${column}: ${message}`; + } else { + this.message = + `parse error on line ${line}, column ${column}: ${message}`; + } + } +} + +export const ERR_BARE_QUOTE = 'bare " in non-quoted-field'; +export const ERR_QUOTE = 'extraneous or missing " in quoted-field'; +export const ERR_INVALID_DELIM = "Invalid Delimiter"; +export const ERR_FIELD_COUNT = "wrong number of fields"; diff --git a/encoding/csv/stream.ts b/encoding/csv/stream.ts new file mode 100644 index 000000000000..c6bcb867a72f --- /dev/null +++ b/encoding/csv/stream.ts @@ -0,0 +1,101 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +import { defaultReadOptions, parseRecord } from "./_io.ts"; +import type { LineReader } from "./_io.ts"; +import { TextLineStream } from "../../streams/delimiter.ts"; + +export interface CSVStreamOptions { + separator?: string; + comment?: string; +} + +class StreamLineReader implements LineReader { + #reader: ReadableStreamDefaultReader; + #done = false; + constructor(reader: ReadableStreamDefaultReader) { + this.#reader = reader; + } + + async readLine(): Promise { + const { value, done } = await this.#reader.read(); + if (done) { + this.#done = true; + return null; + } else { + return value!; + } + } + + isEOF(): Promise { + return Promise.resolve(this.#done); + } + + cancel(): void { + this.#reader.cancel(); + } +} + +export class CSVStream implements TransformStream> { + readonly #readable: ReadableStream>; + readonly #options: CSVStreamOptions; + readonly #lineReader: StreamLineReader; + readonly #textLine: TextLineStream; + #lineIndex = 0; + + constructor(options: CSVStreamOptions = defaultReadOptions) { + this.#options = { + ...defaultReadOptions, + ...options, + }; + + const textLine = new TextLineStream(); + this.#textLine = textLine; + this.#lineReader = new StreamLineReader(textLine.readable.getReader()); + this.#readable = new ReadableStream>({ + pull: (controller) => this.#pull(controller), + }); + } + + async #pull( + controller: ReadableStreamDefaultController>, + ): Promise { + const line = await this.#lineReader.readLine(); + if (line === "") { + // Found an empty line + this.#lineIndex++; + return this.#pull(controller); + } + if (line === null) { + // Reached to EOF + controller.close(); + this.#lineReader.cancel(); + return; + } + + const record = await parseRecord( + line, + this.#lineReader, + this.#options, + this.#lineIndex, + ); + if (record === null) { + controller.close(); + this.#lineReader.cancel(); + return; + } + + this.#lineIndex++; + if (record.length > 0) { + controller.enqueue(record); + } else { + return this.#pull(controller); + } + } + + get readable(): ReadableStream> { + return this.#readable; + } + + get writable(): WritableStream { + return this.#textLine.writable; + } +} diff --git a/encoding/csv/stream_test.ts b/encoding/csv/stream_test.ts new file mode 100644 index 000000000000..eeabddb5775c --- /dev/null +++ b/encoding/csv/stream_test.ts @@ -0,0 +1,114 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +import { CSVStream } from "./stream.ts"; +import { ERR_QUOTE, ParseError } from "./_io.ts"; +import { readableStreamFromIterable } from "../../streams/conversion.ts"; +import { + assert, + assertEquals, + assertRejects, + assertStringIncludes, +} from "../../testing/asserts.ts"; +import { fromFileUrl, join } from "../../path/mod.ts"; + +const testdataDir = join(fromFileUrl(import.meta.url), "../../testdata"); + +Deno.test({ + name: "[encoding/csv/stream] CSVStream", + permissions: { + read: [testdataDir], + }, + fn: async () => { + const file = await Deno.open(join(testdataDir, "simple.csv")); + const readable = file.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new CSVStream()); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foobar"], + ["2", "barbaz"], + ]); + }, +}); + +Deno.test({ + name: "[encoding/csv/stream] CSVStream with `comment` option", + permissions: { read: [testdataDir] }, + fn: async () => { + const file = await Deno.open(join(testdataDir, "complex.csv")); + const readable = file.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough( + new CSVStream({ + comment: "#", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name", "email"], + ["1", "deno", "deno@example.com"], + ["2", "node", "node@example.com"], + ["3", "", "test@example.com"], + ]); + }, +}); + +Deno.test({ + name: "[encoding/csv/stream] CSVStream with `separator` option", + fn: async () => { + const encoder = new TextEncoder(); + const readable = readableStreamFromIterable([ + encoder.encode("id\tname\n"), + encoder.encode("1\tfoo\n"), + encoder.encode("2\tbar\n"), + encoder.encode("3\tbaz\n"), + ]) + .pipeThrough(new TextDecoderStream()) + .pipeThrough( + new CSVStream({ + separator: "\t", + }), + ); + const records = [] as Array>; + for await (const record of readable) { + records.push(record); + } + assertEquals(records, [ + ["id", "name"], + ["1", "foo"], + ["2", "bar"], + ["3", "baz"], + ]); + }, +}); + +Deno.test({ + name: "[encoding/csv/stream] CSVStream with invalid csv", + fn: async () => { + const encoder = new TextEncoder(); + const readable = readableStreamFromIterable([ + encoder.encode("id,name\n"), + encoder.encode("\n"), + encoder.encode("1,foo\n"), + encoder.encode('2,"baz\n'), + ]).pipeThrough(new TextDecoderStream()).pipeThrough( + new CSVStream(), + ); + const reader = readable.getReader(); + assertEquals(await reader.read(), { done: false, value: ["id", "name"] }); + assertEquals(await reader.read(), { done: false, value: ["1", "foo"] }); + await assertRejects(() => reader.read(), (error: unknown) => { + assert(error instanceof ParseError); + assertEquals(error.startLine, 4); + assertEquals(error.line, 5); + assertEquals(error.column, 0); + assertStringIncludes(error.message, ERR_QUOTE); + }); + }, +}); diff --git a/encoding/testdata/complex.csv b/encoding/testdata/complex.csv new file mode 100644 index 000000000000..982825ef2a49 --- /dev/null +++ b/encoding/testdata/complex.csv @@ -0,0 +1,7 @@ +id,name,email +1,deno,deno@example.com +# This is a comment + +2,node,"node@example.com" + +3,,"test@example.com" diff --git a/encoding/testdata/simple.csv b/encoding/testdata/simple.csv new file mode 100644 index 000000000000..dcc7af2cf694 --- /dev/null +++ b/encoding/testdata/simple.csv @@ -0,0 +1,3 @@ +id,name +1,foobar +2,barbaz \ No newline at end of file