Skip to content

Commit

Permalink
refact(archive/ar): extract with pure-Python arpy instead of unar
Browse files Browse the repository at this point in the history
Although arpy looks unmaintained, it is a readable implementation.
It will be easier to fix problems there/in a fork, than in unar/7z.
  • Loading branch information
e3krisztian committed Feb 21, 2025
1 parent ce1d748 commit ed9e21f
Showing 1 changed file with 56 additions and 4 deletions.
60 changes: 56 additions & 4 deletions python/unblob/handlers/archive/ar.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import io
import os
from pathlib import Path
from typing import Optional

import arpy
from structlog import get_logger

from ...extractors import Command
from ...file_utils import OffsetFile
from ...models import File, Handler, HexString, ValidChunk
from ...file_utils import FileSystem, OffsetFile, iterate_file
from ...models import Extractor, ExtractResult, File, Handler, HexString, ValidChunk
from ...report import ExtractionProblem

logger = get_logger()

Expand All @@ -15,6 +17,56 @@
SIGNATURE_LENGTH = 0x8


class RandomReader:
"""Adapter for file_utils.RandomReader.
Changes the parameter names, as they are different for arpy and unblob.File.
"""

def __init__(self, arpy_file: arpy.ArchiveFileData):
self._arpy_file = arpy_file

def read(self, n: Optional[int] = None) -> bytes:
return self._arpy_file.read(n)

def seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
return self._arpy_file.seek(pos, whence)


class ArExtractor(Extractor):
def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
fs = FileSystem(outdir)

with arpy.Archive(inpath.as_posix()) as archive:
archive.read_all_headers()

for name in sorted(archive.archived_files):
archived_file = archive.archived_files[name]

try:
path = Path(name.decode())
except UnicodeDecodeError:
path = Path(name.decode(errors="replace"))
fs.record_problem(
ExtractionProblem(
path=repr(name),
problem="Path is not a valid UTF/8 string",
resolution=f"Converted to {path}",
)
)

fs.write_chunks(
path,
chunks=iterate_file(
RandomReader(archived_file),
0,
archived_file.header.size,
),
)

return ExtractResult(reports=fs.problems)


class ARHandler(Handler):
NAME = "ar"

Expand All @@ -27,7 +79,7 @@ class ARHandler(Handler):
)
]

EXTRACTOR = Command("unar", "-no-directory", "-o", "{outdir}", "{inpath}")
EXTRACTOR = ArExtractor()

def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
offset_file = OffsetFile(file, start_offset)
Expand Down

0 comments on commit ed9e21f

Please sign in to comment.