diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index e8bc25a..d07651a 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -27,7 +27,9 @@ The devices are: * gpio (used to test the GPIO API and the Quick2Wire breakout board via the Pi's SoC GPIO) * mcp23017 (used to test the MCP23017 expander board) - * pcf8591 (used to test the PCF8591 AD/DA board) + * pcf8591 (...the PCF8591 AD/DA board) + * mcp3221 (...the MCP3221 A/D board) + * ltc2631 (...the LTC2631 D/A board) diff --git a/examples/ltc2631write b/examples/ltc2631write new file mode 100755 index 0000000..41223e8 --- /dev/null +++ b/examples/ltc2631write @@ -0,0 +1,21 @@ +#! /usr/bin/env python3 + +from time import sleep +from quick2wire.i2c import I2CMaster +from quick2wire.parts.ltc2631 import LTC2631 + +# Configure DAC address and variant +DAC_ADDRESS = 0x10 # LTC2631 +DAC_VARIANT = 'HZ12' + +# Send voltages 0, 0.3, 0.6, ... to the DAC, +# sleeping for a second between each. +with I2CMaster() as i2c: + dac = LTC2631(i2c, DAC_VARIANT, address=DAC_ADDRESS) + + with dac.output as out: + nvals=10 + for v in (i*3.0/nvals for i in range(nvals)): + print('v={}'.format(v)) + out.value = v + sleep(1) diff --git a/quick2wire/parts/ltc2309.py b/quick2wire/parts/ltc2309.py new file mode 100644 index 0000000..23667c8 --- /dev/null +++ b/quick2wire/parts/ltc2309.py @@ -0,0 +1,206 @@ +""" +API for the LTC2309 A/D converter + +The LTC2309 chip has eight input pins, named CH0 to CH7, at which it +measures voltage. These can be configured as eight single-ended +channels, or four differential channels (CH0/CH1, CH2/CH3, CH4/CH5 or +CH6/CH7). + +Applications control the chip through an object of the LTC2309 clas. +This is created with an I2CMaster, through which it communicates with +the chip. + +Applications may obtain one of the single-ended channels with the +method `single_ended_input`, and one of the differential channels with +the method `differential_input`. + +There are nine possible addresses which the chip can have, plus one +global address which is used to instruct a suite of LTC2309 chips to +make a conversion at the same time. This function is available +through the method `global_sync`. + +See the data sheet at + +Note that the input voltage is converted only _after_ any read or +write message to the chip (after an I2C STOP command, in fact). +Therefore if a channel is read repeatedly, the value returned is +always the value obtained after the previous read. If two channels +are read alternately, however, then the reads are up to date (since +such an operation involves a command write, which triggers a +conversion). + +For example: + + with I2CMaster() as i2c: + adc = LTC2309(i2c, address=???) + with adc.single_ended_input(0) as in0, adc.differential_input(2) as in2: + print('single={}, differential2={}'.format(in0.value, in2.value) + +The value of a channel is obtained by querying its `value` property. +For single-ended channels the value varies between 0 and 4.096 for +'unipolar' channels, and -2.048 and +2.048 for 'bipolar' channels, +created with single_ended_input(n, bipolar=True). Differential +channels vary between -2.048 and +2.048. + +If the channel's `sleep_after_conversion` method is called, the chip +is put into low-power sleep between conversions (as opposed to a +lowish-power 'napping' mode). + +[This module is copyright 2013, 2014 Octameter Computing (8ameter.com), +and is distributed under the same terms as the rest of the quick2wire +distribution.] +""" + +from quick2wire.i2c import reading, writing_bytes +from quick2wire.gpio import In + +GLOBAL_ADDRESS = 0x6b + +ALLOWED_ADDRESSES = (0x08, 0x09, 0x0a, 0x0b, + 0x18, 0x19, 0x1a, 0x1b, + 0x28) + +# Full scale, unipolar; bipolar is [-FS/2..FS/2] +FULL_RANGE = 4.096 + +class LTC2309(object): + """API to control an LTC2309 A/D converter via I2C.""" + + def __init__(self, master, address=0x08): + """Initialises an LTC2309. + + Parameters: + master -- the I2CMaster with which to commmunicate with the + LTC2309 chip. + address -- the I2C address of the LTC2309 chip, which can be in + (0x08, 0x09, 0x0a, 0x0b, 0x18, 0x19, 0x1a, 0x1b, 0x28), + or the global address 0x6b + [optional, default=0x08] + """ + self.master = master + + if address not in ALLOWED_ADDRESSES: + raise ValueError("Invalid address {}".format(address)) + + self.address = address + self._last_din_read = 0x00 # reading differential channel 0 + + @property + def direction(self): + return In + + def single_ended_input(self, n, bipolar=False): + """Returns the nth single-ended analogue input channel. + If bipolar=True (default False) the channel is signed. """ + if n not in range(8): + raise ValueError("Single-ended input channels must be in range 0..7, not {}".format(n)) + dins = [ 0x80, 0xc0, 0x90, 0xd0, 0xa0, 0xe0, 0xb0, 0xf0 ] + if bipolar: + return _InputChannel(dins[n], + self.read_bipolar, + FULL_RANGE) + else: + return _InputChannel(dins[n] | 0x8, + self.read_unipolar, + FULL_RANGE) + + def differential_input(self, n, negate=False): + """Returns the nth differential analogue input channel from + the set (CH0-CH1, CH2-CH3, CH4-CH5 or CH6-CH7). + If negate=True (default False), the value is inverted, so that + the channel CH0-CH1 becomes instead CH1-CH0.""" + if n not in range(4): + raise ValueError("Differential input channels must be in range 0..3, not {}".format(n)) + return _InputChannel((n | (4 if negate else 0)) << 4, + self.read_bipolar, # ??? + FULL_RANGE) + + def read_unipolar(self, din): + """Return the 12-bit value of a single-ended input channel""" + return self.read_raw(din) + + def read_bipolar(self, din): + """Return the 12-bit value of a differential input channel""" + v = self.read_raw(din) + if v & 0x800: + v -= 0x1000 + return v + + def read_raw_repeated_start(self, din): + """Read a value using the 'repeated start' pattern, which + means that the value returned is the value from the _previous_ + conversion, which might be unexpected, if the Din provided is + different from the previous one. This method is currently unused.""" + if din != self._last_din_read: + l = (writing_bytes(self.address, din),) + self._last_din_read = din + else: + l = () + l = l + (reading(self.address, 2),) + + results = self.master.transaction(*l) + res = (results[0][0] << 4) + (results[0][1] >> 4) + return res + + def read_raw(self, din): + """Read a byte. If the provided Din is different from the + value which applied to the previous read, then update the Din + value first. Note that we do this in two transactions, so + that the STOP at the end of the first will initiate a + conversion, and therefore the value subsequently read will be + done with the modified Din/channel.""" + if din != self._last_din_read: + self.master.transaction(writing_bytes(self.address, din)) + self._last_din_read = din + + results = self.master.transaction(reading(self.address, 2)) + res = (results[0][0] << 4) + (results[0][1] >> 4) + #print('{:x},{:x} -> {:x} = {}'.format(results[0][0], results[0][1], res, res)) + return res + + def global_sync(self): + """Send a command to synchronise all LTC2309s on the bus, + without changing any channel. Although the LTC2309 supports + including a channel-selection byte, this is not currently supported + in this API.""" + self.master.transaction(writing_bytes(GLOBAL_ADDRESS)) + +class _InputChannel(object): + def __init__(self, din, read_fn, full_range): + self._din = din + self._read = read_fn + self._range = full_range / 4096 + + @property + def direction(self): + return In + + def get(self): + return self.get_raw() * self._range + value = property(get) + + def get_raw(self): + return self._read(self._din) + raw_value = property(get_raw) + + def sleep_after_conversion(self, sleep_p): + """If the argument is True, then the LTC2309 will be instructed + to go into sleep mode after each conversion""" + if sleep_p: + self._din = self._din | 4 + else: + self._din = self._din & ~4 + + # No-op implementations of Pin resource management API + + def open(self): + pass + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False diff --git a/quick2wire/parts/ltc2631.py b/quick2wire/parts/ltc2631.py new file mode 100644 index 0000000..fa234fc --- /dev/null +++ b/quick2wire/parts/ltc2631.py @@ -0,0 +1,238 @@ +""" +API for the LTC2631 D/A converter. + +The LTC2631 is a family of 12-, 10-, and 8-bit voltage-output DACs +with an integrated, high accuracy, low-drift reference. The chip +comes in a number of variants. + +The LTC2631-L has a full-scale output of 2.5V, and operates from a +single 2.7V to 5.5V supply. The LTC2631-H has a full-scale output of +4.096V, and operates from a 4.5V to 5.5V supply. The -M and -Z variants +reset to mid- and zero-scale respectively. + +Each DAC can also operate in External Reference mode, in which a +voltage supplied to the REF pin sets the full-scale output +(this mode is not currently supported in this API). + +See the data sheet at , +which discusses the assorted variants in more detail. + +Applications talk to the chip via objects of the LTC2631 class. +When an LTC2631 object is created, it is passed a single I2CMaster, +through which it communicates. + +For example: + + with I2CMaster() as i2c: + adc = LTC2631(i2c, 'LZ12', address=0x10) + with adc.output as output: + # assert 2V as the output voltage + output.set(2.0) + ....more + +The second argument to the constructor is the chip variant, which +matches the regular expression [HL][MZ](8|10|12) (for example 'HZ10' +or 'LM8'). + +The chip is returned to its power-off mode when the program exits the +'output' block (so the above program isn't useful as it stands). + +[This module is copyright 2013, 2014 Octameter Computing (8ameter.com), +and is distributed under the same terms as the rest of the quick2wire +distribution.] +""" + +from quick2wire.i2c import writing_bytes +from quick2wire.gpio import Out + +GLOBAL_ADDRESS = 0x73 + +# For variant codes, see data sheet p3 +Z_ADDRESSES = (0x10, 0x11, 0x12, 0x13, + 0x20, 0x21, 0x22, 0x23, + 0x30, + GLOBAL_ADDRESS) +M_ADDRESSES = (0x10, 0x11, 0x12, + GLOBAL_ADDRESS) + +class Variant(object): + def __init__(self, is_4v, is_zero_reset, nbits): + self._is_zero_reset = is_zero_reset + self._is_4v = is_4v + self._nbits = nbits + + def nbits(self): + return self._nbits + + def addresses(self): + if self._is_zero_reset: + return Z_ADDRESSES + else: + return M_ADDRESSES + + def full_scale(self): + if self._is_4v: + return 4.096 + else: + return 2.5 + + def reset_to_zero_p(self): + return self._is_zero_reset + + def vout(self, voltage): + """Given a desired voltage, returns the DAC input, k, such that + V_out = (k/2^N) Vref. + The result is an int, rounded. + Guaranteed to be in range [0,2^full_bits()]. + """ + if voltage < 0: + return 0 + else: + v = min(voltage/self.full_scale(), 1) # v in [0,1] + return round(v * (1 << self._nbits)) + + def kword(self, voltage): + """Given a desired voltage, returns the 16-bit word which will + be sent to the DAC. + """ + return self.vout(voltage) << (16 - self._nbits) + +# List the bit-widths of the recognised variants. +# If a variant isn't in this list, it's not a known variant +# (ie, this dictionary is complete) +lookup_bits = { 'LM12': 12, + 'LM10': 10, + 'LM8': 8, + 'LZ12': 12, + 'LZ10': 10, + 'LZ8': 8, + 'HM12': 12, + 'HM10': 10, + 'HM8': 8, + 'HZ12': 12, + 'HZ10': 10, + 'HZ8': 8, } + +# List all of the variants which have 4.096V full-scale +# (ie, all those with an 'H' in their name) +lookup_all_4v = ('HM12', 'HM10', 'HM8', 'HZ12', 'HZ10', 'HZ8' ) + +# List all of the variants which reset to zero-scale on power-on +# (ie, all those with a Z in their name +lookup_all_zero_reset = ('LZ12', 'LZ10', 'LZ8', 'HZ12', 'HZ10', 'HZ8') + + +class LTC2631(object): + """API to query and control an LTC2631 D/A converter via I2C. + """ + + def __init__(self, master, variant, address=0x10): + """Initialises an LTC2631. + + Parameters: + master -- the I2CMaster with which to commmunicate with the + LTC2631 chip. + variant -- the LTC2631 variant, such as 'LM12' or HZ8' + address -- the I2C address of the LTC2631 chip, which can be + in (0x10, 0x11, 0x12) for 'M' variants, or + in (0x10, 0x11, 0x12, 0x13, 0x20, 0x21, 0x22, 0x23, 0x30) + for 'Z' variants, + or the global address, 0x73 + (optional, default = 0x10) + """ + self.master = master + + if variant not in lookup_bits: + raise ValueError("Invalid variant {}".format(variant)) + + self._variant = Variant(variant in lookup_all_4v, + variant in lookup_all_zero_reset, + lookup_bits[variant]) + + if address not in self._variant.addresses(): + raise ValueError("Invalid address {} for variant {}".format(address, variant)) + + self.address = address + + self._output = _OutputChannel(self) + + @property + def output(self): + return self._output + + def write(self, value): + # Command codes: + # 0000, Write to input register + # 0001, Update (Power Up) DAC Register + # 0011, Write to and Update (Power Up) DAC Register + word = self._variant.kword(value) + self.master.transaction( + writing_bytes(self.address, + 0x3 << 4, # write and update DAC + (word & 0xff00)>>8, # high byte + (word & 0xff))) # low byte + + + @property + def direction(self): + return Out + + def full_scale(self): + return self._variant.full_scale() + + def reset_to_zero_p(self): + return self._variant.reset_to_zero_p() + + def powerdown(self): + # Command code: 0100, Power Down + self.master.transaction( + writing_bytes(self.address, 0x40, 0, 0)) + + # @property + # def single_ended_output(self): + # return self._single_ended_output + + # def write_single_ended(self): + # """Write the XXX-bit value of a single-ended output channel.""" + # return self.write_raw() + + # def write_raw(self): + # results = self.master.transaction(writing(self.address, 2)) + # XXX + # # results is a (single-element) list of reads; + # # each read is a two-byte array + # r = results[0] + # return r[0]*0x100 + r[1] + + +class _OutputChannel(object): + def __init__(self, bank): + self.bank = bank + + @property + def direction(self): + return Out + + def get(self): + return self._value + + def set(self, value): + self._value = value + self.bank.write(self._value) + + value = property(get, set) + + def open(self): + pass + + def close(self): + # Send the power-down command + self.bank.powerdown() + + def __enter__(self): + self.open() + return self + + def __exit__(self, *exc): + self.close() + return False diff --git a/quick2wire/parts/mcp3221.py b/quick2wire/parts/mcp3221.py new file mode 100644 index 0000000..65e65c1 --- /dev/null +++ b/quick2wire/parts/mcp3221.py @@ -0,0 +1,113 @@ +""" +API for the MCP3221 A/D converter. + +The MCP3221 chip provides a single 12-bit measurement of an input +analogue value, available through one single-ended channel. + +Applications talk to the chip via objects of the MCP3221 class. +When an MCP3221 object is created, it is passed a single I2CMaster, +through which it communicates. + +For example: + + with I2CMaster() as i2c: + adc = MCP3221(i2c) + input = adc.single_ended_input + print("{}".format(input.value)) + +The A/D signal is obtained by querying the channel's 'value' property, +which varies in the range 0.0 <= value < 1.0. + +[This module originally by Octameter Computing (8ameter.com), November 2013.] +""" + +from quick2wire.i2c import reading +from quick2wire.gpio import In + +# According to the MCP3221 documentation, the base address is 0x48 +# (in the sense that the device's 'device code' is 0x48), with an +# 'address' comprised of the following three bits, which default to +# 101. Therefore the base address is 0x48, and the default address is 0x4d. +BASE_ADDRESS = 0x48 + +class MCP3221(object): + """PI to query and control an MCP3221 A/D converter via I2C. + + For the MCP3221, "If [...] the voltage level of AIN is equal to or + less than VSS + 1/2 LSB, the resultant code will be + 000h. Additionally, if the voltage at AIN is equal to or greater + than VDD - 1.5 LSB, the output code will be FFFh." Therefore, + the full scale, corresponding to VSS+1.0*(VDD-VSS), is 1000h, + but the maximum floating-point value that can be returned is FFFh/1000h. + """ + + def __init__(self, master, address=5): + """Initialises an MCP3221. + + Parameters: + master -- the I2CMaster with which to commmunicate with the + MCP3221 chip. + address -- the I2C address of the MCP3221 chip, as a number in [0..7] + (optional, default = 5) + """ + self.master = master + self.address = BASE_ADDRESS + address + + if address < 0 or address >= 8: + raise ValueError("Invalid address {} (should be in [0..7]".format(address)) + else: + self._single_ended_input = _InputChannel(self.read_single_ended, 0x1000*1.0) + + @property + def direction(self): + return In + + @property + def single_ended_input(self): + return self._single_ended_input + + def read_single_ended(self): + """Read the 8-bit value of a single-ended input channel.""" + return self.read_raw() + + def read_raw(self): + results = self.master.transaction(reading(self.address, 2)) + # results is a (single-element) list of reads; + # each read is a two-byte array + r = results[0] + return r[0]*0x100 + r[1] + + +class _InputChannel(object): + def __init__(self, read_fn, scale): + self._read = read_fn + self._scale = scale + + @property + def direction(self): + return In + + @property + def value(self): + return self.get_raw() / self._scale + + def get_raw(self): + return self._read() + + # Expose the value as a property. The property and the underlying + # function must be distinct, since value(self) calls get_raw(). + raw_value = property(get_raw) + + # No-op implementations of Pin resource management API + + def open(self): + pass + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False diff --git a/quick2wire/parts/test_ltc2309.py b/quick2wire/parts/test_ltc2309.py new file mode 100644 index 0000000..314e59b --- /dev/null +++ b/quick2wire/parts/test_ltc2309.py @@ -0,0 +1,271 @@ +from quick2wire.i2c_ctypes import I2C_M_RD +from quick2wire.gpio import In +from quick2wire.parts.ltc2309 import LTC2309 +import pytest + + +# This fake I2C master class is now cut-and-pasted in four places. +# Separating this into a separate module would be desirable, but not trivial +# (because the pytest sys.path includes only the directory where +# pytest was run). That's for a later refactoring. +class FakeI2CMaster: + def __init__(self): + self._requests = [] + self._responses = [] + self._next_response = 0 + self.message_precondition = lambda m: True + + def all_messages_must(self, p): + self.message_precondition + + def clear(self): + self.__init__() + + def transaction(self, *messages): + for m in messages: + self.message_precondition(m) + + self._requests.append(messages) + + read_count = sum(bool(m.flags & I2C_M_RD) for m in messages) + if read_count == 0: + return [] + elif self._next_response < len(self._responses): + response = self._responses[self._next_response] + self._next_response += 1 + return response + else: + return [(0x00,)]*read_count + + def add_response(self, *messages): + self._responses.append(messages) + + @property + def request_count(self): + return len(self._requests) + + def request(self, n): + return self._requests[n] + + +def assert_is_approx(expected, value, delta=0.005): + assert abs(value - expected) <= delta + +def correct_message_for(adc): + def check(m): + assert m.addr == adc.address or m.addr == 0x6b + assert m.flags in (0, I2C_M_RD) + assert m.len <= 2 + + return check + + + +i2c = FakeI2CMaster() + +def is_read(m): + return bool(m.flags & I2C_M_RD) + +def is_write(m): + return not is_read(m) + +def setup_function(f): + i2c.clear() + +def create_ltc2309(*args, **kwargs): + adc = LTC2309(*args, **kwargs) + i2c.message_precondition = correct_message_for(adc) + return adc + +#### + +def test_cannot_create_with_bad_address(): + with pytest.raises(ValueError): + LTC2309(i2c, address=0) + +def test_cannot_create_with_global_address(): + with pytest.raises(ValueError): + LTC2309(i2c, address=0x6b) + +def test_can_create(): + adc = create_ltc2309(i2c) + assert adc.direction == In + +def test_can_create_different_address(): + adc = create_ltc2309(i2c, address=0x18) + assert adc.direction == In + +def test_read_single_unipolar_channel_0(): + adc = create_ltc2309(i2c) + with adc.single_ended_input(0) as input: + assert input.direction == In + + i2c.add_response(bytes([0x12, 0x30])) + + sample = input.value + + # there are two requests, the write which changes the Din, and the read + assert i2c.request_count == 2 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 1 + assert m1.buf[0][0] == 0x88 # 0b1000 10xx (cf data sheet Table 1) + + m2, = i2c.request(1) + assert is_read(m2) + assert m2.len == 2 # 2-byte response + + assert_is_approx(0.291, sample) # 0x123 * 1mV + +def test_read_single_channel_with_sleep(): + # as above, but do two reads, with the second one setting the sleep bit + adc = create_ltc2309(i2c) + with adc.single_ended_input(0) as input: + assert input.direction == In + + i2c.add_response(bytes([0x12, 0x30])) + i2c.add_response(bytes([0x45, 0x60])) + + sample = input.value + assert_is_approx(0.291, sample) + + input.sleep_after_conversion(True) + + sample = input.value + assert_is_approx(1.110, sample) + + # there are four requests: + # 1. the write which changes the Din for the first time + # 2. the first read request + # 3. the write which updates the Din + # 4. the second read request + assert i2c.request_count == 4 + + m, = i2c.request(0) + assert is_write(m) + assert m.len == 1 + assert m.buf[0][0] == 0x88 + + m, = i2c.request(1) + assert is_read(m) + assert m.len == 2 + + m, = i2c.request(2) + assert is_write(m) + assert m.len == 1 + assert m.buf[0][0] == 0x8c # 0x1000 1100, including set SLP bit + + m, = i2c.request(3) + assert is_read(m) + assert m.len == 2 + +def test_read_single_bipolar_channel_0(): + adc = create_ltc2309(i2c) + with adc.single_ended_input(0, bipolar=True) as input: + assert input.direction == In + + i2c.add_response(bytes([0xa0, 0x10])) + + sample = input.value + + assert i2c.request_count == 2 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 1 + assert m1.buf[0][0] == 0x80 # 0x1000 00xx (cf data sheet Table 1) + + m2, = i2c.request(1) + assert is_read(m2) + assert m2.len == 2 + + assert_is_approx(-1.535, sample) # (0xa01 - 0x1000) * 1mV + +def test_multiread_single_unipolar_channels(): + adc = create_ltc2309(i2c) + with adc.single_ended_input(0) as i0, adc.single_ended_input(1) as i1: + assert i0.direction == In + assert i1.direction == In + + i2c.add_response(bytes([0x12, 0x30])) + i2c.add_response(bytes([0x45, 0x60])) + i2c.add_response(bytes([0x78, 0x90])) + + sample = i0.value + # there are two requests, the write which changes the Din, and the read + assert i2c.request_count == 2 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 1 + assert m1.buf[0][0] == 0x88 # 0b1000 10xx (cf data sheet Table 1) + m2, = i2c.request(1) + assert is_read(m2) + assert m2.len == 2 + assert_is_approx(0.291, sample) # 0x123 * 1mV + + sample = i0.value + # just one new request this time, because we don't change the Din + assert i2c.request_count == 3 + m1, = i2c.request(2) + assert is_read(m1) + assert m1.len == 2 + assert_is_approx(1.110, sample) # 0x456 * 1mV + + sample = i1.value + # two new requests again + assert i2c.request_count == 5 + m1, = i2c.request(3) + assert is_write(m1) + assert m1.len == 1 + assert m1.buf[0][0] == 0xc8 # 0b1100 10xx (cf data sheet Table 1) + m2, = i2c.request(4) + assert is_read(m2) + assert m2.len == 2 + assert_is_approx(1.929, sample) # 0x789 * 1mV + +def test_read_differential_channel_0(): + adc = create_ltc2309(i2c) + with adc.differential_input(0) as input: + assert input.direction == In + + i2c.add_response(bytes([0x12, 0x30])) + + sample = input.value + # there is only one requests, because this Din is the default + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_read(m1) + assert m1.len == 2 # 2-byte response + + assert_is_approx(0.291, sample) # 0x123 * 1mV + +def test_read_differential_channel_0_negated(): + # as above, but with the differential sign swapped + adc = create_ltc2309(i2c) + with adc.differential_input(0, negate=True) as input: + assert input.direction == In + + i2c.add_response(bytes([0x12, 0x30])) + + sample = input.value + # there are two requests, the write which changes the Din, and the read + assert i2c.request_count == 2 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 1 + assert m1.buf[0][0] == 0x40 # 0b0100 00xx (cf data sheet Table 1) + + m2, = i2c.request(1) + assert is_read(m2) + assert m2.len == 2 # 2-byte response + + assert_is_approx(0.291, sample) # 0x123 * 1mV + +def test_global_sync(): + adc = create_ltc2309(i2c) + adc.global_sync() + + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.addr == 0x6b + assert m1.len == 0 diff --git a/quick2wire/parts/test_ltc2631.py b/quick2wire/parts/test_ltc2631.py new file mode 100644 index 0000000..5d86af5 --- /dev/null +++ b/quick2wire/parts/test_ltc2631.py @@ -0,0 +1,162 @@ +from quick2wire.i2c_ctypes import I2C_M_RD +from quick2wire.gpio import Out +from quick2wire.parts.ltc2631 import LTC2631 +import pytest + + +# This fake I2C master class is now cut-and-pasted in three places. +# Separating this into a separate module would be desirable, but not trivial +# (because the pytest sys.path includes only the directory where +# pytest was run). That's for a later refactoring. +class FakeI2CMaster: + def __init__(self): + self._requests = [] + self._responses = [] + self._next_response = 0 + self.message_precondition = lambda m: True + + def all_messages_must(self, p): + self.message_precondition + + def clear(self): + self.__init__() + + def transaction(self, *messages): + for m in messages: + self.message_precondition(m) + + self._requests.append(messages) + + read_count = sum(bool(m.flags & I2C_M_RD) for m in messages) + if read_count == 0: + return [] + elif self._next_response < len(self._responses): + response = self._responses[self._next_response] + self._next_response += 1 + return response + else: + return [(0x00,)]*read_count + + def add_response(self, *messages): + self._responses.append(messages) + + @property + def request_count(self): + return len(self._requests) + + def request(self, n): + return self._requests[n] + + +def correct_message_for(dac): + def check(m): + assert m.addr == dac.address + assert m.flags not in (0, I2C_M_RD) + assert m.len == 3 + + return check + + + +i2c = FakeI2CMaster() + +def is_read(m): + return bool(m.flags & I2C_M_RD) + +def is_write(m): + return not is_read(m) + +def setup_function(f): + i2c.clear() + +def create_ltc2631(*args, **kwargs): + dac = LTC2631(*args, **kwargs) + i2c.message_precondition = correct_message_for(dac) + return dac + +def test_cannot_create_invalid_variant(): + with pytest.raises(ValueError): + LTC2631(i2c, 'foo') + +def test_cannot_create_with_bad_address(): + with pytest.raises(ValueError): + LTC2631(i2c, 'LM12', address=0x20) # M variants support 0x10, 11, 12 + +def test_can_create(): + dac = LTC2631(i2c, 'LM12') + assert dac.direction == Out + +def test_can_create_different_address(): + dac = LTC2631(i2c, 'LM12', address=0x12) + assert dac.direction == Out + +def test_can_create_global_address(): + dac = LTC2631(i2c, 'LM12', address=0x73) + assert dac.direction == Out + +def test_can_powerdown(): + dac = LTC2631(i2c, 'LM12') + + dac.powerdown() + + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 3 + assert m1.buf[0][0] == 0x40 # spec requires 0b0100xxxx + assert m1.buf[1][0] == 0 # bytes 2 and 3 are don't-cares + assert m1.buf[2][0] == 0 + +def test_can_write_lm10(): + dac = LTC2631(i2c, 'LM10') + assert not dac.reset_to_zero_p() # This variant resets to mid-range + assert dac.full_scale() == 2.5 + + pin = dac.output + assert pin.direction == Out + + pin.value = 1.25 # -> 1.25/2.5*2^10 = 0x200x + + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 3 + assert m1.buf[0][0] == 0x30 # 0x3x is Write and Update DAC + assert m1.buf[1][0] == 0x80 # 0x200 << 6 = 0x8000 + assert m1.buf[2][0] == 0x00 + +def test_can_write_lz12(): + dac = LTC2631(i2c, 'LZ12') + assert dac.reset_to_zero_p() # This variant resets to zero-range + assert dac.full_scale() == 2.5 + + pin = dac.output + assert pin.direction == Out + + pin.value = 0.1777 # -> 0.1777/2.5*2^12 = 0x123x + + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 3 + assert m1.buf[0][0] == 0x30 # 0x3x is Write and Update DAC + assert m1.buf[1][0] == 0x12 # 0x123 << 4 = 0x1230 + assert m1.buf[2][0] == 0x30 + +def test_can_write_hz8(): + dac = LTC2631(i2c, 'HZ8') + assert dac.reset_to_zero_p() # This variant resets to zero-range + assert dac.full_scale() == 4.096 + + pin = dac.output + assert pin.direction == Out + + pin.value = 2.635 # -> 2.635/4.096*2^8 = 164.69, rounded to 0xa5x + + assert i2c.request_count == 1 + m1, = i2c.request(0) + assert is_write(m1) + assert m1.len == 3 + assert m1.buf[0][0] == 0x30 # 0x3x is Write and Update DAC + assert m1.buf[1][0] == 0xa5 # 0xa5 << 8 = 0xa500 + assert m1.buf[2][0] == 0x00 diff --git a/quick2wire/parts/test_ltc2631_2309_loopback.py b/quick2wire/parts/test_ltc2631_2309_loopback.py new file mode 100644 index 0000000..d2bc6db --- /dev/null +++ b/quick2wire/parts/test_ltc2631_2309_loopback.py @@ -0,0 +1,70 @@ +"""Loopback tests for the LTC2631 DAC and LTC2309 ADC + +Requires 1x LTC2309 and 1x LTC2631 + +Topology: + + * connect 2631 GND to 2309 CH3 + * connect 2631 Vout to 2309 CH2 + +You may need to adjust the DAC_ADDRESS, DAC_VARIANT and ADC_ADDRESS +constants below, depending on the chips available. + +""" + +from quick2wire.i2c import I2CMaster +from quick2wire.parts.ltc2631 import LTC2631 +from quick2wire.parts.ltc2309 import LTC2309 +import pytest + +DAC_ADDRESS = 0x10 # LTC2631 +DAC_VARIANT = 'HZ12' +ADC_ADDRESS = 0x08 # LTC2309 + +from time import sleep + +def setup_function(f): + global i2c + i2c = I2CMaster() + +def teardown_function(f): + i2c.close() + +def assert_is_approx(expected, actual, delta=0.02): + assert abs(actual - expected) <= delta + + +#### + +#setup_function(0) + +@pytest.mark.loopback +@pytest.mark.ltc2309 +@pytest.mark.ltc2631 +def test_single_channel(): + adc = LTC2309(i2c, address=ADC_ADDRESS) + dac = LTC2631(i2c, DAC_VARIANT, address=DAC_ADDRESS) + + with dac.output as out0, adc.single_ended_input(2) as in2, adc.single_ended_input(3) as in3: + nvals = 10 + for v in (i*3.0/nvals for i in range(nvals)): + print('v={}'.format(v)) + out0.value = v + assert_is_approx(v, in2.value) + assert_is_approx(0.0, in3.value) + +@pytest.mark.loopback +@pytest.mark.ltc2309 +@pytest.mark.ltc2631 +def test_differential_channel(): + adc = LTC2309(i2c, address=ADC_ADDRESS) + dac = LTC2631(i2c, DAC_VARIANT, address=DAC_ADDRESS) + + with dac.output as out0, adc.differential_input(1) as in1: + nvals = 10 + for v in (i*2.0/nvals for i in range(nvals, 0, -1)): + out0.value = v + # we must do an extra read here, and thus trigger a + # conversion with the new value + in1.get() + assert_is_approx(v, in1.value) diff --git a/quick2wire/parts/test_mcp3221.py b/quick2wire/parts/test_mcp3221.py new file mode 100644 index 0000000..73a25bd --- /dev/null +++ b/quick2wire/parts/test_mcp3221.py @@ -0,0 +1,98 @@ +from quick2wire.i2c_ctypes import I2C_M_RD +from quick2wire.parts.mcp3221 import MCP3221 +from quick2wire.gpio import In +import pytest + +class FakeI2CMaster: + def __init__(self): + self._requests = [] + self._responses = [] + self._next_response = 0 + self.message_precondition = lambda m: True + + def all_messages_must(self, p): + self.message_precondition + + def clear(self): + self.__init__() + + def transaction(self, *messages): + for m in messages: + self.message_precondition(m) + + self._requests.append(messages) + + read_count = sum(bool(m.flags & I2C_M_RD) for m in messages) + if read_count == 0: + return [] + elif self._next_response < len(self._responses): + response = self._responses[self._next_response] + self._next_response += 1 + return response + else: + return [(0x00,)]*read_count + + def add_response(self, *messages): + self._responses.append(messages) + + @property + def request_count(self): + return len(self._requests) + + def request(self, n): + return self._requests[n] + + +i2c = FakeI2CMaster() + +# def is_read(m): +# return bool(m.flags & I2C_M_RD) + +# def is_write(m): +# return not is_read(m) + +def assert_is_approx(expected, value, delta=0.005): + assert abs(value - expected) <= delta + +def correct_message_for(adc): + def check(m): + assert m.addr == adc.address + assert m.flags in (0, I2C_M_RD) + assert m.len == 1 or m.len == 2 + + return check + + +def setup_function(f): + i2c.clear() + +def create_mcp3221(*args, **kwargs): + adc = MCP3221(*args, **kwargs) + i2c.message_precondition = correct_message_for(adc) + return adc + +def test_cannot_be_created_with_invalid_address(): + with pytest.raises(ValueError): + MCP3221(i2c, 8) + +def test_can_read_a_single_ended_pin(): + adc = create_mcp3221(i2c, 0) + pin = adc.single_ended_input + + i2c.add_response(bytes([0x8, 0x00])) + + assert pin.direction == In + + sample = pin.value + + assert_is_approx(0.5, sample) + +def test_can_read_a_single_ended_pin_raw(): + adc = create_mcp3221(i2c, 0) + pin = adc.single_ended_input + + i2c.add_response(bytes([0x8, 0x00])) + + sample = pin.raw_value + + assert sample == 0x800 diff --git a/quick2wire/parts/test_mcp3221_loopback.py b/quick2wire/parts/test_mcp3221_loopback.py new file mode 100644 index 0000000..4151b3d --- /dev/null +++ b/quick2wire/parts/test_mcp3221_loopback.py @@ -0,0 +1,37 @@ +"""Loopback attempts for the MCP3221 + +We do nothing in this test beyond checking that we can connect to the +chip and read a sane value from it. +""" + +from quick2wire.i2c import I2CMaster +from quick2wire.parts.mcp3221 import MCP3221 +import pytest + +from time import sleep + +def setup_function(f): + global i2c + i2c = I2CMaster() + +# Only a very simple test is possible -- do we get a sane value back +# from the chip? +@pytest.mark.loopback +@pytest.mark.mcp3221 +def test_mcp3221_loopback_single_ended(): + adc = MCP3221(i2c) + input = adc.single_ended_input + v = input.value + assert v >= 0.0 + assert v < 1.0 + +def exercise_mcp3221_loopback_single_ended(): + with I2CMaster() as i2c: + adc = MCP3221(i2c) + input = adc.single_ended_input + + for i in range(10): + print("{}: {}".format(i, input.value)) + sleep(1) + +#exercise_mcp3221_loopback_single_ended()