-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcount_classes.py
More file actions
239 lines (216 loc) · 10.8 KB
/
Copy pathcount_classes.py
File metadata and controls
239 lines (216 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import pandas as pd
import regex as re
import os
import itertools
def strip_alphanumeric(s):
# Use a regular expression to remove alphanumeric characters from the start and end of the string
return re.sub(r'^\D+|\D+$', '', s)
# write string to a .txt file
def write_to_file(file_path, content):
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
def generate_permutations(input_list):
permutations = []
for r in range(1, len(input_list) + 1):
permutations.extend(itertools.permutations(input_list, r))
return permutations
def classify_string(s: str) -> list[int]:
if s.isdigit():
return [int(s)]
elif ', ' in s:
classifications = s.split(', ')
elif ',' in s:
classifications = s.split(',')
elif ' ' in s:
classifications = s.split(' ')
else:
return []
return [int(c) for c in classifications if c.isdigit()]
def list_txt_files(directory):
txt_files = []
# Walk through the directory
for filename in os.listdir(directory):
full_path = os.path.join(directory, filename)
# Check if it's a file and has a .txt extension
if os.path.isfile(full_path) and filename.endswith('.txt'):
# Remove the extension and add to the list
txt_files.append(os.path.splitext(filename)[0])
return txt_files
def calculate_class_scores(row, run_columns, numruns):
""" Calculate class scores for each row based on occurrences in run columns """
# Get all possible classes from the 'classes' column
classes_text = row['classes']
all_classes = [cls.strip() for cls in classes_text.split('|')]
# Count occurrences of each class across all run columns
class_counts = {}
for run_col in run_columns:
run_value = row[run_col]
try:
# Parse the list string (e.g., "['all areas', 'high importance to biodiversity is emphasized']")
if isinstance(run_value, list):
for class_name in run_value:
if class_name in all_classes:
if class_name in class_counts:
# If the key exists, increase its value by 1
class_counts[class_name] += 1
else:
# If the key doesn't exist, add it with value 1
class_counts[class_name] = 1
except (ValueError, SyntaxError):
# If parsing fails, skip this entry
continue
# Create tuples with class name and score
class_scores = []
relevant_classes = []
for class_name in all_classes:
count = class_counts.get(class_name, 0)
score = count / numruns
class_scores.append((class_name, score))
if score > 0.5:
relevant_classes.append(class_name)
return class_scores, relevant_classes
source_folder_path = input("Source folder path: ")
results_folder_path = os.path.join(source_folder_path, "review")
# The selection string is the string that is common for all output folders you want to consider aggregating
# e.g. If you want to obtain the majority vote for resulting classifications in folders:
# 2025_05_13_12hour_02min_48sec
# 2025_05_13_12hour_03min_01sec
# 2025_05_13_12hour_03min_11sec
# then you would choose as selection string 2025_05_20_16 to just gather all results from folder starting their name with "2025_05_20_16"
# NB: this script assumes that answers.tsv files are sorted in order of filename ascending, then question_id ascending!!
selection_string = input("Selection string: ")
# make a list of all relevant folder names in results_folder_path
folder_names = [f for f in os.listdir(results_folder_path) if f.startswith(selection_string)]
# for each folder in folder_names
first = True
to_classify_results = {}
valid_to_classifies = list_txt_files(source_folder_path)
# loop over folder names
for counter, folder_name in enumerate(folder_names):
question_ids = []
questions = []
question_templates = []
to_classifies = []
classes = []
classifications = []
error_comments = []
answers_folder_path = os.path.join(results_folder_path, folder_name, 'answers.tsv')
df_answers = pd.read_csv(answers_folder_path, sep='\t', engine='python')
# loop over rows in aswers.tsv
for index, row in df_answers.iterrows():
question_classification = row['question_classification']
# only consider classification questions
if question_classification == "y":
question_template = row['question_template']
question_id = row['question_id']
question = row['question']
classes_string = row['classes']
to_classify = row['answer'].split("|")[0].split(':')[0]
# only take classification of the first element to classify
classification_answer, comment = (row['answer'].split("|")[0].split(':')[1].strip(), None) if ':' in row['answer'] else ("", f'Error classifications for {to_classify}')
# if classification contains a space, take only the part before the space and convert to int
classification = classify_string(classification_answer)
if classification is not None:
classification.sort()
mapping = row['classes'].split('|')
# map classification number to classification description
classification = [mapping[int(value) - 1] if int(value) <= len(mapping) else 'Classification Index out of Range' for value in classification]
else:
classification = None
# append lists for insertion into dataframe
question_ids.append(question_id)
questions.append(question)
question_templates.append(question_template)
to_classifies.append(to_classify)
classes.append(classes_string)
classifications.append(classification)
error_comments.append(comment)
if first:
df_results = pd.DataFrame({"question_id": question_ids,
"question": questions,
"question_template": question_templates,
"to_classify": to_classifies,
"classes": classes,
"run_" + str(counter + 1): classifications,
"errors_run_" + str(counter + 1): error_comments})
first = False
else:
# append the classifications to the dataframe as a new column
df_results["run_" + str(counter + 1)] = classifications
df_results["errors_run_" + str(counter + 1)] = error_comments
# determine the number of occurences per class
numruns = len(folder_names)
run_columns = [col for col in df_results.columns if col.startswith('run_')]
# Add scores per class and relevant classes to the results dataframe
class_scores_list = []
relevant_classes_list = []
# df_results["class_scores"] = [[] for _ in range(len(df_results))]
for index, row in df_results.iterrows():
# Apply the function to create the new column
class_scores, relevant_classes = calculate_class_scores(row, run_columns, numruns)
class_scores_list.append(class_scores)
relevant_classes_list.append(relevant_classes)
df_results["class_scores"] = class_scores_list
df_results["relevant_classes"] = relevant_classes_list
# write the content to file multiple_run_answers.csv
df_results.to_csv(os.path.join(results_folder_path, "multiple_run_answers.csv"), index=False)
#### LIST ALL COUNTRIES PER QUESTION AND CLASS ####
# Take file "questions.csv" from first folder in list of folders
questions_folder_path = os.path.join(results_folder_path, folder_names[0], 'questions.csv')
df_questions = pd.read_csv(questions_folder_path, sep=',')
content = ""
# loop over all questions
for _, row in df_questions.iterrows():
question = row["Question"]
content += f"question: {question}:\n"
classes = row['Classes'].split('|')
for my_class in classes:
cnt = 0
content += f"{my_class}: "
for _, row in df_results.iterrows():
if question == row["question"]:
if my_class in row["relevant_classes"]:
content += row["to_classify"] + ", "
cnt += 1
content = content[:-2] + "\n"
length_valid_to_classifies = max(len(valid_to_classifies), 1) # to avoid division by zero
percentage_occurrences = round(cnt / length_valid_to_classifies * 100, 1)
content += f"percentage of occurrences: {percentage_occurrences}%\n"
content += "\n\n"
# write the content to file multiple_run_answers_summary.txt
write_to_file(file_path=os.path.join(results_folder_path, "multiple_run_answers_summary.txt"), content=content)
# gather all combinations of classes mentioned in the answers of the countries
# approach:
# sort df_results first by question_id, then by relevant classes, then country name
# df_results = df_results.sort_values(by=["question_id", "relevant_classes", "to_classify"]).reset_index(drop=True)
content = ""
# loop over all questions
for _, row in df_questions.iterrows():
question = row["Question"]
content += f"question: {question}:\n"
# loop over all rows in df_results of this question to obtain a list of distinct relvant classes combinations
seen_combinations = set()
for _, row in df_results.iterrows():
if question == row["question"]:
relevant_classes = row["relevant_classes"]
if len(relevant_classes) > 1:
combinations_of_two = set(itertools.combinations(relevant_classes, 2))
for combination in combinations_of_two:
if combination not in seen_combinations:
seen_combinations.add(combination)
# Now count percentages of combined classes
for combination in seen_combinations:
cnt_combination = 0
for _, row in df_results.iterrows():
if question == row["question"]:
relevant_classes = row["relevant_classes"]
if len(relevant_classes) > 1:
combinations_of_two = set(itertools.combinations(relevant_classes, 2))
if combination in combinations_of_two:
cnt_combination += 1
length_valid_to_classifies = max(len(valid_to_classifies), 1) # to avoid division by zero
percentage_occurrences = round(cnt_combination / length_valid_to_classifies * 100, 1)
content += f"percentage of co-occurence combination: {combination[0]} and {combination[1]}: {percentage_occurrences}% ({cnt_combination} co-occurrences, of total {length_valid_to_classifies} occurrences)\n"
content += "\n\n"
# write the content to file multiple_run_answers_summary.txt
write_to_file(file_path=os.path.join(results_folder_path, "multiple_run_co-occurences.txt"), content=content)