diff --git a/commerce_coordinator/apps/commercetools/management/commands/create_commercetools_product.py b/commerce_coordinator/apps/commercetools/management/commands/create_commercetools_product.py new file mode 100644 index 000000000..5529eef90 --- /dev/null +++ b/commerce_coordinator/apps/commercetools/management/commands/create_commercetools_product.py @@ -0,0 +1,66 @@ +import json + +from commercetools.platform.models import ( + ProductDraft, + ProductPriceModeEnum, + ProductTypeResourceIdentifier, + ProductVariantDraft +) + +from commerce_coordinator.apps.commercetools.management.commands._ct_api_client_command import ( + CommercetoolsAPIClientCommand +) + +product_json = ''' + + + +''' + + +class Command(CommercetoolsAPIClientCommand): + help = "Create a commercetools product from a JSON string or file" + + def handle(self, *args, **options): + if product_json: + try: + product_data = json.loads(product_json) + except json.JSONDecodeError as e: + self.stderr.write(f"Invalid JSON format: {e}") + return + else: + print("\n\n\n\nNo JSON data provided.\n\n\n\n") + return + + product_type_data = product_data.get("productType") + if product_type_data: + product_type = ProductTypeResourceIdentifier(id=product_type_data["id"]) + else: + print("\n\n\n\nMissing productType data.\n\n\n\n") + return + + master_variant_data = product_data.get("masterVariant") + variants_data = product_data.get("variants", []) + + master_variant = ProductVariantDraft(**master_variant_data) + variants = [ProductVariantDraft(**variant) for variant in variants_data] + + product_draft_data = { + "key": product_data.get("key"), + "name": product_data.get("name"), + "description": product_data.get("description"), + "slug": product_data.get("slug"), + "price_mode": ProductPriceModeEnum.STANDALONE, + "publish": product_data.get("publish"), + "tax_category": product_data.get("taxCategory"), + "master_variant": master_variant, + "variants": variants, + "product_type": product_type + } + + try: + product_draft = ProductDraft(**product_draft_data) + created_product = self.ct_api_client.base_client.products.create(draft=product_draft) + print(f"\n\n\n\nSuccessfully created product with ID: {created_product.id}") + except Exception as e: + print(f"\n\n\n\nError creating product: {e}") diff --git a/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_discount_data_to_csv.py b/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_discount_data_to_csv.py new file mode 100644 index 000000000..793a1e966 --- /dev/null +++ b/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_discount_data_to_csv.py @@ -0,0 +1,99 @@ +import csv +from datetime import datetime +from typing import Dict, List + +from commerce_coordinator.apps.commercetools.http_api_client import CTCustomAPIClient +from commerce_coordinator.apps.commercetools.management.commands._timed_command import TimedCommand + + +class Command(TimedCommand): + help = "Fetch and verify discount attributes from CommerceTools" + + def handle(self, *args, **options): + try: + ct_api_client = CTCustomAPIClient() + except Exception as e: + print(f"Error initializing Commercetools API client: {e}") + return + + # Fetch discounts based on type + discounts = self.fetch_discounts(ct_api_client) + + # Write data to CSV + self.write_attributes_to_csv(discounts) + + def fetch_discounts(self, ct_api_client): + page_size = 500 + + lastId = None + should_continue = True + results = [] + while should_continue: + if lastId is None: + response = ct_api_client._make_request( + method="GET", + endpoint="discount-codes", + params={ + "limit": page_size, + "sort": "id asc", + "expand": "cartDiscounts[*]", + } + ) + else: + response = ct_api_client._make_request( + method="GET", + endpoint="discount-codes", + params={ + "limit": page_size, + "sort": "id asc", + "expand": "cartDiscounts[*]", + "where": f'id > "{lastId}"' + } + ) + if not response: + print("Failed to get discount codes with code from Commercetools.") + return None + + batch_results = response["results"] + results.extend(batch_results) + should_continue = (len(batch_results) == page_size) + + if batch_results: + lastId = batch_results[-1]["id"] + + return results + + def write_attributes_to_csv(self, discounts: List[Dict]): + if not discounts: + print(f"No discounts found.") + return + + # Dynamically extract all unique keys across all discount dictionaries + discounts_new = [] + for discount in discounts: + cart_discount = discount["cartDiscounts"][0]["obj"] + discount = { + "name": discount["name"].get('en-US', ''), + "code": discount["code"], + "validFrom": discount.get("validFrom", None), + "validUntil": discount.get("validUntil", None), + "maxApplications": discount.get("maxApplications", None), + "maxApplicationsPerCustomer": discount.get("maxApplicationsPerCustomer", None), + "cartDiscountName": cart_discount["name"].get('en-US', ''), + "cartDiscountKey": cart_discount["key"], + "discountType": cart_discount["custom"]["fields"].get('discountType'), + "category": cart_discount["custom"]["fields"].get('category'), + "channel": cart_discount["custom"]["fields"].get('channel'), + } + discounts_new.append(discount) + + # Define CSV filename with discount type and date + filename = f"discount_{datetime.now().strftime('%Y%m%d')}.csv" + + # Write to CSV + with open(filename, "w", newline="") as output_file: + dict_writer = csv.DictWriter(output_file, fieldnames=discounts_new[0].keys()) + dict_writer.writeheader() + dict_writer.writerows(discounts_new) + + print(f"\n\n\n\n\n\n\nCSV file '{filename}' written successfully with {len(discounts_new)} records.") diff --git a/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_products_to_csv.py b/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_products_to_csv.py new file mode 100644 index 000000000..72365a9c0 --- /dev/null +++ b/commerce_coordinator/apps/commercetools/management/commands/extract_commercetools_products_to_csv.py @@ -0,0 +1,140 @@ +import csv +from datetime import datetime +from enum import Enum + +from commerce_coordinator.apps.commercetools.management.commands._ct_api_client_command import ( + CommercetoolsAPIClientCommand +) + + +# Enum for product types +class ProductType(Enum): + EDX_COURSE_ENTITLEMENT = "edx_course_entitlement" + EDX_PROGRAM = "edx_program" + OC_SELF_PACED = "oc_self_paced" + EDX_COURSE = "oc_self_paced" + + +STAGE_PRODUCT_TYPE_ID_MAPPING = { + ProductType.EDX_COURSE_ENTITLEMENT.value: "12e5510c-a4d6-4301-9caf-17053e57ff71", + ProductType.EDX_PROGRAM.value: "79fb6abe-8373-4dec-a8d1-51242b1798b8", + ProductType.OC_SELF_PACED.value: "9f8ec882-043a-4225-8811-00ac5acfd580" +} + +PROD_PRODUCT_TYPE_ID_MAPPING = { + ProductType.EDX_COURSE_ENTITLEMENT.value: "9f1f189a-4d79-4eaa-9c6e-cfcb61aa779f", + ProductType.EDX_PROGRAM.value: "c6a2d629-a50e-4d88-bd01-ab05a0617eae", + ProductType.EDX_COURSE.value: "b241ac79-fee2-461d-b714-8f3c4a1c4c0e" +} + + +class Command(CommercetoolsAPIClientCommand): + help = "Fetch and verify course attributes from CommerceTools" + + def handle(self, *args, **options): + # Specify product type to fetch + product_type = ProductType.EDX_PROGRAM + + # Fetch products based on type + products = self.fetch_products(product_type) + + # Write data to CSV + self.write_attributes_to_csv(products, product_type) + + def fetch_products(self, product_type): + limit = 500 + offset = 0 + products = [] + + product_type_id = PROD_PRODUCT_TYPE_ID_MAPPING.get(product_type.value) + + while True: + products_result = self.ct_api_client.base_client.products.query( + limit=limit, + offset=offset, + where=f"productType(id=\"{product_type_id}\")" + ) + for product in products_result.results: + attributes = self.extract_product_attributes(product, product_type.value) + products.extend(attributes) + + if products_result.offset + products_result.limit >= products_result.total: + break + offset += limit + + return products + + def extract_product_attributes(self, product, product_type): + # Extract common product-level attributes + common_attributes = { + "product_type": product_type, + "product_id": product.id, + "product_key": product.key, + "published_status": product.master_data.published, + "name": product.master_data.current.name.get('en-US', ''), + "slug": product.master_data.current.slug.get('en-US', ''), + "description": ( + product.master_data.current.description.get('en-US', '') + if product.master_data.current.description + else '' + ), + "date_created": product.created_at, + "master_variant_key": product.master_data.current.master_variant.key, + "master_variant_sku": product.master_data.current.master_variant.sku, + "master_variant_image_url": ( + product.master_data.current.master_variant.images[0].url + if product.master_data.current.master_variant.images + else None + ), + } + + product_rows = [] # This will hold the product and variant rows + + # Add the master variant attributes + if len(product.master_data.current.variants) == 0: + master_variant_attributes = {attr.name: attr.value for attr in + product.master_data.current.master_variant.attributes} + product_rows.append({**common_attributes, **master_variant_attributes}) + + # Add attributes for each variant and create a separate row, including variant_key and variant_sku + for variant in product.master_data.current.variants: + variant_attributes = {attr.name: attr.value for attr in variant.attributes} + variant_row = { + **common_attributes, + "variant_key": variant.key, # Add variant_key + "variant_sku": variant.sku, # Add variant_sku + "variant_image_url": ( + variant.images[0].url + if variant.images + else None + ), + **variant_attributes, + } + # Create a new row for each variant, combining common product data with variant-specific attributes + product_rows.append(variant_row) + + return product_rows + + def write_attributes_to_csv(self, products, product_type): + if not products: + print(f"No products found for type {product_type}.") + return + + # Dynamically extract all unique keys across all product dictionaries + keys = set() + for product in products: + keys.update(product.keys()) + + # Convert keys set back to a list and sort them if you want a consistent order + keys = sorted(list(keys)) + + # Define CSV filename with product type and date + filename = f"{product_type.value}_attributes_{datetime.now().strftime('%Y%m%d')}.csv" + + # Write to CSV + with open(filename, "w", newline="") as output_file: + dict_writer = csv.DictWriter(output_file, fieldnames=keys) + dict_writer.writeheader() + dict_writer.writerows(products) + + print(f"\n\n\n\n\n\n\nCSV file '{filename}' written successfully with {len(products)} records.") diff --git a/commerce_coordinator/apps/commercetools/management/commands/update_commercetools_product.py b/commerce_coordinator/apps/commercetools/management/commands/update_commercetools_product.py new file mode 100644 index 000000000..7cf7d470a --- /dev/null +++ b/commerce_coordinator/apps/commercetools/management/commands/update_commercetools_product.py @@ -0,0 +1,71 @@ +import json + +from commerce_coordinator.apps.commercetools.http_api_client import CTCustomAPIClient +from commerce_coordinator.apps.commercetools.management.commands._ct_api_client_command import ( + CommercetoolsAPIClientCommand +) + +product_id = '21533075-1710-4cbc-88d2-a18ba4176fdd' +product_json = ''' + +{ + "version": 9, + "actions": [ + { + "action": "setDescription", + "description": { + "en-US": "

This course reflects the most current version of the PMP exam.

" + } + }, + { + "action": "publish" + } + ] + } + + +''' + + +class Command(CommercetoolsAPIClientCommand): + help = "Update a commercetools product from a JSON string or file" + + def handle(self, *args, **options): + if product_json: + try: + product_data = json.loads(product_json) + except json.JSONDecodeError as e: + self.stderr.write(f"Invalid JSON format: {e}") + return + else: + print("\n\n\n\nNo JSON data provided.\n\n\n\n") + return + + version = product_data.get("version") + actions = product_data.get("actions") + + if not product_id or not version or not actions: + print("\n\n\n\nMissing product ID, version, or actions.\n\n\n\n") + return + + # Initialize the custom commercetools client + ct_client = CTCustomAPIClient() + + # Prepare the data for updating the product + update_payload = { + "version": version, + "actions": actions + } + + # Make the request to update the product + endpoint = f"products/{product_id}" + response = ct_client._make_request( + method="POST", + endpoint=endpoint, + json=update_payload + ) + + if response: + print(f"\n\n\n\n\nSuccessfully updated product with ID: {response.get('id')}\n\n\n\n\n") + else: + print("\n\n\n\n\nError updating product.\n\n\n\n\n\n")