Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Build unstable

on: [push]

concurrency:
group: unstable
# cancel-in-progress: true


jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: "3.9"
# cache: 'pip'
- name: Cleanup more disk space
run: sudo rm -rf /usr/share/dotnet && sudo rm -rf /opt/ghc && sudo rm -rf "/usr/local/share/boost" && sudo rm -rf "$AGENT_TOOLSDIRECTORY"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install --upgrade flake8 pytest pycodestyle
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
python -m pytest --rootdir .
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
env
*.pyc
*.egg-info
.idea
12 changes: 0 additions & 12 deletions .travis.yml

This file was deleted.

6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# LM_Dataformat [![Build Status](https://travis-ci.org/leogao2/lm_dataformat.svg?branch=master)](https://travis-ci.org/leogao2/lm_dataformat) [![Coverage Status](https://coveralls.io/repos/github/leogao2/lm_dataformat/badge.svg?branch=master)](https://coveralls.io/github/leogao2/lm_dataformat?branch=master)
# LM_Dataformat

[![Build unstable](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml/badge.svg)](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml)

Utilities for storing data for LM training.

**NOTE**: The original project seems abandoned, I've continued the development to support ore data formats and integration with stackexchange_dataset


## Basic Usage

Expand Down
152 changes: 94 additions & 58 deletions lm_dataformat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
import os
import zstandard
import ujson as json
import time
import tarfile
import codecs
from functools import reduce
import jsonlines
import io
from zipfile import ZipFile
import gzip
from math import ceil
import io
import mmap
import multiprocessing as mp
import os
import re
import tarfile
import time
from functools import reduce
from math import ceil
from pathlib import Path
import zipfile

import jsonlines
import ujson as json
import zstandard

VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst',
'.txt', '.zip', '.tar.gz', '.json.gz', '.gz']

# LM_DATAFORMAT_FORMAT = "lm_dataformat"
TEXT_FORMAT = "txt"
JSONL_FORMAT = "jsonl"

SUPPORTED_FORMATS = [
TEXT_FORMAT,
# LM_DATAFORMAT_FORMAT,
JSONL_FORMAT
]

VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', '.txt', '.zip', '.tar.gz', '.json.gz', '.gz']

def has_valid_extension(file):
return any([file.endswith(ext) for ext in VALID_EXTENSIONS])


def _listdir_or_file(x):
if isinstance(x, list):
return reduce(lambda x, y: x + y, map(listdir_or_file, sorted(x)))
Expand All @@ -29,9 +44,11 @@ def _listdir_or_file(x):
else:
raise FileNotFoundError(f"{x} not found")


def listdir_or_file(x):
return list(filter(has_valid_extension, _listdir_or_file(x)))


def tarfile_reader(file, streaming=False):
# we need our own tarfile parser because `tarfile` doesn't work well for
# big tarfiles; it seems to be reading the entire file to get a list of
Expand All @@ -49,10 +66,10 @@ def tarfile_reader(file, streaming=False):

# https://www.gnu.org/software/tar/manual/html_node/Standard.html
# end at 135 not 136 because of \0 terminator
if hdr[124:135] == b'\0'*11:
if hdr[124:135] == b'\0' * 11:
# end of record
break

fname = hdr[:100].split(b'\0')[0]

# if the file is too big to fit in the size field, tarfiles will actually
Expand All @@ -71,11 +88,13 @@ def tarfile_reader(file, streaming=False):

if type == 'x':
meta = file.read(padded_size)[:size]

def kv(x):
return x.decode('utf-8').split(' ')[1].split('=')

paxfileattrs = {
kv(x)[0]: kv(x)[1]
for x in meta.split(b'\n') if x
kv(x)[0]: kv(x)[1]
for x in meta.split(b'\n') if x
}
paxfilesize = int(paxfileattrs['size'])

Expand All @@ -101,6 +120,7 @@ def kv(x):
yield file.read(padded_size)[:size]
offset += padded_size


def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key='text'):
for ob in jsonl_reader:
# naive jsonl where each object is just the string itself, with no meta. For legacy compatibility.
Expand All @@ -123,7 +143,7 @@ def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key='
class Reader:
def __init__(self, in_path):
self.in_path = in_path

def stream_data(self, get_meta=False, threaded=False):
if not threaded:
yield from self._stream_data(get_meta)
Expand All @@ -136,7 +156,7 @@ def stream_data(self, get_meta=False, threaded=False):
res = q.get()
if res is None: break
yield res

def _stream_data_threaded(self, q, get_meta=False):
for data in self._stream_data(get_meta):
q.put(data)
Expand Down Expand Up @@ -166,14 +186,14 @@ def _stream_data(self, get_meta=False, jsonl_key="text"):
elif f.endswith('.jsonl.zst'):
yield from self.read_jsonl_zst(f, get_meta, key=jsonl_key)
elif f.endswith('.jsonl.zst.tar'):
yield from self.read_jsonl_tar(f, get_meta, jsonl_key=key)
yield from self.read_jsonl_tar(f, get_meta, key=jsonl_key)
elif f.endswith('.json.zst'):
assert not get_meta

yield from self.read_json(f)
elif f.endswith('.txt'):
assert not get_meta

yield from self.read_txt(f)
elif f.endswith('.zip'):
assert not get_meta
Expand All @@ -185,11 +205,11 @@ def _stream_data(self, get_meta=False, jsonl_key="text"):
yield from self.read_tgz(f)
elif f.endswith('.json.gz'):
assert not get_meta

yield from self.read_jsongz(f)
elif f.endswith('.gz'):
assert not get_meta

yield from self.read_gz(f)
else:
# shouldn't be reached
Expand All @@ -200,23 +220,23 @@ def read_txt(self, file):
yield fh.read()

def read_zip(self, file):
archive = ZipFile(file, 'r')
archive = zipfile.ZipFile(file, 'r')
for f in archive.namelist():
yield archive.read(f).decode('UTF-8')

def read_tgz(self, file):
gz = gzip.open(file)
yield from (x.decode('utf-8') for x in tarfile_reader(gz, streaming=False))
def read_gz(self, file):

def read_gz(self, file):
with gzip.open(file, 'rb') as f:
for line in f:
yield line.decode('utf-8')
def read_jsongz(self, file):

def read_jsongz(self, file):
for line in self.read_gz(file):
yield json.loads(line)

def read_json(self, file):
with open(file, 'rb') as fh:
cctx = zstandard.ZstdDecompressor()
Expand All @@ -240,7 +260,7 @@ def read_dat(self, file):
def read_jsonl(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'):
with jsonlines.open(file) as rdr:
yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key)

def read_jsonl_zst(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'):
with open(file, 'rb') as fh:
cctx = zstandard.ZstdDecompressor()
Expand All @@ -256,7 +276,7 @@ def read_jsonl_tar(self, file, get_meta=False, autojoin_paragraphs=True, para_jo
rdr = jsonlines.Reader(reader)
yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key)
f.close()

def read_owt(self, file):
tar = tarfile.open(file, encoding='utf-8')
utf8reader = codecs.getreader('utf-8')
Expand All @@ -283,19 +303,21 @@ def __init__(self, out_dir, compression_level=3, threads=8):
self.out_dir = out_dir
os.makedirs(out_dir, exist_ok=True)
self.i = 0

self.time = int(time.time())

self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb')
self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads)
self.compressor = self.cctx.stream_writer(self.fh)



def add_data(self, data, meta={}):
self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n')

def commit(self, archive_name='default'):
fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst'

def commit(self, archive_name='default', extension="jsonl"):
fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(self.time), archive_name,
extension)
# fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst'
self.compressor.flush(zstandard.FLUSH_FRAME)

self.fh.flush()
self.fh.close()
os.rename(self.out_dir + '/current_chunk_incomplete', fname)
Expand All @@ -313,18 +335,19 @@ def __init__(self, out_dir):
self.i = 0
if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0:
self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1

def add_data(self, data):
self.data.append(data)

def commit(self, archive_name=None):
# TODO: streaming
cctx = zstandard.ZstdCompressor(level=3)

if archive_name is None:
archive_name = str(int(time.time()))

res = b''.join(map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data)))
res = b''.join(
map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data)))
cdata = cctx.compress(res)

with open(self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.dat.zst', 'wb') as fh:
Expand All @@ -333,24 +356,37 @@ def commit(self, archive_name=None):
self.i += 1
self.data = []

class JSONArchive:

class TextArchive(Archive):
def __init__(self, out_dir):
self.out_dir = out_dir
os.makedirs(out_dir, exist_ok=True)
self.data = []
self.i = 0
if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0:
self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1

def add_data(self, data):
self.data.append(data)

def commit(self):
cctx = zstandard.ZstdCompressor(level=3)

cdata = cctx.compress(json.dumps(self.data).encode('UTF-8'))
with open(self.out_dir + '/data_' + str(self.i) + '_' + str(int(time.time())) + '.json.zst', 'wb') as fh:
fh.write(cdata)
super().__init__(out_dir)

self.i += 1
self.data = []
def add_data(self, data, **kwargs):
self.compressor.write(TextArchive.to_text(data).encode('UTF-8') + b'\n')

def commit(self, archive_name='default', extension="txt"):
super().commit(archive_name, extension)

@staticmethod
def filter_newlines(text):
return re.sub("\n{3,}", "\n\n", text)

@staticmethod
def handle_unicode_errors(txt):
return txt.encode('utf-8', 'replace').decode()

@staticmethod
def to_text(data):
out_str = ""
out_str += 'Q:\n\n'
out_str += '{}\n\n'.format(data['question']['title'])
out_str += '{}\n\n'.format(data['question']['body'])
for answer in data['answers']:
out_str += 'A:\n\n{}\n\n'.format(answer['body'])

try:
out_str = TextArchive.filter_newlines(out_str)
except:
out_str = TextArchive.filter_newlines(TextArchive.handle_unicode_errors(out_str))

return out_str
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
testpaths = test
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
jsonlines
ujson
zstandard
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading