-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquestion_processor.py
More file actions
1236 lines (1028 loc) · 53.7 KB
/
question_processor.py
File metadata and controls
1236 lines (1028 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import re
import json
import fitz # PyMuPDF
import cv2
import numpy as np
from PIL import Image
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
from app import app, db
from models import Question, QuestionDocument, Unit, Topic, Subject
# Set up NLTK data path
import nltk
import shutil
# Define NLTK data paths
nltk_data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'nltk_data')
os.makedirs(nltk_data_path, exist_ok=True)
# Add our custom path to the beginning of the NLTK data path
nltk.data.path.insert(0, nltk_data_path)
# Function to manually load NLTK data
def load_nltk_data():
# Check if we have the required data files
punkt_path = os.path.join(nltk_data_path, 'tokenizers', 'punkt')
stopwords_path = os.path.join(nltk_data_path, 'corpora', 'stopwords')
tagger_path = os.path.join(nltk_data_path, 'taggers', 'averaged_perceptron_tagger')
# Check and load punkt
if os.path.exists(punkt_path):
print(f"Found punkt data at {punkt_path}")
else:
print("Downloading punkt data...")
nltk.download('punkt', download_dir=nltk_data_path, quiet=False)
# Check and load stopwords
if os.path.exists(stopwords_path):
print(f"Found stopwords data at {stopwords_path}")
else:
print("Downloading stopwords data...")
nltk.download('stopwords', download_dir=nltk_data_path, quiet=False)
# Check and load averaged_perceptron_tagger
if os.path.exists(os.path.join(tagger_path, 'averaged_perceptron_tagger.pickle')):
print(f"Found averaged_perceptron_tagger data at {tagger_path}")
else:
print("Downloading averaged_perceptron_tagger data...")
nltk.download('averaged_perceptron_tagger', download_dir=nltk_data_path, quiet=False)
# Load required NLTK data
print("Loading NLTK data...")
try:
load_nltk_data()
print("NLTK data loaded successfully")
except Exception as e:
print(f"Error loading NLTK data: {str(e)}")
raise
@dataclass
class ExtractedQuestion:
"""Data class to hold extracted question information."""
question_number: str
question_text: str
page_number: int
section: str
question_type: str
marks: int = 1
has_formula: bool = False
has_diagram: bool = False
metadata: Optional[Dict[str, Any]] = None
class PDFQuestionExtractor:
"""Extract questions from PDF documents with improved text and structure analysis."""
def __init__(self, pdf_path: str):
"""Initialize with path to PDF file."""
self.pdf_path = pdf_path
self.doc = fitz.open(pdf_path)
self.current_section = ""
self.progress_callback = None
self.total_pages = len(self.doc)
def set_progress_callback(self, callback):
"""Set a callback function to report progress.
The callback should accept the following parameters:
- current_page: Current page being processed (0-based)
- total_pages: Total number of pages
- message: Optional status message
"""
self.progress_callback = callback
def _report_progress(self, current_page, message=None):
"""Report progress using the callback if available."""
if self.progress_callback:
self.progress_callback(current_page, self.total_pages, message)
def extract_questions(self) -> List[ExtractedQuestion]:
"""Extract all questions from the PDF with progress reporting."""
app.logger.info(f"Extracting questions from: {os.path.basename(self.pdf_path)}")
questions = []
try:
self._report_progress(0, "Starting question extraction...")
for page_num in range(len(self.doc)):
page = self.doc[page_num]
# Report progress for this page
self._report_progress(
page_num,
f"Extracting questions from page {page_num + 1} of {self.total_pages}..."
)
# Get page text and update section
text = page.get_text()
self._update_section(text)
# Extract questions from this page
page_questions = self._extract_questions_from_page(text, page_num + 1)
questions.extend(page_questions)
# Log progress
if page_num % 5 == 0 or page_num == len(self.doc) - 1:
app.logger.info(
f"Processed page {page_num + 1}/{len(self.doc)} - "
f"Found {len(page_questions)} questions on this page, "
f"Total so far: {len(questions)}"
)
# Final progress update
self._report_progress(
len(self.doc) - 1,
f"Completed extraction of {len(questions)} questions from {len(self.doc)} pages"
)
except Exception as e:
error_msg = f"Error extracting questions from page {page_num + 1}: {str(e)}"
app.logger.error(error_msg, exc_info=True)
self._report_progress(
page_num if 'page_num' in locals() else 0,
f"Error: {error_msg[:200]}"
)
raise
return questions
def _update_section(self, text: str) -> None:
"""Update current section based on section headers in text."""
section_match = re.search(r'Section\s+([A-Z]):\s*([^\n]+)', text)
if section_match:
self.current_section = section_match.group(2).strip()
def _extract_questions_from_page(self, text: str, page_num: int) -> List[ExtractedQuestion]:
"""
Extract questions from a single page's text with improved handling of various formats.
Args:
text: The text content of the page
page_num: The page number
Returns:
List of ExtractedQuestion objects
"""
questions = []
lines = [line.strip() for line in text.split('\n') if line.strip()]
i = 0
while i < len(lines):
line = lines[i]
# Check if line starts with a question number/letter
question_match = self._match_question_pattern(line)
if question_match:
question_num = question_match.group(1).strip()
question_text = [question_match.group(2).strip() if question_match.group(2) else '']
# Initialize variables for tracking question parts
in_question = True
options_started = False
# Collect continuation lines until next question or end
i += 1
while i < len(lines) and in_question:
next_line = lines[i].strip()
# Skip empty lines within the same question
if not next_line:
i += 1
continue
# Check if next line starts a new question
next_question_match = self._match_question_pattern(next_line)
if next_question_match:
in_question = False
continue
# Check for common question endings
if self._is_question_end(next_line, question_text):
in_question = False
continue
# Handle options in multiple choice questions
if re.match(r'^([a-zA-Z]|[ivx]+\)|\d+\.)\s+', next_line):
if not options_started and len(question_text) > 0 and len(question_text[-1]) < 50:
# If we have very short question text, this might be part of the question
question_text.append(next_line)
else:
options_started = True
# For now, we'll include options in the question text
question_text.append(next_line)
else:
# Regular question text
question_text.append(next_line)
i += 1
# Clean up question text
full_text = ' '.join(question_text).strip()
# Skip if question text is too short (likely a false positive)
if len(full_text) < 10:
i += 1
continue
# Create the question object
question = ExtractedQuestion(
question_number=question_num,
question_text=full_text,
page_number=page_num,
section=self.current_section,
question_type=self._determine_question_type(full_text),
marks=self._extract_marks(full_text),
has_formula=self._contains_formula(full_text),
has_diagram=self._contains_diagram_marker(full_text)
)
questions.append(question)
# If we've identified this as a multiple choice question, try to extract the options
if question.question_type == "Multiple Choice":
self._extract_multiple_choice_options(question, question_text)
else:
i += 1
return questions
def _match_question_pattern(self, text: str) -> re.Match:
"""Match text against various question patterns."""
patterns = [
# Numbered questions (1., 2., etc.)
r'^(\d+)[\.\)\]\}\s]\s*(.*)',
# Lettered questions (a), b), etc.)
r'^\(?([a-z])\)\s*(.*)',
# Q1, Q2 or Q1:, Q2:
r'^[Qq]\s*(\d+)[\.\)\:]?\s*(.*)',
# Question 1, Problem 2, etc.
r'^(?:Question|Problem|Exercise|Task)\s*(\d+)[\.\)\: ]?\s*(.*)',
# Section-based numbering (1.1, 1.2, etc.)
r'^(\d+\.\d+)[\.\)\s]\s*(.*)',
# Bullet points with numbers or letters
r'^[•\-*]\s*(\d+|[a-z])\)?\s*(.*)'
]
for pattern in patterns:
match = re.match(pattern, text)
if match:
return match
return None
def _determine_question_type(self, text: str) -> str:
"""
Determine the type of question based on its content, structure, and keywords.
Returns one of: 'Multiple Choice', 'True/False', 'Matching', 'Fill-in-the-Blank',
'Short Answer', 'Long Answer', 'Problem Solving', 'Diagram-based', 'Essay', 'Calculation',
'Proof', 'Case Study', or 'Other'.
"""
text_lower = text.lower().strip()
# Check for multiple choice (A), B), C), etc. or (i), (ii), (iii), etc.)
if (re.search(r'\b(a|b|c|d|e)\)', text_lower) or
re.search(r'\([ivx]+\)', text_lower) or
re.search(r'\b(true|false|t|f)\b', text_lower, re.IGNORECASE)):
return "Multiple Choice"
# Check for true/false questions
if (re.search(r'\b(true|false)\b', text_lower) and
any(word in text_lower for word in ['circle', 'select', 'choose', 'tick', 'mark'])):
return "True/False"
# Check for matching questions
if (re.search(r'match\s+(?:column|the following|items?|pairs?|statements?)', text_lower) or
re.search(r'column\s+(a|i).*column\s+(b|ii)', text_lower, re.DOTALL)):
return "Matching"
# Check for fill-in-the-blank
if (re.search(r'\b(?:fill\s*in|complete|fill\s*the\s*blank)', text_lower) or
re.search(r'\b_+\b', text) or # Underscore placeholders
re.search(r'\b(?:write|provide|give|state)\s+(?:the|a)?\s*[^\n?]*\?', text_lower)):
return "Fill-in-the-Blank"
# Check for diagram-based questions
if self._contains_diagram_marker(text_lower):
return "Diagram-based"
# Check for calculation problems
if (any(word in text_lower for word in ['calculate', 'compute', 'solve for', 'find', 'determine', 'evaluate', 'simplify']) or
re.search(r'\b(?:what is|what are|how (?:much|many|long|far|fast|tall|wide|high))\b', text_lower) or
self._contains_formula(text_lower)):
return "Problem Solving"
# Check for proof questions
if (any(word in text_lower for word in ['prove', 'show that', 'demonstrate', 'verify', 'derive']) or
re.search(r'\b(?:prove|show)\s+(?:that\s+)?[A-Z]', text)):
return "Proof"
# Check for case studies
if (any(word in text_lower for word in ['case study', 'case of', 'scenario', 'situation']) or
re.search(r'\bgiven\s+(?:that\s+)?[A-Z]', text)):
return "Case Study"
# Check for essay questions
if (any(word in text_lower for word in ['discuss', 'analyze', 'critique', 'evaluate', 'justify', 'examine', 'explore', 'elaborate', 'compare and contrast']) or
len(text.split()) > 50): # Long questions are likely essays
return "Essay"
# Check for short answer
if (any(word in text_lower for word in ['what', 'when', 'where', 'who', 'which', 'why', 'how', 'name', 'list']) or
'?' in text_lower or
len(text.split()) < 30): # Short questions
return "Short Answer"
# Default to long answer for anything that doesn't fit above
return "Long Answer"
def _extract_marks(self, text: str) -> int:
"""Extract marks from question text if specified."""
marks_match = re.search(r'\((\d+)\s*(?:marks?|points?)\)', text, re.IGNORECASE)
if marks_match:
return int(marks_match.group(1))
# Check for marks at the end of the question
marks_match = re.search(r'\[(\d+)\s*(?:marks?|points?)\]', text, re.IGNORECASE)
if marks_match:
return int(marks_match.group(1))
return 1 # Default marks
def _contains_formula(self, text: str) -> bool:
"""Check if question contains mathematical formulas with enhanced detection."""
# Basic math symbols
math_symbols = r'[∑∫∂∆√∛∜∞≤≥≠≈≡±×÷∈∉⊆⊂∪∩∅]|\\[a-zA-Z]+|\^[0-9a-zA-Z{}()]+|_[0-9a-zA-Z{}()]+|\b(?:sin|cos|tan|cot|sec|csc|log|ln|exp|sqrt|integral|derivative|lim|sum|prod|int|iint|iiint)\b'
# Common formula patterns
formula_patterns = [
r'\$[^$]+\$', # LaTeX inline math
r'\\\(.*?\\\)|\\\[.*?\\\]', # LaTeX display math
r'\b(?:eq\.?|equation|formula|theorem|proof|corollary|lemma|proposition)\b',
r'[a-zA-Z]\s*[=≠≈]\s*[a-zA-Z0-9+\-*/^()]+', # Equations like x = 2y + 3
r'\d+\s*[a-zA-Zα-ωΑ-Ω]\b', # Variables with coefficients
r'[a-zA-Z]\s*[+\-*/^]\s*[a-zA-Z0-9()]', # Basic operations with variables
r'\b(?:if|then|therefore|because|since|given|let|assume|suppose|consider)\b.*?[=≠≈<>]', # Conditional math
]
# Check for any math symbols or patterns
if re.search(math_symbols, text, re.IGNORECASE):
return True
# Check for formula patterns
if any(re.search(pattern, text, re.IGNORECASE | re.DOTALL) for pattern in formula_patterns):
return True
# Check for common math notation
if re.search(r'[a-zA-Z]\s*[{}]\s*[=:]', text): # Set notation or function definitions
return True
return False
def _is_question_end(self, line: str, question_text: List[str]) -> bool:
"""
Determine if the current line indicates the end of a question.
Args:
line: The current line being processed
question_text: List of lines in the current question
Returns:
bool: True if this line indicates the end of the question
"""
# Common question endings
endings = [
r'\b(?:end\s+of\s+questions?|stop|that\s+is\s+all|no\s+more\s+questions)',
r'\b(?:total|maximum|max)\s*[\[({]?\s*\d+\s*(?:marks?|points?|pts?\b)\s*[\])}]?',
r'\b(?:page|p\.?\s*)\d+\s*(?:of|/)\s*\d+\s*$',
r'\b(?:continued\s+on\s+next\s+page|cont\.?\s*\d+)\b',
r'\b(?:section|part|chapter)\s+[A-Z0-9]+\b',
r'^\s*\*{3,}\s*$', # Lines with *** or more
r'^\s*_{3,}\s*$', # Lines with ___ or more
r'^\s*-{3,}\s*$' # Lines with --- or more
]
# Check for ending patterns
if any(re.search(pattern, line, re.IGNORECASE) for pattern in endings):
return True
# Check if this looks like the start of a new section or header
if (re.match(r'^\s*[A-Z][A-Z\s]+$', line) and # All caps line
len(line.split()) < 5 and # Short line (likely a header)
len(question_text) > 1): # Already have some question text
return True
# Check for page numbers or footers
if (re.search(r'^\s*\d+\s*$', line) or # Just a number
re.search(r'^[A-Za-z]+\s+\d+\s*$', line)): # Month Year or similar
return True
return False
def _extract_multiple_choice_options(self, question: ExtractedQuestion, question_text: List[str]) -> None:
"""
Extract multiple choice options from question text and update the question object.
Args:
question: The question object to update
question_text: List of text lines for the question
"""
options = {}
current_option = None
option_pattern = re.compile(r'^\s*([a-zA-Z]|[ivx]+\)|\d+\.)\s*(.*)')
# Process each line to find options
for line in question_text:
match = option_pattern.match(line)
if match:
option_key = match.group(1).strip().lower()
option_text = match.group(2).strip()
# Skip if this looks like part of the question text
if current_option is None and len(option_text.split()) > 5: # Too long for an option
continue
current_option = option_key
options[option_key] = option_text
elif current_option is not None:
# Continue the current option if the line is indented or starts with a space
if line.startswith((' ', '\t')) and line.strip():
options[current_option] += ' ' + line.strip()
# Update the question object with the extracted options
if options:
if not hasattr(question, 'metadata'):
question.metadata = {}
question.metadata['options'] = options
# If we have a question mark in the first line, try to separate the question from options
first_line = question_text[0] if question_text else ''
if '?' in first_line and len(question_text) > 1:
question_parts = first_line.split('?', 1)
if len(question_parts) > 1 and question_parts[1].strip():
question.question_text = question_parts[0] + '?'
# The rest might be part of the first option
first_option = question_parts[1].strip()
if first_option and not any(k in first_option.lower() for k in options.keys()):
# If we don't already have this as an option, add it
first_letter = chr(ord('a') + len(options))
options[first_letter] = first_option
def _contains_diagram_marker(self, text: str) -> bool:
"""Check if question contains diagram-related markers with enhanced detection."""
# Basic diagram indicators
diagram_indicators = [
r'\b(?:diagram|figure|draw|sketch|illustration|graph|chart|plot|image|picture|schematic|blueprint|map)\b',
r'\blabel\s*(?:the|each|all|any|every|some|these|those|following|below|above|on|in|at|for|with|of)?\s*',
r'\b(?:show|indicate|mark|identify|point out|highlight|circle|box|shade|color|colour|outline|trace|plot)\b.*\b(on|in|at|for|with|of)\b.*\b(diagram|figure|graph|chart|image|picture|drawing|illustration)',
r'\b(refer|according|see|based on|using|use|given|following|shown|displayed|illustrated|depicted|represented)\b.*\b(diagram|figure|graph|chart|image|picture|drawing|illustration)',
r'\b(diagram|figure|graph|chart|image|picture|drawing|illustration)\s*[0-9]*\s*(?:shows|showing|illustrates|depicts|represents|demonstrates|presents|displays|contains|includes)',
r'\b(?:as|like|similar to|resembling|in the style of|in the form of|in the shape of|in the pattern of)\b.*\b(diagram|figure|graph|chart|image|picture|drawing|illustration)',
r'\b(?:with|having|containing|including|featuring|showing|displaying|illustrating|depicting|representing|demonstrating|presenting)\b.*\b(diagram|figure|graph|chart|image|picture|drawing|illustration)',
]
# Check for any diagram indicators
if any(re.search(pattern, text, re.IGNORECASE) for pattern in diagram_indicators):
return True
# Check for coordinate system references
if re.search(r'\b(?:x-?axis|y-?axis|origin|coordinate\s*system|grid|axes|quadrant|abscissa|ordinate)\b', text, re.IGNORECASE):
return True
# Check for geometric shape references
if re.search(r'\b(?:point|line|segment|ray|angle|triangle|square|rectangle|circle|ellipse|polygon|polyhedron|prism|pyramid|cylinder|cone|sphere|cube|rhombus|trapezoid|parallelogram|pentagon|hexagon|octagon|dodecagon|tetrahedron|octahedron|dodecahedron|icosahedron|ellipsoid|hyperboloid|paraboloid|torus)\b', text, re.IGNORECASE):
return True
return False
def __del__(self):
"""Ensure the PDF document is properly closed."""
if hasattr(self, 'doc'):
self.doc.close()
class QuestionExtractor:
def __init__(self):
self.stop_words = set(stopwords.words('english'))
self.current_section = ""
self.progress_callback = None
self.total_pages = 0
# Enhanced question patterns with better support for different formats
self.question_patterns = [
# Numbered questions (1., 2., etc.)
r'^(\d+)[\.\)\]\}\s]\s*',
# Lettered questions (a), b), etc.)
r'\(?([a-z])\)\s*',
# Roman numerals (i., ii., etc.)
r'([ivx]+)[\.\)\s]\s*',
# Q1, Q2 or Q1:, Q2:
r'[Qq]\s*(\d+)[\.\)\:]\s*',
# Question 1, Problem 2, etc.
r'(?:Question|Problem|Exercise|Task)\s*(\d+)[\.\)\: ]?\s*',
# Section-based numbering (1.1, 1.2, etc.)
r'(\d+\.\d+)[\.\)\s]\s*',
# Bullet points with numbers or letters
r'[•\-*]\s*(\d+|[a-z])\)?\s*'
]
# Keywords that might indicate a question
self.question_keywords = [
'what', 'when', 'where', 'why', 'how', 'explain', 'describe',
'calculate', 'solve', 'find', 'prove', 'show', 'determine',
'compare', 'contrast', 'discuss', 'evaluate', 'analyze', 'justify'
]
# Formula and math-related keywords
self.formula_keywords = [
'equation', 'formula', 'calculate', 'solve', 'find', 'derive', 'prove',
'compute', 'evaluate', 'simplify', 'factor', 'expand', 'integrate',
'differentiate', 'graph', 'plot', 'matrix', 'vector', 'theorem', 'proof'
]
# Difficulty indicators
self.difficulty_indicators = {
'easy': ['define', 'list', 'identify', 'name', 'recall', 'state', 'match'],
'medium': ['explain', 'describe', 'summarize', 'classify', 'compare', 'contrast'],
'hard': ['analyze', 'evaluate', 'justify', 'critique', 'design', 'formulate', 'prove']
}
def set_progress_callback(self, callback):
"""Set a callback function to report progress.
The callback should accept the following parameters:
- current_page: Current page being processed
- total_pages: Total number of pages
- message: Optional status message
"""
self.progress_callback = callback
def _report_progress(self, current_page, message=None):
"""Report progress using the callback if available."""
if self.progress_callback and self.total_pages > 0:
self.progress_callback(current_page, self.total_pages, message)
def process_document(self, document_id):
"""Process a document and extract questions.
Args:
document_id: ID of the document to process
Returns:
bool: True if processing was successful, False otherwise
"""
document = QuestionDocument.query.get(document_id)
if not document:
app.logger.error(f"Document {document_id} not found")
return False
try:
app.logger.info(f"Starting extraction for document {document_id}")
# Open the PDF to get total pages for progress tracking
try:
doc = fitz.open(document.file_path)
self.total_pages = len(doc)
doc.close()
app.logger.info(f"Document has {self.total_pages} pages")
except Exception as e:
app.logger.warning(f"Could not get total pages for document {document_id}: {str(e)}")
self.total_pages = 0
# Report initial progress
self._report_progress(0, "Starting document processing...")
# Extract questions from PDF
extractor = PDFQuestionExtractor(document.file_path)
# Set up progress reporting for the extractor
def extraction_progress(page_num, total_pages, message):
self._report_progress(page_num, message)
extractor.set_progress_callback(extraction_progress)
# Extract questions with progress reporting
extracted_questions = extractor.extract_questions()
# Report progress before saving to database
self._report_progress(
self.total_pages - 1 if self.total_pages > 0 else 0,
f"Extracted {len(extracted_questions)} questions. Saving to database..."
)
# Save extracted questions to database
saved_count = 0
for eq in extracted_questions:
try:
self.save_question({
'question_number': eq.question_number,
'question_text': eq.question_text,
'page_number': eq.page_number,
'section': eq.section,
'question_type': eq.question_type,
'marks': eq.marks,
'has_formula': eq.has_formula,
'has_diagram': eq.has_diagram,
'metadata': json.dumps(eq.metadata) if hasattr(eq, 'metadata') else None
}, document)
saved_count += 1
# Update progress every 5 questions
if saved_count % 5 == 0:
self._report_progress(
self.total_pages - 1 if self.total_pages > 0 else 0,
f"Saved {saved_count} of {len(extracted_questions)} questions..."
)
except Exception as save_error:
app.logger.error(f"Error saving question {eq.question_number if hasattr(eq, 'question_number') else 'unknown'}: {str(save_error)}", exc_info=True)
continue # Continue with next question even if one fails
# Update document status
try:
document.extraction_status = 'completed'
document.total_questions = saved_count
document.processed_at = datetime.utcnow()
db.session.commit()
app.logger.info(f"Successfully extracted and saved {saved_count} questions from document {document_id}")
return True
except Exception as commit_error:
app.logger.error(f"Error updating document status: {str(commit_error)}", exc_info=True)
db.session.rollback()
raise # Re-raise to be caught by outer exception handler
except Exception as e:
app.logger.error(f"Error processing document {document_id}: {str(e)}", exc_info=True)
try:
if document:
document.extraction_status = 'failed'
db.session.commit()
except Exception as status_error:
app.logger.error(f"Error updating document status to failed: {str(status_error)}", exc_info=True)
db.session.rollback()
return False
def extract_questions_from_pdf(self, pdf_path):
"""Extract questions from a PDF file."""
try:
extractor = PDFQuestionExtractor(pdf_path)
extracted_questions = extractor.extract_questions()
# Convert to list of dictionaries for compatibility
return [{
'question_number': q.question_number,
'question_text': q.question_text,
'page_number': q.page_number,
'section': q.section,
'question_type': q.question_type,
'marks': q.marks,
'has_formula': q.has_formula,
'has_diagram': q.has_diagram
} for q in extracted_questions]
except Exception as e:
app.logger.error(f"Error extracting questions from PDF {pdf_path}: {str(e)}", exc_info=True)
return []
def save_question(self, question_data, document):
"""Save a question to the database."""
try:
question = Question(
question_number=question_data.get('question_number', ''),
question_text=question_data.get('question_text', ''),
page_number=question_data.get('page_number', 1),
question_type=question_data.get('question_type', 'text'),
marks=question_data.get('marks', 1),
has_formula=question_data.get('has_formula', False),
has_image=question_data.get('has_diagram', False), # Map has_diagram to has_image
document_id=document.id,
created_at=datetime.utcnow()
)
db.session.add(question)
db.session.commit()
app.logger.debug(f"Saved question {question.id} for document {document.id}")
return question
except Exception as e:
app.logger.error(f"Error saving question for document {document.id}: {str(e)}", exc_info=True)
db.session.rollback()
return None
def categorize_question(self, question, subject):
"""Automatically categorize question by topic and unit."""
if not subject:
return
# Get all units and topics for this subject
units = Unit.query.filter_by(subject_id=subject.id).all()
if not units:
return
# Prepare text for analysis
question_text = question.question_text.lower()
tokens = word_tokenize(question_text)
tokens = [token for token in tokens if token.isalpha() and token not in self.stop_words]
clean_text = ' '.join(tokens)
best_unit = None
best_topic = None
best_unit_score = 0
best_topic_score = 0
# Compare with each unit and its topics
for unit in units:
unit_text = f"{unit.name} {unit.description or ''}".lower()
unit_score = self.calculate_similarity(clean_text, unit_text)
if unit_score > best_unit_score:
best_unit_score = unit_score
best_unit = unit
# Check topics within this unit
for topic in unit.topics:
topic_text = f"{topic.name} {topic.description or ''}".lower()
topic_score = self.calculate_similarity(clean_text, topic_text)
if topic_score > best_topic_score:
best_topic_score = topic_score
best_topic = topic
# Assign if confidence is above threshold
if best_unit_score > 0.1: # Threshold for unit assignment
question.unit_id = best_unit.id
question.unit_confidence = best_unit_score
if best_topic_score > 0.1: # Threshold for topic assignment
question.topic_id = best_topic.id
question.topic_confidence = best_topic_score
def calculate_similarity(self, text1, text2):
"""Calculate similarity between two texts using TF-IDF."""
if not text1.strip() or not text2.strip():
return 0
try:
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform([text1, text2])
similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])
return similarity[0][0]
except:
return 0
def __init__(self):
pass
def generate_question_paper(self, subject_id, unit_ids=None, topic_ids=None,
total_marks=100, difficulty_distribution=None):
"""Generate a question paper based on specified criteria with ResearchNest signature and watermark."""
from reportlab.lib.pagesizes import A4
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Image,
PageBreak, Table, TableStyle, PageTemplate, Frame
)
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch, cm
from reportlab.lib import colors
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
from datetime import datetime
import os
import sys
from flask import current_app
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# Define a fallback static folder path if not in app context
try:
static_folder = current_app.static_folder
except RuntimeError:
# If we're not in an app context, use a relative path
static_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'app', 'static')
# Register font for watermark
try:
pdfmetrics.registerFont(TTFont('Roboto-Light', 'app/static/fonts/Roboto-Light.ttf'))
pdfmetrics.registerFont(TTFont('Roboto-Bold', 'app/static/fonts/Roboto-Bold.ttf'))
except:
# Fallback to default font if custom font not available
pass
# Default difficulty distribution
if not difficulty_distribution:
difficulty_distribution = {'easy': 0.3, 'medium': 0.5, 'hard': 0.2}
# Get questions based on criteria
questions = self.select_questions(
subject_id, unit_ids, topic_ids, total_marks, difficulty_distribution
)
if not questions:
return None
# Generate PDF
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'ResearchNest_QuestionPaper_{timestamp}.pdf'
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], 'question_papers', filename)
# Ensure directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Define styles
styles = getSampleStyleSheet()
# Add custom styles
title_style = ParagraphStyle(
'Title',
parent=styles['Heading1'],
fontSize=18,
spaceAfter=12,
alignment=1, # Center aligned
fontName='Helvetica-Bold',
textColor=colors.HexColor('#2c3e50')
)
subtitle_style = ParagraphStyle(
'Subtitle',
parent=styles['Heading2'],
fontSize=14,
spaceAfter=20,
alignment=1,
fontName='Helvetica',
textColor=colors.HexColor('#34495e')
)
question_style = ParagraphStyle(
'Question',
parent=styles['Normal'],
fontSize=11,
spaceAfter=12,
leading=14,
fontName='Helvetica',
textColor=colors.HexColor('#2c3e50')
)
instruction_style = ParagraphStyle(
'Instruction',
parent=styles['Italic'],
fontSize=10,
spaceAfter=16,
leading=12,
fontName='Helvetica-Oblique',
textColor=colors.HexColor('#7f8c8d')
)
# Create a custom header and footer function with watermark
def add_header_footer(canvas, doc):
canvas.saveState()
# Add watermark
canvas.saveState()
canvas.setFont('Helvetica', 60)
# Using a very light gray color instead of alpha transparency
canvas.setFillColor(colors.HexColor('#f0f0f0'))
canvas.translate(A4[0]/2, A4[1]/2)
canvas.rotate(45)
canvas.drawCentredString(0, 0, "RESEARCHNEST")
canvas.restoreState()
# Draw header line
canvas.setStrokeColor(colors.HexColor('#3498db'))
canvas.setLineWidth(1)
canvas.line(50, A4[1] - 60, A4[0] - 50, A4[1] - 60)
# Add header
canvas.setFont('Helvetica-Bold', 10)
canvas.setFillColor(colors.HexColor('#3498db'))
canvas.drawString(50, A4[1] - 45, "RESEARCHNEST - ACADEMIC QUESTION PAPER")
# Add page number
page_num = canvas.getPageNumber()
canvas.drawRightString(A4[0] - 50, A4[1] - 45, f"Page {page_num}")
# Draw footer line
canvas.setStrokeColor(colors.HexColor('#e74c3c'))
canvas.setLineWidth(0.5)
canvas.line(50, 50, A4[0] - 50, 50)
# Add footer text
canvas.setFont('Helvetica', 8)
canvas.setFillColor(colors.HexColor('#7f8c8d'))
# Left side - Copyright notice
current_year = datetime.now().year
text = f"© {current_year} ResearchNest. All rights reserved."
canvas.drawString(50, 35, text)
# Right side - Generation timestamp
text = f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
text_width = canvas.stringWidth(text, 'Helvetica', 8)
canvas.drawString(A4[0] - 50 - text_width, 35, text)
# Add ResearchNest logo or text fallback
logo_path = os.path.join(static_folder, 'img', 'researchnest-logo.png')
logo_found = False
# Try to load the logo if it exists
if os.path.exists(logo_path):
try:
logo = Image(logo_path, width=2*cm, height=0.7*cm)
logo.drawOn(canvas, (A4[0] - 2*cm)/2, 35) # Center the logo
logo_found = True
except Exception as e:
app.logger.warning(f"Could not add logo: {str(e)}")
# If logo not found or failed to load, use text fallback
if not logo_found:
try:
canvas.setFont('Helvetica-Bold', 12)
canvas.setFillColor(colors.HexColor('#3498db'))
text = "RESEARCHNEST"
text_width = canvas.stringWidth(text, 'Helvetica-Bold', 12)
canvas.drawString((A4[0] - text_width)/2, 35, text)
except Exception as e:
app.logger.warning(f"Could not add text fallback: {str(e)}")
canvas.restoreState()
# Get subject information for document metadata
subject_name = 'General'
try:
subject = Subject.query.get(subject_id)
if subject:
subject_name = subject.name
except Exception as e:
app.logger.warning(f"Could not load subject: {str(e)}")
# Create document with custom header and footer
doc = SimpleDocTemplate(
file_path,
pagesize=A4,
rightMargin=50,
leftMargin=50,
topMargin=80, # More space for header
bottomMargin=70, # More space for footer
title=f"ResearchNest Question Paper - {timestamp}",
author="ResearchNest Platform",
subject=f"Generated Question Paper - {subject_name}",
creator="ResearchNest Platform",
producer="ResearchNest"
)
# Override build method to include header and footer on all pages
def build_with_watermark(story, **kwargs):
return SimpleDocTemplate.build(
doc,
story,
onFirstPage=add_header_footer,
onLaterPages=add_header_footer,
**kwargs
)
doc.build = build_with_watermark
# Start building the document
story = []
# Add title and subtitle
story.append(Paragraph("RESEARCHNEST", title_style))
story.append(Paragraph("Question Paper", subtitle_style))
# Add a decorative line
story.append(Spacer(1, 12))
story.append(Paragraph("<b><font color='#e74c3c'>" + "•"*50 + "</font></b>", styles['Normal']))
story.append(Spacer(1, 20))
# Add paper metadata in a table for better organization