This repository was archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathskopeo_mirror_wrapper.py
153 lines (122 loc) · 4.54 KB
/
skopeo_mirror_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
import re
import requests
import subprocess
import sys
import yaml
from loguru import logger
# environment variables
mirror_source = os.getenv("MIRROR_SOURCE_URL", "https://raw.githubusercontent.com/osism/sbom/main/mirrors.yaml")
mirror_dest = os.getenv("MIRROR_DEST_URL", "registry.airgap.services.osism.tech")
chunk_size = int(os.getenv("CHUNK_SIZE", "0"))
chunk_number = int(os.getenv("CHUNK_NUMBER", "0"))
filter = os.getenv("FILTER", "")
limit = int(os.getenv("LIMIT", "0"))
# prepare logger format
logger.remove()
logger_format = (
"<black>{time:YYYY-MM-DD HH:mm:ss.SSS}</black> | "
"<level>{level: <8}</level> | "
"<cyan>{extra[container]: <52}</cyan> | "
"<level>{message}</level>"
)
logger.configure(extra={"container": ""})
logger.add(sys.stdout, format=logger_format)
def load_yaml() -> dict:
result = requests.get(mirror_source)
try:
result = yaml.safe_load(result.content)
except yaml.YAMLError as exc:
print(exc)
containers = []
# we are using chunks
if chunk_size > 0 and chunk_number > 0:
starting_point = chunk_size * (chunk_number - 1) # substract one, to start at 0
counter = 0
while counter < chunk_size:
try:
containers.append(result['containers'][counter + starting_point])
except IndexError:
pass
counter = counter + 1
else:
containers = result['containers']
return containers
def get_tags(reg: str, org: str, img: str) -> list:
page = 0
tags = []
if reg == "quay.io":
# quay, page elements: 50
api = f"https://quay.io/api/v1/repository/{org}/{img}/tag/?page="
while True:
page = page + 1
url = f"{api}{page}"
results = requests.get(url)
# break if all pages have been scrubbed
if results.json()['tags'] == []:
break
for result in results.json()['tags']:
if "expiration" not in result:
tags.append(result['name'])
else:
# docker, page elements: 10
api = f"https://registry.hub.docker.com/v2/repositories/{org}/{img}/tags/?page="
while True:
page = page + 1
url = f"{api}{page}"
results = requests.get(url)
# break if all pages have been scrubbed
if 'results' not in results.json():
break
# loop through all tags and append them
for result in results.json()['results']:
# skip windows images (traefik)
for arch in result['images']:
if arch["os"] != "windows":
break
else:
continue
if result['tag_status'] == "active":
tags.append(result['name'])
return tags
def main():
containers = load_yaml()
for container in containers:
smw_logger = logger.bind(container=container)
smw_logger.info("fetching all tags")
reg, org, img = container.split("/")
limit_counter = 0
for tag in get_tags(reg=reg, org=org, img=img):
# apply filters
if filter != "":
if not re.match(filter, tag):
continue
# if limit is set, pull only the given amount of images
if limit != 0:
limit_counter = limit_counter + 1
if limit_counter > limit:
break
smw_logger.info(f"tag {tag} started")
result = requests.get(f"https://{mirror_dest}/v2/{org}/{img}/tags/list")
if "tags" in result.json():
if tag in result.json()['tags']:
smw_logger.success(f"tag {tag} mirrored")
continue
source_uri = f"docker://{reg}/{org}/{img}:{tag}"
destination_uri = f"docker://{mirror_dest}/{org}/{img}:{tag}"
command = ["skopeo", "copy", source_uri, destination_uri]
result = subprocess.run(
command,
capture_output=True,
check=True,
encoding="UTF-8"
)
if result.returncode > 0:
if result.stderr:
smw_logger.error(f"tag {tag} {result.stderr}")
else:
smw_logger.critical(f"tag {tag} Return code {result.returncode}. No error message received.")
else:
smw_logger.success(f"tag {tag} mirrored")
if __name__ == "__main__":
main()