Skip to content

Commit 197822f

Browse files
authored
Merge pull request #2 from Ke-vin-S/feature/sync-with-order
Add hosted db for production
2 parents 5e680fa + a2db478 commit 197822f

10 files changed

Lines changed: 297 additions & 11 deletions

File tree

.env.example

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,10 @@ LOGISTICS_SERVICE_PORT=8002
33
KAFKA_BROKER_URL=kafka:9092
44
IS_DOCKER=True
55
ALLOWED_HOSTS=localhost,logistics_service,api_gateway
6+
7+
ENVIRONMENT=production
8+
DB_NAME=your_supabase_db_name
9+
DB_USER=your_supabase_user
10+
DB_PASSWORD=your_supabase_password
11+
DB_HOST=db.xxxxxxxxx.supabase.co
12+
DB_PORT=5432

kafka_client/__init__.py

Whitespace-only changes.

kafka_client/consumer.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from confluent_kafka import Consumer, KafkaException
2+
import json
3+
import os
4+
import logging
5+
6+
logger = logging.getLogger(__name__)
7+
8+
KAFKA_BROKER_URL = os.getenv("KAFKA_BROKER_URL", "localhost:9092")
9+
10+
def start_consumer(topic: str, group_id: str = "default-group", auto_offset_reset: str = "latest"):
11+
consumer = Consumer({
12+
'bootstrap.servers': KAFKA_BROKER_URL,
13+
'group.id': group_id,
14+
'auto.offset.reset': auto_offset_reset,
15+
})
16+
17+
consumer.subscribe([topic])
18+
logger.info(f"Listening to topic: {topic}")
19+
20+
try:
21+
while True:
22+
msg = consumer.poll(timeout=1.0)
23+
if msg is None:
24+
continue
25+
if msg.error():
26+
raise KafkaException(msg.error())
27+
28+
value = json.loads(msg.value().decode("utf-8"))
29+
key = msg.key().decode("utf-8") if msg.key() else None
30+
handle_message(msg.topic(), key, value)
31+
except KeyboardInterrupt:
32+
print("Consumer stopped.")
33+
finally:
34+
consumer.close()
35+
36+
def handle_message(topic: str, key: str, value: dict):
37+
if topic == "shipment.status.updated":
38+
print(f"Shipment event for Order {key}: {value}")

kafka_client/producer.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from confluent_kafka import Producer
2+
import json
3+
import os
4+
import logging
5+
6+
logger = logging.getLogger(__name__)
7+
8+
KAFKA_BROKER_URL = os.getenv("KAFKA_BROKER_URL", "localhost:9092")
9+
10+
producer = Producer({
11+
'bootstrap.servers': KAFKA_BROKER_URL
12+
})
13+
14+
def delivery_report(err, msg):
15+
if err is not None:
16+
logger.error(f"Delivery failed for record {msg.key()}: {err}")
17+
else:
18+
logger.info(f"Record {msg.key()} successfully produced to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
19+
20+
def publish_event(topic: str, key: str, payload: dict):
21+
try:
22+
producer.produce(
23+
topic=topic,
24+
key=key,
25+
value=json.dumps(payload),
26+
callback=delivery_report
27+
)
28+
producer.flush() # ensures all messages are delivered before exiting
29+
except Exception as e:
30+
logger.error(f"[Kafka Error] Failed to publish to topic '{topic}': {e}")

logistics_core/settings.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,27 @@
8787
# Database
8888
# https://docs.djangoproject.com/en/5.2/ref/settings/#databases
8989

90-
DATABASES = {
91-
'default': {
92-
'ENGINE': 'django.db.backends.sqlite3',
93-
'NAME': BASE_DIR / 'db.sqlite3',
94-
}
95-
}
96-
9790

91+
ENV = os.getenv("ENVIRONMENT", "development")
92+
93+
if ENV == "production":
94+
DATABASES = {
95+
'default': {
96+
'ENGINE': 'django.db.backends.postgresql',
97+
'NAME': os.getenv('DB_NAME'),
98+
'USER': os.getenv('DB_USER'),
99+
'PASSWORD': os.getenv('DB_PASSWORD'),
100+
'HOST': os.getenv('DB_HOST'),
101+
'PORT': os.getenv('DB_PORT'),
102+
}
103+
}
104+
else:
105+
DATABASES = {
106+
'default': {
107+
'ENGINE': 'django.db.backends.sqlite3',
108+
'NAME': BASE_DIR / "db.sqlite3",
109+
}
110+
}
98111
# Password validation
99112
# https://docs.djangoproject.com/en/5.2/ref/settings/#auth-password-validators
100113

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ sqlparse==0.5.3
2525
tzdata==2025.2
2626
uritemplate==4.1.1
2727

28-
kafka~=1.3.5
2928
requests~=2.32.3
30-
django-environ~=0.12.0
29+
django-environ~=0.12.0
30+
psycopg2-binary==2.9.10
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import os
2+
import json
3+
import time
4+
from confluent_kafka import Consumer, KafkaException
5+
6+
def consume_messages(topic, timeout=10, group_id="shipment-integration-tests"):
7+
consumer = Consumer({
8+
'bootstrap.servers': os.getenv("KAFKA_BROKER_URL", "localhost:9092"),
9+
'group.id': group_id,
10+
'auto.offset.reset': 'earliest',
11+
'enable.auto.commit': True,
12+
})
13+
14+
consumer.subscribe([topic])
15+
end_time = time.time() + timeout
16+
messages = []
17+
18+
try:
19+
while time.time() < end_time:
20+
msg = consumer.poll(timeout=1.0)
21+
if msg is None:
22+
continue
23+
if msg.error():
24+
raise KafkaException(msg.error())
25+
26+
key = msg.key().decode("utf-8") if msg.key() else None
27+
value = json.loads(msg.value().decode("utf-8"))
28+
print(f"[KAFKA] Got: {value}")
29+
messages.append(msg)
30+
31+
# optional: stop early if messages are found
32+
if messages:
33+
break
34+
finally:
35+
consumer.close()
36+
37+
return messages
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import json
2+
import unittest
3+
4+
from django.urls import reverse
5+
from rest_framework.test import APITestCase
6+
from shipments.models import Shipment
7+
from shipments.tests.kafka_test_consumer import consume_messages
8+
9+
10+
class ShipmentKafkaIntegrationTests(APITestCase):
11+
def setUp(self):
12+
self.shipment = Shipment.objects.create(
13+
shipment_id="SHIP-KAFKA-001",
14+
order_id="ORD-KAFKA-001",
15+
origin={"lat": 7.87, "lng": 80.77},
16+
destination={"lat": 6.92, "lng": 79.86},
17+
demand=5
18+
)
19+
20+
@unittest.skip("Temporarily skipping this test")
21+
def test_dispatch_and_kafka_publish(self):
22+
# Schedule the shipment
23+
url_schedule = reverse("shipment-mark-scheduled", kwargs={"pk": self.shipment.pk})
24+
self.client.post(url_schedule, data={"scheduled_time": "2025-05-17T10:00:00Z"})
25+
26+
# Dispatch the shipment
27+
url_dispatch = reverse("shipment-mark-dispatched", kwargs={"pk": self.shipment.pk})
28+
response = self.client.post(url_dispatch, data={"dispatch_time": "2025-05-17T10:15:00Z"})
29+
30+
self.assertEqual(response.status_code, 200)
31+
32+
# Consume Kafka messages
33+
messages = consume_messages("shipment.status.updated")
34+
35+
dispatched = []
36+
for msg in messages:
37+
payload = json.loads(msg.value().decode("utf-8"))
38+
print("DEBUG Kafka:", payload)
39+
if payload["order_id"] == self.shipment.order_id and payload["status"] == "dispatched":
40+
dispatched.append(payload)
41+
42+
self.assertTrue(dispatched, "Dispatched Kafka message not found.")
43+
self.assertEqual(dispatched[0]["shipment_id"], self.shipment.shipment_id)
44+
45+
@unittest.skip("Temporarily skipping this test")
46+
def test_delivered_and_kafka_publish(self):
47+
# Full flow: schedule → dispatch → in_transit → deliver
48+
self.client.post(reverse("shipment-mark-scheduled", kwargs={"pk": self.shipment.pk}))
49+
self.client.post(reverse("shipment-mark-dispatched", kwargs={"pk": self.shipment.pk}))
50+
self.client.post(reverse("shipment-mark-in-transit", kwargs={"pk": self.shipment.pk}))
51+
response = self.client.post(
52+
reverse("shipment-mark-delivered", kwargs={"pk": self.shipment.pk}),
53+
data={"delivery_time": "2025-05-17T12:00:00Z"}
54+
)
55+
self.assertEqual(response.status_code, 200)
56+
57+
# Consume Kafka messages
58+
messages = consume_messages("shipment.status.updated", timeout=15)
59+
60+
found_dispatched = False
61+
found_delivered = False
62+
63+
for msg in messages:
64+
payload = json.loads(msg.value().decode("utf-8"))
65+
print("DEBUG Kafka:", payload)
66+
if payload["order_id"] != self.shipment.order_id:
67+
continue
68+
if payload["status"] == "dispatched":
69+
found_dispatched = True
70+
elif payload["status"] == "delivered":
71+
found_delivered = True
72+
73+
self.assertTrue(found_dispatched, "Dispatched Kafka message not found")
74+
self.assertTrue(found_delivered, "Delivered Kafka message not found")
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from django.test import TestCase
2+
from django.utils import timezone
3+
from shipments.models import Shipment
4+
from django.core.exceptions import ValidationError
5+
6+
class ShipmentModelTests(TestCase):
7+
def setUp(self):
8+
self.shipment = Shipment.objects.create(
9+
shipment_id="SHIP100",
10+
order_id="ORD100",
11+
origin={"lat": 7.8731, "lng": 80.7718},
12+
destination={"lat": 6.9271, "lng": 79.8612},
13+
demand=10
14+
)
15+
16+
def test_schedule_from_pending(self):
17+
self.shipment.mark_scheduled(scheduled_time=timezone.now())
18+
self.assertEqual(self.shipment.status, 'scheduled')
19+
self.assertIsNotNone(self.shipment.scheduled_dispatch)
20+
21+
def test_dispatch_from_scheduled(self):
22+
self.shipment.mark_scheduled()
23+
self.shipment.mark_dispatched()
24+
self.assertEqual(self.shipment.status, 'dispatched')
25+
self.assertIsNotNone(self.shipment.actual_dispatch)
26+
27+
def test_invalid_dispatch_from_pending(self):
28+
with self.assertRaises(ValidationError):
29+
self.shipment.mark_dispatched()
30+
31+
def test_in_transit_from_dispatched(self):
32+
self.shipment.mark_scheduled()
33+
self.shipment.mark_dispatched()
34+
self.shipment.mark_in_transit()
35+
self.assertEqual(self.shipment.status, 'in_transit')
36+
37+
def test_delivered_from_in_transit(self):
38+
self.shipment.mark_scheduled()
39+
self.shipment.mark_dispatched()
40+
self.shipment.mark_in_transit()
41+
self.shipment.mark_delivered()
42+
self.assertEqual(self.shipment.status, 'delivered')
43+
self.assertIsNotNone(self.shipment.delivery_time)
44+
45+
def test_invalid_delivered_from_pending(self):
46+
with self.assertRaises(ValidationError):
47+
self.shipment.mark_delivered()
48+
49+
def test_failed_from_active_states(self):
50+
self.shipment.mark_failed()
51+
self.assertEqual(self.shipment.status, 'failed')
52+
53+
def test_invalid_failed_from_delivered(self):
54+
self.shipment.mark_scheduled()
55+
self.shipment.mark_dispatched()
56+
self.shipment.mark_in_transit()
57+
self.shipment.mark_delivered()
58+
with self.assertRaises(ValidationError):
59+
self.shipment.mark_failed()

shipments/views.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
from django.utils import timezone
12
from rest_framework import viewsets, status
23
from rest_framework.decorators import action
34
from rest_framework.response import Response
45
from django.core.exceptions import ValidationError
56
from django.utils.dateparse import parse_datetime
67
from django.shortcuts import get_object_or_404
78

9+
from kafka_client.producer import publish_event
810
from .models import Shipment
911
from .serializers import ShipmentSerializer
1012

@@ -48,7 +50,20 @@ def mark_scheduled(self, request, pk=None):
4850
@action(detail=True, methods=['post'])
4951
def mark_dispatched(self, request, pk=None):
5052
shipment = get_object_or_404(Shipment, pk=pk)
51-
return self.handle_transition(request, shipment, shipment.mark_dispatched, time_field='dispatch_time')
53+
response = self.handle_transition(request, shipment, shipment.mark_dispatched, time_field='dispatch_time')
54+
55+
if response.status_code == 200:
56+
publish_event(
57+
topic="shipment.status.updated",
58+
key=str(shipment.order_id),
59+
payload={
60+
"shipment_id": shipment.shipment_id,
61+
"order_id": shipment.order_id,
62+
"status": "dispatched",
63+
"timestamp": timezone.now().isoformat()
64+
}
65+
)
66+
return response
5267

5368
@action(detail=True, methods=['post'])
5469
def mark_in_transit(self, request, pk=None):
@@ -58,7 +73,20 @@ def mark_in_transit(self, request, pk=None):
5873
@action(detail=True, methods=['post'])
5974
def mark_delivered(self, request, pk=None):
6075
shipment = get_object_or_404(Shipment, pk=pk)
61-
return self.handle_transition(request, shipment, shipment.mark_delivered, time_field='delivery_time')
76+
response = self.handle_transition(request, shipment, shipment.mark_delivered, time_field='delivery_time')
77+
78+
if response.status_code == 200:
79+
publish_event(
80+
topic="shipment.status.updated",
81+
key=str(shipment.order_id),
82+
payload={
83+
"shipment_id": shipment.shipment_id,
84+
"order_id": shipment.order_id,
85+
"status": "delivered",
86+
"timestamp": timezone.now().isoformat()
87+
}
88+
)
89+
return response
6290

6391
@action(detail=True, methods=['post'])
6492
def mark_failed(self, request, pk=None):

0 commit comments

Comments
 (0)