-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathtest_api.py
111 lines (87 loc) · 3.54 KB
/
test_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import clamd
from io import BytesIO
from contextlib import contextmanager
import tempfile
import shutil
import os
import stat
import sys
try:
import _multiprocessing
have_multiprocessing_sendfd = hasattr(_multiprocessing, 'sendfd') and callable(_multiprocessing.sendfd)
except ImportError:
have_multiprocessing_sendfd = False
import pytest
mine = (stat.S_IREAD | stat.S_IWRITE)
other = stat.S_IROTH
execute = (stat.S_IEXEC | stat.S_IXOTH)
@contextmanager
def mkdtemp(*args, **kwargs):
temp_dir = tempfile.mkdtemp(*args, **kwargs)
try:
yield temp_dir
finally:
shutil.rmtree(temp_dir)
class TestUnixSocket(object):
kwargs = {}
def setup(self):
self.cd = clamd.ClamdUnixSocket(**self.kwargs)
def test_ping(self):
assert self.cd.ping()
def test_version(self):
assert self.cd.version().startswith("ClamAV")
def test_reload(self):
assert self.cd.reload() == 'RELOADING'
def test_scan(self):
with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f:
f.write(clamd.EICAR)
f.flush()
os.fchmod(f.fileno(), (mine | other))
expected = {f.name: ('FOUND', 'Eicar-Test-Signature')}
assert self.cd.scan(f.name) == expected
def test_unicode_scan(self):
with tempfile.NamedTemporaryFile('wb', prefix=u"python-clamdλ") as f:
f.write(clamd.EICAR)
f.flush()
os.fchmod(f.fileno(), (mine | other))
expected = {f.name: ('FOUND', 'Eicar-Test-Signature')}
assert self.cd.scan(f.name) == expected
def test_multiscan(self):
expected = {}
with mkdtemp(prefix="python-clamd") as d:
for i in range(10):
with open(os.path.join(d, "file" + str(i)), 'wb') as f:
f.write(clamd.EICAR)
os.fchmod(f.fileno(), (mine | other))
expected[f.name] = ('FOUND', 'Eicar-Test-Signature')
os.chmod(d, (mine | other | execute))
assert self.cd.multiscan(d) == expected
def test_instream(self):
expected = {'stream': ('FOUND', 'Eicar-Test-Signature')}
assert self.cd.instream(BytesIO(clamd.EICAR)) == expected
def test_insteam_success(self):
assert self.cd.instream(BytesIO(b"foo")) == {'stream': ('OK', None)}
@pytest.mark.skipif(sys.version_info[0] < 3 and not have_multiprocessing_sendfd,
reason="For Python 2.x, _multiprocessing.sendfd() is required for this test")
def test_fdscan(self):
with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f:
f.write(clamd.EICAR)
f.flush()
expected = {f.name: ('FOUND', 'Eicar-Test-Signature')}
assert self.cd.fdscan(f.name, f.fileno()) == expected
@pytest.mark.skipif(sys.version_info[0] < 3 and not have_multiprocessing_sendfd,
reason="For Python 2.x, _multiprocessing.sendfd() is required for this test")
def test_fdscan_success(self):
with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f:
f.write(b"foo")
f.flush()
expected = {f.name: ('OK', None)}
assert self.cd.fdscan(f.name, f.fileno()) == expected
class TestUnixSocketTimeout(TestUnixSocket):
kwargs = {"timeout": 20}
def test_cannot_connect():
with pytest.raises(clamd.ConnectionError):
clamd.ClamdUnixSocket(path="/tmp/404").ping()