-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
最新版,对于含注释的文档识别不出注释部分的内容 #956
Labels
bug
Something isn't working
Comments
图不知道为啥一直不显示,在文本“或者有参数可以调节是否忽略注释部分内容吗?”下方,鼠标变成小手点击即可 |
输出不含footnote是预期之中的,如果需要解析这部分内容,请自行通过解析xxx_middle.json中的discarded_blocks字段获取 |
我简单让ai搞了一个自动加脚注 import json
import re
import argparse
from pathlib import Path
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from rapidfuzz import fuzz, process
from difflib import SequenceMatcher
import logging
from datetime import datetime
def setup_logging(output_dir):
"""设置日志"""
log_file = Path(output_dir) / f"footnotes_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger()
def extract_text_from_spans(spans):
"""从spans中提取文本内容"""
text = ''
for span in spans:
if span['type'] == 'text':
text += span['content']
elif span['type'] == 'inline_equation':
content = span['content']
# 处理特殊符号
# if content == '=':
# text += ' = '
# elif content == '^+':
# text += ' + '
# else:
# 行内公式直接用$包裹
text += f'${content}$'
return text
def format_footnote_text(text):
"""格式化脚注文本"""
return text
def find_insertion_point(official_md, target_text, window=100, similarity_threshold=0.6):
"""在官方MD中找到目标文本的位置"""
# 预处理目标文本,去除空白字符
target_text = target_text.strip()
if not target_text:
return -1
# 1. 首先尝试完全匹配
index = official_md.find(target_text)
if index != -1:
return index
# 2. 尝试分段匹配
segments = target_text.split('。')
if len(segments) > 1:
# 选择最长的非空段落
longest_segment = max((s.strip() for s in segments if len(s.strip()) > 10),
key=len, default='')
if longest_segment:
index = official_md.find(longest_segment)
if index != -1:
return index
# 3. 使用关键词匹配
# 提取目标文本中的关键词(去除停用词和短词)
words = [w for w in target_text.split() if len(w) > 2]
if not words:
return len(official_md)
# 选择最长的几个词作为关键词
keywords = sorted(words, key=len, reverse=True)[:3]
# 找到所有关键词的位置
positions = []
for keyword in keywords:
pos = official_md.find(keyword)
if pos != -1:
positions.append(pos)
if positions:
# 返回找到的第一个关键词位置
return min(positions)
return len(official_md) # 如果都找不到,返回文档末尾
def merge_continuous_blocks(markdown_text):
"""合并连续的脚注代码块"""
# 分割成行
lines = markdown_text.split('\n')
result_lines = []
current_block = []
in_block = False
block_type = None # 用于区分footnote和page块
i = 0
while i < len(lines):
line = lines[i]
# 检测代码块开始
if line.strip() in ['```footnote', '```page']:
block_type = 'footnote' if line.strip() == '```footnote' else 'page'
if not in_block:
# 新代码块开始
in_block = True
current_block = [line]
else:
# 连续的代码块,跳过开始标记
pass
# 检测代码块结束
elif line.strip() == '```':
if in_block:
if block_type == 'footnote':
# 检查后面是否有连续的脚注块(允许中间有4行以内的空行)
next_block_start = -1
empty_lines = 0
j = i + 1
while j < min(i + 6, len(lines)): # 最多往后看5行(4行空行+1行代码块开始)
if lines[j].strip() == '```footnote':
next_block_start = j
break
elif not lines[j].strip():
empty_lines += 1
elif lines[j].strip() == '```page': # 如果遇到page块,不合并
break
else:
break
j += 1
if next_block_start != -1 and empty_lines <= 4:
# 跳过结束标记和空行,继续收集下一个块的内容
i = next_block_start
else:
# 添加结束标记并结束当前块
current_block.append(line)
result_lines.extend(current_block)
current_block = []
in_block = False
else:
# page块直接结束
current_block.append(line)
result_lines.extend(current_block)
current_block = []
in_block = False
else:
result_lines.append(line)
else:
if in_block:
if line.strip(): # 只添加非空行
current_block.append(line)
else:
result_lines.append(line)
i += 1
# 如果还有未处理的块
if current_block:
result_lines.extend(current_block)
return '\n'.join(result_lines)
def convert_to_quote_format(markdown_text):
"""将脚注代码块转换为引述格式,保持page块不变"""
lines = markdown_text.split('\n')
result_lines = []
in_block = False
block_type = None
for i, line in enumerate(lines):
if line.strip() in ['```footnote', '```page']:
block_type = 'footnote' if line.strip() == '```footnote' else 'page'
in_block = True
if block_type == 'footnote':
result_lines.append('> ---')
result_lines.append('>')
else:
result_lines.append(line)
elif line.strip() == '```' and in_block:
if block_type == 'page':
result_lines.append(line)
in_block = False
block_type = None
elif in_block:
if line.strip():
if block_type == 'footnote':
result_lines.append('> * ' + line.strip())
else:
result_lines.append(line)
else:
result_lines.append(line)
return '\n'.join(result_lines)
def process_document_to_markdown(json_data, official_md_path, similarity_threshold=0.6, add_page_marks=False):
"""处理文档,添加页码标记参数"""
logger = setup_logging(Path(official_md_path).parent)
logger.info(f"开始处理文档: {official_md_path}")
logger.info(f"相似度阈值: {similarity_threshold}")
# 读取官方MD文件
with open(official_md_path, 'r', encoding='utf-8') as f:
official_md = f.read()
# 预分配列表大小
footnotes = []
context_map = {}
# 预处理页面信息
logger.info("预处理页面信息...")
total_pages = len(json_data['pdf_info'])
page_info = []
page_contexts = {} # 存储每页的最后一个文本块位置
with tqdm(total=total_pages, desc="页面预处理") as pbar:
for page_idx, page in enumerate(json_data['pdf_info']):
if 'page_size' in page:
page_info.append({
'height': page['page_size'][1],
'para_blocks': page.get('para_blocks', []),
'discarded_blocks': page.get('discarded_blocks', [])
})
# 找到每页最后一个文本块的位置
last_block = None
for block in page.get('para_blocks', []):
if block['type'] == 'text':
if not last_block or block['bbox'][3] > last_block['bbox'][3]:
last_block = block
if last_block:
page_contexts[page_idx] = {
'text': '\n'.join(extract_text_from_spans(line['spans']) for line in last_block['lines']),
'bbox': last_block['bbox']
}
pbar.update(1)
# 收集脚注
logger.info("\n收集脚注...")
footnote_tasks = []
for page_idx, page in enumerate(page_info):
page_height = page['height']
bottom_threshold = page_height * 0.8
for block in page['discarded_blocks']:
if block['bbox'][1] > bottom_threshold:
footnote_tasks.append((page_idx, block['bbox'][1], block['lines']))
def process_footnote(args):
page_idx, y_pos, lines = args
results = []
for line in lines:
text = extract_text_from_spans(line['spans'])
if text.strip() and not text.strip().isdigit():
results.append((text, y_pos))
return page_idx, results
with ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
with tqdm(total=len(footnote_tasks), desc="脚注处理") as pbar:
for page_idx, footnote_results in executor.map(process_footnote, footnote_tasks):
# 预先获取当前页面的上下文
current_page_contexts = {
pos: ctx for pos, ctx in context_map.items()
if ctx['page'] == page_idx
}
for text, y_pos in footnote_results:
# 找到最近的上下文
closest_context = None
min_distance = float('inf')
# 遍历当前页的所有文本块
for block in page_info[page_idx]['para_blocks']:
if block['type'] == 'text':
block_y = block['bbox'][3]
# 只考虑在脚注上方的文本块
if block_y < y_pos:
distance = y_pos - block_y
if distance < min_distance:
min_distance = distance
closest_context = '\n'.join(
extract_text_from_spans(line['spans'])
for line in block['lines']
)
footnotes.append({
'text': text,
'position': f"{page_idx}_{y_pos}",
'context': closest_context if closest_context else '',
'page': page_idx
})
logger.info(f"找到脚注 [第{page_idx + 1}页]: {text}")
if closest_context:
logger.info(f"上下文: {closest_context[:100]}...")
else:
logger.warning(f"未找到上下文 [第{page_idx + 1}页]")
pbar.update(1)
# 按页码和位置排序脚注
logger.info("\n整理脚注...")
footnotes.sort(key=lambda x: (x['page'], float(x['position'].split('_')[1])))
# 构建结果
logger.info("\n生成最终文档...")
result_parts = []
last_position = 0
total_footnotes = len(footnotes)
processed_footnotes = set() # 初始化已处理脚注的集合
# 第一步:插入脚注代码块和页码标记
with tqdm(total=total_footnotes, desc="插入脚注") as pbar:
for footnote in footnotes:
if not footnote['context']:
pbar.update(1)
continue
# 生成脚注的唯一标识(使用文本内容和页码)
footnote_id = f"{footnote['text']}_{footnote['page']}"
# 检查该脚注是否已经处理过
if footnote_id in processed_footnotes:
logger.info(f"跳过重复脚注 [第{footnote['page'] + 1}页]: {footnote['text']}")
pbar.update(1)
continue
insert_point = find_insertion_point(
official_md,
footnote['context'],
similarity_threshold=similarity_threshold
)
if insert_point != -1 and insert_point >= last_position:
# 记录插入位置的上下文
context_before = official_md[max(0, insert_point-50):insert_point]
context_after = official_md[insert_point:min(len(official_md), insert_point+50)]
logger.info(f"\n插入脚注 [第{footnote['page'] + 1}页]:")
logger.info(f"脚注内容: {footnote['text']}")
logger.info(f"插入位置: {insert_point}")
logger.info(f"插入位置上文: {context_before}")
logger.info(f"插入位置下文: {context_after}")
# 添加从上一个位置到当前位置的文本
result_parts.append(official_md[last_position:insert_point])
result_parts.append(f'\n```footnote\n{footnote["text"]}\n```\n')
last_position = insert_point
processed_footnotes.add(footnote_id)
else:
logger.warning(f"未找到匹配位置或位置在前文 [第{footnote['page'] + 1}页]: {footnote['text']}")
pbar.update(1)
# 添加剩余的文本
if last_position < len(official_md):
result_parts.append(official_md[last_position:])
# 只在需要时添加页码标记
if add_page_marks:
logger.info("\n添加页码标记...")
final_parts = []
current_text = ''.join(result_parts)
last_pos = 0
for page_idx in range(total_pages):
if page_idx in page_contexts:
context = page_contexts[page_idx]
insert_point = find_insertion_point(current_text[last_pos:], context['text'])
if insert_point != -1:
insert_point += last_pos
final_parts.append(current_text[last_pos:insert_point])
final_parts.append(f'\n```page\n第{page_idx + 1}页\n```\n')
last_pos = insert_point
logger.info(f"添加页码标记: 第{page_idx + 1}页")
# 添加剩余内容
if last_pos < len(current_text):
final_parts.append(current_text[last_pos:])
merged_text = merge_continuous_blocks(''.join(final_parts))
else:
merged_text = merge_continuous_blocks(''.join(result_parts))
# 转换为引述格式
logger.info("\n转换为引述格式...")
final_text = convert_to_quote_format(merged_text)
logger.info("\n处理完成")
return final_text
def find_matching_files(directory='.'):
"""查找目录下的middle.json和对应的md文件"""
directory = Path(directory)
pairs = []
# 查找所有middle.json文件
for json_file in directory.glob('*middle.json'):
# 找到对应的md文件(去掉_middle.json后缀)
md_file = json_file.parent / f"{json_file.name.replace('_middle.json', '.md')}"
if md_file.exists():
pairs.append((json_file, md_file))
return pairs
def main():
print("=== 脚注处理工具 ===")
# 获取工作目录
# work_dir = input("请输入工作目录(直接回车使用当前目录): ").strip().strip('"')
work_dir = r'D:\1STUDY\3-Resource\NBU\教材\2.1\环境资源法\环境法mineru'
if not work_dir:
work_dir = '.'
# 查找匹配的文件
file_pairs = find_matching_files(work_dir)
if not file_pairs:
print(f"在目录 {work_dir} 中没有找到匹配的文件对")
input("按回车键退出...")
return
print("\n找到以下文件对:")
for i, (json_file, md_file) in enumerate(file_pairs, 1):
print(f"{i}. {json_file.name} -> {md_file.name}")
# 获取相似度阈值
while True:
threshold = input("\n请输入文本匹配相似度阈值(0-1,直接回车默认0.6): ").strip()
if not threshold:
threshold = 0.8
break
try:
threshold = float(threshold)
if 0 <= threshold <= 1:
break
print("错误:相似度阈值必须在0到1之间")
except ValueError:
print("错误:请输入有效的数字")
# 获取是否添加页码标记
add_page = input("\n是否添加页码标记(y/n,直接回车默认n): ").strip().lower()
add_page_marks = add_page == 'y'
# 处理文件
json_file, md_file = file_pairs[0] # 只处理第一对文件
try:
print(f"\n处理文件:{json_file.name} -> {md_file.name}")
# 获取原始文件大小
original_size = md_file.stat().st_size
# 读取JSON文件
print("读取JSON文件...")
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 处理文档
markdown_text = process_document_to_markdown(
json_data,
str(md_file),
similarity_threshold=threshold,
add_page_marks=add_page_marks
)
# 生成输出文件路径
output_path = md_file.parent / f"{md_file.stem}_footnotes{md_file.suffix}"
# 写入输出文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_text)
# 检查文件大小变化
new_size = output_path.stat().st_size
size_increase = (new_size - original_size) / original_size * 100
print(f"\n✓ 处理完成!输出文件:{output_path}")
print(f"原始文件大小: {original_size / 1024:.2f} KB")
print(f"新文件大小: {new_size / 1024:.2f} KB")
if size_increase > 10:
print(f"\n⚠️ 警告:文件大小增加了 {size_increase:.1f}%,超过了10%的阈值")
else:
print(f"文件大小增加: {size_increase:.1f}%")
except Exception as e:
print(f"\n✗ 处理失败:{e}")
input("\n按回车键退出...")
if __name__ == '__main__':
main() |
请问大佬你的代码如何使用呢 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Description of the bug | 错误描述
现在用的:
magic-pdf, version 0.9.2
例图如下,用官方默认设置,输出的markdown不含注释部分内容
但是之前用magic-pdf, version 0.7.0b1,都能解析出注释部分并识别
不知道代码中动了什么
![ClipBoard](https://private-user-images.githubusercontent.com/53851058/386076647-80da3a82-5124-43be-b7b0-bb7c4e6cb9b0.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkyNDE4MDksIm5iZiI6MTczOTI0MTUwOSwicGF0aCI6Ii81Mzg1MTA1OC8zODYwNzY2NDctODBkYTNhODItNTEyNC00M2JlLWI3YjAtYmI3YzRlNmNiOWIwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjExVDAyMzgyOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTQ2MjI4MThjOTFhZjBiYjlkNjdiNmUyMjc2ODRhYjQ4OTM4YzQ3YjZmMGE5NDQ1NjAyNTIzMjVjMTZiMTFiOWYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.9tbUvgGxIqjG6F4DhfJOzTM7J2ROQJ8SnhLQnp4S5Ew)
或者有参数可以调节是否忽略注释部分内容吗?
How to reproduce the bug | 如何复现
magic-pdf -p . -o .
Operating system | 操作系统
Linux
Python version | Python 版本
3.10
Software version | 软件版本 (magic-pdf --version)
0.9.x
Device mode | 设备模式
cuda
The text was updated successfully, but these errors were encountered: