diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml new file mode 100644 index 0000000..3bd37dc --- /dev/null +++ b/.github/workflows/ci-build.yml @@ -0,0 +1,36 @@ +name: Build unstable + +on: [push] + +concurrency: + group: unstable +# cancel-in-progress: true + + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v4 + with: + python-version: "3.9" +# cache: 'pip' + - name: Cleanup more disk space + run: sudo rm -rf /usr/share/dotnet && sudo rm -rf /opt/ghc && sudo rm -rf "/usr/local/share/boost" && sudo rm -rf "$AGENT_TOOLSDIRECTORY" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install --upgrade flake8 pytest pycodestyle + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + python -m pytest --rootdir . \ No newline at end of file diff --git a/.gitignore b/.gitignore index 15e1172..c37af38 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ env *.pyc *.egg-info +.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 60505e3..0000000 --- a/.travis.yml +++ /dev/null @@ -1,12 +0,0 @@ -language: python -python: - - "3.6" -install: - - pip install pytest - - pip install pytest-cov - - pip install coveralls - - pip install -e . -script: - - pytest --cov=lm_dataformat/ -after_success: - - coveralls diff --git a/README.md b/README.md index aecfe90..292b5f3 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,11 @@ -# LM_Dataformat [![Build Status](https://travis-ci.org/leogao2/lm_dataformat.svg?branch=master)](https://travis-ci.org/leogao2/lm_dataformat) [![Coverage Status](https://coveralls.io/repos/github/leogao2/lm_dataformat/badge.svg?branch=master)](https://coveralls.io/github/leogao2/lm_dataformat?branch=master) +# LM_Dataformat + +[![Build unstable](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml/badge.svg)](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml) Utilities for storing data for LM training. +**NOTE**: The original project seems abandoned, I've continued the development to support ore data formats and integration with stackexchange_dataset + ## Basic Usage diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index ee1955b..e6ef6ba 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -1,24 +1,39 @@ -import os -import zstandard -import ujson as json -import time -import tarfile import codecs -from functools import reduce -import jsonlines -import io -from zipfile import ZipFile import gzip -from math import ceil +import io import mmap import multiprocessing as mp +import os +import re +import tarfile +import time +from functools import reduce +from math import ceil from pathlib import Path +import zipfile + +import jsonlines +import ujson as json +import zstandard + +VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', + '.txt', '.zip', '.tar.gz', '.json.gz', '.gz'] + +# LM_DATAFORMAT_FORMAT = "lm_dataformat" +TEXT_FORMAT = "txt" +JSONL_FORMAT = "jsonl" + +SUPPORTED_FORMATS = [ + TEXT_FORMAT, + # LM_DATAFORMAT_FORMAT, + JSONL_FORMAT +] -VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', '.txt', '.zip', '.tar.gz', '.json.gz', '.gz'] def has_valid_extension(file): return any([file.endswith(ext) for ext in VALID_EXTENSIONS]) + def _listdir_or_file(x): if isinstance(x, list): return reduce(lambda x, y: x + y, map(listdir_or_file, sorted(x))) @@ -29,9 +44,11 @@ def _listdir_or_file(x): else: raise FileNotFoundError(f"{x} not found") + def listdir_or_file(x): return list(filter(has_valid_extension, _listdir_or_file(x))) + def tarfile_reader(file, streaming=False): # we need our own tarfile parser because `tarfile` doesn't work well for # big tarfiles; it seems to be reading the entire file to get a list of @@ -49,10 +66,10 @@ def tarfile_reader(file, streaming=False): # https://www.gnu.org/software/tar/manual/html_node/Standard.html # end at 135 not 136 because of \0 terminator - if hdr[124:135] == b'\0'*11: + if hdr[124:135] == b'\0' * 11: # end of record break - + fname = hdr[:100].split(b'\0')[0] # if the file is too big to fit in the size field, tarfiles will actually @@ -71,11 +88,13 @@ def tarfile_reader(file, streaming=False): if type == 'x': meta = file.read(padded_size)[:size] + def kv(x): return x.decode('utf-8').split(' ')[1].split('=') + paxfileattrs = { - kv(x)[0]: kv(x)[1] - for x in meta.split(b'\n') if x + kv(x)[0]: kv(x)[1] + for x in meta.split(b'\n') if x } paxfilesize = int(paxfileattrs['size']) @@ -101,6 +120,7 @@ def kv(x): yield file.read(padded_size)[:size] offset += padded_size + def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key='text'): for ob in jsonl_reader: # naive jsonl where each object is just the string itself, with no meta. For legacy compatibility. @@ -123,7 +143,7 @@ def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key=' class Reader: def __init__(self, in_path): self.in_path = in_path - + def stream_data(self, get_meta=False, threaded=False): if not threaded: yield from self._stream_data(get_meta) @@ -136,7 +156,7 @@ def stream_data(self, get_meta=False, threaded=False): res = q.get() if res is None: break yield res - + def _stream_data_threaded(self, q, get_meta=False): for data in self._stream_data(get_meta): q.put(data) @@ -166,14 +186,14 @@ def _stream_data(self, get_meta=False, jsonl_key="text"): elif f.endswith('.jsonl.zst'): yield from self.read_jsonl_zst(f, get_meta, key=jsonl_key) elif f.endswith('.jsonl.zst.tar'): - yield from self.read_jsonl_tar(f, get_meta, jsonl_key=key) + yield from self.read_jsonl_tar(f, get_meta, key=jsonl_key) elif f.endswith('.json.zst'): assert not get_meta yield from self.read_json(f) elif f.endswith('.txt'): assert not get_meta - + yield from self.read_txt(f) elif f.endswith('.zip'): assert not get_meta @@ -185,11 +205,11 @@ def _stream_data(self, get_meta=False, jsonl_key="text"): yield from self.read_tgz(f) elif f.endswith('.json.gz'): assert not get_meta - + yield from self.read_jsongz(f) elif f.endswith('.gz'): assert not get_meta - + yield from self.read_gz(f) else: # shouldn't be reached @@ -200,23 +220,23 @@ def read_txt(self, file): yield fh.read() def read_zip(self, file): - archive = ZipFile(file, 'r') + archive = zipfile.ZipFile(file, 'r') for f in archive.namelist(): yield archive.read(f).decode('UTF-8') def read_tgz(self, file): gz = gzip.open(file) yield from (x.decode('utf-8') for x in tarfile_reader(gz, streaming=False)) - - def read_gz(self, file): + + def read_gz(self, file): with gzip.open(file, 'rb') as f: for line in f: yield line.decode('utf-8') - - def read_jsongz(self, file): + + def read_jsongz(self, file): for line in self.read_gz(file): yield json.loads(line) - + def read_json(self, file): with open(file, 'rb') as fh: cctx = zstandard.ZstdDecompressor() @@ -240,7 +260,7 @@ def read_dat(self, file): def read_jsonl(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'): with jsonlines.open(file) as rdr: yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key) - + def read_jsonl_zst(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'): with open(file, 'rb') as fh: cctx = zstandard.ZstdDecompressor() @@ -256,7 +276,7 @@ def read_jsonl_tar(self, file, get_meta=False, autojoin_paragraphs=True, para_jo rdr = jsonlines.Reader(reader) yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key) f.close() - + def read_owt(self, file): tar = tarfile.open(file, encoding='utf-8') utf8reader = codecs.getreader('utf-8') @@ -283,19 +303,21 @@ def __init__(self, out_dir, compression_level=3, threads=8): self.out_dir = out_dir os.makedirs(out_dir, exist_ok=True) self.i = 0 - + self.time = int(time.time()) + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) self.compressor = self.cctx.stream_writer(self.fh) - - + def add_data(self, data, meta={}): self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') - - def commit(self, archive_name='default'): - fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' + + def commit(self, archive_name='default', extension="jsonl"): + fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(self.time), archive_name, + extension) + # fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' self.compressor.flush(zstandard.FLUSH_FRAME) - + self.fh.flush() self.fh.close() os.rename(self.out_dir + '/current_chunk_incomplete', fname) @@ -313,10 +335,10 @@ def __init__(self, out_dir): self.i = 0 if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 - + def add_data(self, data): self.data.append(data) - + def commit(self, archive_name=None): # TODO: streaming cctx = zstandard.ZstdCompressor(level=3) @@ -324,7 +346,8 @@ def commit(self, archive_name=None): if archive_name is None: archive_name = str(int(time.time())) - res = b''.join(map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data))) + res = b''.join( + map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data))) cdata = cctx.compress(res) with open(self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.dat.zst', 'wb') as fh: @@ -333,24 +356,37 @@ def commit(self, archive_name=None): self.i += 1 self.data = [] -class JSONArchive: + +class TextArchive(Archive): def __init__(self, out_dir): - self.out_dir = out_dir - os.makedirs(out_dir, exist_ok=True) - self.data = [] - self.i = 0 - if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: - self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 - - def add_data(self, data): - self.data.append(data) - - def commit(self): - cctx = zstandard.ZstdCompressor(level=3) - - cdata = cctx.compress(json.dumps(self.data).encode('UTF-8')) - with open(self.out_dir + '/data_' + str(self.i) + '_' + str(int(time.time())) + '.json.zst', 'wb') as fh: - fh.write(cdata) + super().__init__(out_dir) - self.i += 1 - self.data = [] + def add_data(self, data, **kwargs): + self.compressor.write(TextArchive.to_text(data).encode('UTF-8') + b'\n') + + def commit(self, archive_name='default', extension="txt"): + super().commit(archive_name, extension) + + @staticmethod + def filter_newlines(text): + return re.sub("\n{3,}", "\n\n", text) + + @staticmethod + def handle_unicode_errors(txt): + return txt.encode('utf-8', 'replace').decode() + + @staticmethod + def to_text(data): + out_str = "" + out_str += 'Q:\n\n' + out_str += '{}\n\n'.format(data['question']['title']) + out_str += '{}\n\n'.format(data['question']['body']) + for answer in data['answers']: + out_str += 'A:\n\n{}\n\n'.format(answer['body']) + + try: + out_str = TextArchive.filter_newlines(out_str) + except: + out_str = TextArchive.filter_newlines(TextArchive.handle_unicode_errors(out_str)) + + return out_str diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..54fc9ba --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +testpaths = test \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bf0c7f1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +jsonlines +ujson +zstandard \ No newline at end of file diff --git a/test/blns.jsonl.zst.tar b/test/resources/blns.jsonl.zst.tar similarity index 100% rename from test/blns.jsonl.zst.tar rename to test/resources/blns.jsonl.zst.tar diff --git a/test/blns.txt b/test/resources/blns.txt similarity index 100% rename from test/blns.txt rename to test/resources/blns.txt diff --git a/test/blns.txt.tar.gz b/test/resources/blns.txt.tar.gz similarity index 100% rename from test/blns.txt.tar.gz rename to test/resources/blns.txt.tar.gz diff --git a/test/blns.txt.zip b/test/resources/blns.txt.zip similarity index 100% rename from test/blns.txt.zip rename to test/resources/blns.txt.zip diff --git a/test/testtarfile.tar b/test/resources/testtarfile.tar similarity index 100% rename from test/testtarfile.tar rename to test/resources/testtarfile.tar diff --git a/test/test_dat_archive_reader.py b/test/test_dat_archive_reader.py index 0fd0a5f..c7403c5 100644 --- a/test/test_dat_archive_reader.py +++ b/test/test_dat_archive_reader.py @@ -3,14 +3,16 @@ import hashlib import shutil + def sha256str(s): h = hashlib.sha256() h.update(s) return h.hexdigest() + def test_dat(): archive = lmd.DatArchive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123') archive.add_data(blns) @@ -27,31 +29,13 @@ def test_dat(): assert data[3] == 'testing 123456789' shutil.rmtree('test_dir') -def test_json(): - archive = lmd.JSONArchive('test_dir') - blns = open('test/blns.txt').read() - archive.add_data(blns) - archive.add_data('testing 123') - archive.add_data(blns) - archive.add_data('testing 123456789') - archive.commit() - - reader = lmd.Reader('test_dir') - - data = list(reader.stream_data()) - - assert data[0] == blns - assert data[1] == 'testing 123' - assert data[2] == blns - assert data[3] == 'testing 123456789' - shutil.rmtree('test_dir') def test_jsonl(): archive = lmd.Archive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123', meta={'testing': 123}) - archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) + archive.add_data(blns, meta={'testing2': 456, 'testing': ['a', 'b']}) archive.add_data('testing 123456789') archive.commit() @@ -61,16 +45,17 @@ def test_jsonl(): assert data[0] == (blns, {}) assert data[1] == ('testing 123', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) shutil.rmtree('test_dir') + def test_jsonl_paras(): archive = lmd.Archive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data(['testing 123', 'testing 345'], meta={'testing': 123}) - archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) + archive.add_data(blns, meta={'testing2': 456, 'testing': ['a', 'b']}) archive.add_data('testing 123456789') archive.commit() @@ -80,56 +65,61 @@ def test_jsonl_paras(): assert data[0] == (blns, {}) assert data[1] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) shutil.rmtree('test_dir') + def test_jsonl_tar(): - blns = open('test/blns.txt').read() - reader = lmd.Reader('test/blns.jsonl.zst.tar') + blns = open('test/resources/blns.txt').read() + reader = lmd.Reader('test/resources/blns.jsonl.zst.tar') data = list(reader.stream_data(get_meta=True)) assert data[0] == (blns, {}) assert data[1] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) assert data[4] == (blns, {}) assert data[5] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[6] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[6] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[7] == ('testing 123456789', {}) + def test_txt_read(): - reader = lmd.Reader('test/blns.txt') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) assert data[0] == blns assert len(data) == 1 + def test_zip_read(): - reader = lmd.Reader('test/blns.txt.zip') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt.zip') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) assert data[0] == blns assert len(data) == 1 + def test_tgz_read(): - reader = lmd.Reader('test/blns.txt.tar.gz') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt.tar.gz') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) assert data[0] == blns assert len(data) == 1 + def test_tarfile_reader(): - rdr = lmd.tarfile_reader(open('test/testtarfile.tar', 'rb'), streaming=True) - + rdr = lmd.tarfile_reader(open('test/resources/testtarfile.tar', 'rb'), streaming=True) + hashes = map(lambda doc: sha256str(doc.read()), rdr) expected = [