Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fdscan() method to ClamdUnixSocket #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Changes
1.0.3 (unreleased)
------------------

- Nothing changed yet.
- Added fdscan() method to ClamdUnixSocket for scanning file descriptors


1.0.2 (2014-08-21)
Expand Down
42 changes: 41 additions & 1 deletion src/clamd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
import contextlib
import re
import base64
# _multiprocessing.sendfd() only exists in python2
if sys.version_info[0] < 3:
import _multiprocessing
else:
import array

scan_response = re.compile(r"^(?P<path>.*): ((?P<virus>.+) )?(?P<status>(FOUND|OK|ERROR))$")
EICAR = base64.b64decode(
Expand Down Expand Up @@ -264,7 +269,7 @@ def _close_socket(self):

def _parse_response(self, msg):
"""
parses responses for SCAN, CONTSCAN, MULTISCAN and STREAM commands.
parses responses for SCAN, CONTSCAN, MULTISCAN, FILDES and STREAM commands.
"""
try:
return scan_response.match(msg).group("path", "virus", "status")
Expand All @@ -287,6 +292,25 @@ class initialisation
self.unix_socket = path
self.timeout = timeout

# only works for python >= 3.3
def _send_fd_via_socket_sendmsg(self, fd):
"""
internal use only
"""
return self.clamd_socket.sendmsg([b'\0'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [fd]))])

# _multiprocessing.sendfd() only exists in python2
def _send_fd_via_mp_sendfd(self, fd):
"""
internal use only
"""
return _multiprocessing.sendfd(self.clamd_socket.fileno(), fd)

# make _send_fd refer to one of the two _send_fd_* methods above. Either
# _send_fd_via_socket_sendmsg or _send_fd_via_mp_sendfd, depending upon the version of python
"""internal use only"""
_send_fd = _send_fd_via_socket_sendmsg if sys.version_info[0] >= 3 else _send_fd_via_mp_sendfd

def _init_socket(self):
"""
internal use only
Expand All @@ -313,3 +337,19 @@ def _error_message(self, exception):
path=self.unix_socket,
msg=exception.args[1]
)

def fdscan(self, path, fd):
"""
Scan a file referenced by a file descriptor.
path (string) : path of file to use for result dictionary (otherwise unused)
fd (int) : file descriptor number (fileno) of file to scan
"""
try:
self._init_socket()
cmd = 'nFILDES\n'.encode('utf-8')
self.clamd_socket.send(cmd)
self._send_fd(fd)
_, reason, status = self._parse_response(self._recv_response_multiline())
return {path: (status, reason)}
finally:
self._close_socket()
24 changes: 24 additions & 0 deletions src/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
import shutil
import os
import stat
import sys
try:
import _multiprocessing
have_multiprocessing_sendfd = hasattr(_multiprocessing, 'sendfd') and callable(_multiprocessing.sendfd)
except ImportError:
have_multiprocessing_sendfd = False

import pytest

Expand Down Expand Up @@ -77,6 +83,24 @@ def test_instream(self):
def test_insteam_success(self):
assert self.cd.instream(BytesIO(b"foo")) == {'stream': ('OK', None)}

@pytest.mark.skipif(sys.version_info[0] < 3 and not have_multiprocessing_sendfd,
reason="For Python 2.x, _multiprocessing.sendfd() is required for this test")
def test_fdscan(self):
with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f:
f.write(clamd.EICAR)
f.flush()
expected = {f.name: ('FOUND', 'Eicar-Test-Signature')}
assert self.cd.fdscan(f.name, f.fileno()) == expected

@pytest.mark.skipif(sys.version_info[0] < 3 and not have_multiprocessing_sendfd,
reason="For Python 2.x, _multiprocessing.sendfd() is required for this test")
def test_fdscan_success(self):
with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f:
f.write(b"foo")
f.flush()
expected = {f.name: ('OK', None)}
assert self.cd.fdscan(f.name, f.fileno()) == expected


class TestUnixSocketTimeout(TestUnixSocket):
kwargs = {"timeout": 20}
Expand Down