Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

请教PYODPS多进程读写数据问题 #152

Open
templaryang opened this issue May 19, 2021 · 0 comments
Open

请教PYODPS多进程读写数据问题 #152

templaryang opened this issue May 19, 2021 · 0 comments

Comments

@templaryang
Copy link

问题背景:需要读取大约500万行,2个字段的表,然后计算出一个同等记录行数结果集,都用dataframe下载、计算、并上传到Dataphin上,现在性能瓶颈在下载数据(需要3分钟)和上传数据(需要10分钟),计算只需要2分钟完成。

目的:为了改善读写性能,查看PYODPS文档,支持多进程读写数据

  1. 支持多进程读取,编写和示例代码几乎一样的代码,但报错
t = odps.get_table('test_table')
n_process = multiprocessing.cpu_count()
with t.open_reader() as reader:
    pd_df = reader.to_pandas(n_process=n_process)

问题:为什么报错?:TypeError: to_pandas() got an unexpected keyword argument 'n_process'

  1. 写入表也是支持多进程程写入,但我对比过,比我单进程写入更慢,为什么?
    1. 单进程写入
records = df.values().values.tolist()
o.write_table('test_table', records)
2. 多进程写入
def write_records(session_id, block_id):
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    with local_session.open_record_writer(block_id) as writer:
        maxlen = len(records)
        strlen = int(maxlen/N_WORKERS) * block_id
        endlen = int(maxlen/N_WORKERS) * (block_id + 1)
        if block_id == N_WORKERS - 1:
            endlen = maxlen
        for row in range(strlen, endlen):
            record = table.new_record(records[row])
            writer.write(record)
if __name__ == '__main__':
    records = df.values().values.tolist()
    N_WORKERS = 4
    table = odps.get_table("test_table")
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name)
    session_id = upload_session.id
    pool = Pool(processes=N_WORKERS)
    block_ids = []
    for i in range(N_WORKERS):
        pool.apply_async(write_records, (session_id, i))
        block_ids.append(i)
    pool.close()
    pool.join()
    upload_session.commit(block_ids)

** 问题:多进程比单进程更慢,单进程在大约10分钟完成500万记录的插入,多进程超过20分钟仍未结束(已经设置需要的cpu为4) **

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant