-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain_batch.py
145 lines (125 loc) · 5.84 KB
/
main_batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import csv
import argparse
import os
import json
from flow import cold_outreach_flow
def main():
"""
Batch processing script for the Cold Outreach Opener Generator.
Processes multiple people from a CSV file and generates personalized opening messages.
"""
# Parse command line arguments
parser = argparse.ArgumentParser(description='Process multiple cold outreach targets from a CSV file.')
parser.add_argument('--input', default='input.csv', help='Input CSV file (default: input.csv)')
parser.add_argument('--output', default='output.csv', help='Output CSV file (default: output.csv)')
args = parser.parse_args()
# Check if input file exists
if not os.path.exists(args.input):
print(f"Error: Input file '{args.input}' not found.")
return
# Hardcoded personalization factors
personalization_factors = [
{
"name": "personal_connection",
"description": "Check if target person has Columbia University affiliation",
"action": "If they do, mention shared connection to Columbia"
},
{
"name": "recent_promotion",
"description": "Check if target person was recently promoted",
"action": "If they were, congratulate them on their new role"
},
{
"name": "recent_talks",
"description": "Check if target person gave talks recently",
"action": "If they did, mention enjoying their insights"
}
]
# Extract factor names for later use
factor_names = [factor["name"] for factor in personalization_factors]
print(factor_names)
# Hardcoded style preference
style = "Be concise, specific, and casual in 30 words or less. For example: 'Heard about your talk on the future of space exploration—loved your take on creating a more sustainable path for space travel.'"
# Read input CSV
input_data = []
with open(args.input, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if 'first_name' in row and 'last_name' in row and 'keywords' in row:
input_data.append(row)
if not input_data:
print(f"Error: No valid data found in '{args.input}'. CSV should have columns: first_name, last_name, keywords")
return
# Process each person
results = []
total = len(input_data)
for i, person in enumerate(input_data, 1):
print(f"\nProcessing {i}/{total}: {person['first_name']} {person['last_name']}")
# Prepare input data
shared = {
"input": {
"first_name": person['first_name'],
"last_name": person['last_name'],
"keywords": person['keywords'],
"personalization_factors": personalization_factors,
"style": style
}
}
# Run the flow
try:
cold_outreach_flow.run(shared)
# Prepare result row
# Extract URLs as comma-separated string
urls = [result.get("link", "") for result in shared.get("search_results", []) if "link" in result]
url_string = ",".join(urls)
# Extract personalization details
personalization_data = {}
for factor_name, details in shared.get("personalization", {}).items():
personalization_data[factor_name + "_actionable"] = str(details.get("actionable", False))
personalization_data[factor_name + "_details"] = details.get("details", "")
result = {
'first_name': person['first_name'],
'last_name': person['last_name'],
'keywords': person['keywords'],
'opening_message': shared.get("output", {}).get("opening_message", ""),
'search_results': url_string,
**personalization_data # Add all personalization fields
}
results.append(result)
# Display the result
print(f"Generated opener: {result['opening_message']}")
except Exception as e:
print(f"Error processing {person['first_name']} {person['last_name']}: {str(e)}")
# Add failed row with error message
results.append({
'first_name': person['first_name'],
'last_name': person['last_name'],
'keywords': person['keywords'],
'opening_message': f"ERROR: {str(e)}",
'search_results': "",
# Include empty personalization fields for consistency with successful rows
**{f"{factor}_actionable": "False" for factor in factor_names},
**{f"{factor}_details": "" for factor in factor_names}
})
# Write results to output CSV
if results:
# Determine all field names by examining all the result rows
all_fields = set()
for result in results:
all_fields.update(result.keys())
fieldnames = ['first_name', 'last_name', 'keywords', 'opening_message', 'search_results']
# Add personalization fields in a specific order
for factor in factor_names:
if f"{factor}_actionable" in all_fields:
fieldnames.append(f"{factor}_actionable")
if f"{factor}_details" in all_fields:
fieldnames.append(f"{factor}_details")
with open(args.output, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"\nProcessing complete. Results written to '{args.output}'")
else:
print("\nNo results to write.")
if __name__ == "__main__":
main()