diff --git a/ecommerce_integrations/ecommerce_integrations/doctype/ecommerce_item/ecommerce_item.py b/ecommerce_integrations/ecommerce_integrations/doctype/ecommerce_item/ecommerce_item.py index 81985d80c..b814e4d59 100644 --- a/ecommerce_integrations/ecommerce_integrations/doctype/ecommerce_item/ecommerce_item.py +++ b/ecommerce_integrations/ecommerce_integrations/doctype/ecommerce_item/ecommerce_item.py @@ -7,6 +7,8 @@ from frappe.model.document import Document from frappe.utils import cstr, get_datetime, now +from ecommerce_integrations.shopify.connection import temp_shopify_session + class EcommerceItem(Document): erpnext_item_code: str # item_code in ERPNext @@ -23,6 +25,46 @@ def validate(self): def before_insert(self): self.check_unique_constraints() + # handles deletion of item both from shopify and erpnext + @temp_shopify_session + def on_trash(self): + from ecommerce_integrations.shopify.product import delete_from_shopify + + frappe.logger().info( + f"[EcommerceItem:on_trash] Triggered for {self.name} | Integration: {self.integration}" + ) + + sales_order = frappe.db.exists("Sales Order Item", {"item_code": self.erpnext_item_code}) + sales_invoice = frappe.db.exists("Sales Invoice Item", {"item_code": self.erpnext_item_code}) + + if not (sales_order or sales_invoice): + if self.integration and self.integration.lower() == "shopify" and self.integration_item_code: + frappe.logger().info( + f"[Shopify] Preparing to delete. has_variants={self.has_variants}, " + f"variant_id={self.variant_id}, integration_item_code={self.integration_item_code}" + ) + + product_gid = f"gid://shopify/Product/{self.integration_item_code}" + result = delete_from_shopify(product_id=product_gid) + + if self.erpnext_item_code and frappe.db.exists("Item", self.erpnext_item_code): + item_doc = frappe.get_doc("Item", self.erpnext_item_code) + if not item_doc.disabled: + item_doc.db_set("disabled", 1) + frappe.logger().info( + f"[ERPNext] Item '{self.erpnext_item_code}' disabled (instead of deleted)." + ) + else: + frappe.logger().info(f"[ERPNext] Item '{self.erpnext_item_code}' already disabled.") + + frappe.logger().info(f"[Shopify] Deleted Product {product_gid} | Response: {result}") + else: + frappe.logger().info( + f"[Shopify] Skipped — no integration or integration_item_code for {self.name}" + ) + else: + frappe.throw(_("Item Cannot be Deleted — linked with Sales Order or Invoice.")) + def check_unique_constraints(self) -> None: filters = [] @@ -64,7 +106,10 @@ def is_synced( integration: shopify, integration_item_code: TSHIRT """ - filter = {"integration": integration, "integration_item_code": integration_item_code} + filter = { + "integration": integration, + "integration_item_code": integration_item_code, + } if variant_id: filter.update({"variant_id": variant_id}) @@ -87,7 +132,10 @@ def get_erpnext_item_code( variant_id: str | None = None, has_variants: int | None = 0, ) -> str | None: - filters = {"integration": integration, "integration_item_code": integration_item_code} + filters = { + "integration": integration, + "integration_item_code": integration_item_code, + } if variant_id: filters.update({"variant_id": variant_id}) elif has_variants: @@ -111,11 +159,16 @@ def get_erpnext_item( item_code = None if sku: item_code = frappe.db.get_value( - "Ecommerce Item", {"sku": sku, "integration": integration}, fieldname="erpnext_item_code" + "Ecommerce Item", + {"sku": sku, "integration": integration}, + fieldname="erpnext_item_code", ) if not item_code: item_code = get_erpnext_item_code( - integration, integration_item_code, variant_id=variant_id, has_variants=has_variants + integration, + integration_item_code, + variant_id=variant_id, + has_variants=has_variants, ) if item_code: diff --git a/ecommerce_integrations/shopify/connection.py b/ecommerce_integrations/shopify/connection.py index 4a4c7c86d..d751527f8 100644 --- a/ecommerce_integrations/shopify/connection.py +++ b/ecommerce_integrations/shopify/connection.py @@ -6,8 +6,7 @@ import frappe from frappe import _ -from shopify.resources import Webhook -from shopify.session import Session +from shopify import GraphQL, Session from ecommerce_integrations.shopify.constants import ( API_VERSION, @@ -29,7 +28,11 @@ def wrapper(*args, **kwargs): setting = frappe.get_doc(SETTING_DOCTYPE) if setting.is_enabled(): - auth_details = (setting.shopify_url, API_VERSION, setting.get_password("password")) + auth_details = ( + setting.shopify_url, + API_VERSION, + setting.get_password("password"), + ) with Session.temp(*auth_details): return func(*args, **kwargs) @@ -37,44 +40,166 @@ def wrapper(*args, **kwargs): return wrapper -def register_webhooks(shopify_url: str, password: str) -> list[Webhook]: - """Register required webhooks with shopify and return registered webhooks.""" +def register_webhooks(shopify_url: str, password: str) -> list[dict]: + """Register required webhooks using Shopify GraphQL API.""" + new_webhooks = [] - # clear all stale webhooks matching current site url before registering new ones + # Remove old webhooks unregister_webhooks(shopify_url, password) + mutation = """ + mutation webhookSubscriptionCreate($topic: WebhookSubscriptionTopic!, $callbackUrl: URL!) { + webhookSubscriptionCreate( + topic: $topic + webhookSubscription: { format: JSON, callbackUrl: $callbackUrl } + ) { + webhookSubscription { + id + topic + endpoint { + __typename + ... on WebhookHttpEndpoint { + callbackUrl + } + } + } + userErrors { + field + message + } + } + } + """ + with Session.temp(shopify_url, API_VERSION, password): for topic in WEBHOOK_EVENTS: - webhook = Webhook.create({"topic": topic, "address": get_callback_url(), "format": "json"}) + # Ensure JSON object result + raw = GraphQL().execute( + mutation, + { + "topic": topic, + "callbackUrl": get_callback_url(), + }, + ) + + try: + result = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + create_shopify_log(status="Error", message="Invalid GraphQL response", response_data=raw) + continue + + # Core nodes + root = result.get("data") + errors = result.get("errors") + + # If `data` is None => fatal GraphQL error + if root is None: + msg = errors[0].get("message") if errors else "Unknown Shopify GraphQL error" + create_shopify_log( + status="Error", + message=msg, + response_data=result, + exception=errors, + ) + continue - if webhook.is_valid(): - new_webhooks.append(webhook) - else: + create_node = root.get("webhookSubscriptionCreate") + + if not create_node: + create_shopify_log( + status="Error", + message="Missing webhookSubscriptionCreate", + response_data=result, + ) + continue + + # User errors + user_errors = create_node.get("userErrors") or [] + if user_errors: + msg = user_errors[0].get("message") create_shopify_log( status="Error", - response_data=webhook.to_dict(), - exception=webhook.errors.full_messages(), + message=msg, + response_data=result, + exception=user_errors, ) + continue + webhook = create_node.get("webhookSubscription") + if webhook: + new_webhooks.append(webhook) + query = """ + query { + webhookSubscriptionsCount { + count + precision + } + } + """ + response = GraphQL().execute(query) + create_shopify_log( + status="Success", message="Webhooks added to current url", response_data=response + ) return new_webhooks def unregister_webhooks(shopify_url: str, password: str) -> None: - """Unregister all webhooks from shopify that correspond to current site url.""" - url = get_current_domain_name() + """Unregister all GraphQL webhooks for the current site URL.""" + + query = """ + { + webhookSubscriptions(first: 250) { + edges { + node { + id + endpoint { + __typename + ... on WebhookHttpEndpoint { + callbackUrl + } + } + } + } + } + } + """ + + delete_mutation = """ + mutation webhookSubscriptionDelete($id: ID!) { + webhookSubscriptionDelete(id: $id) { + deletedWebhookSubscriptionId + userErrors { + field + message + } + } + } + """ with Session.temp(shopify_url, API_VERSION, password): - for webhook in Webhook.find(): - if url in webhook.address: - webhook.destroy() + result_raw = GraphQL().execute(query) + + try: + result = json.loads(result_raw) if isinstance(result_raw, str) else result_raw + except Exception: + frappe.log_error(f"Invalid GraphQL response: {result_raw}", "Shopify Unregister Webhooks") + return + + edges = result.get("data", {}).get("webhookSubscriptions", {}).get("edges", []) + + for edge in edges: + node = edge.get("node", {}) + webhook_id = node.get("id") + if webhook_id: + with Session.temp(shopify_url, API_VERSION, password): + response = GraphQL().execute(delete_mutation, {"id": webhook_id}) + create_shopify_log( + status="Success", message="Webhook deleted for the current url", response_data=response + ) def get_current_domain_name() -> str: - """Get current site domain name. E.g. test.erpnext.com - - If developer_mode is enabled and localtunnel_url is set in site config then domain is set to localtunnel_url. - """ if frappe.conf.developer_mode and frappe.conf.localtunnel_url: return frappe.conf.localtunnel_url else: @@ -99,16 +224,15 @@ def store_request_data() -> None: _validate_request(frappe.request, hmac_header) data = json.loads(frappe.request.data) + event = frappe.request.headers.get("X-Shopify-Topic") process_request(data, event) def process_request(data, event): - # create log log = create_shopify_log(method=EVENT_MAPPER[event], request_data=data) - # enqueue backround job frappe.enqueue( method=EVENT_MAPPER[event], queue="short", @@ -121,9 +245,11 @@ def process_request(data, event): def _validate_request(req, hmac_header): settings = frappe.get_doc(SETTING_DOCTYPE) secret_key = settings.shared_secret + raw_body = req.get_data() - sig = base64.b64encode(hmac.new(secret_key.encode("utf8"), req.data, hashlib.sha256).digest()) + computed_hmac = base64.b64encode( + hmac.new(secret_key.encode("utf-8"), raw_body, hashlib.sha256).digest() + ).decode() - if sig != bytes(hmac_header.encode()): - create_shopify_log(status="Error", request_data=req.data) + if not hmac.compare_digest(computed_hmac, hmac_header): frappe.throw(_("Unverified Webhook Data")) diff --git a/ecommerce_integrations/shopify/constants.py b/ecommerce_integrations/shopify/constants.py index 47720e032..489eb30f2 100644 --- a/ecommerce_integrations/shopify/constants.py +++ b/ecommerce_integrations/shopify/constants.py @@ -6,14 +6,17 @@ SETTING_DOCTYPE = "Shopify Setting" OLD_SETTINGS_DOCTYPE = "Shopify Settings" -API_VERSION = "2024-01" +API_VERSION = "2025-04" WEBHOOK_EVENTS = [ - "orders/create", - "orders/paid", - "orders/fulfilled", - "orders/cancelled", - "orders/partially_fulfilled", + "ORDERS_CANCELLED", + "ORDERS_CREATE", + "ORDERS_FULFILLED", + "ORDERS_PAID", + "ORDERS_PARTIALLY_FULFILLED", + "PRODUCTS_CREATE", + "RETURNS_APPROVE", + "REFUNDS_CREATE", ] EVENT_MAPPER = { @@ -22,10 +25,11 @@ "orders/fulfilled": "ecommerce_integrations.shopify.fulfillment.prepare_delivery_note", "orders/cancelled": "ecommerce_integrations.shopify.order.cancel_order", "orders/partially_fulfilled": "ecommerce_integrations.shopify.fulfillment.prepare_delivery_note", + "products/create": "ecommerce_integrations.shopify.product.create_item", + "returns/approve": "ecommerce_integrations.shopify.return.process_shopify_return", + "refunds/create": "ecommerce_integrations.shopify.return.process_invoice_return", } -SHOPIFY_VARIANTS_ATTR_LIST = ["option1", "option2", "option3"] - # custom fields CUSTOMER_ID_FIELD = "shopify_customer_id" @@ -37,6 +41,14 @@ ADDRESS_ID_FIELD = "shopify_address_id" ORDER_ITEM_DISCOUNT_FIELD = "shopify_item_discount" ITEM_SELLING_RATE_FIELD = "shopify_selling_rate" +SHOPIFY_LINE_ITEM_ID_FIELD = "shopify_line_item_id" +SHOPIFY_RETURN_ID_FIELD = "shopify_return_id" + # ERPNext already defines the default UOMs from Shopify but names are different -WEIGHT_TO_ERPNEXT_UOM_MAP = {"kg": "Kg", "g": "Gram", "oz": "Ounce", "lb": "Pound"} +WEIGHT_TO_ERPNEXT_UOM_MAP = { + "KILOGRAMS": "Kg", + "GRAMS": "Gram", + "POUNDS": "Lb", + "OUNCES": "Oz", +} diff --git a/ecommerce_integrations/shopify/customer.py b/ecommerce_integrations/shopify/customer.py index 3a0ee952f..d5d786696 100644 --- a/ecommerce_integrations/shopify/customer.py +++ b/ecommerce_integrations/shopify/customer.py @@ -20,24 +20,27 @@ def __init__(self, customer_id: str): def sync_customer(self, customer: dict[str, Any]) -> None: """Create Customer in ERPNext using shopify's Customer dict.""" - customer_name = cstr(customer.get("first_name")) + " " + cstr(customer.get("last_name")) if len(customer_name.strip()) == 0: customer_name = customer.get("email") - customer_group = self.setting.customer_group super().sync_customer(customer_name, customer_group) - - billing_address = customer.get("billing_address", {}) or customer.get("default_address") + billing_address = customer.get("billing_address", {}) or customer.get("defaultAddress") shipping_address = customer.get("shipping_address", {}) if billing_address: self.create_customer_address( - customer_name, billing_address, address_type="Billing", email=customer.get("email") + customer_name, + billing_address, + address_type="Billing", + email=customer.get("email"), ) if shipping_address: self.create_customer_address( - customer_name, shipping_address, address_type="Shipping", email=customer.get("email") + customer_name, + shipping_address, + address_type="Shipping", + email=customer.get("email"), ) self.create_customer_contact(customer) @@ -54,12 +57,10 @@ def create_customer_address( super().create_customer_address(address_fields) def update_existing_addresses(self, customer): - billing_address = customer.get("billing_address", {}) or customer.get("default_address") + billing_address = customer.get("billing_address", {}) or customer.get("defaultAddress") shipping_address = customer.get("shipping_address", {}) - customer_name = cstr(customer.get("first_name")) + " " + cstr(customer.get("last_name")) email = customer.get("email") - if billing_address: self._update_existing_address(customer_name, billing_address, "Billing", email) if shipping_address: @@ -76,12 +77,14 @@ def _update_existing_address( if not old_address: self.create_customer_address(customer_name, shopify_address, address_type, email) + else: exclude_in_update = ["address_title", "address_type"] new_values = _map_address_fields(shopify_address, customer_name, address_type, email) old_address.update({k: v for k, v in new_values.items() if k not in exclude_in_update}) old_address.flags.ignore_mandatory = True + old_address.save() def create_customer_contact(self, shopify_customer: dict[str, Any]) -> None: diff --git a/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/__init__.py b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.js b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.js new file mode 100644 index 000000000..37c13a7b7 --- /dev/null +++ b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.js @@ -0,0 +1,8 @@ +// Copyright (c) 2026, Frappe and contributors +// For license information, please see license.txt + +// frappe.ui.form.on("Shopify Bulk Sync Progress", { +// refresh(frm) { + +// }, +// }); diff --git a/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.json b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.json new file mode 100644 index 000000000..5db2ee9f3 --- /dev/null +++ b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.json @@ -0,0 +1,52 @@ +{ + "actions": [], + "allow_rename": 1, + "creation": "2026-01-12 13:04:01.580094", + "default_view": "List", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "bulk_id", + "last_synced_product_id" + ], + "fields": [ + { + "fieldname": "bulk_id", + "fieldtype": "Data", + "label": "Bulk ID" + }, + { + "fieldname": "last_synced_product_id", + "fieldtype": "Data", + "label": "Last Synced Product ID" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "links": [], + "modified": "2026-01-12 13:05:17.198209", + "modified_by": "Administrator", + "module": "shopify", + "name": "Shopify Bulk Sync Progress", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "Administrator", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.py b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.py new file mode 100644 index 000000000..a6779af03 --- /dev/null +++ b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/shopify_bulk_sync_progress.py @@ -0,0 +1,9 @@ +# Copyright (c) 2026, Frappe and contributors +# For license information, please see license.txt + +# import frappe +from frappe.model.document import Document + + +class ShopifyBulkSyncProgress(Document): + pass diff --git a/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/test_shopify_bulk_sync_progress.py b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/test_shopify_bulk_sync_progress.py new file mode 100644 index 000000000..6d084fc4f --- /dev/null +++ b/ecommerce_integrations/shopify/doctype/shopify_bulk_sync_progress/test_shopify_bulk_sync_progress.py @@ -0,0 +1,9 @@ +# Copyright (c) 2026, Frappe and Contributors +# See license.txt + +# import frappe +from frappe.tests.utils import FrappeTestCase + + +class TestShopifyBulkSyncProgress(FrappeTestCase): + pass diff --git a/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.js b/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.js index 5995c8409..b06bcb600 100644 --- a/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.js +++ b/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.js @@ -1,6 +1,3 @@ -// Copyright (c) 2021, Frappe and contributors -// For license information, please see LICENSE - frappe.provide("ecommerce_integrations.shopify.shopify_setting"); frappe.ui.form.on("Shopify Setting", { @@ -15,6 +12,24 @@ frappe.ui.form.on("Shopify Setting", { }); }, + sync_old_orders: function (frm) { + if (frm.doc.sync_old_orders) { + frm.set_value("sync_old_orders", 1); + frm.save().then(() => { + frappe.call({ + doc: frm.doc, + method: "sync_old_orders_in_shopify", + freeze: true, + freeze_message: __("Syncing old orders from Shopify..."), + callback: function (r) { + if (!r.exc) { + frappe.msgprint(__("Synced Orders")); + } + }, + }); + }); + } + }, fetch_shopify_locations: function (frm) { frappe.call({ doc: frm.doc, diff --git a/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.py b/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.py index dc974e70e..59c30d97d 100644 --- a/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.py +++ b/ecommerce_integrations/shopify/doctype/shopify_setting/shopify_setting.py @@ -1,12 +1,10 @@ -# Copyright (c) 2021, Frappe and contributors -# For license information, please see LICENSE +import json import frappe from frappe import _ from frappe.custom.doctype.custom_field.custom_field import create_custom_fields from frappe.utils import get_datetime -from shopify.collection import PaginatedIterator -from shopify.resources import Location +from shopify import GraphQL from ecommerce_integrations.controllers.setting import ( ERPNextWarehouse, @@ -23,8 +21,13 @@ ORDER_ITEM_DISCOUNT_FIELD, ORDER_NUMBER_FIELD, ORDER_STATUS_FIELD, + SHOPIFY_LINE_ITEM_ID_FIELD, + SHOPIFY_RETURN_ID_FIELD, SUPPLIER_ID_FIELD, ) +from ecommerce_integrations.shopify.order import ( + sync_old_orders as sync_old_shopify_orders, +) from ecommerce_integrations.shopify.utils import ( ensure_old_connector_is_disabled, migrate_from_old_connector, @@ -52,7 +55,7 @@ def on_update(self): migrate_from_old_connector() def _handle_webhooks(self): - if self.is_enabled() and not self.webhooks: + if self.is_enabled(): new_webhooks = connection.register_webhooks(self.shopify_url, self.get_password("password")) if not new_webhooks: @@ -62,7 +65,10 @@ def _handle_webhooks(self): frappe.throw(msg) for webhook in new_webhooks: - self.append("webhooks", {"webhook_id": webhook.id, "method": webhook.topic}) + self.append( + "webhooks", + {"webhook_id": webhook.get("id").split("/")[-1], "method": webhook.get("topic")}, + ) elif not self.is_enabled(): connection.unregister_webhooks(self.shopify_url, self.get_password("password")) @@ -81,26 +87,79 @@ def _initalize_default_values(self): @frappe.whitelist() @connection.temp_shopify_session def update_location_table(self): - """Fetch locations from shopify and add it to child table so user can - map it with correct ERPNext warehouse.""" + """Fetch locations from Shopify using GraphQL and add it to child table + so user can map it with correct ERPNext warehouse.""" self.shopify_warehouse_mapping = [] - for locations in PaginatedIterator(Location.find()): - for location in locations: + + # GraphQL query to fetch locations + query = """ + query($cursor: String) { + locations(first: 50, includeLegacy: true, after: $cursor) { + edges { + node { + id + name + legacyResourceId + isActive + isFulfillmentService + } + } + pageInfo { + hasNextPage + endCursor + } + } + } + """ + + has_next_page = True + cursor = None + + while has_next_page: + # Execute GraphQL query + variables = {"cursor": cursor} if cursor else {} + response = GraphQL().execute(query, variables=variables) + result = json.loads(response) + + # Process locations + locations_data = result.get("data", {}).get("locations", {}) + + for edge in locations_data.get("edges", []): + location = edge.get("node", {}) + + location_id = location.get("legacyResourceId") or location.get("id").split("/")[-1] + self.append( "shopify_warehouse_mapping", - {"shopify_location_id": location.id, "shopify_location_name": location.name}, + { + "shopify_location_id": location_id, + "shopify_location_name": location.get("name"), + }, ) + # Check if there are more pages + page_info = locations_data.get("pageInfo", {}) + has_next_page = page_info.get("hasNextPage", False) + cursor = page_info.get("endCursor") + + @frappe.whitelist() + def sync_old_orders_in_shopify(self): + sync_old_shopify_orders() + def get_erpnext_warehouses(self) -> list[ERPNextWarehouse]: return [wh_map.erpnext_warehouse for wh_map in self.shopify_warehouse_mapping] - def get_erpnext_to_integration_wh_mapping(self) -> dict[ERPNextWarehouse, IntegrationWarehouse]: + def get_erpnext_to_integration_wh_mapping( + self, + ) -> dict[ERPNextWarehouse, IntegrationWarehouse]: return { wh_map.erpnext_warehouse: wh_map.shopify_location_id for wh_map in self.shopify_warehouse_mapping } - def get_integration_to_erpnext_wh_mapping(self) -> dict[IntegrationWarehouse, ERPNextWarehouse]: + def get_integration_to_erpnext_wh_mapping( + self, + ) -> dict[IntegrationWarehouse, ERPNextWarehouse]: return { wh_map.shopify_location_id: wh_map.erpnext_warehouse for wh_map in self.shopify_warehouse_mapping } @@ -180,6 +239,14 @@ def setup_custom_fields(): insert_after="discount_and_margin", read_only=1, ), + dict( + fieldname=SHOPIFY_LINE_ITEM_ID_FIELD, + label="Shopify Line Item Id", + fieldtype="Data", + insert_after=ORDER_ITEM_DISCOUNT_FIELD, + read_only=1, + print_hide=1, + ), ], "Delivery Note": [ dict( @@ -214,6 +281,22 @@ def setup_custom_fields(): read_only=1, print_hide=1, ), + dict( + fieldname=SHOPIFY_LINE_ITEM_ID_FIELD, + label="Shopify Return Id", + fieldtype="Small Text", + insert_after=FULLFILLMENT_ID_FIELD, + read_only=1, + print_hide=1, + ), + dict( + fieldname=SHOPIFY_RETURN_ID_FIELD, + label="Shopify Return Id", + fieldtype="Small Text", + insert_after=ORDER_ID_FIELD, + read_only=1, + print_hide=1, + ), ], "Sales Invoice": [ dict( @@ -240,6 +323,14 @@ def setup_custom_fields(): read_only=1, print_hide=1, ), + dict( + fieldname=SHOPIFY_RETURN_ID_FIELD, + label="Shopify Return Id", + fieldtype="Small Text", + insert_after=ORDER_ID_FIELD, + read_only=1, + print_hide=1, + ), ], } diff --git a/ecommerce_integrations/shopify/fulfillment.py b/ecommerce_integrations/shopify/fulfillment.py index 5ffc0ebae..8cf7b4881 100644 --- a/ecommerce_integrations/shopify/fulfillment.py +++ b/ecommerce_integrations/shopify/fulfillment.py @@ -9,6 +9,7 @@ ORDER_ID_FIELD, ORDER_NUMBER_FIELD, SETTING_DOCTYPE, + SHOPIFY_LINE_ITEM_ID_FIELD, ) from ecommerce_integrations.shopify.order import get_sales_order from ecommerce_integrations.shopify.utils import create_shopify_log @@ -18,7 +19,6 @@ def prepare_delivery_note(payload, request_id=None): frappe.set_user("Administrator") setting = frappe.get_doc(SETTING_DOCTYPE) frappe.flags.request_id = request_id - order = payload try: @@ -27,7 +27,10 @@ def prepare_delivery_note(payload, request_id=None): create_delivery_note(order, setting, sales_order) create_shopify_log(status="Success") else: - create_shopify_log(status="Invalid", message="Sales Order not found for syncing delivery note.") + create_shopify_log( + status="Invalid", + message="Sales Order not found for syncing delivery note.", + ) except Exception as e: create_shopify_log(status="Error", exception=e, rollback=True) @@ -60,7 +63,6 @@ def create_delivery_note(shopify_order, setting, so): def get_fulfillment_items(dn_items, fulfillment_items, location_id=None): - # local import to avoid circular imports from ecommerce_integrations.shopify.product import get_item_code fulfillment_items = deepcopy(fulfillment_items) @@ -72,7 +74,7 @@ def get_fulfillment_items(dn_items, fulfillment_items, location_id=None): final_items = [] def find_matching_fullfilement_item(dn_item): - nonlocal fulfillment_items + fulfillment_items for item in fulfillment_items: if get_item_code(item) == dn_item.item_code: @@ -81,6 +83,13 @@ def find_matching_fullfilement_item(dn_item): for dn_item in dn_items: if shopify_item := find_matching_fullfilement_item(dn_item): - final_items.append(dn_item.update({"qty": shopify_item.get("quantity"), "warehouse": warehouse})) + dn_item.qty = shopify_item.get("quantity") + dn_item.warehouse = warehouse + setattr( + dn_item, + SHOPIFY_LINE_ITEM_ID_FIELD, + str(shopify_item.get("id")), + ) + final_items.append(dn_item) return final_items diff --git a/ecommerce_integrations/shopify/inventory.py b/ecommerce_integrations/shopify/inventory.py index 526107dd3..eaf9d53b0 100644 --- a/ecommerce_integrations/shopify/inventory.py +++ b/ecommerce_integrations/shopify/inventory.py @@ -1,9 +1,10 @@ +import json from collections import Counter import frappe from frappe.utils import cint, create_batch, now from pyactiveresource.connection import ResourceNotFound -from shopify.resources import InventoryLevel, Variant +from shopify import GraphQL from ecommerce_integrations.controllers.inventory import ( get_inventory_levels, @@ -17,6 +18,7 @@ def update_inventory_on_shopify() -> None: """Upload stock levels from ERPNext to Shopify. + This is Called by scheduler on configured interval. """ @@ -44,17 +46,117 @@ def upload_inventory_data_to_shopify(inventory_levels, warehous_map) -> None: d.shopify_location_id = warehous_map[d.warehouse] try: - variant = Variant.find(d.variant_id) - inventory_id = variant.inventory_item_id - - InventoryLevel.set( - location_id=d.shopify_location_id, - inventory_item_id=inventory_id, - # shopify doesn't support fractional quantity - available=cint(d.actual_qty) - cint(d.reserved_qty), + # GraphQL query to get inventory item ID from variant + variant_query = """ + query($id: ID!) { + productVariant(id: $id) { + id + inventoryItem { + id + legacyResourceId + } + } + } + """ + + variant_gid = f"gid://shopify/ProductVariant/{d.variant_id}" + variant_response = GraphQL().execute(variant_query, variables={"id": variant_gid}) + variant_result = json.loads(variant_response) + + # Check if variant exists + variant_data = variant_result.get("data", {}).get("productVariant") + if not variant_data: + raise ResourceNotFound("Variant not found") + inventory_item_gid = variant_data.get("inventoryItem", {}).get("id") + + location_gid = f"gid://shopify/Location/{d.shopify_location_id}" + + activate_mutation = """ + mutation($inventoryItemId: ID!, $locationId: ID!) { + inventoryActivate( + inventoryItemId: $inventoryItemId + locationId: $locationId + ) { + inventoryLevel { + id + } + userErrors { + field + message + } + } + } + """ + + activate_response = GraphQL().execute( + activate_mutation, + variables={ + "inventoryItemId": inventory_item_gid, + "locationId": location_gid, + }, ) + activate_result = json.loads(activate_response) + + # Check activation errors + activate_errors = ( + activate_result.get("data", {}).get("inventoryActivate", {}).get("userErrors", []) + ) + if activate_errors: + pass + + # GraphQL mutation to set inventory level + inventory_mutation = """ + mutation($inventoryItemId: ID!, $locationId: ID!, $available: Int!) { + inventorySetQuantities( + input: { + reason: "correction" + name: "available" + ignoreCompareQuantity: true + quantities: [ + { + inventoryItemId: $inventoryItemId + locationId: $locationId + quantity: $available + } + ] + } + ) { + inventoryAdjustmentGroup { + id + reason + } + userErrors { + field + message + } + } + } + """ + + available_qty = cint(d.actual_qty) - cint(d.reserved_qty) + + mutation_response = GraphQL().execute( + inventory_mutation, + variables={ + "inventoryItemId": inventory_item_gid, + "locationId": location_gid, + "available": available_qty, + }, + ) + mutation_result = json.loads(mutation_response) + + # Check for errors + user_errors = ( + mutation_result.get("data", {}).get("inventorySetQuantities", {}).get("userErrors", []) + ) + + if user_errors: + error_messages = [err.get("message") for err in user_errors] + raise Exception("; ".join(error_messages)) + update_inventory_sync_status(d.ecom_item, time=synced_on) d.status = "Success" + except ResourceNotFound: # Variant or location is deleted, mark as last synced and ignore. update_inventory_sync_status(d.ecom_item, time=synced_on) @@ -63,6 +165,8 @@ def upload_inventory_data_to_shopify(inventory_levels, warehous_map) -> None: d.status = "Failed" d.failure_reason = str(e) + # Commit is required here to persist each inventory update independently + # during bulk Shopify sync to prevent data loss on partial failures frappe.db.commit() _log_inventory_update_status(inventory_sync_batch) diff --git a/ecommerce_integrations/shopify/invoice.py b/ecommerce_integrations/shopify/invoice.py index f841cb416..61d1ecb70 100644 --- a/ecommerce_integrations/shopify/invoice.py +++ b/ecommerce_integrations/shopify/invoice.py @@ -1,5 +1,6 @@ import frappe from erpnext.selling.doctype.sales_order.sales_order import make_sales_invoice +from frappe import _ from frappe.utils import cint, cstr, getdate, nowdate from ecommerce_integrations.shopify.constants import ( @@ -11,7 +12,7 @@ def prepare_sales_invoice(payload, request_id=None): - from ecommerce_integrations.shopify.order import get_sales_order + from ecommerce_integrations.shopify.order import create_sales_order, get_sales_order order = payload @@ -25,7 +26,15 @@ def prepare_sales_invoice(payload, request_id=None): create_sales_invoice(order, setting, sales_order) create_shopify_log(status="Success") else: - create_shopify_log(status="Invalid", message="Sales Order not found for syncing sales invoice.") + sales_order = create_sales_order(order, setting) + if sales_order: + create_sales_invoice(order, setting, sales_order) + create_shopify_log(status="Success") + else: + create_shopify_log( + status="Invalid", message="Sales Order could not be created for syncing sales invoice." + ) + except Exception as e: create_shopify_log(status="Error", exception=e, rollback=True) @@ -47,7 +56,12 @@ def create_sales_invoice(shopify_order, setting, so): sales_invoice.due_date = posting_date sales_invoice.naming_series = setting.sales_invoice_series or "SI-Shopify-" sales_invoice.flags.ignore_mandatory = True - set_cost_center(sales_invoice.items, setting.cost_center) + set_cost_center( + sales_invoice.items, + setting.cost_center, + sales_invoice.company, + setting, + ) sales_invoice.insert(ignore_mandatory=True) sales_invoice.submit() if sales_invoice.grand_total > 0: @@ -57,9 +71,31 @@ def create_sales_invoice(shopify_order, setting, so): sales_invoice.add_comment(text=f"Order Note: {shopify_order.get('note')}") -def set_cost_center(items, cost_center): +def set_cost_center(items, cost_center, company, setting=None): for item in items: - item.cost_center = cost_center + # Cost Center + if cost_center and not item.cost_center: + item.cost_center = cost_center + # Income Account resolution + if not item.income_account: + item.income_account = frappe.db.get_value( + "Item Default", + { + "parent": item.item_code, + "company": company, + }, + "income_account", + ) + + if not item.income_account: + item.income_account = frappe.db.get_value( + "Item Group", + item.item_group, + "default_income_account", + ) + + if not item.income_account: + frappe.throw(_(f"Income Account not found for Item {item.item_code}")) def make_payament_entry_against_sales_invoice(doc, setting, posting_date=None): diff --git a/ecommerce_integrations/shopify/order.py b/ecommerce_integrations/shopify/order.py index 0570d035b..abe60e2b2 100644 --- a/ecommerce_integrations/shopify/order.py +++ b/ecommerce_integrations/shopify/order.py @@ -2,10 +2,10 @@ from typing import Literal, Optional import frappe +import pytz from frappe import _ from frappe.utils import cint, cstr, flt, get_datetime, getdate, nowdate -from shopify.collection import PaginatedIterator -from shopify.resources import Order +from shopify import GraphQL from ecommerce_integrations.shopify.connection import temp_shopify_session from ecommerce_integrations.shopify.constants import ( @@ -16,9 +16,13 @@ ORDER_NUMBER_FIELD, ORDER_STATUS_FIELD, SETTING_DOCTYPE, + SHOPIFY_LINE_ITEM_ID_FIELD, ) from ecommerce_integrations.shopify.customer import ShopifyCustomer -from ecommerce_integrations.shopify.product import create_items_if_not_exist, get_item_code +from ecommerce_integrations.shopify.product import ( + create_items_if_not_exist, + get_item_code, +) from ecommerce_integrations.shopify.utils import create_shopify_log from ecommerce_integrations.utils.price_list import get_dummy_price_list from ecommerce_integrations.utils.taxation import get_dummy_tax_category @@ -48,10 +52,10 @@ def sync_sales_order(payload, request_id=None): customer.sync_customer(customer=shopify_customer) else: customer.update_existing_addresses(shopify_customer) - create_items_if_not_exist(order) setting = frappe.get_doc(SETTING_DOCTYPE) + create_order(order, setting) except Exception as e: create_shopify_log(status="Error", exception=e, rollback=True) @@ -66,7 +70,7 @@ def create_order(order, setting, company=None): so = create_sales_order(order, setting, company) if so: - if order.get("financial_status") == "paid": + if order.get("financial_status") == "PAID": create_sales_invoice(order, setting, so) if order.get("fulfillments"): @@ -94,7 +98,7 @@ def create_sales_order(shopify_order, setting, company=None): "Following items exists in the shopify order but relevant records were" " not found in the shopify Product master" ) - product_not_exists = [] # TODO: fix missing items + product_not_exists = [] # fix missing items message += "\n" + ", ".join(product_not_exists) create_shopify_log(status="Error", exception=message, rollback=True) @@ -124,6 +128,7 @@ def create_sales_order(shopify_order, setting, company=None): so.update({"company": company, "status": "Draft"}) so.flags.ignore_mandatory = True so.flags.shopiy_order_json = json.dumps(shopify_order) + normalize_item_wise_tax_detail(so.taxes) so.save(ignore_permissions=True) so.submit() @@ -145,7 +150,10 @@ def get_order_items(order_items, setting, delivery_date, taxes_inclusive): if not shopify_item.get("product_exists"): all_product_exists = False product_not_exists.append( - {"title": shopify_item.get("title"), ORDER_ID_FIELD: shopify_item.get("id")} + { + "title": shopify_item.get("title"), + ORDER_ID_FIELD: shopify_item.get("id"), + } ) continue @@ -163,6 +171,7 @@ def get_order_items(order_items, setting, delivery_date, taxes_inclusive): ORDER_ITEM_DISCOUNT_FIELD: ( _get_total_discount(shopify_item) / cint(shopify_item.get("quantity")) ), + SHOPIFY_LINE_ITEM_ID_FIELD: str(shopify_item.get("id")), } ) else: @@ -182,7 +191,7 @@ def _get_item_price(line_item, taxes_inclusive: bool) -> float: return price - (total_discount / qty) total_taxes = 0.0 - for tax in line_item.get("tax_lines"): + for tax in line_item.get("tax_lines", []): total_taxes += flt(tax.get("price")) return price - (total_taxes + total_discount) / qty @@ -193,70 +202,129 @@ def _get_total_discount(line_item) -> float: return sum(flt(discount.get("amount")) for discount in discount_allocations) +def consolidate_order_taxes(taxes): + """ + Consolidate taxes by account_head. + Always returns a list of dicts and keeps item_wise_tax_detail as a dict here. + The caller must stringify item_wise_tax_detail before inserting into ERPNext. + """ + tax_account_wise_data = {} + for tax in taxes or []: + account_head = tax.get("account_head") + if not account_head: + # skip malformed rows + continue + + if account_head not in tax_account_wise_data: + tax_account_wise_data[account_head] = { + "charge_type": "Actual", + "account_head": account_head, + "description": tax.get("description"), + "cost_center": tax.get("cost_center"), + "included_in_print_rate": tax.get("included_in_print_rate", 0), + "dont_recompute_tax": tax.get("dont_recompute_tax", 1), + "tax_amount": 0.0, + "item_wise_tax_detail": {}, + } + + entry = tax_account_wise_data[account_head] + entry["tax_amount"] = flt(entry.get("tax_amount", 0.0)) + flt(tax.get("tax_amount", 0.0)) + + # Merge item_wise_tax_detail if present (accept dict or JSON-string) + item_detail = tax.get("item_wise_tax_detail") or {} + if isinstance(item_detail, str): + try: + item_detail = json.loads(item_detail) + except Exception: + item_detail = {} + if isinstance(item_detail, dict): + entry["item_wise_tax_detail"].update(item_detail) + + # return as list (ERPNext expects an iterable of dicts) + return list(tax_account_wise_data.values()) + + def get_order_taxes(shopify_order, setting, items): + """ + Build taxes list for an order. + + IMPORTANT: ERPNext expects tax.item_wise_tax_detail to be a JSON STRING when it + reads tax rows. So we keep item_wise_tax_detail as dict while building, then + stringify all rows at the end. + """ taxes = [] - line_items = shopify_order.get("line_items") + line_items = shopify_order.get("line_items") or [] for line_item in line_items: item_code = get_item_code(line_item) - for tax in line_item.get("tax_lines"): + for tax in line_item.get("tax_lines") or []: taxes.append( { "charge_type": "Actual", "account_head": get_tax_account_head(tax, charge_type="sales_tax"), "description": ( get_tax_account_description(tax) - or f"{tax.get('title')} - {tax.get('rate') * 100.0:.2f}%" + or f"{tax.get('title')} - {flt(tax.get('rate')) * 100.0:.2f}%" ), - "tax_amount": tax.get("price"), + "tax_amount": flt(tax.get("price")), "included_in_print_rate": 0, "cost_center": setting.cost_center, - "item_wise_tax_detail": {item_code: [flt(tax.get("rate")) * 100, flt(tax.get("price"))]}, + "item_wise_tax_detail": { + item_code: { + "rate": flt(tax.get("rate")) * 100, + "tax_amount": flt(tax.get("price")), + } + }, "dont_recompute_tax": 1, } ) + # Update taxes with shipping lines (this function should append dicts similarly) update_taxes_with_shipping_lines( taxes, - shopify_order.get("shipping_lines"), + shopify_order.get("shipping_lines") or [], setting, items, - taxes_inclusive=shopify_order.get("taxes_included"), + taxes_inclusive=bool(shopify_order.get("taxes_included")), ) + # Consolidate if requested (returns list) if cint(setting.consolidate_taxes): taxes = consolidate_order_taxes(taxes) - for row in taxes: - tax_detail = row.get("item_wise_tax_detail") - if isinstance(tax_detail, dict): - row["item_wise_tax_detail"] = json.dumps(tax_detail) + # ensure every row has item_wise_tax_detail as JSON string --- + normalized = [] + for row in taxes or []: + r = dict(row) + + detail = r.get("item_wise_tax_detail") + + if isinstance(detail, dict): + r["item_wise_tax_detail"] = json.dumps(detail) + elif isinstance(detail, list): + converted = {} + for idx, entry in enumerate(detail): + if isinstance(entry, dict): + key = entry.get("item_code") or entry.get("name") or f"row_{idx+1}" + converted[key] = entry + else: + converted[f"row_{idx+1}"] = {"tax": entry} + r["item_wise_tax_detail"] = json.dumps(converted) + elif isinstance(detail, str): + try: + loaded = json.loads(detail) + if isinstance(loaded, dict): + r["item_wise_tax_detail"] = json.dumps(loaded) + else: + r["item_wise_tax_detail"] = "{}" + except Exception: + r["item_wise_tax_detail"] = "{}" + else: + r["item_wise_tax_detail"] = "{}" - return taxes + normalized.append(r) - -def consolidate_order_taxes(taxes): - tax_account_wise_data = {} - for tax in taxes: - account_head = tax["account_head"] - tax_account_wise_data.setdefault( - account_head, - { - "charge_type": "Actual", - "account_head": account_head, - "description": tax.get("description"), - "cost_center": tax.get("cost_center"), - "included_in_print_rate": 0, - "dont_recompute_tax": 1, - "tax_amount": 0, - "item_wise_tax_detail": {}, - }, - ) - tax_account_wise_data[account_head]["tax_amount"] += flt(tax.get("tax_amount")) - if tax.get("item_wise_tax_detail"): - tax_account_wise_data[account_head]["item_wise_tax_detail"].update(tax["item_wise_tax_detail"]) - - return tax_account_wise_data.values() + return normalized def get_tax_account_head(tax, charge_type: Literal["shipping", "sales_tax"] | None = None): @@ -310,7 +378,7 @@ def update_taxes_with_shipping_lines(taxes, shipping_lines, setting, items, taxe { "item_code": setting.shipping_item, "rate": shipping_charge_amount, - "delivery_date": items[-1]["delivery_date"] if items else nowdate(), + "delivery_date": (items[-1]["delivery_date"] if items else nowdate()), "qty": 1, "stock_uom": "Nos", "warehouse": setting.warehouse, @@ -339,16 +407,53 @@ def update_taxes_with_shipping_lines(taxes, shipping_lines, setting, items, taxe ), "tax_amount": tax["price"], "cost_center": setting.cost_center, - "item_wise_tax_detail": { - setting.shipping_item: [flt(tax.get("rate")) * 100, flt(tax.get("price"))] - } - if shipping_as_item - else {}, + "item_wise_tax_detail": ( + { + setting.shipping_item: { + "rate": flt(tax.get("rate")) * 100, + "tax_amount": flt(tax.get("price")), + } + } + if shipping_as_item + else {} + ), "dont_recompute_tax": 1, } ) +def normalize_item_wise_tax_detail(taxes): + import json + + for row in taxes: + val = row.get("item_wise_tax_detail") + + if isinstance(val, dict): + row.set("item_wise_tax_detail", json.dumps(val)) + continue + + if isinstance(val, list): + row.set("item_wise_tax_detail", "{}") + continue + + if val is None: + row.set("item_wise_tax_detail", "{}") + continue + + if isinstance(val, str): + try: + loaded = json.loads(val) + if isinstance(loaded, dict): + row.set("item_wise_tax_detail", json.dumps(loaded)) + else: + row.set("item_wise_tax_detail", "{}") + except Exception: + row.set("item_wise_tax_detail", "{}") + continue + + row.set("item_wise_tax_detail", "{}") + + def get_sales_order(order_id): """Get ERPNext sales order using shopify order id.""" sales_order = frappe.db.get_value("Sales Order", filters={ORDER_ID_FIELD: order_id}) @@ -410,26 +515,372 @@ def sync_old_orders(): for order in orders: log = create_shopify_log( - method=EVENT_MAPPER["orders/create"], request_data=json.dumps(order), make_new=True + method=EVENT_MAPPER["orders/create"], + request_data=json.dumps(order), + make_new=True, ) sync_sales_order(order, request_id=log.name) shopify_setting = frappe.get_doc(SETTING_DOCTYPE) shopify_setting.sync_old_orders = 0 - shopify_setting.save() -def _fetch_old_orders(from_time, to_time): - """Fetch all shopify orders in specified range and return an iterator on fetched orders.""" +def _fetch_old_orders(from_time, to_time, limit=50): + frappe.set_user("Administrator") + frappe.logger().info("Fetching old orders from Shopify...") + + from_time = get_datetime(from_time).astimezone(pytz.UTC).isoformat() + to_time = get_datetime(to_time).astimezone(pytz.UTC).isoformat() + + query = """ + query GetOrdersByDateRange($query: String!, $limit: Int!, $cursor: String) { + orders(first: $limit, query: $query, after: $cursor) { + edges { + cursor + node { + id + name + createdAt + updatedAt + processedAt + cancelledAt + closedAt + confirmed + test + displayFinancialStatus + displayFulfillmentStatus + taxesIncluded + currencyCode + note + tags + cancelReason + totalWeight + totalPriceSet { + presentmentMoney { + amount + currencyCode + } + } + totalDiscountsSet { + presentmentMoney { + amount + currencyCode + } + } + currentSubtotalPriceSet { + presentmentMoney { + amount + currencyCode + } + } + totalOutstandingSet { + presentmentMoney { + amount + currencyCode + } + } + totalShippingPriceSet { + presentmentMoney { + amount + currencyCode + } + } + totalTipReceived { + amount + currencyCode + } + customer { + id + email + firstName + lastName + phone + state + taxExempt + taxExemptions + emailMarketingConsent { + consentUpdatedAt + marketingOptInLevel + marketingState + } + smsMarketingConsent { + consentCollectedFrom + consentUpdatedAt + marketingOptInLevel + marketingState + } + defaultAddress { + id + firstName + lastName + company + address1 + address2 + city + province + country + zip + phone + provinceCode + countryCodeV2 + } + } + billingAddress { + firstName + lastName + company + address1 + address2 + city + province + country + zip + phone + name + provinceCode + countryCodeV2 + } + shippingAddress { + firstName + lastName + company + address1 + address2 + city + province + country + zip + phone + name + provinceCode + countryCodeV2 + } + shippingLines(first: 10) { + edges { + node { + id + title + code + carrierIdentifier + discountedPriceSet { + presentmentMoney { + amount + currencyCode + } + } + } + } + } + lineItems(first: 50) { + edges { + node { + id + name + quantity + sku + vendor + taxable + currentQuantity + fulfillableQuantity + variant { + id + title + inventoryItem { + id + tracked + } + } + product { + id + } + discountAllocations { + allocatedAmountSet { + presentmentMoney { + amount + currencyCode + } + } + } + totalDiscountSet { + presentmentMoney { + amount + currencyCode + } + } + fulfillmentService { + serviceName + type + } + } + } + } + fulfillments(first: 10) { + id + status + createdAt + updatedAt + trackingInfo { + company + number + url + } + fulfillmentLineItems(first: 50) { + edges { + node { + id + quantity + lineItem { + id + name + } + } + } + } + } + } + } + pageInfo { + hasNextPage + endCursor + } + } + } + """ + + search_query = f'createdAt:>="{from_time}" AND createdAt:<="{to_time}"' + + cursor = None + has_next_page = True + total_orders = 0 + + while has_next_page: + frappe.logger().info(f"Querying Shopify with cursor: {cursor}") + variables = {"query": search_query, "limit": limit, "cursor": cursor} + response = json.loads(GraphQL().execute(query, variables)) + + if not response: + frappe.logger().error("Empty response from Shopify GraphQL API.") + break + + if "errors" in response: + frappe.log_error(json.dumps(response["errors"], indent=2), "Shopify Order Fetch Error") + break + + orders_data = response.get("data", {}).get("orders", {}) + for edge in orders_data.get("edges", []): + node = edge["node"] + customer = node.get("customer") or {} + customer_id = customer.get("id") + if customer_id: + customer_id = int(customer_id.split("/")[-1]) + else: + customer_id = None + + def money_amount(obj): + return obj.get("presentmentMoney", {}).get("amount", "0.0") if obj else "0.0" + + normalized_order = { + "id": int(node["id"].split("/")[-1]), + "admin_graphql_api_id": node["id"], + "name": node.get("name"), + "order_number": int(node.get("name", "#0").replace("#", "")), + "email": customer.get("email"), + "phone": customer.get("phone"), + "currency": node.get("currencyCode"), + "financial_status": node.get("displayFinancialStatus"), + "fulfillment_status": node.get("displayFulfillmentStatus"), + "total_price": money_amount(node.get("totalPriceSet")), + "subtotal_price": money_amount(node.get("currentSubtotalPriceSet")), + "total_discounts": money_amount(node.get("totalDiscountsSet")), + "total_tax": money_amount(node.get("totalTaxSet")), + "total_weight": node.get("totalWeight"), + "taxes_included": node.get("taxesIncluded"), + "confirmed": node.get("confirmed"), + "test": node.get("test"), + "created_at": node.get("createdAt"), + "updated_at": node.get("updatedAt"), + "processed_at": node.get("processedAt"), + "cancelled_at": node.get("cancelledAt"), + "closed_at": node.get("closedAt"), + "source_name": node.get("sourceName", None), + "tags": node.get("tags", []), + "note": node.get("note"), + "billing_address": node.get("billingAddress", {}), + "shipping_address": node.get("shippingAddress", {}), + "customer": { + "id": customer_id, + "first_name": customer.get("firstName"), + "last_name": customer.get("lastName"), + "email": customer.get("email"), + "phone": customer.get("phone"), + "tags": customer.get("tags", ""), + "tax_exempt": customer.get("taxExempt", False), + "currency": node.get("currencyCode"), + "default_address": customer.get("defaultAddress", {}), + }, + "line_items": [], + "fulfillments": [], + "shipping_lines": [], + "discount_applications": [], + "payment_terms": None, + "refunds": [], + } + line_items = (node or {}).get("lineItems", {}) or {} + edges = line_items.get("edges", []) or [] - from_time = get_datetime(from_time).astimezone().isoformat() - to_time = get_datetime(to_time).astimezone().isoformat() - orders_iterator = PaginatedIterator( - Order.find(created_at_min=from_time, created_at_max=to_time, limit=250) - ) + # Normalize line items (CRASH-PROOF VERSION) + for li_edge in edges: + # skip empty or invalid edges + if not li_edge or not isinstance(li_edge, dict): + continue + + li = li_edge.get("node") + if not li or not isinstance(li, dict): + continue + + # product and variant can be null from Shopify + product_obj = li.get("product") or {} + variant_obj = li.get("variant") or {} + fulfillment_obj = li.get("fulfillmentService") or {} + + # Safe product_id + variant_id extraction + product_gid = product_obj.get("id") + product_id = int(product_gid.split("/")[-1]) if product_gid else None + + variant_gid = variant_obj.get("id") + variant_id = int(variant_gid.split("/")[-1]) if variant_gid else None + + normalized_order["line_items"].append( + { + "id": (int(li.get("id", "").split("/")[-1]) if li.get("id") else None), + "admin_graphql_api_id": li.get("id"), + "name": li.get("name"), + "quantity": li.get("quantity"), + "sku": li.get("sku"), + "vendor": li.get("vendor"), + "taxable": li.get("taxable"), + "current_quantity": li.get("currentQuantity"), + "fulfillable_quantity": li.get("fulfillableQuantity"), + # Safe fulfillment service + "fulfillment_service": fulfillment_obj.get("serviceName"), + # SAFE fields + "product_exists": bool(product_id), + "variant_id": variant_id, + "variant_title": variant_obj.get("title"), + "product_id": product_id, + # discount allocations + "discount_allocations": li.get("discountAllocations", []), + "total_discount": money_amount(li.get("totalDiscountSet")), + } + ) + + # Normalize line items + + total_orders += 1 + yield normalized_order + + page_info = orders_data.get("pageInfo", {}) + has_next_page = page_info.get("hasNextPage") + cursor = page_info.get("endCursor") - for orders in orders_iterator: - for order in orders: - # Using generator instead of fetching all at once is better for - # avoiding rate limits and reducing resource usage. - yield order.to_dict() + frappe.logger().info(f"Finished fetching {total_orders} orders.") diff --git a/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.js b/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.js index 14852668a..5740c8232 100644 --- a/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.js +++ b/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.js @@ -33,6 +33,7 @@ shopify.ProductImporter = class { const jobs = await frappe.db.get_list("RQ Job", { filters: { status: ("in", ("queued", "started")) }, }); + this.syncRunning = jobs.find( (job) => job.job_name == "shopify.job.sync.all.products", @@ -131,6 +132,7 @@ shopify.ProductImporter = class { }, { name: "Name", + align: "left", editable: false, focusable: false, }, @@ -159,16 +161,23 @@ shopify.ProductImporter = class { this.wrapper.find(".shopify-datatable-footer").show(); } - async fetchShopifyProducts(from_ = null) { + async fetchShopifyProducts(cursor = null, direction = "next") { try { const { - message: { products, nextUrl, prevUrl }, + message: { products, nextCursor, prevCursor, pageInfo }, } = await frappe.call({ method: "ecommerce_integrations.shopify.page.shopify_import_products.shopify_import_products.get_shopify_products", - args: { from_ }, + args: { cursor, direction }, }); - this.nextUrl = nextUrl; - this.prevUrl = prevUrl; + this.nextURL = nextCursor; + this.prevURL = prevCursor; + + this.hasNextPage = pageInfo?.hasNextPage ?? false; + this.hasPreviousPage = pageInfo?.hasPreviousPage ?? false; + + // Enable/disable buttons based on page info + $(".btn-next").prop("disabled", !this.hasNextPage); + $(".btn-prev").prop("disabled", !this.hasPreviousPage); const shopifyProducts = products.map((product) => ({ // 'Image': product.image && product.image.src && ``, @@ -284,12 +293,50 @@ shopify.ProductImporter = class { $(".btn-paginate").prop("disabled", true); this.shopifyProductTable.showToastMessage("Loading..."); + const isNext = _this.hasClass("btn-next"); + const cursor = isNext ? this.nextURL : this.prevURL; + + // Prevent invalid navigation + if (isNext && !this.hasNextPage) { + this.shopifyProductTable.clearToastMessage(); + frappe.show_alert({ + message: __("No more next pages available."), + indicator: "orange", + }); + $(".btn-next").prop("disabled", true); + $(".btn-paginate").prop("disabled", false); + return; + } + + if (!isNext && !this.hasPreviousPage) { + this.shopifyProductTable.clearToastMessage(); + frappe.show_alert({ + message: __("No more previous pages available."), + indicator: "orange", + }); + $(".btn-prev").prop("disabled", true); + $(".btn-paginate").prop("disabled", false); + return; + } + + if (!cursor) { + this.shopifyProductTable.clearToastMessage(); + frappe.show_alert({ + message: __("No valid cursor available."), + indicator: "orange", + }); + $(".btn-paginate").prop("disabled", false); + return; + } + + // Fetch products const newProducts = await this.fetchShopifyProducts( - _this.hasClass("btn-next") ? this.nextUrl : this.prevUrl, + cursor, + isNext ? "next" : "prev", ); - this.shopifyProductTable.refresh(newProducts); + // Re-enable buttons $(".btn-paginate").prop("disabled", false); this.shopifyProductTable.clearToastMessage(); } @@ -326,8 +373,9 @@ shopify.ProductImporter = class { _log.append(message); _log.scrollTop(_log[0].scrollHeight); - if (synced) + if (synced) { this.updateSyncedCount(_syncedCounter, _erpnextCounter); + } if (done) { frappe.realtime.off("shopify.key.sync.all.products"); @@ -343,12 +391,12 @@ shopify.ProductImporter = class { const btn = $("#btn-sync-all"); const _toggleClass = (d) => (d ? "btn-success" : "btn-primary"); - const _toggleText = () => (disable ? "Syncing..." : "Sync Products"); + const _toggleText = (d) => (d ? "Syncing..." : "Sync Products"); btn.prop("disabled", disable) .addClass(_toggleClass(disable)) .removeClass(_toggleClass(!disable)) - .text(_toggleText()); + .text(_toggleText(disable)); } updateSyncedCount(_syncedCounter, _erpnextCounter) { diff --git a/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.py b/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.py index e30a102e9..46f712680 100644 --- a/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.py +++ b/ecommerce_integrations/shopify/page/shopify_import_products/shopify_import_products.py @@ -1,59 +1,176 @@ +import datetime +import json +import os +import time from time import process_time +from unittest import result import frappe +import requests +from frappe import _ from frappe.exceptions import UniqueValidationError -from shopify.resources import Product +from shopify import GraphQL -from ecommerce_integrations.ecommerce_integrations.doctype.ecommerce_item import ecommerce_item +from ecommerce_integrations.ecommerce_integrations.doctype.ecommerce_item import ( + ecommerce_item, +) from ecommerce_integrations.shopify.connection import temp_shopify_session from ecommerce_integrations.shopify.constants import MODULE_NAME from ecommerce_integrations.shopify.product import ShopifyProduct +from ecommerce_integrations.shopify.utils import create_shopify_log # constants SYNC_JOB_NAME = "shopify.job.sync.all.products" REALTIME_KEY = "shopify.key.sync.all.products" +TEMP_DIR = frappe.get_site_path("private", "temp") +os.makedirs(TEMP_DIR, exist_ok=True) @frappe.whitelist() -def get_shopify_products(from_=None): - shopify_products = fetch_all_products(from_) +def get_shopify_products(cursor=None, direction="next"): + shopify_products = fetch_all_products(cursor=cursor, direction=direction) return shopify_products -def fetch_all_products(from_=None): - # format shopify collection for datatable +def fetch_all_products(cursor=None, direction="next"): + """Fetch paginated Shopify products.""" - collection = _fetch_products_from_shopify(from_) + response = _fetch_products_from_shopify(cursor=cursor, direction=direction) + products_data = response.get("products", []) + page_info = response.get("pageInfo", {}) products = [] - for product in collection: - d = product.to_dict() - d["synced"] = is_synced(product.id) - products.append(d) - next_url = None - if collection.has_next_page(): - next_url = collection.next_page_url - - prev_url = None - if collection.has_previous_page(): - prev_url = collection.previous_page_url + for product in products_data: + product["synced"] = is_synced(product["id"]) + products.append(product) return { "products": products, - "nextUrl": next_url, - "prevUrl": prev_url, + "nextCursor": page_info.get("endCursor"), + "prevCursor": page_info.get("startCursor"), + "pageInfo": { + "hasNextPage": page_info.get("hasNextPage", False), + "hasPreviousPage": page_info.get("hasPreviousPage", False), + }, } @temp_shopify_session -def _fetch_products_from_shopify(from_=None, limit=20): - if from_: - collection = Product.find(from_=from_) +def _fetch_products_from_shopify(cursor=None, direction="next", limit=20): + """ + Fetch products from Shopify with bidirectional pagination (forward/backward). + + Args: + cursor (str): Cursor for pagination. + direction (str): 'next' for forward, 'prev' for backward pagination. + limit (int): Number of products per page. + + Returns: + dict: { + "products": [...], + "pageInfo": { + "hasNextPage": bool, + "hasPreviousPage": bool, + "startCursor": str, + "endCursor": str + } + } + """ + + if direction == "prev": + query = """ + query ($last: Int!, $before: String) { + products(last: $last, before: $before) { + edges { + cursor + node { + id + title + variants(first: 100) { + edges { + node { + id + title + sku + } + } + } + } + } + pageInfo { + hasNextPage + hasPreviousPage + startCursor + endCursor + } + } + } + """ + variables = {"last": limit, "before": cursor if cursor else None} else: - collection = Product.find(limit=limit) + query = """ + query ($first: Int!, $after: String) { + products(first: $first, after: $after) { + edges { + cursor + node { + id + title + variants(first: 100) { + edges { + node { + id + title + sku + } + } + } + } + } + pageInfo { + hasNextPage + hasPreviousPage + startCursor + endCursor + } + } + } + """ + variables = {"first": limit, "after": cursor if cursor else None} + + response = GraphQL().execute(query, variables=variables) + response_dict = json.loads(response) + products_data = response_dict.get("data", {}).get("products", {}) + + edges = products_data.get("edges", []) + products = [] + + for edge in edges: + node = edge.get("node", {}) + + product_id = node.get("id", "").split("/")[-1] - return collection + variants = [] + for v in node.get("variants", {}).get("edges", []): + variant_node = v.get("node", {}) + variant_id = variant_node.get("id", "").split("/")[-1] + variants.append( + { + "id": variant_id, + "title": variant_node.get("title"), + "sku": variant_node.get("sku"), + } + ) + + products.append({"id": product_id, "title": node.get("title"), "variants": variants}) + + page_info = products_data.get("pageInfo", {}) + + return { + "products": products, + "pageInfo": page_info, + } @frappe.whitelist() @@ -75,7 +192,18 @@ def get_product_count(): @temp_shopify_session def get_shopify_product_count(): - return Product.count() + query = """ + { + productsCount { + count + } + } + """ + response = GraphQL().execute(query) + response_dict = json.loads(response) + data = response_dict.get("data", {}).get("productsCount", {}) + count = data.get("count", 0) + return count @frappe.whitelist() @@ -85,8 +213,10 @@ def sync_product(product): shopify_product.sync_product() return True + except Exception: frappe.db.rollback() + return False @@ -97,18 +227,60 @@ def resync_product(product): @temp_shopify_session def _resync_product(product): + """ + Resync a specific Shopify product using GraphQL. + Automatically cleans gid:// IDs and fetches product + variants via GraphQL. + """ + savepoint = "shopify_resync_product" + try: - item = Product.find(product) + product_id = str(product).split("/")[-1] + + query = """ + query ($id: ID!) { + product(id: $id) { + id + title + variants(first: 100) { + edges { + node { + id + title + } + } + } + } + } + """ + + variables = {"id": f"gid://shopify/Product/{product_id}"} + + response = GraphQL().execute(query, variables=variables) + response_dict = json.loads(response) + product_data = response_dict.get("data", {}).get("product") + + if not product_data: + publish(f"❌ Product {product_id} not found in Shopify.", error=True) + return False frappe.db.savepoint(savepoint) - for variant in item.variants: - shopify_product = ShopifyProduct(product, variant_id=variant.id) + + for variant_edge in product_data.get("variants", {}).get("edges", []): + variant_node = variant_edge.get("node", {}) + variant_id = variant_node.get("id", "").split("/")[-1] + + shopify_product = ShopifyProduct(product_id, variant_id=variant_id) shopify_product.sync_product() + publish(f"✅ Synced variant {variant_id} for product {product_id}", synced=True) + return True - except Exception: + + except Exception as e: + frappe.log_error(f"Shopify Resync Error: {e}", "Shopify Product Resync") frappe.db.rollback(save_point=savepoint) + publish(f"❌ Error resyncing product {product}: {e}", error=True) return False @@ -118,14 +290,297 @@ def is_synced(product): @frappe.whitelist() def import_all_products(): + """Entry point: decide between realtime or bulk sync""" + counts = get_product_count() + total_shopify = counts.get("shopifyCount", 0) + publish(f"Starting import of {total_shopify} products from Shopify...") + + if total_shopify > 2000: + start_bulk_import() + + else: + frappe.enqueue( + queue_sync_all_products, + queue="long", + job_name=SYNC_JOB_NAME, + key=REALTIME_KEY, + timeout=360, + ) + # Run directly (faster) for small stores + + +def start_bulk_import(): + """Start a bulk import and monitor it""" + publish("⚡ Starting Shopify bulk operation...") + start_bulk_product_job() + frappe.enqueue( - queue_sync_all_products, + monitor_bulk_job, queue="long", job_name=SYNC_JOB_NAME, key=REALTIME_KEY, + is_async=False, + timeout=8600000, ) +@temp_shopify_session +def start_bulk_product_job(): + """Start a Shopify bulk operation to fetch all products""" + query = """ + mutation { + bulkOperationRunQuery( + query: \""" + { + products { + edges { + node { + id + title + + } + } + } + } + \""" + ) { + bulkOperation { + id + status + } + userErrors { + field + message + } + } + } + """ + response = json.loads(GraphQL().execute(query)) + if response.get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors"): + frappe.throw( + _("Error while executing bulkOperation:", response["data"]["bulkOperationRunQuery"]["userErrors"]) + ) + return response + + +@temp_shopify_session +def check_bulk_status(): + """Check current bulk operation status""" + query = """ + { + currentBulkOperation { + id + status + errorCode + completedAt + objectCount + fileSize + url + } + } + """ + response = json.loads(GraphQL().execute(query)) + return response.get("data", {}).get("currentBulkOperation") + + +def monitor_bulk_job(**kwargs): + """Monitor Shopify bulk job until completion""" + publish("⏳ Waiting for Shopify bulk job to complete...") + + max_attempts = 120 + attempt = 0 + + while attempt < max_attempts: + info = check_bulk_status() + if not info: + publish("⚠️ No active bulk operation found.", error=True) + create_shopify_log( + status="Error", + message="No active bulk operation found.", + method="monitor_bulk_job", + ) + return + + status = info.get("status") + count = int(info.get("objectCount", 0)) + + if status == "COMPLETED": + publish(f"✅ Bulk job completed! Processing {count} products...") + bulk_id = info.get("id") + url = info.get("url") + local_file = download_bulk_file(url, bulk_id) + + # Process the file in batches + synced, failed = process_bulk_file_from_disk(local_file, bulk_id) + publish(f"🎉 Bulk sync completed. Synced: {synced}, Failed: {failed}") + create_shopify_log( + status="Success", + message="Bulk sync completed", + method="monitor_bulk_job", + ) + + # Clean up local file + if os.path.exists(local_file): + os.remove(local_file) + + return + + elif status in ("FAILED", "CANCELED", "CANCELING"): + publish(f"❌ Bulk job failed: {info.get('errorCode')}", error=True) + create_shopify_log( + status="Error", + message=f"Bulk job failed: {info.get('errorCode')}", + method="monitor_bulk_job", + ) + return + + elif status == "RUNNING": + publish(f"⏳ Processing... ({count} objects so far)", br=False) + create_shopify_log( + status="In Progress", + message=f"Bulk job in progress... ({count} objects processed)", + method="monitor_bulk_job", + ) + + # Adaptive polling — check faster for small jobs + delay = 1 if count < 2000 else 5 + time.sleep(delay) + attempt += 1 + + else: + time.sleep(2) + attempt += 1 + + publish("⏱️ Timeout: bulk job did not complete in expected time.", error=True) + + +BATCH_SIZE = 200 # Adjust based on your ERPNext worker limits + + +def download_bulk_file(url, bulk_id): + """ + Download Shopify bulk JSONL file to local temporary storage. + + """ + + safe_bulk_id = bulk_id.replace(":", "_").replace("/", "_") + + local_file = os.path.join(TEMP_DIR, f"shopify_bulk_{safe_bulk_id}.jsonl") + + create_shopify_log( + status="In Progress", + message=f"Downloading bulk file from url: {url}", + method="download_bulk_file", + ) + with requests.get(url, stream=True, timeout=300) as response: + response.raise_for_status() + with open(local_file, "wb") as f: + for chunk in response.iter_content(chunk_size=1024 * 1024): + if chunk: + f.write(chunk) + publish("✅ Bulk file downloaded successfully.") + create_shopify_log( + status="completed", + message=f"Downloaded bulk file from url: {url}", + method="download_bulk_file", + ) + + return local_file + + +def process_bulk_file_from_disk(file_path, bulk_id): + """ + Process Shopify bulk JSONL file in batches with checkpoints. + """ + # Ensure checkpoint doc exists + if bulk_id and not frappe.db.exists("Shopify Bulk Sync Progress", {"bulk_id": bulk_id}): + frappe.get_doc( + { + "doctype": "Shopify Bulk Sync Progress", + "bulk_id": bulk_id, + "last_synced_product_id": None, + } + ).insert(ignore_permissions=True) + + last_synced_id = frappe.db.get_value( + "Shopify Bulk Sync Progress", {"bulk_id": bulk_id}, "last_synced_product_id" + ) + skip = bool(last_synced_id) + batch = [] + synced_count = 0 + failed_count = 0 + seen_products = set() + + with open(file_path, encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + product = json.loads(line) + product_id = product["id"].split("/")[-1] + + # Skip already synced until checkpoint + if skip: + if product_id != last_synced_id: + skip = False + else: + continue + continue + + if product_id in seen_products or is_synced(product_id): + continue + seen_products.add(product_id) + + batch.append(product) + + if len(batch) >= BATCH_SIZE: + b_synced, b_failed, last_id = process_batch(batch, bulk_id) + synced_count += b_synced + failed_count += b_failed + batch = [] + + # Process remaining products + if batch: + b_synced, b_failed, last_id = process_batch(batch, bulk_id) + synced_count += b_synced + failed_count += b_failed + + return synced_count, failed_count + + +def process_batch(batch, bulk_id=None): + """Sync a batch of products and update checkpoint.""" + synced_count = 0 + failed_count = 0 + last_synced_id = None + + for product in batch: + try: + product_id = product["id"].split("/")[-1] + shopify_product = ShopifyProduct(product_id) + shopify_product.sync_product() + synced_count += 1 + last_synced_id = product_id + + except Exception as e: + failed_count += 1 + frappe.log_error( + message=f"Product {product_id} sync failed: {e}", + title="Shopify Bulk Sync Error", + ) + + if all([bulk_id, last_synced_id]): + frappe.db.set_value( + "Shopify Bulk Sync Progress", + {"bulk_id": bulk_id}, + "last_synced_product_id", + last_synced_id, + ) + # Commit DB and update checkpoint after each batch + frappe.db.commit() + + return synced_count, failed_count, last_synced_id + + def queue_sync_all_products(*args, **kwargs): start_time = process_time() @@ -133,43 +588,56 @@ def queue_sync_all_products(*args, **kwargs): publish("Syncing all products...") if counts["shopifyCount"] < counts["syncedCount"]: - publish("⚠ Shopify has less products than ERPNext.") + publish("⚠ Shopify has fewer products than ERPNext.") - _sync = True - collection = _fetch_products_from_shopify(limit=100) savepoint = "shopify_product_sync" - while _sync: - for product in collection: + cursor = None + has_next_page = True + + while has_next_page: + collection = _fetch_products_from_shopify(cursor=cursor, direction="next", limit=100) + products = collection.get("products", []) + page_info = collection.get("pageInfo", {}) + + for product in products: try: - publish(f"Syncing product {product.id}", br=False) + publish(f"Syncing product {product['id']}", br=False) frappe.db.savepoint(savepoint) - if is_synced(product.id): - publish(f"Product {product.id} already synced. Skipping...") + + if is_synced(product["id"]): + publish(f"Product {product['id']} already synced. Skipping...") continue - shopify_product = ShopifyProduct(product.id) + shopify_product = ShopifyProduct(product["id"]) shopify_product.sync_product() - publish(f"✅ Synced Product {product.id}", synced=True) + publish(f"✅ Synced Product {product['id']}", synced=True) except UniqueValidationError as e: - publish(f"❌ Error Syncing Product {product.id} : {e!s}", error=True) + publish(f"❌ Error Syncing Product {product['id']} : {e!s}", error=True) frappe.db.rollback(save_point=savepoint) continue except Exception as e: - publish(f"❌ Error Syncing Product {product.id} : {e!s}", error=True) + publish(f"❌ Error Syncing Product {product['id']} : {e!s}", error=True) frappe.db.rollback(save_point=savepoint) continue - if collection.has_next_page(): - frappe.db.commit() # prevents too many write request error - collection = _fetch_products_from_shopify(from_=collection.next_page_url) - else: - _sync = False + # Commit after processing each Shopify page to persist progress + # before fetching the next page + frappe.db.commit() + + has_next_page = page_info.get("hasNextPage", False) + cursor = page_info.get("endCursor") if has_next_page else None end_time = process_time() - publish(f"🎉 Done in {end_time - start_time}s", done=True) + publish(f"🎉 Done in {end_time - start_time:.2f}s", done=True) + create_shopify_log( + status="Success", + message=f"Completed syncing all products in {end_time - start_time:.2f}s", + method="queue_sync_all_products", + ) + return True diff --git a/ecommerce_integrations/shopify/product.py b/ecommerce_integrations/shopify/product.py index 92c31f467..a34e7c149 100644 --- a/ecommerce_integrations/shopify/product.py +++ b/ecommerce_integrations/shopify/product.py @@ -1,18 +1,20 @@ +import json from typing import Optional import frappe from frappe import _, msgprint -from frappe.utils import cint, cstr +from frappe.utils import cint, cstr, flt from frappe.utils.nestedset import get_root_of -from shopify.resources import Product, Variant +from shopify import GraphQL -from ecommerce_integrations.ecommerce_integrations.doctype.ecommerce_item import ecommerce_item +from ecommerce_integrations.ecommerce_integrations.doctype.ecommerce_item import ( + ecommerce_item, +) from ecommerce_integrations.shopify.connection import temp_shopify_session from ecommerce_integrations.shopify.constants import ( ITEM_SELLING_RATE_FIELD, MODULE_NAME, SETTING_DOCTYPE, - SHOPIFY_VARIANTS_ATTR_LIST, SUPPLIER_ID_FIELD, WEIGHT_TO_ERPNEXT_UOM_MAP, ) @@ -53,11 +55,10 @@ def get_erpnext_item(self): has_variants=self.has_variants, ) - @temp_shopify_session def sync_product(self): if not self.is_synced(): - shopify_product = Product.find(self.product_id) - product_dict = shopify_product.to_dict() + product_dict = self.fetch_shopify_product(self.product_id) + self._make_item(product_dict) def _make_item(self, product_dict): @@ -109,7 +110,6 @@ def _create_attribute(self, product_dict): "numeric_values": item_attr.get("numeric_values"), } ) - return attribute def _set_new_attribute_values(self, item_attr, values): @@ -118,83 +118,208 @@ def _set_new_attribute_values(self, item_attr, values): (d.abbr.lower() == attr_value.lower() or d.attribute_value.lower() == attr_value.lower()) for d in item_attr.item_attribute_values ): - item_attr.append("item_attribute_values", {"attribute_value": attr_value, "abbr": attr_value}) + item_attr.append( + "item_attribute_values", + {"attribute_value": attr_value, "abbr": attr_value}, + ) def _create_item(self, product_dict, warehouse, has_variant=0, attributes=None, variant_of=None): + item_code = cstr(product_dict.get("id")) + + # HSN code handling + hsn_code = str(product_dict.get("metafield") or "").strip() + if not hsn_code.isdigit() or len(hsn_code) not in (6, 8): + hsn_code = "999713" + price = flt(product_dict.get("price") or 0) + stock_qty = flt(product_dict.get("stock_qty") or 0) + + if not has_variant and not variant_of: + # For Single product without varaint + first_variant = product_dict["variants"][0] + price = flt(first_variant.get("price") or 0) + stock_qty = flt(first_variant.get("stock_qty") or 0) + item_dict = { "variant_of": variant_of, "is_stock_item": 1, - "item_code": cstr(product_dict.get("item_code")) or cstr(product_dict.get("id")), - "item_name": product_dict.get("title", "").strip(), + "item_code": item_code, + "item_name": (product_dict.get("title") or "").strip(), "description": product_dict.get("body_html") or product_dict.get("title"), "item_group": self._get_item_group(product_dict.get("product_type")), "has_variants": has_variant, "attributes": attributes or [], - "stock_uom": product_dict.get("uom") or _("Nos"), "sku": product_dict.get("sku") or _get_sku(product_dict), "default_warehouse": warehouse, "image": _get_item_image(product_dict), - "weight_uom": WEIGHT_TO_ERPNEXT_UOM_MAP[product_dict.get("weight_unit")], + "weight_uom": WEIGHT_TO_ERPNEXT_UOM_MAP.get(product_dict.get("weight_unit")), "weight_per_unit": product_dict.get("weight"), "default_supplier": self._get_supplier(product_dict), + "gst_hsn_code": hsn_code, } - integration_item_code = product_dict["id"] # shopify product_id - variant_id = product_dict.get("variant_id", "") # shopify variant_id if has variants - sku = item_dict["sku"] - - if not _match_sku_and_link_item( - item_dict, integration_item_code, variant_id, variant_of=variant_of, has_variant=has_variant - ): - ecommerce_item.create_ecommerce_item( - MODULE_NAME, - integration_item_code, - item_dict, - variant_id=variant_id, - sku=sku, - variant_of=variant_of, - has_variants=has_variant, + is_template = has_variant and not variant_of + # update stock and price when it is not an template + if not is_template: + if stock_qty: + item_dict["opening_stock"] = stock_qty + item_dict["valuation_rate"] = price + + if price: + item_dict["standard_rate"] = price + item_dict["shopify_selling_rate"] = price + + # Clean and normalize attributes + cleaned = [ + attr + for attr in attributes or [] + if attr.get("attribute", "").lower() not in ("default", "default title") + ] + normalized_attributes = [] + for attr in cleaned: + if not attr.get("attribute"): + continue + if not attr.get("attribute_value") and attr.get("attribute"): + attr["attribute_value"] = ( + frappe.db.get_value( + "Item Attribute Value", {"parent": attr["attribute"]}, "attribute_value" + ) + or "" + ) + normalized_attributes.append( + { + "attribute": attr["attribute"], + "attribute_value": attr["attribute_value"], + "doctype": "Item Variant Attribute", + } ) + item_dict["attributes"] = normalized_attributes + + if variant_of and not normalized_attributes: + item_dict["has_variants"] = 0 + item_dict["variant_of"] = None + + try: + integration_item_code = product_dict["id"] + variant_id = product_dict.get("variant_id", "") + sku = item_dict["sku"] + + if not _match_sku_and_link_item( + item_dict, integration_item_code, variant_id, variant_of=variant_of, has_variant=has_variant + ): + ecommerce_item.create_ecommerce_item( + MODULE_NAME, + integration_item_code, + item_dict, + variant_id=variant_id, + sku=sku, + variant_of=variant_of, + has_variants=has_variant, + ) + + except Exception: + frappe.log_error(frappe.get_traceback(), f"Shopify Item Creation Failed: {item_code}") + def _create_item_variants(self, product_dict, warehouse, attributes): template_item = ecommerce_item.get_erpnext_item( MODULE_NAME, integration_item_code=product_dict.get("id"), has_variants=1 ) - if template_item: - for variant in product_dict.get("variants"): + if not template_item: + return + + template_item_code = template_item.name + + for variant in product_dict.get("variants", []): + try: + variant_id = variant.get("id") + # Varaint is updated it it exsist + if frappe.db.exists("Item", variant_id): + frappe.db.set_value( + "Item", + variant_id, + { + "standard_rate": flt(variant.get("price")), + "valuation_rate": flt(variant.get("price")), + "shopify_selling_rate": flt(variant.get("price")), + }, + ) + + if flt(variant.get("stock_qty")): + frappe.db.set_value( + "Item", variant_id, "opening_stock", flt(variant.get("stock_qty")) + ) + + continue + + # Build variant attributes + variant_attributes = [] + variant_index = product_dict["variants"].index(variant) + for _, option in enumerate(product_dict.get("options", []), start=1): + option_name = option.get("name") + if variant_index < len(option.get("values", [])): + raw_value = option["values"][variant_index] + if option_name and raw_value: + attribute_value = self._get_attribute_value(raw_value, {"attribute": option_name}) + variant_attributes.append( + {"attribute": option_name, "attribute_value": attribute_value} + ) + shopify_item_variant = { - "id": product_dict.get("id"), - "variant_id": variant.get("id"), - "item_code": variant.get("id"), - "title": product_dict.get("title", "").strip() + "-" + variant.get("title"), + "id": variant_id, + "variant_id": variant_id, + "variant_of": template_item_code, + "item_code": variant_id, + "title": variant.get("title") or product_dict.get("title"), "product_type": product_dict.get("product_type"), "sku": variant.get("sku"), "uom": template_item.stock_uom or _("Nos"), - "item_price": variant.get("price"), + "price": flt(variant.get("price") or product_dict.get("item_price") or 0), + "stock_qty": flt(variant.get("stock_qty") or 0), "weight_unit": variant.get("weight_unit"), "weight": variant.get("weight"), + "body_html": product_dict.get("body_html"), } - for i, variant_attr in enumerate(SHOPIFY_VARIANTS_ATTR_LIST): - if variant.get(variant_attr): - attributes[i].update( - { - "attribute_value": self._get_attribute_value( - variant.get(variant_attr), attributes[i] - ) - } - ) - self._create_item(shopify_item_variant, warehouse, 0, attributes, template_item.name) + self._create_item( + shopify_item_variant, + warehouse, + has_variant=0, + attributes=variant_attributes, + variant_of=template_item_code, + ) + except Exception: + frappe.log_error( + frappe.get_traceback(), f"Shopify Variant Creation Failed: {variant.get('id')}" + ) def _get_attribute_value(self, variant_attr_val, attribute): - attribute_value = frappe.db.sql( - """select attribute_value from `tabItem Attribute Value` - where parent = %s and (abbr = %s or attribute_value = %s)""", - (attribute["attribute"], variant_attr_val, variant_attr_val), - as_list=1, - ) - return attribute_value[0][0] if len(attribute_value) > 0 else cint(variant_attr_val) + av = frappe.qb.DocType("Item Attribute Value") + + # Check if attribute value exists + attribute_value = ( + frappe.qb.from_(av) + .select(av.attribute_value) + .where( + (av.parent == attribute["attribute"]) + & ((av.abbr == variant_attr_val) | (av.attribute_value == variant_attr_val)) + ) + ).run(as_list=True) + + if attribute_value: + return str(attribute_value[0][0]) + else: + # Create missing attribute value in ERPNext + new_val = frappe.get_doc( + { + "doctype": "Item Attribute Value", + "parent": attribute["attribute"], + "parenttype": "Item Attribute", + "parentfield": "attribute_values", + "attribute_value": str(variant_attr_val), + } + ).insert(ignore_permissions=True) + return str(new_val.attribute_value) def _get_item_group(self, product_type=None): parent_item_group = get_root_of("Item Group") @@ -215,37 +340,186 @@ def _get_item_group(self, product_type=None): return item_group.name def _get_supplier(self, product_dict): - if product_dict.get("vendor"): - supplier = frappe.db.sql( - f"""select name from tabSupplier - where name = %s or {SUPPLIER_ID_FIELD} = %s """, - (product_dict.get("vendor"), product_dict.get("vendor").lower()), - as_list=1, - ) - - if supplier: - return product_dict.get("vendor") - supplier = frappe.get_doc( - { - "doctype": "Supplier", - "supplier_name": product_dict.get("vendor"), - SUPPLIER_ID_FIELD: product_dict.get("vendor").lower(), - "supplier_group": self._get_supplier_group(), - } - ).insert() - return supplier.name - else: + if not product_dict.get("vendor"): return "" + vendor_name = product_dict.get("vendor") + vendor_id = vendor_name.lower() + exs = frappe.qb.DocType("Supplier") + existing_supplier = ( + frappe.qb.from_(exs) + .select(exs.name) + .where((exs.name == vendor_name) | (exs[SUPPLIER_ID_FIELD] == vendor_id)) + ).run(as_dict=True) + + if existing_supplier: + return existing_supplier[0]["name"] + + supplier = frappe.get_doc( + { + "doctype": "Supplier", + "supplier_name": vendor_name, + SUPPLIER_ID_FIELD: vendor_id, + "supplier_group": self._get_supplier_group(), + } + ).insert(ignore_permissions=True) + + return supplier.name + def _get_supplier_group(self): - supplier_group = frappe.db.get_value("Supplier Group", _("Shopify Supplier")) + group_name = "Shopify Supplier" + + supplier_group = frappe.db.get_value("Supplier Group", {"name": group_name}, "name") + if not supplier_group: - supplier_group = frappe.get_doc( - {"doctype": "Supplier Group", "supplier_group_name": _("Shopify Supplier")} - ).insert() - return supplier_group.name + supplier_group = ( + frappe.get_doc({"doctype": "Supplier Group", "supplier_group_name": group_name}) + .insert(ignore_permissions=True) + .name + ) + return supplier_group + @temp_shopify_session + def fetch_shopify_product(self, product_id: str) -> dict: + """Fetch shopify product using GraphQL API to get all details including variants and options.""" + if "/" in product_id: + product_id = product_id.split("/")[-1] + if not product_id.startswith("gid://"): + product_id = f"gid://shopify/Product/{product_id}" + query = """ + query ($id: ID!) { + product(id: $id) { + id + title + descriptionHtml + productType + vendor + featuredMedia { + ... on MediaImage { + id + image { + url + } + } + } + options(first: 10) { + name + values + } + metafield(namespace: "custom", key: "hsn_code") { + value + } + variants(first: 50) { + edges { + node { + id + title + sku + price + inventoryQuantity + inventoryItem { + id + tracked + measurement { + weight { + value + unit + } + } + inventoryLevels(first: 10) { + edges { + node { + id + quantities(names: ["available"]) { + name + quantity + } + location { + id + name + legacyResourceId + } + } + } + } + } + } + } + } + } + } + + """ + variables = {"id": product_id} + response = GraphQL().execute(query, variables) + + if isinstance(response, str): + data = json.loads(response) + else: + data = response + + if "errors" in data: + frappe.log_error( + json.dumps(data["errors"], indent=2), + "Shopify GraphQL Product Fetch Error", + ) + + product = data.get("data", {}).get("product", {}) + + if not product: + frappe.throw(_("No product data found in Shopify GraphQL response , Product may be deleted")) + + normalized = { + "id": product.get("id").split("/")[-1], + "title": product.get("title"), + "body_html": product.get("descriptionHtml"), + "product_type": product.get("productType"), + "vendor": product.get("vendor"), + "image": { + "src": ( + product.get("featuredMedia", {}).get("image", {}).get("url") + if product.get("featuredMedia") + else None + ) + }, + "options": [ + {"name": opt.get("name"), "values": opt.get("values", [])} + for opt in product.get("options", []) + ], + "metafield": (product.get("metafield", {}).get("value") if product.get("metafield") else None), + "variants": [], + } + for edge in product.get("variants", {}).get("edges", []): + node = edge.get("node", {}) + total_stock = 0 + inventory_levels = node.get("inventoryItem", {}).get("inventoryLevels", {}).get("edges", []) + + for level in inventory_levels: + quantities = level.get("node", {}).get("quantities", []) + for qty in quantities: + if qty.get("name") == "available": + total_stock += qty.get("quantity", 0) + normalized["variants"].append( + { + "id": node.get("id").split("/")[-1], + "title": node.get("title"), + "sku": node.get("sku"), + "price": node.get("price"), + "weight": node.get("inventoryItem", {}) + .get("measurement", {}) + .get("weight", {}) + .get("value"), + "weight_unit": node.get("inventoryItem", {}) + .get("measurement", {}) + .get("weight", {}) + .get("unit"), + "stock_qty": total_stock, + } + ) + + return normalized + def _add_weight_details(product_dict): variants = product_dict.get("variants") @@ -276,7 +550,7 @@ def _match_sku_and_link_item(item_dict, product_id, variant_id, variant_of=None, Returns true if matched and linked. """ - sku = item_dict["sku"] + sku = item_dict.get("sku") if not sku or variant_of or has_variant: return False @@ -294,7 +568,6 @@ def _match_sku_and_link_item(item_dict, product_id, variant_id, variant_of=None, "sku": sku, } ) - ecommerce_item.insert() return True except Exception: @@ -328,15 +601,252 @@ def get_item_code(shopify_item): return item.item_code +def delete_from_shopify(product_id: str | None = None, variant_id: str | None = None) -> dict | None: + """ + Delete a Shopify product using GraphQL. + Note:This doesn't support deleting individual variants, only full products. + """ + if not product_id and not variant_id: + return None + + def ensure_gid(value: str) -> str: + if not value: + return None + if value.startswith("gid://"): + return value + return f"gid://shopify/Product/{value}" + + gid = ensure_gid(product_id or variant_id) + + mutation = """ + mutation productDelete($input: ProductDeleteInput!) { + productDelete(input: $input) { + deletedProductId + userErrors { + field + message + } + } + } + """ + + variables = {"input": {"id": gid}} + + try: + raw = GraphQL().execute(mutation, variables) + except Exception: + frappe.log_error( + f"Shopify GraphQL execution failed:\n{frappe.get_traceback()}", + "Shopify GraphQL Error", + ) + raise + + if isinstance(raw, str): + try: + data = json.loads(raw) + except Exception: + frappe.log_error(f"Invalid JSON response from Shopify: {raw}", "Shopify GraphQL Error") + raise + else: + data = raw + + if "errors" in data: + frappe.log_error(json.dumps(data["errors"], indent=2), "Shopify GraphQL Errors") + raise Exception(f"Shopify GraphQL errors: {data['errors']}") + + result = data.get("data", {}).get("productDelete") + if not result: + frappe.log_error(json.dumps(data, indent=2), "Shopify Delete Unexpected Response") + raise Exception("Unexpected response from Shopify during delete.") + + user_errors = result.get("userErrors") or [] + if user_errors: + msgs = ", ".join([ue.get("message", str(ue)) for ue in user_errors]) + frappe.log_error(json.dumps(user_errors, indent=2), "Shopify Delete User Errors") + raise Exception(f"Shopify Delete Error: {msgs}") + + frappe.logger().info(f"Shopify Delete Success: {json.dumps(result, indent=2)}") + return result + + @temp_shopify_session -def upload_erpnext_item(doc, method=None): - """This hook is called when inserting new or updating existing `Item`. +def shopify_graphql_product_mutation(action: str, product_data: dict) -> dict: + """ + Create or update Shopify product using new GraphQL `productSet` mutation. + Handles both single and multi-variant products. + """ + + import json + + from shopify import GraphQL + + # --- Key fixes for productSet mutation --- + # 1. For single variant products (default variant), use productOptions with "Title" + # 2. For variant optionValues, use "optionName" and "name" (not "value") + # 3. Don't include "options" field in productSet input - use "productOptions" instead + + if product_data.get("variants"): + # For single variant products with default "Title" option + if len(product_data["variants"]) == 1: + variant = product_data["variants"][0] - New items are pushed to shopify and changes to existing items are - updated depending on what is configured in "Shopify Setting" doctype. + # Set up productOptions for single variant + if "productOptions" not in product_data: + product_data["productOptions"] = [ + { + "name": "Title", + "position": 1, + "values": [{"name": "Default Title"}], + } + ] + + if "optionValues" not in variant or not variant["optionValues"]: + variant["optionValues"] = [{"optionName": "Title", "name": "Default Title"}] + else: + for opt_val in variant["optionValues"]: + if "value" in opt_val: + opt_val["name"] = opt_val.pop("value") + if "name" in opt_val and "optionName" not in opt_val: + opt_val["optionName"] = opt_val.pop("name") + opt_val["name"] = opt_val.get("name", "Default Title") + + if "options" in product_data: + del product_data["options"] + + # --- productSet mutation for both create and update --- + mutation = """ + mutation ProductSet($productSet: ProductSetInput!, $synchronous: Boolean!) { + productSet(synchronous: $synchronous, input: $productSet) { + product { + id + title + descriptionHtml + productType + status + vendor + metafields(first: 10, namespace: "custom") { + edges { + node { + key + value + } + } + } + variants(first: 50) { + nodes { + id + title + sku + price + inventoryItem { + id + tracked + inventoryLevels(first: 5) { + nodes { + quantities(names: ["available"]) { + name + quantity + } + location { + id + name + } + } + } + } + } + } + } + userErrors { + field + message + code + } + } + } """ - template_item = item = doc # alias for readability - # a new item recieved from ecommerce_integrations is being inserted + + # --- Variables --- + variables = {"synchronous": True, "productSet": product_data} + + # --- Execute GraphQL mutation --- + response = GraphQL().execute(mutation, variables) + + if isinstance(response, str): + data = json.loads(response) + else: + data = response + + # --- Handle top-level GraphQL errors --- + if "errors" in data: + frappe.log_error( + json.dumps(data["errors"], indent=2), + f"Shopify GraphQL Product {action.title()} Error", + ) + + result = data.get("data", {}).get("productSet", {}) + user_errors = result.get("userErrors") + + if user_errors: + frappe.log_error(json.dumps(data, indent=2), f"Shopify GraphQL {action.title()} Raw Response") + + product = result.get("product", {}) + if not product: + frappe.throw(_(f"No product returned from Shopify after {action}")) + + normalized = { + "id": product.get("id").split("/")[-1] if product.get("id") else None, + "title": product.get("title"), + "body_html": product.get("descriptionHtml"), + "product_type": product.get("productType"), + "status": product.get("status"), + "vendor": product.get("vendor"), + "metafields": {}, + "variants": [], + } + + for edge in product.get("metafields", {}).get("edges", []): + node = edge.get("node", {}) + normalized["metafields"][node.get("key")] = node.get("value") + + for node in product.get("variants", {}).get("nodes", []): + inventory = node.get("inventoryItem", {}) + tracked = inventory.get("tracked") + inventory_levels = [] + + for inv_node in inventory.get("inventoryLevels", {}).get("nodes", []): + inventory_levels.append( + { + "location_id": inv_node.get("location", {}).get("id"), + "location_name": inv_node.get("location", {}).get("name"), + "available": next( + ( + q.get("quantity") + for q in inv_node.get("quantities", []) + if q.get("name") == "available" + ), + None, + ), + } + ) + + normalized["variants"].append( + { + "id": node.get("id").split("/")[-1] if node.get("id") else None, + "title": node.get("title"), + "sku": node.get("sku"), + "price": node.get("price"), + "inventory_item_id": inventory.get("id"), + "tracked": tracked, + "inventory_levels": inventory_levels, + } + ) + return normalized + + +def upload_erpnext_item(doc, method=None): + template_item = item = doc + if item.flags.from_integration: return @@ -348,15 +858,13 @@ def upload_erpnext_item(doc, method=None): if frappe.flags.in_import: return - if item.has_variants: - return - + # In GraphQL flow, allow templates (has_variants=1) so variants can be generated if len(item.attributes) > 3: - msgprint(_("Template items/Items with 4 or more attributes can not be uploaded to Shopify.")) + frappe.msgprint(_("Template items/Items with 4 or more attributes can not be uploaded to Shopify.")) return - if doc.variant_of and not setting.upload_variants_as_items: - msgprint(_("Enable variant sync in setting to upload item to Shopify.")) + if item.variant_of and not setting.upload_variants_as_items: + frappe.msgprint(_("Enable variant sync in setting to upload item to Shopify.")) return if item.variant_of: @@ -370,12 +878,10 @@ def upload_erpnext_item(doc, method=None): is_new_product = not bool(product_id) if is_new_product: - product = Product() - product.published = False - product.status = "active" if setting.sync_new_item_as_active else "draft" + product_data = map_erpnext_item_to_shopify(erpnext_item=template_item) + product = shopify_graphql_product_mutation("create", product_data) - map_erpnext_item_to_shopify(shopify_product=product, erpnext_item=template_item) - is_successful = product.save() + is_successful = bool(product) if is_successful: update_default_variant_properties( @@ -384,45 +890,24 @@ def upload_erpnext_item(doc, method=None): price=template_item.get(ITEM_SELLING_RATE_FIELD), is_stock_item=template_item.is_stock_item, ) - if item.variant_of: - product.options = [] - product.variants = [] - variant_attributes = { - "title": template_item.item_name, - "sku": item.item_code, - "price": item.get(ITEM_SELLING_RATE_FIELD), - } - max_index_range = min(3, len(template_item.attributes)) - for i in range(0, max_index_range): - attr = template_item.attributes[i] - product.options.append( - { - "name": attr.attribute, - "values": frappe.db.get_all( - "Item Attribute Value", {"parent": attr.attribute}, pluck="attribute_value" - ), - } - ) - try: - variant_attributes[f"option{i+1}"] = item.attributes[i].attribute_value - except IndexError: - frappe.throw( - _("Shopify Error: Missing value for attribute {}").format(attr.attribute) - ) - product.variants.append(Variant(variant_attributes)) - - product.save() # push variant + # Create Ecommerce Item records ecom_items = list(set([item, template_item])) + for d in ecom_items: + first_variant = (product.get("variants") or [{}])[0] + + variant_id = "" if d.has_variants else cstr(first_variant.get("id") or "") + sku = "" if d.has_variants else cstr(first_variant.get("sku") or "") + ecom_item = frappe.get_doc( { "doctype": "Ecommerce Item", "erpnext_item_code": d.name, "integration": MODULE_NAME, - "integration_item_code": str(product.id), - "variant_id": "" if d.has_variants else str(product.variants[0].id), - "sku": "" if d.has_variants else str(product.variants[0].sku), + "integration_item_code": cstr(product.get("id")), + "variant_id": variant_id, + "sku": sku, "has_variants": d.has_variants, "variant_of": d.variant_of, } @@ -430,141 +915,567 @@ def upload_erpnext_item(doc, method=None): ecom_item.insert() write_upload_log(status=is_successful, product=product, item=item) + elif setting.update_shopify_item_on_update: - product = Product.find(product_id) - if product: + product_data = map_erpnext_item_to_shopify(erpnext_item=template_item) + product_data["id"] = f"gid://shopify/Product/{product_id}" + + product = shopify_graphql_product_mutation("update", product_data) + is_successful = bool(product) + + if is_successful: + # Push back returned Shopify data → ERPNext map_erpnext_item_to_shopify(shopify_product=product, erpnext_item=template_item) + + # If THIS ITEM is NOT a variant → update default variant if not item.variant_of: update_default_variant_properties( product, is_stock_item=template_item.is_stock_item, price=item.get(ITEM_SELLING_RATE_FIELD), ) + + # If THIS ITEM IS A VARIANT → update variant-level attributes else: - variant_attributes = {"sku": item.item_code, "price": item.get(ITEM_SELLING_RATE_FIELD)} - product.options = [] + variant_attributes = { + "sku": item.item_code, + "price": item.get(ITEM_SELLING_RATE_FIELD), + } + + # Build Shopify options (max 3) + product["options"] = [] max_index_range = min(3, len(template_item.attributes)) + for i in range(0, max_index_range): attr = template_item.attributes[i] - product.options.append( + product["options"].append( { "name": attr.attribute, "values": frappe.db.get_all( - "Item Attribute Value", {"parent": attr.attribute}, pluck="attribute_value" + "Item Attribute Value", + {"parent": attr.attribute}, + pluck="attribute_value", ), } ) + try: variant_attributes[f"option{i+1}"] = item.attributes[i].attribute_value except IndexError: frappe.throw( _("Shopify Error: Missing value for attribute {}").format(attr.attribute) ) - product.variants.append(Variant(variant_attributes)) - is_successful = product.save() - if is_successful and item.variant_of: map_erpnext_variant_to_shopify_variant(product, item, variant_attributes) - write_upload_log(status=is_successful, product=product, item=item, action="Updated") + write_upload_log(status=is_successful, product=product, item=item, action="Updated") + + +@temp_shopify_session +def map_erpnext_variant_to_shopify_variant(shopify_product, erpnext_item, variant_attributes): + """Maps variant and updates price + stock in Shopify.""" + + graphql = GraphQL() + + stock_qty = cint(erpnext_item.opening_stock) + + default_warehouse = frappe.db.get_single_value("Shopify Setting", "warehouse") + location_id = get_shopify_location_id(default_warehouse) + + shopify_variants = getattr(shopify_product, "variants", None) or shopify_product.get("variants", []) + + if not shopify_variants: + frappe.msgprint(_("No variants found in Shopify product")) + return None + + target_sku = variant_attributes.get("sku") + target_variant_id = None + inventory_item_id = None + for v in shopify_variants: + sku = v.get("sku") if isinstance(v, dict) else v.sku + if sku == target_sku: + target_variant_id = v.get("id") if isinstance(v, dict) else v.id + inventory_item_id = v.get("inventory_item_id") if isinstance(v, dict) else v.inventory_item_id -def map_erpnext_variant_to_shopify_variant(shopify_product: Product, erpnext_item, variant_attributes): - variant_product_id = frappe.db.get_value( + if not target_variant_id: + frappe.log_error( + message=f"Could not find variant in Shopify for SKU: {target_sku}", + title="Shopify Variant Not Found", + ) + return None + + price_mutation = """ + mutation updateVariantPrice($productId: ID!, $variants: [ProductVariantsBulkInput!]!) { + productVariantsBulkUpdate(productId: $productId, variants: $variants){ + productVariants { + id + price + } + userErrors { + field + message + } + } + } + """ + price_variables = { + "productId": f"gid://shopify/Product/{shopify_product.get('id')}", + "variants": [ + { + "id": f"gid://shopify/ProductVariant/{target_variant_id}", + "price": flt(variant_attributes.get("price")), + } + ], + } + + price_response = graphql.execute(price_mutation, price_variables) + if isinstance(price_response, str): + price_response = json.loads(price_response) + + if "errors" in price_response: + frappe.log_error(json.dumps(price_response["errors"], indent=2), "Shopify Price Update Error") + + if inventory_item_id: + stock_mutation = """ + mutation inventorySetQuantities($input: InventorySetQuantitiesInput!) { + inventorySetQuantities(input: $input) { + inventoryAdjustmentGroup { + createdAt + reason + referenceDocumentUri + changes { + name + delta + quantityAfterChange + } + } + userErrors { + code + field + message + } + } + } + """ + + stock_variables = { + "input": { + "ignoreCompareQuantity": True, + "name": "available", + "reason": "correction", + "quantities": [ + { + "inventoryItemId": inventory_item_id, + "locationId": f"gid://shopify/Location/{location_id}", + "quantity": stock_qty, + } + ], + } + } + + stock_response = graphql.execute(stock_mutation, stock_variables) + + if isinstance(stock_response, str): + stock_response = json.loads(stock_response) + stock_response = graphql.execute(stock_mutation, stock_variables) + + if isinstance(stock_response, str): + stock_response = json.loads(stock_response) + + if "errors" in stock_response: + frappe.log_error(json.dumps(stock_response["errors"], indent=2), "Shopify Stock Update Error") + + existing = frappe.db.get_value( "Ecommerce Item", - {"erpnext_item_code": erpnext_item.name, "integration": MODULE_NAME}, - "integration_item_code", + { + "erpnext_item_code": erpnext_item.name, + "integration": MODULE_NAME, + "sku": target_sku, + }, + "name", ) - if not variant_product_id: - for variant in shopify_product.variants: - if ( - variant.option1 == variant_attributes.get("option1") - and variant.option2 == variant_attributes.get("option2") - and variant.option3 == variant_attributes.get("option3") - ): - variant_product_id = str(variant.id) - if not frappe.flags.in_test: - frappe.get_doc( - { - "doctype": "Ecommerce Item", - "erpnext_item_code": erpnext_item.name, - "integration": MODULE_NAME, - "integration_item_code": str(shopify_product.id), - "variant_id": variant_product_id, - "sku": str(variant.sku), - "variant_of": erpnext_item.variant_of, - } - ).insert() - break - if not variant_product_id: - msgprint(_("Shopify: Couldn't sync item variant.")) - return variant_product_id + if existing: + # Update existing mapping + frappe.db.set_value( + "Ecommerce Item", + existing, + { + "variant_id": target_variant_id, + "integration_item_code": shopify_product.get("id"), + }, + ) + else: + # Insert new mapping + frappe.get_doc( + { + "doctype": "Ecommerce Item", + "erpnext_item_code": erpnext_item.name, + "integration": MODULE_NAME, + "integration_item_code": shopify_product.get("id"), + "variant_id": target_variant_id, + "sku": target_sku, + "variant_of": erpnext_item.variant_of, + } + ).insert(ignore_permissions=True) + + return target_variant_id + + +def map_erpnext_item_to_shopify(erpnext_item, shopify_product=None): + """Map ERPNext Item fields to Shopify GraphQL `productSet` mutation input structure.""" + + # ---- Base Product Info ---- + product_data = { + "title": erpnext_item.item_name, + "descriptionHtml": f"

{erpnext_item.description or erpnext_item.item_name}

", + "productType": erpnext_item.item_group or "All Item Groups", + "vendor": erpnext_item.brand or "Default Vendor", + "status": "DRAFT" if erpnext_item.disabled else "ACTIVE", + "metafields": [ + { + "namespace": "custom", + "key": "hsn_code", + "value": str(erpnext_item.gst_hsn_code or ""), + "type": "number_integer", + } + ], + } + + # ---- Detect Variant Attributes ---- + attributes = getattr(erpnext_item, "attributes", []) + has_variants = bool(erpnext_item.has_variants) + + product_options = [] + attribute_values_map = {} + + if has_variants: + for idx, attr in enumerate(attributes, start=1): + # Ensure attribute behaves like an object + attribute_name = getattr(attr, "attribute", None) or attr.get("attribute") + attribute_value = getattr(attr, "attribute_value", None) or attr.get("attribute_value") + + # Use the value from the attribute itself if present, fallback to DB + if attribute_value: + values = [attribute_value] + else: + values = frappe.db.get_all( + "Item Attribute Value", + filters={"parent": attribute_name}, + pluck="attribute_value", + ) + + if values: + product_options.append( + { + "name": attribute_name, + "position": idx, + "values": [{"name": v} for v in values], + } + ) + attribute_values_map[attribute_name] = values + else: + product_options = [ + { + "name": "Title", + "position": 1, + "values": [{"name": "Default Title"}], + } + ] + + product_data["productOptions"] = product_options + # ---- Generate Variants ---- + variants = [] + + if has_variants and attribute_values_map: + import itertools + + attribute_names = list(attribute_values_map.keys()) + attribute_value_lists = [attribute_values_map[name] for name in attribute_names] + + for combination in itertools.product(*attribute_value_lists): + variant = { + "sku": f"{erpnext_item.item_code}-{'-'.join(combination)}", + "price": str(erpnext_item.get(ITEM_SELLING_RATE_FIELD) or "0"), + "optionValues": [], + } + + for attr_name, attr_value in zip(attribute_names, combination, strict=False): + variant["optionValues"].append( + { + "optionName": attr_name, + "name": attr_value, + } + ) -def map_erpnext_item_to_shopify(shopify_product: Product, erpnext_item): - """Map erpnext fields to shopify, called both when updating and creating new products.""" + if erpnext_item.weight_per_unit: + uom = get_shopify_weight_uom(erpnext_item.weight_uom) + variant["weight"] = erpnext_item.weight_per_unit + variant["weightUnit"] = uom - shopify_product.title = erpnext_item.item_name - shopify_product.body_html = erpnext_item.description - shopify_product.product_type = erpnext_item.item_group + default_warehouse = frappe.db.get_single_value("Shopify Setting", "warehouse") + shopify_location_id = get_shopify_location_id(default_warehouse) + if shopify_location_id and cint(erpnext_item.opening_stock or 0) > 0: + variant["inventoryQuantities"] = [ + { + "locationId": f"gid://shopify/Location/{shopify_location_id}", + "name": "available", + "quantity": cint(erpnext_item.opening_stock or 0), + } + ] - if erpnext_item.weight_uom in WEIGHT_TO_ERPNEXT_UOM_MAP.values(): - # reverse lookup for key - uom = get_shopify_weight_uom(erpnext_weight_uom=erpnext_item.weight_uom) - shopify_product.weight = erpnext_item.weight_per_unit - shopify_product.weight_unit = uom + variants.append(variant) + else: + base_variant = { + "sku": erpnext_item.item_code, + "price": str(erpnext_item.get(ITEM_SELLING_RATE_FIELD) or "0"), + "optionValues": [ + { + "optionName": "Title", + "name": "Default Title", + } + ], + } - if erpnext_item.disabled: - shopify_product.status = "draft" - shopify_product.published = False - msgprint(_("Status of linked Shopify product is changed to Draft.")) + if erpnext_item.weight_per_unit: + uom = get_shopify_weight_uom(erpnext_item.weight_uom) + base_variant["weight"] = erpnext_item.weight_per_unit + base_variant["weightUnit"] = uom + + default_warehouse = frappe.db.get_single_value("Shopify Setting", "warehouse") + shopify_location_id = get_shopify_location_id(default_warehouse) + if shopify_location_id and cint(erpnext_item.opening_stock or 0) > 0: + base_variant["inventoryQuantities"] = [ + { + "locationId": f"gid://shopify/Location/{shopify_location_id}", + "name": "available", + "quantity": cint(erpnext_item.opening_stock or 0), + } + ] + + variants.append(base_variant) + + product_data["variants"] = variants + + if shopify_product: + product_data["id"] = ( + shopify_product.get("id") + if isinstance(shopify_product, dict) + else getattr(shopify_product, "id", None) + ) + + return product_data + + +def get_shopify_location_id(erpnext_warehouse: str | None = None) -> str | None: + """ + Fetch the Shopify Location ID from the Shopify Setting doctype. + If `erpnext_warehouse` is provided, map it using the child table. + Otherwise, return the first available location. + """ + try: + shopify_setting = frappe.get_single("Shopify Setting") + mappings = shopify_setting.get("shopify_warehouse_mapping") or [] + + if erpnext_warehouse: + for mapping in mappings: + if mapping.erpnext_warehouse == erpnext_warehouse: + return mapping.shopify_location_id + + # Fallback to first location if available + if mappings: + return mappings[0].shopify_location_id + + frappe.log_error( + "No Shopify Location ID found in Shopify Setting", + "get_shopify_location_id", + ) + return None + + except Exception: + frappe.log_error(f"Error in get_shopify_location_id: {frappe.get_traceback()} ") + return None def get_shopify_weight_uom(erpnext_weight_uom: str) -> str: - for shopify_uom, erpnext_uom in WEIGHT_TO_ERPNEXT_UOM_MAP.items(): - if erpnext_uom == erpnext_weight_uom: - return shopify_uom + """Return Shopify weight unit name (e.g. 'KILOGRAMS') for a given ERPNext UOM. + Handles both map shapes (SHOPIFY->ERPNext or ERPNext->SHOPIFY), is case-insensitive, + and falls back to sensible defaults/synonyms. + """ + if not erpnext_weight_uom: + return "GRAMS" + + key = cstr(erpnext_weight_uom).strip().lower() + + # If constants are SHOPIFY -> ERPNext, build reverse map: erpnext_lower -> shopify_name + reverse_map = { + erpnext_uom.lower(): shopify_uom for shopify_uom, erpnext_uom in WEIGHT_TO_ERPNEXT_UOM_MAP.items() + } + if key in reverse_map: + return reverse_map[key] + + # If constants are ERPNext -> SHOPIFY, check direct mapping + direct_map = {k.lower(): v for k, v in WEIGHT_TO_ERPNEXT_UOM_MAP.items()} + if key in direct_map: + return direct_map[key] + + # fallbacks (erpnext uom -> shopify name) + synonyms = { + "kg": "KILOGRAMS", + "kilogram": "KILOGRAMS", + "kilograms": "KILOGRAMS", + "g": "GRAMS", + "gram": "GRAMS", + "grams": "GRAMS", + "oz": "OUNCES", + "ounce": "OUNCES", + "ounces": "OUNCES", + "lb": "POUNDS", + "lbs": "POUNDS", + "pound": "POUNDS", + "pounds": "POUNDS", + } + + return synonyms.get(key, "GRAMS") def update_default_variant_properties( - shopify_product: Product, + shopify_product: dict, is_stock_item: bool, sku: str | None = None, price: float | None = None, ): - """Shopify creates default variant upon saving the product. + """Update default variant properties for Shopify products (GraphQL + REST compatible). - Some item properties are supposed to be updated on the default variant. - Input: saved shopify_product, sku and price + Handles both: + - REST API format: product["variants"] -> [ { ...variant... } ] + - GraphQL format: product["variants"]["edges"] -> [ { "node": {...variant...} } ] """ - default_variant: Variant = shopify_product.variants[0] - # this will create Inventory item and qty will be updated by scheduled job. - if is_stock_item: - default_variant.inventory_management = "shopify" + # Determine variant source + variants = shopify_product.get("variants") + default_variant = {} + + # ---- GraphQL Format ---- + if isinstance(variants, dict) and "edges" in variants: + edges = variants.get("edges", []) + if edges and isinstance(edges[0], dict): + default_variant = edges[0].get("node", {}) or {} + if not default_variant: + frappe.log_error( + f"No default variant found for Shopify product: {shopify_product}", + "update_default_variant_properties", + ) + return + + # ---- Apply Updates ---- + if sku: + default_variant["sku"] = sku if price is not None: - default_variant.price = price - if sku is not None: - default_variant.sku = sku + # In GraphQL structure, this is often nested under node or variant input + default_variant["price"] = float(price) + + if is_stock_item: + # REST uses "inventory_management" = "shopify" + # GraphQL uses "tracked" = true + if "inventory_management" in default_variant: + default_variant["inventory_management"] = "shopify" + else: + default_variant["tracked"] = True + else: + # If not stock item, untrack in GraphQL context + default_variant["tracked"] = False + + return default_variant + + +def create_item(payload, request_id=None): + frappe.set_user("Administrator") + frappe.flags.request_id = request_id + if not payload: + data = frappe.request.get_json() + + product_id = payload.get("id") + if not product_id: + frappe.log_error("Shopify Product Webhook: Missing product ID", str(data)) + return + try: + sp = ShopifyProduct(product_id=product_id) + sp.sync_product() + except Exception: + frappe.log_error(f"Shopify Product Sync Failed ({product_id})", frappe.get_traceback()) -def write_upload_log(status: bool, product: Product, item, action="Created") -> None: + +def write_upload_log(status: bool, product, item, action="Created") -> None: + """Log upload results for Shopify product sync (JSON-safe).""" + + # --- STEP 1: Extract raw product data safely --- + if hasattr(product, "to_dict"): + raw = product.to_dict() + elif isinstance(product, dict): + raw = product + else: + raw = {"raw": str(product)} + + def safe_default(o): + return str(o) + + try: + product_json = json.dumps(raw, default=safe_default) + product_data = json.loads(product_json) + except Exception: + # absolute last fallback + product_data = {"raw": str(raw)} + + # --- STEP 3: Handle the ERROR case --- if not status: + error_messages = "" + + # Shopify REST API errors + if hasattr(product, "errors"): + try: + error_messages = ", ".join(product.errors.full_messages()) + except Exception: + error_messages = str(product.errors) + + # GraphQL / dict errors + elif isinstance(product, dict) and "errors" in product: + try: + error_messages = json.dumps(product["errors"], indent=2) + except Exception: + error_messages = str(product["errors"]) + msg = _("Failed to upload item to Shopify") + "
" - msg += _("Shopify reported errors:") + " " + ", ".join(product.errors.full_messages()) - msgprint(msg, title="Note", indicator="orange") + if error_messages: + msg += _("Shopify reported errors:") + " " + error_messages + + frappe.msgprint(msg, title="Note", indicator="orange") create_shopify_log( status="Error", - request_data=product.to_dict(), + request_data=product_data, message=msg, method="upload_erpnext_item", ) - else: - create_shopify_log( - status="Success", - request_data=product.to_dict(), - message=f"{action} Item: {item.name}, shopify product: {product.id}", - method="upload_erpnext_item", - ) + return + + # --- STEP 4: Handle the SUCCESS case --- + product_id = None + + # Shopify REST (object) + if hasattr(product, "id"): + product_id = getattr(product, "id", None) + + # Shopify GraphQL (dict) + if not product_id: + product_id = product_data.get("id") or "Unknown" + + create_shopify_log( + status="Success", + request_data=product_data, + message=f"{action} Item: {item.name}, shopify product: {product_id}", + method="upload_erpnext_item", + ) diff --git a/ecommerce_integrations/shopify/return.py b/ecommerce_integrations/shopify/return.py new file mode 100644 index 000000000..c3051270d --- /dev/null +++ b/ecommerce_integrations/shopify/return.py @@ -0,0 +1,312 @@ +from copy import deepcopy + +import frappe +from erpnext.controllers.sales_and_purchase_return import make_return_doc +from frappe.utils import cint + +from ecommerce_integrations.shopify.constants import ( + ORDER_ID_FIELD, + SHOPIFY_LINE_ITEM_ID_FIELD, + SHOPIFY_RETURN_ID_FIELD, +) +from ecommerce_integrations.shopify.utils import create_shopify_log + + +def process_shopify_return(payload, request_id=None): + """ + Entry point for Shopify returns webhooks + Handles: open | approved | closed + """ + frappe.set_user("Administrator") + frappe.flags.request_id = request_id + + try: + create_shopify_log( + status="Debug", + message=f"Received payload: {frappe.as_json(payload)}", + ) + return_status = payload.get("status") + shopify_return_id = payload.get("id") + + order_id = payload.get("order_id") or (payload.get("order") or {}).get("id") + + if not shopify_return_id or not order_id: + create_shopify_log( + status="Invalid", + message=payload, + ) + return + + if return_status == "open": + _create_return_delivery_note( + payload, + shopify_return_id, + order_id, + ) + return + if return_status == "closed": + create_shopify_log( + status="Ignored", + message=f"Return {shopify_return_id} is closed", + ) + return + + create_shopify_log( + status="Ignored", + message=f"Unknown return status: {return_status}", + ) + + except Exception as e: + create_shopify_log( + status="Error", + exception=e, + rollback=True, + ) + + +def _create_return_delivery_note(payload, shopify_return_id, order_id): + """ + Creates ERPNext Delivery Note Return + """ + + if frappe.db.get_value( + "Delivery Note", + {"shopify_return_id": shopify_return_id}, + "name", + ): + create_shopify_log( + status="Ignored", + message=f"Return {shopify_return_id} already processed", + ) + return + + dn_name = frappe.db.get_value( + "Delivery Note", + { + ORDER_ID_FIELD: order_id, + "docstatus": 1, + }, + "name", + ) + + if not dn_name: + create_shopify_log( + status="Invalid", + message="Original Delivery Note not found", + ) + return + + original_dn = frappe.get_doc("Delivery Note", dn_name) + + return_dn = make_return_doc("Delivery Note", original_dn.name) + + map_return_items( + return_dn, + payload.get("return_line_items") or [], + ) + + if not return_dn.items: + create_shopify_log( + status="Invalid", + message="Approved return has no returnable items", + ) + return + + return_dn.shopify_return_id = shopify_return_id + return_dn.flags.ignore_mandatory = True + + return_dn.save() + return_dn.submit() + + create_shopify_log( + status="Success", + message=f"Return Delivery Note created: {return_dn.name}", + ) + + +def map_return_items(return_dn, return_line_items): + """ + Map Shopify return_line_items → ERPNext Delivery Note items + """ + + # Make a deepcopy to avoid mutating the payload + return_line_items = deepcopy(return_line_items) + + for r_item in return_line_items: + fulfillment_line_item = r_item.get("fulfillment_line_item") or {} + line_item = fulfillment_line_item.get("line_item") or {} + + shopify_line_item_id = str(line_item.get("id")) + return_qty = cint(r_item.get("quantity")) + + if not shopify_line_item_id or return_qty <= 0: + continue + + for dn_item in return_dn.items: + # Skip items with no Sales Order detail + if not dn_item.so_detail: + continue + + # Get Shopify line item ID from Sales Order Item + so_shopify_line_item_id = frappe.db.get_value( + "Sales Order Item", + dn_item.so_detail, + SHOPIFY_LINE_ITEM_ID_FIELD, + ) + + if str(so_shopify_line_item_id or "") != shopify_line_item_id: + continue + + # Determine allowed return quantity + allowed_qty = abs(dn_item.qty) if dn_item.qty < 0 else dn_item.qty + if allowed_qty <= 0: + continue + + # Set the return quantity (negative for return) + dn_item.qty = -min(return_qty, allowed_qty) + dn_item.stock_qty = dn_item.qty * (dn_item.conversion_factor or 1) + break # Found match, go to next Shopify line item + + # Remove items with qty 0 to avoid ERPNext error + return_dn.items = [d for d in return_dn.items if d.qty != 0] + + +def map_return_si_items(return_si, return_line_items): + """ + Map Shopify return_line_items → ERPNext Sales Invoice items + """ + + return_line_items = deepcopy(return_line_items) + + for r_item in return_line_items: + fulfillment_line_item = r_item.get("fulfillment_line_item") or {} + line_item = fulfillment_line_item.get("line_item") or {} + + shopify_line_item_id = str(line_item.get("id")) + return_qty = cint(r_item.get("quantity")) + + if not shopify_line_item_id or return_qty <= 0: + continue + + for si_item in return_si.items: + if not si_item.so_detail: + continue + + so_shopify_line_item_id = frappe.db.get_value( + "Sales Order Item", + si_item.so_detail, + SHOPIFY_LINE_ITEM_ID_FIELD, + ) + + if str(so_shopify_line_item_id or "") != shopify_line_item_id: + continue + + allowed_qty = abs(si_item.qty) if si_item.qty < 0 else si_item.qty + if allowed_qty <= 0: + continue + + si_item.qty = -min(return_qty, allowed_qty) + break + + # Remove zero-qty items + return_si.items = [d for d in return_si.items if d.qty != 0] + + +def process_invoice_return(payload, request_id=None): + """ + Entry point for Shopify returns webhooks + Handles: open | approved | closed + """ + frappe.set_user("Administrator") + frappe.flags.request_id = request_id + + try: + create_shopify_log( + status="Debug", + message=f"Received payload: {frappe.as_json(payload)}", + ) + shopify_return_id = payload.get("id") + + order_id = payload.get("order_id") or (payload.get("order") or {}).get("id") + + if not shopify_return_id or not order_id: + create_shopify_log( + status="Invalid", + message=payload, + ) + return + + create_return_sales_invoice( + payload, + shopify_return_id, + order_id, + ) + + except Exception as e: + create_shopify_log( + status="Error", + exception=e, + rollback=True, + ) + + +def create_return_sales_invoice(payload, shopify_return_id, order_id): + """ + Creates ERPNext Sales Invoice Return (Credit Note) + """ + + # Prevent duplicate credit notes + if frappe.db.get_value( + "Sales Invoice", + {"shopify_return_id": shopify_return_id}, + "name", + ): + create_shopify_log( + status="Ignored", + message=f"Sales Invoice return already created for {shopify_return_id}", + ) + return + + si_name = frappe.db.get_value( + "Sales Invoice", + { + ORDER_ID_FIELD: order_id, + "docstatus": 1, + "is_return": 0, + }, + "name", + ) + + if not si_name: + create_shopify_log( + status="Invalid", + message="Original Sales Invoice not found", + ) + return + + original_si = frappe.get_doc("Sales Invoice", si_name) + + return_si = make_return_doc("Sales Invoice", original_si.name) + + map_return_si_items( + return_si, + payload.get("return_line_items") or [], + ) + + if not return_si.items: + create_shopify_log( + status="Invalid", + message="Approved return has no returnable invoice items", + ) + return + + return_si.shopify_return_id = str(shopify_return_id) + return_si.flags.ignore_mandatory = True + + return_si.save() + return_si.submit() + + create_shopify_log( + status="Success", + message=f"Return Sales Invoice created: {return_si.name}", + )