File tree 1 file changed +24
-0
lines changed
1 file changed +24
-0
lines changed Original file line number Diff line number Diff line change 8
8
import shutil
9
9
import os
10
10
import stat
11
+ import sys
12
+ try :
13
+ import _multiprocessing
14
+ have_multiprocessing_sendfd = hasattr (_multiprocessing , 'sendfd' ) and callable (_multiprocessing .sendfd )
15
+ except ImportError :
16
+ have_multiprocessing_sendfd = False
11
17
12
18
import pytest
13
19
@@ -77,6 +83,24 @@ def test_instream(self):
77
83
def test_insteam_success (self ):
78
84
assert self .cd .instream (BytesIO (b"foo" )) == {'stream' : ('OK' , None )}
79
85
86
+ @pytest .mark .skipif (sys .version_info [0 ] < 3 and not have_multiprocessing_sendfd ,
87
+ reason = "For Python 2.x, _multiprocessing.sendfd() is required for this test" )
88
+ def test_fdscan (self ):
89
+ with tempfile .NamedTemporaryFile ('wb' , prefix = "python-clamd" ) as f :
90
+ f .write (clamd .EICAR )
91
+ f .flush ()
92
+ expected = {f .name : ('FOUND' , 'Eicar-Test-Signature' )}
93
+ assert self .cd .fdscan (f .name , f .fileno ()) == expected
94
+
95
+ @pytest .mark .skipif (sys .version_info [0 ] < 3 and not have_multiprocessing_sendfd ,
96
+ reason = "For Python 2.x, _multiprocessing.sendfd() is required for this test" )
97
+ def test_fdscan_success (self ):
98
+ with tempfile .NamedTemporaryFile ('wb' , prefix = "python-clamd" ) as f :
99
+ f .write (b"foo" )
100
+ f .flush ()
101
+ expected = {f .name : ('OK' , None )}
102
+ assert self .cd .fdscan (f .name , f .fileno ()) == expected
103
+
80
104
81
105
class TestUnixSocketTimeout (TestUnixSocket ):
82
106
kwargs = {"timeout" : 20 }
You can’t perform that action at this time.
0 commit comments