diff --git a/card/ICC.py b/card/ICC.py index 0541995..bc53db6 100644 --- a/card/ICC.py +++ b/card/ICC.py @@ -42,7 +42,8 @@ from smartcard.util import toHexString from card.utils import * - +from .modem.modem_card_request import ModemCardRequest + ########################################################### # ISO7816 class with attributes and methods as defined # by ISO-7816 part 4 standard for smartcard @@ -141,7 +142,7 @@ class ISO7816(object): 0xAB : 'Security Attribute expanded', } - def __init__(self, CLA=0x00, reader=''): + def __init__(self, CLA=0x00, reader='', modem_device_path='', at_client=None): """ connect smartcard and defines class CLA code for communication uses "pyscard" library services @@ -149,11 +150,12 @@ def __init__(self, CLA=0x00, reader=''): creates self.CLA attribute with CLA code and self.coms attribute with associated "apdu_stack" instance """ + cardtype = AnyCardType() - if reader: - cardrequest = CardRequest(timeout=1, cardType=cardtype, readers=[reader]) + if modem_device_path or at_client: + cardrequest = ModemCardRequest(at_client=at_client, modem_device_path=modem_device_path, timeout=1, cardType=cardtype, readers=[reader]) else: - cardrequest = CardRequest(timeout=1, cardType=cardtype) + cardrequest = CardRequest(timeout=1, cardType=cardtype, readers=[reader]) self.cardservice = cardrequest.waitforcard() self.cardservice.connection.connect() self.reader = self.cardservice.connection.getReader() @@ -1784,3 +1786,8 @@ def select_by_aid(self, aid_num=1): if hasattr(self, 'AID') and aid_num <= len(self.AID)+1: return self.select(self.AID[aid_num-1], 'aid') + def dispose(self): + try: + self.cardservice.dispose() + except: + pass diff --git a/card/SIM.py b/card/SIM.py index 1d08b49..b6b9b31 100644 --- a/card/SIM.py +++ b/card/SIM.py @@ -100,12 +100,12 @@ class SIM(ISO7816): use self.dbg = 1 or more to print live debugging information """ - def __init__(self, reader=''): + def __init__(self, reader='', modem_device_path='', at_client=None): """ initialize like an ISO7816-4 card with CLA=0xA0 can also be used for USIM working in SIM mode, """ - ISO7816.__init__(self, CLA=0xA0, reader=reader) + ISO7816.__init__(self, CLA=0xA0, reader=reader, modem_device_path=modem_device_path, at_client=at_client) # if self.dbg >= 2: log(3, '(SIM.__init__) type definition: %s' % type(self)) diff --git a/card/USIM.py b/card/USIM.py index 9f3fe0c..9b39741 100644 --- a/card/USIM.py +++ b/card/USIM.py @@ -177,7 +177,7 @@ class USIM(UICC): use self.dbg = 1 or more to print live debugging information """ - def __init__(self, reader=''): + def __init__(self, reader='', modem_device_path='', at_client=None): """ initializes like an ISO7816-4 card with CLA=0x00 and checks available AID (Application ID) read from EF_DIR @@ -185,7 +185,7 @@ def __init__(self, reader=''): initializes on the MF """ # initialize like a UICC - ISO7816.__init__(self, CLA=0x00, reader=reader) + ISO7816.__init__(self, CLA=0x00, reader=reader, modem_device_path=modem_device_path, at_client=at_client) self.AID = [] self.AID_GP = {} self.AID_USIM = None diff --git a/card/modem/__init__.py b/card/modem/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/card/modem/at_command_client.py b/card/modem/at_command_client.py new file mode 100644 index 0000000..555c733 --- /dev/null +++ b/card/modem/at_command_client.py @@ -0,0 +1,53 @@ +import time +import serial +from typing import Optional, Union, Callable, Any + +class ATCommandClient: + + def __init__(self, device_path: str, timeout: Optional[float] = 1.0) -> None: + if timeout < 0.5: + timeout = 0.5 + + self._device_path = device_path + self._timeout = timeout + self._serial = None + + def connect(self) -> None: + if self._serial: + return + + self._serial = serial.Serial( + self._device_path, + 115200, + timeout=0.001, + ) + + def transmit(self, at_command: Union[str, bytes], transform: Optional[Callable[[str, str], Any]] = lambda x,y: y) -> Union[str, Any]: + if not self._serial: + raise ValueError("Client shall be connected") + + if isinstance(at_command, bytes): + at_command = at_command.decode() + + if at_command[-2::] != "\r\n": + at_command += "\r\n" + + at_command = at_command.encode() + self._serial.write(at_command) + + resp = b'' + read_until = time.time() + self._timeout + while b'OK' not in resp and b'ERROR' not in resp: + resp += self._serial.read(256) + if time.time() > read_until: + break + + return transform(at_command, resp.decode()) + + def dispose(self) -> None: + if not self._serial: + return + + self._serial.close() + self._serial = None + \ No newline at end of file diff --git a/card/modem/modem_card_request.py b/card/modem/modem_card_request.py new file mode 100644 index 0000000..a127dbc --- /dev/null +++ b/card/modem/modem_card_request.py @@ -0,0 +1,102 @@ +import logging +import time +from typing import Any, Iterable, Optional, List, Tuple +from smartcard.CardType import AnyCardType, CardType +from serial import SerialException +from .at_command_client import ATCommandClient + +logger = logging.getLogger("modem") + +class ModemCardRequest: + def __init__(self, at_client: Optional[ATCommandClient] = None, modem_device_path: Optional[str] = None, timeout: int = 1, cardType: CardType = AnyCardType, readers: Optional[Iterable[str]] = None) -> None: + self._readers = readers or [''] + if not at_client and not modem_device_path: + raise ValueError("Either at_client or modem_device_path shall be configured") + + if modem_device_path: + self._client = ATCommandClient(modem_device_path, timeout=float(timeout)) + + if at_client: + self._client = at_client + + @property + def connection(self) -> Any: + return self + + def waitforcard(self) -> None: + self.connect() + return self + + def connect(self) -> None: + self._client.connect() + + def getReader(self) -> Any: + return self._readers + + def getATR(self) -> Any: + return None + + def transmit(self, apdu: List[int]) -> Any: + """ + Transmits SIM APDU to the modem. + """ + + at_command = self._to_csim_command(apdu) + data, sw1, sw2 = [], 0xff, 0xff + + try: + while sw1 == 0xff and sw2 == 0xff: + data, sw1, sw2 = self._client.transmit(at_command, self._at_response_to_card_response) + except SerialException as e: + logger.debug("Serial communication error << {e} ... retrying") # for faulty, unstable cards + + logger.debug(f""" + APDU << {apdu} + AT Command << {at_command} + Ret << data:{data}, sw1:{sw1}, sw2:{sw2} + """) + return (data, sw1, sw2) + + def _to_csim_command(self, apdu: List[int]) -> str: + """ + Transforms a SIM APDU represented as a list of integers (bytes data) + into its corresponding AT+CSIM command format. + """ + + at_command = ("").join(map(lambda x: "%0.2X" % x, apdu)) + at_command = f'AT+CSIM={len(at_command)},"{at_command}"' + return at_command + + def _at_response_to_card_response(self, at_command: str, at_response: str) -> Tuple[List[int], int, int]: + """ + Transforms AT response to the expected CardService format. + """ + + parts = list(filter(lambda x: x != '', at_response.split("\r\n"))) + if len(parts) == 0: + return [], 0xff, 0xff # communication error + + if not parts[-1] or 'ERROR' in parts[-1]: + return [], 0x6f, 0x0 # checking error: no precise diagnosis + + res = parts[0] + res = res[res.find('"')+1:-1:] + + return ( + self._hexstream_to_bytes(res[:-4:]), + int(res[-4:-2:], 16), + int(res[-2::], 16) + ) + + def _hexstream_to_bytes(self, hexstream: str) -> List[int]: + """ + Returns a list of integers representing byte data from a hexadecimal stream. + """ + + return list( + map( + lambda x: int(x, 16), + [hexstream[i:i+2] for i in range(0, len(hexstream), 2)] + ) + ) + diff --git a/setup.py b/setup.py index b35cb1f..c5837f2 100644 --- a/setup.py +++ b/setup.py @@ -6,17 +6,18 @@ packages=[ "card" - ], + ], # mandatory dependency install_requires=[ - 'pyscard' - ], + 'pyscard', + 'pyserial' + ], # optional dependency extras_require={ 'graph': ['pydot', 'graphviz'] - }, + }, author="Benoit Michau", author_email="michau.benoit@gmail.com",