diff --git a/india_compliance/hooks.py b/india_compliance/hooks.py index bb6f6333b1..fea1ba3f6e 100644 --- a/india_compliance/hooks.py +++ b/india_compliance/hooks.py @@ -362,6 +362,9 @@ "erpnext.assets.doctype.asset.depreciation.cancel_depreciation_entries": ( "india_compliance.income_tax_india.overrides.asset_depreciation_schedule.cancel_depreciation_entries" ), + "erpnext.accounts.doctype.tax_withholding_category.tax_withholding_category.get_tax_id_for_party": ( + "india_compliance.income_tax_india.overrides.tax_withholding_category.get_tax_id_for_party" + ), } } diff --git a/india_compliance/income_tax_india/overrides/tax_withholding_category.py b/india_compliance/income_tax_india/overrides/tax_withholding_category.py index 6a2ef757d0..e500d75874 100644 --- a/india_compliance/income_tax_india/overrides/tax_withholding_category.py +++ b/india_compliance/income_tax_india/overrides/tax_withholding_category.py @@ -10,3 +10,11 @@ def _get_tax_withholding_accounts(): return set(frappe.get_all("Tax Withholding Account", pluck="account", filters={"company": company})) return frappe.cache.hget("tax_withholding_accounts", company, generator=_get_tax_withholding_accounts) + + +def get_tax_id_for_party(party_type, party): + # PAN field is only available for Customer and Supplier. + if party_type in ("Customer", "Supplier"): + return frappe.db.get_value(party_type, party, "pan") + + return "" diff --git a/india_compliance/income_tax_india/overrides/test_tax_withholding_category.py b/india_compliance/income_tax_india/overrides/test_tax_withholding_category.py new file mode 100644 index 0000000000..e882f2a45a --- /dev/null +++ b/india_compliance/income_tax_india/overrides/test_tax_withholding_category.py @@ -0,0 +1,303 @@ +import random +import string + +import frappe +from erpnext.accounts.doctype.tax_withholding_category.tax_withholding_category import ( + get_tax_id_for_party, +) +from erpnext.accounts.utils import get_fiscal_year +from frappe.tests import IntegrationTestCase +from frappe.utils import today + +from india_compliance.gst_india.utils.tests import create_purchase_invoice + +COMPANY = "_Test Indian Registered Company" +ABBR = "_TIRC" +CATEGORY = "Test PAN TDS Category" +THRESHOLD_CATEGORY = "Test PAN Threshold TDS Category" + + +class TestTaxWithholdingCategory(IntegrationTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + create_tds_setup() + + def test_returns_pan_for_supplier(self): + pan = generate_unique_pan() + supplier = create_supplier("_Test TDS Supplier With PAN", pan=pan) + result = get_tax_id_for_party("Supplier", supplier) + self.assertEqual(result, pan) + + def test_returns_pan_for_customer(self): + pan = generate_unique_pan() + customer = create_customer("_Test TDS Customer With PAN", pan=pan) + result = get_tax_id_for_party("Customer", customer) + self.assertEqual(result, pan) + + def test_returns_none_for_party_other_than_customer_or_supplier(self): + party_name = "_Test TDS Employee With PAN" + result = get_tax_id_for_party("Employee", party_name) + self.assertEqual(result, "") + + def test_tds_deducted_and_tax_id_set_as_pan(self): + pan = generate_unique_pan() + supplier = create_supplier("_Test TDS PAN Supplier", pan=pan) + frappe.db.set_value("Supplier", supplier, "tax_withholding_category", CATEGORY) + + pi = create_purchase_invoice( + supplier=supplier, + company=COMPANY, + apply_tds=1, + rate=50000, + do_not_submit=1, + ) + pi.submit() + + tds_amount = sum(d.base_tax_amount for d in pi.taxes if d.is_tax_withholding_account) + self.assertEqual(tds_amount, 5000) + + twe_rows = frappe.get_all( + "Tax Withholding Entry", + filters={"parenttype": "Purchase Invoice", "parent": pi.name}, + fields=["name", "tax_id", "party", "party_type"], + ) + self.assertTrue(twe_rows) + for row in twe_rows: + self.assertEqual(row.tax_id, pan) + + def test_threshold_considers_entries_for_parties_with_same_pan(self): + pan = generate_unique_pan() + suffix = frappe.generate_hash(length=6) + + supplier_1 = create_supplier( + f"_Test TDS Shared PAN Supplier A {suffix}", + pan=pan, + ) + supplier_2 = create_supplier( + f"_Test TDS Shared PAN Supplier B {suffix}", + pan=pan, + ) + + for supplier in (supplier_1, supplier_2): + frappe.db.set_value( + "Supplier", + supplier, + "tax_withholding_category", + THRESHOLD_CATEGORY, + ) + + pi_1 = create_purchase_invoice( + supplier=supplier_1, + company=COMPANY, + apply_tds=1, + rate=10000, + do_not_submit=1, + ) + pi_1.submit() + + pi_2 = create_purchase_invoice( + supplier=supplier_2, + company=COMPANY, + apply_tds=1, + rate=10000, + do_not_submit=1, + ) + pi_2.submit() + + pi_3 = create_purchase_invoice( + supplier=supplier_2, + company=COMPANY, + apply_tds=1, + rate=10000, + do_not_submit=1, + ) + pi_3.submit() + + tds_1 = sum(d.base_tax_amount for d in pi_1.taxes if d.is_tax_withholding_account) + tds_2 = sum(d.base_tax_amount for d in pi_2.taxes if d.is_tax_withholding_account) + tds_3 = sum(d.base_tax_amount for d in pi_3.taxes if d.is_tax_withholding_account) + + self.assertEqual(tds_1, 0) + self.assertEqual(tds_2, 0) + self.assertEqual(tds_3, 2000) + + def test_ldc_applies_for_party_with_same_pan(self): + pan = generate_unique_pan() + suffix = frappe.generate_hash(length=6) + + supplier_1 = create_supplier( + f"_Test LDC PAN Supplier A {suffix}", + pan=pan, + ) + supplier_2 = create_supplier( + f"_Test LDC PAN Supplier B {suffix}", + pan=pan, + ) + + for supplier in (supplier_1, supplier_2): + frappe.db.set_value("Supplier", supplier, "tax_withholding_category", CATEGORY) + + ldc_doc = create_lower_deduction_certificate( + supplier=supplier_1, + tax_withholding_category=CATEGORY, + tax_rate=2, + certificate_no=f"LDC-{suffix}", + limit=50000, + ) + + pi = create_purchase_invoice( + supplier=supplier_2, + company=COMPANY, + apply_tds=1, + rate=10000, + do_not_submit=1, + ) + pi.submit() + + tds_amount = sum(d.base_tax_amount for d in pi.taxes if d.is_tax_withholding_account) + self.assertEqual(tds_amount, 200) + + twe_rows = frappe.get_all( + "Tax Withholding Entry", + filters={"parenttype": "Purchase Invoice", "parent": pi.name}, + fields=["tax_id", "lower_deduction_certificate"], + ) + self.assertTrue(twe_rows) + self.assertEqual(twe_rows[0].tax_id, pan) + self.assertEqual(twe_rows[0].lower_deduction_certificate, ldc_doc.name) + + +def create_party(party_type, name, pan=None): + party = party_type.lower() + if not frappe.db.exists(party_type, name): + doc = frappe.new_doc(party_type) + doc.update( + { + f"{party}_name": name, + f"{party}_type": "Individual", + } + ) + doc.save() + + frappe.db.set_value(party_type, name, "pan", pan) + return name + + +def create_supplier(name, pan=None): + return create_party("Supplier", name, pan=pan) + + +def generate_unique_pan(): + existing_pans = frappe.get_all("Supplier", pluck="pan", filters={"pan": ("is", "set")}) + existing_pans += frappe.get_all("Customer", pluck="pan", filters={"pan": ("is", "set")}) + existing_pans = set(existing_pans) + + for _ in range(100): + letters = "".join(random.choices(string.ascii_uppercase, k=5)) + digits = "".join(random.choices(string.digits, k=4)) + suffix = random.choice(string.ascii_uppercase) + pan = f"{letters}{digits}{suffix}" + + if pan not in existing_pans: + return pan + + existing_pans.add(pan) + + raise RuntimeError("Unable to generate unique PAN") + + +def create_customer(name, pan=None): + return create_party("Customer", name, pan=pan) + + +def create_lower_deduction_certificate( + supplier, + tax_withholding_category, + tax_rate, + certificate_no, + limit, +): + fiscal_year = get_fiscal_year(today(), company=COMPANY) + doc = frappe.get_doc( + { + "doctype": "Lower Deduction Certificate", + "company": COMPANY, + "supplier": supplier, + "certificate_no": certificate_no, + "tax_withholding_category": tax_withholding_category, + "fiscal_year": fiscal_year[0], + "valid_from": fiscal_year[1], + "valid_upto": fiscal_year[2], + "rate": tax_rate, + "certificate_limit": limit, + } + ).insert(ignore_if_duplicate=True) + return doc + + +def create_tax_withholding_category(category_name, account_name, **kwargs): + fiscal_year = get_fiscal_year(today(), company=COMPANY, as_dict=True) + tax_withholding_rate = kwargs.pop("tax_withholding_rate", 10) + single_threshold = kwargs.pop("single_threshold", 0) + cumulative_threshold = kwargs.pop("cumulative_threshold", 0) + + rate_row = { + "from_date": fiscal_year.year_start_date, + "to_date": fiscal_year.year_end_date, + "tax_withholding_rate": tax_withholding_rate, + "single_threshold": single_threshold, + "cumulative_threshold": cumulative_threshold, + } + account_row = {"company": COMPANY, "account": account_name} + + if frappe.db.exists("Tax Withholding Category", category_name): + doc = frappe.get_doc("Tax Withholding Category", category_name) + + else: + doc = frappe.new_doc("Tax Withholding Category") + doc.name = category_name + + doc.update(kwargs) + doc.set("accounts", [account_row]) + doc.set("rates", [rate_row]) + doc.save() + + return doc + + +def create_account(account_name, parent_account, company): + company_abbr = frappe.get_cached_value("Company", company, "abbr") + account = frappe.db.get_value("Account", f"{account_name} - {company_abbr}") + if account: + return account + + return ( + frappe.get_doc( + { + "doctype": "Account", + "account_name": account_name, + "parent_account": parent_account, + "company": company, + } + ) + .insert() + .name + ) + + +def create_tds_setup(): + account_name = f"TDS Payable - {ABBR}" + create_account( + account_name="TDS Payable", + parent_account=f"Duties and Taxes - {ABBR}", + company=COMPANY, + ) + + create_tax_withholding_category(CATEGORY, account_name) + create_tax_withholding_category( + THRESHOLD_CATEGORY, + account_name, + disable_transaction_threshold=1, + cumulative_threshold=30000, + ) diff --git a/india_compliance/patches.txt b/india_compliance/patches.txt index 225a9111e4..7ae911894c 100644 --- a/india_compliance/patches.txt +++ b/india_compliance/patches.txt @@ -83,3 +83,5 @@ execute:from india_compliance.audit_trail.setup import create_custom_fields; cre india_compliance.patches.v15.update_itc_classification_for_sez_import_invoices india_compliance.patches.v15.set_boe_applicable_for_import_goods_purchase_invoices india_compliance.patches.post_install.update_gst_treatment_for_import_transactions +india_compliance.patches.v16.update_tax_id_for_tax_withholding_entries + diff --git a/india_compliance/patches/v16/update_tax_id_for_tax_withholding_entries.py b/india_compliance/patches/v16/update_tax_id_for_tax_withholding_entries.py new file mode 100644 index 0000000000..1922278a5b --- /dev/null +++ b/india_compliance/patches/v16/update_tax_id_for_tax_withholding_entries.py @@ -0,0 +1,31 @@ +import frappe + + +def execute(): + """ + Update tax_id in Tax Withholding Entry from PAN of linked party (Customer/Supplier) for Indian companies. + """ + indian_companies = frappe.get_all("Company", filters={"country": "India"}, pluck="name") + if not indian_companies: + return + + update_tax_id("Supplier", indian_companies) + update_tax_id("Customer", indian_companies) + + +def update_tax_id(party_type, companies): + twe = frappe.qb.DocType("Tax Withholding Entry", alias="twe") + party = frappe.qb.DocType(party_type, alias="party") + + ( + frappe.qb.update(twe) + .join(party) + .on(twe.party == party.name) + .set(twe.tax_id, party.pan) + .where(twe.party_type == party_type) + .where(twe.company.isin(companies)) + .where(party.pan.isnotnull()) + .where(party.pan != "") + .where(twe.created_by_migration == 0) + .run() + )