Skip to content

Commit 12743b4

Browse files
committed
Added new annotation labels for bit_duration, high_period, and low_period.
Split 4th bit(white component) into its own annotation with options to display Hex Decimal or Percent. Added option to display first 3 bits(RGB components) in decimal format Moved several repetitive tasks to their own methods. Improved the handling of RESET conditions within the decode method, ensuring accurate annotation and timing.
1 parent 06f09f6 commit 12743b4

File tree

1 file changed

+142
-89
lines changed
  • decoders/rgb_led_clockless

1 file changed

+142
-89
lines changed

decoders/rgb_led_clockless/pd.py

Lines changed: 142 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ class SamplerateError(Exception):
5454
class DecoderError(Exception):
5555
pass
5656

57+
# Define annotation constants for easy reference.
5758
(
58-
ANN_BIT, ANN_RESET, ANN_RGB,
59+
ANN_BIT, ANN_RESET, ANN_RGB, ANN_W,
5960
ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,
60-
) = range(7)
61+
ANN_BIT_DURATION, ANN_HIGH_PERIOD, ANN_LOW_PERIOD
62+
) = range(11)
6163

6264
class Decoder(srd.Decoder):
6365
api_version = 3
@@ -76,15 +78,21 @@ class Decoder(srd.Decoder):
7678
('bit', 'Bit'),
7779
('reset', 'RESET'),
7880
('rgb', 'RGB'),
81+
('w', 'W'),
7982
('r', 'R'),
8083
('g', 'G'),
8184
('b', 'B'),
82-
('w', 'W'),
85+
('w_comp', 'W'),
86+
('bit_duration', 'Bit Duration'),
87+
('high_period', 'High Period'),
88+
('low_period', 'Low Period'),
8389
)
8490
annotation_rows = (
91+
('bit-timing', 'Bit Timing', (ANN_HIGH_PERIOD, ANN_LOW_PERIOD,)),
8592
('bits', 'Bits', (ANN_BIT, ANN_RESET,)),
93+
('bit-duration', 'Bit Duration', (ANN_BIT_DURATION,)),
8694
('rgb-comps', 'RGB components', (ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,)),
87-
('rgb-vals', 'RGB values', (ANN_RGB,)),
95+
('rgb-vals', 'RGB values', (ANN_RGB, ANN_W,)),
8896
)
8997
options = (
9098
{'id': 'led_type', 'desc': 'LED Type',
@@ -96,63 +104,80 @@ class Decoder(srd.Decoder):
96104
{'id': 'is_rgbw', 'desc': 'Is this RGBW?',
97105
'default': 'False', 'values': ('True', 'False')},
98106
{'id': 'textorder', 'desc': 'Components output order (text)',
99-
'default': 'RGB[W]', 'values': ('wire', 'RGB[W]', 'RGB')},
107+
'default': 'RGB[W]', 'values': ('wire', 'RGB[W]')},
108+
{'id': 'rgb_text_format', 'desc': 'RGB Text Format',
109+
'default': 'hex', 'values': ('hex', 'decimal')},
110+
{'id': 'w_text_format', 'desc': 'W Text Format',
111+
'default': 'hex', 'values': ('hex', 'decimal', 'percentage')},
100112
)
101113

102114
def __init__(self):
103115
self.reset()
104116

105117
def reset(self):
118+
"""Reset internal state for a new decoding session."""
106119
self.samplerate = None
107120
self.bits = []
121+
self.bit_start_sample = None
122+
self.inversion_point_sample = None
123+
self.bit_end_sample = None
124+
self.is_processing_bit = False
125+
self.is_looking_for_reset = False
108126

109127
def preprocess_options(self):
128+
"""Process user options and prepare internal settings based on them."""
110129
if not self.samplerate:
111130
raise SamplerateError('Cannot decode without samplerate.')
112131

113-
# Preprocess wire order
132+
# Get the wireorder, textorder, and determine if RGBW mode is used.
114133
wireorder = self.options['wireorder'].lower()
115-
116-
# Determine if RGBW is selected
117-
is_rgbw = self.options['is_rgbw'].lower() == 'true'
134+
self.is_rgbw = self.options['is_rgbw'].lower() == 'true'
135+
textorder = self.options['textorder'].lower()
118136

119137
# Prepare wire format based on RGBW option
120138
self.wireformat = [c for c in wireorder if c in 'rgb']
121-
if is_rgbw:
139+
if self.is_rgbw:
122140
self.wireformat.append('w')
123141

124142
# Calculate the number of bits needed based on the wire format
125143
self.need_bits = len(self.wireformat) * 8
126144

127145
# Handle the output text format
128-
textorder = self.options['textorder'].lower()
129-
if textorder == 'wire':
130-
self.textformat = 'wire'
131-
elif textorder == 'rgb[w]':
132-
self.textformat = '#{r:02x}{g:02x}{b:02x}{wt:s}'
146+
self.textformat = 'wire' if textorder == 'wire' else '#{r:02x}{g:02x}{b:02x}'
147+
148+
# Determine settings based on the LED type.
149+
led_type = self.options['led_type'].lower()
150+
# Constants for bit timing and reset detection
151+
if led_type == 'ws281x':
152+
self.DUTY_CYCLE_THRESHOLD = 0.5 # 50% threshold for distinguishing bit values
153+
self.RESET_CODE_TIMING = round(self.samplerate * 50e-6)
154+
elif led_type == 'sk6812':
155+
self.DUTY_CYCLE_THRESHOLD = 0.375 # 37.5% threshold for distinguishing bit values
156+
self.RESET_CODE_TIMING = round(self.samplerate * 80e-6)
133157
else:
134-
# Default RGB format string
135-
self.textformat = '#{r:02x}{g:02x}{b:02x}'
158+
raise DecoderError(f'Unsupported LED Type: {led_type}')
159+
self.BIT_PERIOD = 1.25e-6 # 1.25 microseconds for WS281x and SK681
136160

137161
def start(self):
162+
"""Initialize decoder output and prepare options."""
138163
self.out_ann = self.register(srd.OUTPUT_ANN)
139164
self.preprocess_options() # Preprocess options when the decoding starts
140165

141166
def metadata(self, key, value):
167+
"""Receive and store metadata such as samplerate."""
142168
if key == srd.SRD_CONF_SAMPLERATE:
143169
self.samplerate = value
144170

145171
def putg(self, ss, es, cls, text):
172+
"""Helper method to output annotated data."""
146173
self.put(ss, es, self.out_ann, [cls, text])
147174

148175
def handle_bits(self):
176+
"""Process and interpret collected bits into RGB(W) values."""
149177
if len(self.bits) < self.need_bits:
150178
return
151179

152-
# Determine the start and end sample numbers for the packet
153-
ss_packet, es_packet = self.bits[0][1], self.bits[-1][2]
154-
155-
# Initialize RGB and W component values
180+
ss_rgb_packet, es_rgb_packet = self.bits[0][1], self.bits[-1][2]
156181
r, g, b, w = 0, 0, 0, None
157182
comps = []
158183

@@ -163,12 +188,19 @@ def handle_bits(self):
163188
comp_ss, comp_es = comp_bits[0][1], comp_bits[-1][2]
164189
comp_value = bitpack_msb(comp_bits, 0)
165190
comp_text = '{:02x}'.format(comp_value)
191+
192+
# Annotation class selection for the component.
166193
comp_ann = {
167194
'r': ANN_COMP_R, 'g': ANN_COMP_G,
168195
'b': ANN_COMP_B, 'w': ANN_COMP_W,
169196
}.get(c.lower(), None)
197+
198+
if comp_ann is not None:
199+
self.putg(comp_ss, comp_es, comp_ann, [comp_text])
200+
170201
comps.append((comp_ss, comp_es, comp_ann, comp_value, comp_text))
171202

203+
# Assign the value to the appropriate RGBW component.
172204
if c.lower() == 'r':
173205
r = comp_value
174206
elif c.lower() == 'g':
@@ -178,106 +210,127 @@ def handle_bits(self):
178210
elif c.lower() == 'w':
179211
w = comp_value
180212

181-
# Determine the wt (white component text) for formatting
182-
wt = '' if w is None else '{:02x}'.format(w)
183-
184213
# Format the RGB text for annotation
185214
if self.textformat == 'wire':
186-
rgb_text = '#' + ''.join([c[-1] for c in comps])
215+
rgb_text = '#' + ''.join([comp[4] for comp in comps])
216+
if rgb_text:
217+
self.putg(ss_rgb_packet, es_rgb_packet, ANN_RGB, [rgb_text])
187218
else:
188-
rgb_text = self.textformat.format(r=r, g=g, b=b, w=w, wt=wt)
189-
190-
# Annotate each component and the RGB value
191-
for ss_comp, es_comp, cls_comp, _, text_comp in comps:
192-
self.putg(ss_comp, es_comp, cls_comp, [text_comp])
193-
194-
if rgb_text:
195-
self.putg(ss_packet, es_packet, ANN_RGB, [rgb_text])
219+
# Output the RGB part.
220+
ss_rgb_end = self.bits[23][2] if len(self.bits) >= 24 else es_rgb_packet
221+
ss_w_start = self.bits[24][1] if len(self.bits) > 24 else es_rgb_packet
222+
es_w_end = self.bits[-1][2]
223+
224+
rgb_text_format = self.options['rgb_text_format']
225+
if rgb_text_format == 'hex':
226+
rgb_text = self.textformat.format(r=r, g=g, b=b)
227+
elif rgb_text_format == 'decimal':
228+
rgb_text = 'RGB({},{},{})'.format(r, g, b)
229+
230+
if rgb_text:
231+
self.putg(ss_rgb_packet, ss_rgb_end, ANN_RGB, [rgb_text])
232+
233+
# Output the W component if in RGBW mode.
234+
if self.is_rgbw and w is not None:
235+
w_text_format = self.options['w_text_format']
236+
if w_text_format == 'hex':
237+
w_text = '{:02x}'.format(w)
238+
elif w_text_format == 'decimal':
239+
w_text = str(w)
240+
elif w_text_format == 'percentage':
241+
w_text = f'{w * 100 // 255}%'
242+
243+
self.putg(ss_w_start, es_w_end, ANN_W, [w_text])
196244

197245
# Clear the bits list after processing the packet
198246
self.bits.clear()
199247

200-
def handle_bit(self, ss, es, value, ann_late=False):
248+
def handle_bit(self, bit_start_sample, bit_end_sample, bit_value, ann_late=False):
249+
"""Process a single bit and manage its annotations."""
201250
if not ann_late:
202-
text = ['{:d}'.format(value)]
203-
self.putg(ss, es, ANN_BIT, text)
251+
self.putg(bit_start_sample, bit_end_sample, ANN_BIT, ['{:d}'.format(bit_value)])
204252

205-
self.bits.append((value, ss, es))
253+
self.bits.append((bit_value, bit_start_sample, bit_end_sample))
206254
self.handle_bits()
207255

208256
if ann_late:
209-
text = ['{:d}'.format(value)]
210-
self.putg(ss, es, ANN_BIT, text)
257+
self.putg(bit_start_sample, bit_end_sample, ANN_BIT, ['{:d}'.format(bit_value)])
258+
259+
def annotate_bit_timing(self, bit_start_sample, bit_end_sample, inversion_point_sample):
260+
"""Calculate and annotate timing details for a bit."""
261+
bit_duration_us = self.convert_samples_to_time(bit_end_sample - bit_start_sample, 'us')
262+
high_period_us = self.convert_samples_to_time(inversion_point_sample - bit_start_sample, 'us')
263+
low_period_us = self.convert_samples_to_time(bit_end_sample - inversion_point_sample, 'us')
264+
265+
self.putg(bit_start_sample, bit_end_sample, ANN_BIT_DURATION, [f'{bit_duration_us:.2f} µs'])
266+
self.putg(bit_start_sample, inversion_point_sample, ANN_HIGH_PERIOD, [f'{high_period_us:.2f} µs'])
267+
self.putg(inversion_point_sample, bit_end_sample, ANN_LOW_PERIOD, [f'{low_period_us:.2f} µs'])
268+
269+
def process_bit(self, bit_start_sample, bit_end_sample, inversion_point_sample):
270+
"""Process a bit and update annotations."""
271+
period = bit_end_sample - bit_start_sample
272+
high_time = inversion_point_sample - bit_start_sample
273+
bit_value = 1 if (high_time / period) > self.DUTY_CYCLE_THRESHOLD else 0
274+
275+
self.handle_bit(bit_start_sample, bit_end_sample, bit_value)
276+
self.annotate_bit_timing(bit_start_sample, bit_end_sample, inversion_point_sample)
211277

212278
def decode(self):
213-
led_type = self.options['led_type'].lower()
214-
# Constants for bit timing and reset detection
215-
if led_type == 'ws281x':
216-
DUTY_CYCLE_THRESHOLD = 0.5 # 50% threshold for distinguishing bit values
217-
RESET_CODE_TIMING = round(self.samplerate * 50e-6)
218-
elif led_type == 'sk6812':
219-
DUTY_CYCLE_THRESHOLD = 0.375 # 37.5% threshold for distinguishing bit values
220-
RESET_CODE_TIMING = round(self.samplerate * 80e-6)
221-
else:
222-
raise DecoderError('Unsupported LED Type')
223-
BIT_PERIOD = 1.25e-6 # 1.25 microseconds for WS281x and SK6812
224-
HALF_BIT_PERIOD = int(self.samplerate * (BIT_PERIOD / 2))
279+
"""Main decoding loop to interpret the input signal."""
225280

226-
# Conditions for bit and reset detection
227281
cond_bit_starts = {0: 'r'}
228282
cond_inbit_edge = {0: 'f'}
229-
cond_reset_pulse = {'skip': RESET_CODE_TIMING + 1}
283+
cond_reset_pulse = {'skip': self.RESET_CODE_TIMING + 1}
230284
conds = [cond_bit_starts, cond_inbit_edge, cond_reset_pulse]
231285

232-
ss_bit, inv_bit, es_bit = None, None, None
286+
self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = None, None, None
233287
pin, = self.wait({0: 'l'})
234-
inv_bit = self.samplenum
235-
check_reset = False
288+
self.inversion_point_sample = self.samplenum
289+
self.is_processing_bit = False
290+
self.is_looking_for_reset = False
236291

237292
while True:
238293
pin, = self.wait(conds)
239294

240-
# Check for RESET condition
241-
if check_reset and self.matched[2]:
242-
es_bit = inv_bit
243-
ss_rst, es_rst = inv_bit, self.samplenum
244-
245-
if ss_bit and inv_bit:
246-
# Decode the last bit value
247-
duty = inv_bit - ss_bit
248-
period = self.samplerate * BIT_PERIOD
249-
thres = period * DUTY_CYCLE_THRESHOLD
250-
bit_value = 1 if duty >= thres else 0
295+
if self.is_looking_for_reset and self.matched[2]:
296+
self.bit_end_sample = self.inversion_point_sample
297+
reset_start_sample, reset_end_sample = self.inversion_point_sample, self.samplenum
251298

299+
if self.bit_start_sample and self.inversion_point_sample:
252300
# Extend the last bit's annotation to the expected end of the bit period
253-
expected_es_bit = ss_bit + int(self.samplerate * BIT_PERIOD)
254-
self.handle_bit(ss_bit, expected_es_bit, bit_value, True)
301+
expected_bit_end_sample = self.bit_start_sample + int(self.samplerate * self.BIT_PERIOD)
302+
self.process_bit(self.bit_start_sample, expected_bit_end_sample, self.inversion_point_sample)
255303

256304
# Update the start and end of the reset to be after the bit period
257-
ss_rst = expected_es_bit
258-
es_rst = expected_es_bit + RESET_CODE_TIMING
305+
reset_start_sample = expected_bit_end_sample
306+
reset_end_sample = expected_bit_end_sample + self.RESET_CODE_TIMING
259307

260308
# Annotate RESET after the extended bit period
261-
if ss_rst and es_rst:
262-
text = ['RESET', 'RST', 'R']
263-
self.putg(ss_rst, es_rst, ANN_RESET, text)
309+
if reset_start_sample and reset_end_sample:
310+
self.putg(reset_start_sample, reset_end_sample, ANN_RESET, ['RESET', 'RST', 'R'])
264311

265-
check_reset = False
312+
self.is_looking_for_reset = False
266313
self.bits.clear()
267-
ss_bit, inv_bit, es_bit = None, None, None
314+
self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = None, None, None
268315

269316
# Bit value detection logic
270-
if self.matched[0]: # Rising edge starts a bit time
271-
check_reset = False
272-
if ss_bit and inv_bit:
273-
es_bit = self.samplenum
274-
period = es_bit - ss_bit
275-
duty = inv_bit - ss_bit
276-
bit_value = 1 if (duty / period) > DUTY_CYCLE_THRESHOLD else 0
277-
self.handle_bit(ss_bit, es_bit, bit_value)
278-
ss_bit, inv_bit, es_bit = self.samplenum, None, None
279-
280-
if self.matched[1]: # Falling edge ends its high period
281-
check_reset = True
282-
inv_bit = self.samplenum
317+
if self.matched[0]: # Rising edge indicates the start of a bit
318+
self.is_processing_bit = True
319+
self.is_looking_for_reset = False
320+
321+
if self.bit_start_sample and self.inversion_point_sample:
322+
self.bit_end_sample = self.samplenum
323+
self.process_bit(self.bit_start_sample, self.bit_end_sample, self.inversion_point_sample)
324+
325+
self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = self.samplenum, None, None
326+
327+
if self.matched[1]: # Falling edge indicates the end of high period in bit
328+
self.is_looking_for_reset = True
329+
self.is_processing_bit = False
330+
self.inversion_point_sample = self.samplenum
331+
332+
def convert_samples_to_time(self, samples, unit='us'):
333+
"""Convert sample counts to time units based on the current samplerate."""
334+
factor = 1e6 if unit == 'us' else 1e3
335+
return (samples / self.samplerate) * factor
283336

0 commit comments

Comments
 (0)