Skip to content

Commit

Permalink
Update snapchat_dlp.py
Browse files Browse the repository at this point in the history
Fixing download thanks to

skyme5#24 (comment)
  • Loading branch information
JaKrIvMa authored Oct 4, 2024
1 parent dbbdce6 commit eac1916
Showing 1 changed file with 64 additions and 82 deletions.
146 changes: 64 additions & 82 deletions snapchat_dlp/snapchat_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ def __init__(
self.regexp_web_json = (
r'<script\s*id="__NEXT_DATA__"\s*type="application\/json">([^<]+)<\/script>'
)
self.reaponse_ok = requests.codes.get("ok")

def _api_response(self, username):
web_url = self.endpoint_web.format(username)
return requests.get(web_url).text
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/58.0.3029.110 Safari/537.3"
)
}
return requests.get(web_url, headers=headers)

def _web_fetch_story(self, username):
"""Download user stories from Web.
Expand All @@ -55,13 +61,18 @@ def _web_fetch_story(self, username):
APIResponseError: API Error
Returns:
(dict, dict): user_info, stories
(list, dict): stories, user_info
"""
retries = 3
timeout = 15

response = self._api_response(username)
response_json_raw = re.findall(self.regexp_web_json, response)

if response.status_code != requests.codes.ok:
if response.status_code == 404:
logger.error(f"User '{username}' not found (404).")
else:
logger.error(f"Failed to fetch data for '{username}'. Status code: {response.status_code}.")
raise APIResponseError

response_json_raw = re.findall(self.regexp_web_json, response.text)

try:
response_json = json.loads(response_json_raw[0])
Expand All @@ -72,7 +83,7 @@ def util_web_user_info(content: dict):
field_id = user_profile["$case"]
return user_profile[field_id]
else:
raise UserNotFoundError(f"Could not find Snapchat user {username}")
raise UserNotFoundError

def util_web_story(content: dict):
if "story" in content["props"]["pageProps"]:
Expand All @@ -82,87 +93,58 @@ def util_web_story(content: dict):
user_info = util_web_user_info(response_json)
stories = util_web_story(response_json)
return stories, user_info
except (IndexError, KeyError, ValueError):
except (IndexError, KeyError, ValueError) as e:
logger.error(f"Error parsing response for '{username}': {e}")
raise APIResponseError
except UserNotFoundError as e:
print(
f"User {username} not found. Check the spelling, or that it is a public profile."
)
# print(e)

def _download_media(self, media, username, snap_user):
snap_id = media["snapId"]["value"]
media_url = media["snapUrls"]["mediaUrl"]
media_type = media["snapMediaType"]
timestamp = int(media["timestampInSec"]["value"])
date_str = strf_time(timestamp, "%Y-%m-%d")

dir_name = os.path.join(self.directory_prefix, username, date_str)
os.makedirs(dir_name, exist_ok=True) # Ensure the directory exists

filename = strf_time(timestamp, "%Y-%m-%d_%H-%M-%S {} {}.{}").format(
snap_id, username, MEDIA_TYPE[media_type]
)

if self.dump_json:
filename_json = os.path.join(dir_name, f"{filename}.json")
media_json = dict(media)
media_json["snapUser"] = snap_user
dump_response(media_json, filename_json)

media_output = os.path.join(dir_name, filename)
return media_url, media_output

def download(self, username):
"""Download Snapchat Story for `username`.
Args:
username (str): Snapchat `username`
Returns:
[bool]: story downloader
"""
stories = None
snap_user = None

skip_user = False
story_download_count = 0
try:
stories, snap_user = self._web_fetch_story(username)
except TypeError as e:
print(
f"Error with above command. \n| Args: username: {str(username)} \n| stories: {stories} \n| snap_user: {snap_user} \n| self: {str(self)} \n| Error message: {str(e)}"
)
skip_user = True
pass
if len(stories) == 0 or stories is None:
if self.quiet is False:
logger.info("\033[91m{}\033[0m has no stories".format(username))
print(
"Found no stories. Possible reasons:\n - Typo in the username\n - User currently does not have any published stories\n - The profile is not a public profile"
)
skip_user = True
pass
# raise NoStoriesFound

if self.limit_story > -1:
stories = stories[0 : self.limit_story]

logger.info("[+] {} has {} stories".format(username, len(stories)))
if not skip_user:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
try:
for media in stories:
snap_id = media["snapId"]["value"]
media_url = media["snapUrls"]["mediaUrl"]
media_type = media["snapMediaType"]
timestamp = int(media["timestampInSec"]["value"])
date_str = strf_time(timestamp, "%Y-%m-%d")

dir_name = os.path.join(self.directory_prefix, username, date_str)

filename = strf_time(timestamp, "%Y-%m-%d_%H-%M-%S {} {}.{}").format(
snap_id, username, MEDIA_TYPE[media_type]
)

if self.dump_json:
filename_json = os.path.join(dir_name, filename + ".json")
media_json = dict(media)
media_json["snapUser"] = snap_user
dump_response(media_json, filename_json)

media_output = os.path.join(dir_name, filename)

# # Check if file exists
if not os.path.isfile(media_output):
story_download_count += 1

executor.submit(
download_url, media_url, media_output, self.sleep_interval
)

except KeyboardInterrupt:
executor.shutdown(wait=False)
executor.shutdown(wait=True)
logger.info(
"[✔] {}'s stories downloaded, downloaded {} this session, {} already existed".format(
username, story_download_count, len(stories) - story_download_count
)
)
except APIResponseError:
logger.error(f"Could not fetch data for '{username}'. The user may not exist or has no public stories.")
return

if not stories:
if not self.quiet:
logger.info(f"{username} has no stories.")
return

logger.info(f"[+] {username} has {len(stories)} stories.")

executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
try:
for media in stories:
media_url, media_output = self._download_media(media, username, snap_user)
executor.submit(download_url, media_url, media_output, self.sleep_interval)
except KeyboardInterrupt:
executor.shutdown(wait=False)

logger.info(f"[✔] {len(stories)} stories downloaded for {username}.")

0 comments on commit eac1916

Please sign in to comment.