-
Notifications
You must be signed in to change notification settings - Fork 3.9k
fix(quantization): validate bias scale in QDQ Conv → QLinearConv fusion #28229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| get_tensor_consumers_and_producers, | ||
| ) | ||
|
|
||
| import onnxruntime as ort | ||
| from onnxruntime.quantization import QDQQuantizer, QuantFormat, QuantType, quantize_static, write_calibration_table | ||
| from onnxruntime.quantization.calibrate import CalibrationMethod, TensorData, TensorsData | ||
| from onnxruntime.quantization.quant_utils import quantize_nparray | ||
|
|
@@ -2224,5 +2225,182 @@ def test_quantize_with_prequantized_input(self): | |
| self.assertEqual(zp_val, zp_val_original) | ||
|
|
||
|
|
||
| class TestConvBiasScaleValidation(unittest.TestCase): | ||
| """Tests that QDQ Conv -> QLinearConv fusion is skipped when bias scale does not match | ||
| input_scale * weight_scale, as required by the ONNX QLinearConv specification.""" | ||
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.bias_scale_validation_") | ||
| # Note: swap with the commented line if you want to see the models in local test dir. | ||
| cls._tmp_dir_path = cls._tmp_model_dir.name | ||
| # cls._tmp_dir_path = "." | ||
|
|
||
| @classmethod | ||
| def tearDownClass(cls): | ||
| cls._tmp_model_dir.cleanup() | ||
|
|
||
| def build_qdq_conv_model( | ||
| self, | ||
| inp_shape: list[int], | ||
| weight_shape: list[int], | ||
| x_scale: float, | ||
| w_scale: float, | ||
| b_scale: float, | ||
| x_zp: int = 128, | ||
| w_zp: int = 0, | ||
| ) -> onnx.ModelProto: | ||
| """Builds a QDQ Conv model (DQ->Conv->Q) with the given quantization scales. | ||
|
|
||
| The bias is encoded as int32 with the provided b_scale. When b_scale != x_scale * w_scale | ||
| the model is non-conformant and fusion must be skipped. | ||
| """ | ||
| num_out_channels = weight_shape[0] | ||
| np_x_scale = np.float32(x_scale) | ||
| np_w_scale = np.float32(w_scale) | ||
| np_b_scale = np.float32(b_scale) | ||
|
|
||
| # --- Quantized input (graph input: uint8) --- | ||
| input_q = onnx.helper.make_tensor_value_info("input_q", onnx.TensorProto.UINT8, inp_shape) | ||
|
|
||
| # --- Weight: random int8 constant --- | ||
| rng = np.random.default_rng(42) | ||
| weight_quant_data = rng.integers(-5, 6, size=weight_shape, dtype=np.int8) | ||
| weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_q") | ||
|
|
||
| # --- Scales / zero-points as scalar constants --- | ||
| x_scale_tensor = onnx.numpy_helper.from_array(np.array(np_x_scale, dtype=np.float32), "x_scale") | ||
| x_zp_tensor = onnx.numpy_helper.from_array(np.array(x_zp, dtype=np.uint8), "x_zp") | ||
| w_scale_tensor = onnx.numpy_helper.from_array(np.array(np_w_scale, dtype=np.float32), "w_scale") | ||
| w_zp_tensor = onnx.numpy_helper.from_array(np.array(w_zp, dtype=np.int8), "w_zp") | ||
| b_scale_tensor = onnx.numpy_helper.from_array(np.array(np_b_scale, dtype=np.float32), "b_scale") | ||
| b_zp_tensor = onnx.numpy_helper.from_array(np.array(0, dtype=np.int32), "b_zp") | ||
|
|
||
| # Bias as int32 (quantized with b_scale) | ||
| bias_float = np.ones(num_out_channels, dtype=np.float32) | ||
| bias_q_data = np.round(bias_float / np_b_scale).astype(np.int32) | ||
| bias_q = onnx.numpy_helper.from_array(bias_q_data, "bias_q") | ||
|
|
||
| # Output scale/zp for Q node | ||
| out_scale_val = np.float32(x_scale) | ||
| out_zp_val = np.uint8(128) | ||
| out_scale_tensor = onnx.numpy_helper.from_array(np.array(out_scale_val, dtype=np.float32), "out_scale") | ||
| out_zp_tensor = onnx.numpy_helper.from_array(np.array(out_zp_val, dtype=np.uint8), "out_zp") | ||
|
|
||
| # Nodes: DQ(input) -> | -> Conv -> Q(output) | ||
| # DQ(weight) -/ | ||
| # DQ(bias) -/ | ||
| dq_input = onnx.helper.make_node( | ||
| "DequantizeLinear", ["input_q", "x_scale", "x_zp"], ["input_f"], name="DQ_input" | ||
| ) | ||
| dq_weight = onnx.helper.make_node( | ||
| "DequantizeLinear", ["weight_q", "w_scale", "w_zp"], ["weight_f"], name="DQ_weight" | ||
| ) | ||
| dq_bias = onnx.helper.make_node("DequantizeLinear", ["bias_q", "b_scale", "b_zp"], ["bias_f"], name="DQ_bias") | ||
| conv_node = onnx.helper.make_node("Conv", ["input_f", "weight_f", "bias_f"], ["conv_out"], name="Conv0") | ||
| q_output = onnx.helper.make_node( | ||
| "QuantizeLinear", ["conv_out", "out_scale", "out_zp"], ["output_q"], name="Q_output" | ||
| ) | ||
|
|
||
| output_q = onnx.helper.make_tensor_value_info("output_q", onnx.TensorProto.UINT8, None) | ||
|
|
||
| graph = onnx.helper.make_graph( | ||
| [dq_input, dq_weight, dq_bias, conv_node, q_output], | ||
| "QDQConvBiasScaleTest", | ||
| [input_q], | ||
| [output_q], | ||
| initializer=[ | ||
| weight_quant, | ||
| x_scale_tensor, | ||
| x_zp_tensor, | ||
| w_scale_tensor, | ||
| w_zp_tensor, | ||
| b_scale_tensor, | ||
| b_zp_tensor, | ||
| bias_q, | ||
| out_scale_tensor, | ||
| out_zp_tensor, | ||
| ], | ||
| ) | ||
| opset_imports = [onnx.helper.make_opsetid("", 21)] | ||
| model = onnx.helper.make_model(graph, opset_imports=opset_imports) | ||
| model = onnx.shape_inference.infer_shapes(model) | ||
| return model | ||
|
|
||
| def _run_model(self, model_path: str, inputs: dict, optimize: bool) -> list: | ||
| """Run model with the given optimization level and return outputs.""" | ||
| sess_opts = ort.SessionOptions() | ||
| if optimize: | ||
| sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | ||
| else: | ||
| sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL | ||
| sess = ort.InferenceSession(model_path, sess_options=sess_opts, providers=["CPUExecutionProvider"]) | ||
| return sess.run(None, inputs) | ||
|
|
||
| def test_mismatched_bias_scale_skips_fusion(self): | ||
| """When bias_scale != input_scale * weight_scale, fusion must be skipped. | ||
|
|
||
| The outputs of the optimized and unoptimized sessions should match, proving | ||
| that no silent numerical corruption occurred. | ||
| """ | ||
| inp_shape = [1, 1, 4, 4] | ||
| weight_shape = [1, 1, 1, 1] | ||
|
|
||
| x_scale = 0.05 | ||
| w_scale = 0.02 | ||
| # Intentionally wrong bias scale: 2x the correct value | ||
| correct_b_scale = x_scale * w_scale | ||
| wrong_b_scale = 2.0 * correct_b_scale | ||
|
|
||
| model = self.build_qdq_conv_model(inp_shape, weight_shape, x_scale, w_scale, wrong_b_scale) | ||
| model_path = os.path.join(self._tmp_dir_path, "conv_mismatched_bias_scale.qdq.onnx") | ||
| onnx.save_model(model, model_path) | ||
|
|
||
| # Build a representative input | ||
| rng = np.random.default_rng(0) | ||
| input_data = rng.integers(100, 200, size=inp_shape, dtype=np.uint8) | ||
| inputs = {"input_q": input_data} | ||
|
|
||
| out_optimized = self._run_model(model_path, inputs, optimize=True) | ||
| out_unoptimized = self._run_model(model_path, inputs, optimize=False) | ||
|
|
||
| # Both should produce the same uint8 output (fusion was skipped or fallback path is correct). | ||
| np.testing.assert_array_equal( | ||
| out_optimized[0], | ||
| out_unoptimized[0], | ||
| err_msg="Mismatched bias scale: optimized and unoptimized outputs differ, " | ||
| "indicating silent numerical corruption from incorrect fusion.", | ||
| ) | ||
|
|
||
| def test_matching_bias_scale_allows_fusion(self): | ||
| """When bias_scale == input_scale * weight_scale, fusion should be allowed. | ||
|
|
||
| Both optimized and unoptimized sessions must produce identical outputs. | ||
| """ | ||
| inp_shape = [1, 1, 4, 4] | ||
| weight_shape = [1, 1, 1, 1] | ||
|
|
||
|
Comment on lines
+2375
to
+2382
|
||
| x_scale = 0.05 | ||
| w_scale = 0.02 | ||
| correct_b_scale = x_scale * w_scale # exactly matching | ||
|
|
||
| model = self.build_qdq_conv_model(inp_shape, weight_shape, x_scale, w_scale, correct_b_scale) | ||
| model_path = os.path.join(self._tmp_dir_path, "conv_matching_bias_scale.qdq.onnx") | ||
| onnx.save_model(model, model_path) | ||
|
|
||
| rng = np.random.default_rng(0) | ||
| input_data = rng.integers(100, 200, size=inp_shape, dtype=np.uint8) | ||
| inputs = {"input_q": input_data} | ||
|
|
||
| out_optimized = self._run_model(model_path, inputs, optimize=True) | ||
| out_unoptimized = self._run_model(model_path, inputs, optimize=False) | ||
|
|
||
| np.testing.assert_array_equal( | ||
| out_optimized[0], | ||
| out_unoptimized[0], | ||
| err_msg="Matching bias scale: optimized and unoptimized outputs differ.", | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckConvBiasScalederivesnum_channelsfromw_scales.size(), which is1for per-tensor weight scales even when the Conv has multiple output channels. This makes the loop validate only one channel and also rejects valid cases whereb_scaleis a per-channel vector (length C_out) butw_scaleis scalar (all channels share the same expected bias scale). Consider deriving the channel count from the per-channel scale vector when present (e.g., useb_scales.size()when >1, elsew_scales.size()), and whenw_scales.size()==1compare each channel againstw_scales[0]while allowingb_scaleto be either scalar or length C_out.