You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.

279 lines
9.8 KiB
Python

#!/usr/bin/python3
import subprocess
import json
import sys
import requests
import os
from pathlib import Path
WORKING_DIR = os.path.dirname(os.path.realpath(__file__)) + '/'
with open(WORKING_DIR + 'services.json') as f:
services_file = json.load(f)
#import sys
#data = json.loads(sys.argv[1])
#print(data[sys.argv[2]])
##### EDIT THIS SECTION #####
# Image prefix for all local images
# ex.: local/drawio
LOCAL_PREFIX = "local"
# Directory where scripts used to restart services are stored.
# The scripts must be named "<service_name>.sh" (without the "local/" part)
RESTART_SCRIPTS_DIR = "restart/"
# Directory where scripts used to build local images are stored.
# The scripts must be named "<service_name>.sh" (without the "local/" part)
BUILD_SCRIPTS_DIR = "build/"
##### DO NOT EDIT BELOW #####
# Endpoint used to get auth tokens
# Replace %s by "repo/name" (ex.: "library/alpine")
AUTH_TOKEN_ADDRESS = 'https://auth.docker.io/token?service=registry.docker.io&scope=repository:%s:pull'
# Endpoint used to get image digests
# Replace %s by "repo/name"
# Replace %s by the target tag (ex.: "latest")
IMAGE_DIGEST_ADDRESS = 'https://index.docker.io/v2/%s/manifests/%s'
# Headers used for the "get image digests" request.
# Replace %s by the auth token.
IMAGE_DIGEST_HEADERS = {'Authorization': 'Bearer %s', 'Accept': 'application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json'}
# Docker commands
# Replace %s by "repo/name", second %s by tag
DOCKER_GETID = ["docker", "inspect", "--format", "{{index .RepoDigests 0}}"]
DOCKER_GETID_LOCAL = ["docker", "inspect", "--format", "{{.Id}}"]
DOCKER_GETLAYERS = ["docker", "inspect", "--format", "{{.RootFS.Layers}}"]
DOCKER_PULL = ["docker", "pull"]
DOCKER_REMOVE = ["docker", "rmi"]
# Colors class
class c:
SOFTWARE = '\033[35m'
UPDATE = '\033[93m'
SUCCESS = '\033[92m'
OK = '\033[94m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Texts class
class t:
UPDATE_CHECK = "Checking for updates..."
FP_CHECK = "Local image, comparing layers with the parent image..."
DH_CHECK = "DockerHub image, getting hashes from the Docker API..."
UPDATE_ASK = "Update found. Proceed? [y/N] "
UPDATE_SKIP = "Skipping update."
UPDATE_CHILDREN = "Children found, updating."
NO_UPDATE = "No update found."
LOCALVER_GET = "Getting information on the current image..."
TOKEN_GET = "Getting API token..."
DHVER_GET = "Checking for a new version using the token..."
PULLING = "Pulling newer image..."
PULLED = "Pull complete."
PULL_FAILED = "Pull failed. Please try it yourself."
BUILDING = "Building child image from Dockerfile..."
BUILT = "Building complete."
BUILD_FAILED = "Building failed. Please try it yourself."
DELETING = "Trying to delete the old image..."
DELETED = "Old image deleted."
DELETE_FAILED = "Failed to delete the old image. Dependent child images?"
SRV_RESTARTING = "Restarting the service..."
SRV_RESTARTED = "Service restarted."
ARGS_ERROR = "Error : Invalid argument"
OCI_DETECTED = "OCI image detected, retrying request..."
HELP_MSG = "\
Usage: ./update-images.py [Option(s)]\n\
Available options:\n\
-h --help => print this help message\n\
-y --assume-yes => does not ask for confirmation when an image needs to be updated"
print(c.BOLD + "--- 42l's docker update script v2 ---" + c.ENDC)
# Prints the image name being updated
def print_sw(sw, msg, color):
print(c.SOFTWARE + c.BOLD + "[" + sw + "]" + c.ENDC + " "
+ color + msg + c.ENDC)
return
# Asks if an update is necessary for a specified image
def ask_sw(sw, msg, color):
return input(c.SOFTWARE + c.BOLD + "[" + sw + "]" + c.ENDC + " "
+ color + msg + c.ENDC).lower() == 'y'
# Pulls the specified image from DockerHub and returns its digest
def pull_image(s):
print_sw(s["name"], t.PULLING, c.OK)
# Use command to get local image hash
pull_cmd = DOCKER_PULL.copy()
pull_cmd.append(get_name(s))
response = subprocess.run(pull_cmd, stdout=subprocess.PIPE)
#digest_local = response.stdout.decode('utf-8').splitlines()[-1].replace('\n', '')
return response.returncode
def remove_image(s, fingerprint):
rmi_cmd = DOCKER_REMOVE.copy()
# Local image hash if local image, remote content digest if remote image
if LOCAL_PREFIX + "/" in s["name"]:
rmi_cmd.append(fingerprint)
else:
rmi_cmd.append(s["name"] + ":" + s["tag"] + "@" + fingerprint)
del_res = subprocess.run(rmi_cmd, stdout=subprocess.PIPE)
return del_res.returncode
def call_script(s, directory):
filename = Path(directory + s["name"].split('/')[1] + ".sh")
if filename.is_file():
call_res = subprocess.call([filename])
return call_res
else:
return 1
def get_name(s):
return s["name"] + ":" + s["tag"]
# Checks for an image update using API. Returns the image's digest.
def lookup_new(s):
print_sw(s["name"], t.TOKEN_GET, c.OK)
# Request a token
# TODO: handle request fail, missing token key in response
token_r = AUTH_TOKEN_ADDRESS
token_r = requests.get(token_r % s["name"])
tok = token_r.json()["token"]
print_sw(s["name"], t.DHVER_GET, c.OK)
imdig_address = IMAGE_DIGEST_ADDRESS
imdig_address = imdig_address % (s["name"], s["tag"])
imdig_headers = IMAGE_DIGEST_HEADERS.copy()
imdig_headers["Authorization"] = imdig_headers["Authorization"] % tok
# TODO: handle request fail, missing config key in headers, missing digest key in config
digest_r = requests.get(imdig_address,headers=imdig_headers)
#print (digest_r.headers)
return digest_r.headers["docker-content-digest"]
# Gets a local image's digest.
def lookup_current(s):
print_sw(s["name"], t.LOCALVER_GET, c.OK)
# Use command to get local image hash if it is a local image,
# remote content digest otherwise
if LOCAL_PREFIX + "/" in s["name"]:
cmd = DOCKER_GETID_LOCAL.copy()
else:
cmd = DOCKER_GETID.copy()
cmd.append(get_name(s))
response = subprocess.run(cmd, stdout=subprocess.PIPE)
# Docker responds with something like (image name)@sha256:digesthash
try:
digest_local = response.stdout.decode('utf-8').splitlines()[-1].replace('\n', '').split("@")[1]
except:
return ""
return digest_local
def lookup_layers(tag):
cmd = DOCKER_GETLAYERS.copy()
cmd.append(tag)
response = subprocess.run(cmd, stdout=subprocess.PIPE)
digest_local = response.stdout.decode('utf-8').splitlines()[-1].replace('\n', ' ').replace('[', '').replace(']', '')
return digest_local.split(' ')
def check_err(s, result, success_str, fail_str):
# the result must be 0
if result == 0:
print_sw(s["name"], success_str, c.SUCCESS)
else:
print_sw(s["name"], fail_str, c.FAIL)
sys.exit(1)
# Recursive function. Checks every image in the services.json file.
def browse(services, parent):
for s in services:
print_sw(s["name"], t.UPDATE_CHECK, c.OK)
# Getting the image's fingerprint
dig_current = lookup_current(s)
#print("current image: "+get_name(s)+" | "+dig_current)
if LOCAL_PREFIX + "/" in s["name"]:
# It's a local image
print_sw(s["name"], t.FP_CHECK, c.OK)
# Check the parent's layers
parent_last_layer = lookup_layers(get_name(parent))[-1]
layers_current = lookup_layers(get_name(s))
# If the current image's layers contains the parent's layer
# then it *probably* didn't get updated
if parent_last_layer in layers_current:
print_sw(s["name"], t.NO_UPDATE, c.OK)
continue
else:
# It's a dockerhub image
# Gets the fingerprint from remote
print_sw(s["name"], t.DH_CHECK, c.OK)
dig_new = lookup_new(s)
# if the hashes are the same -> no update is needed
if dig_new == dig_current:
print_sw(s["name"], t.NO_UPDATE, c.OK)
continue
# skip update
if not assume_yes and not ask_sw(s["name"], t.UPDATE_ASK, c.UPDATE):
print_sw(s["name"], t.UPDATE_SKIP, c.OK)
continue
# proceed to update
# for local images --> build
# for remote images --> pull
if LOCAL_PREFIX + "/" in s["name"]:
print_sw(s["name"], t.BUILDING, c.OK)
result = call_script(s, WORKING_DIR + BUILD_SCRIPTS_DIR)
check_err(s, result, t.BUILT, t.BUILD_FAILED)
else:
result = pull_image(s)
check_err(s, result, t.PULLED, t.PULL_FAILED)
# if the image is associated with a service, restart it
if s.get("service") != None and s["service"]:
print_sw(s["name"], t.SRV_RESTARTING, c.OK)
call_script(s, WORKING_DIR + RESTART_SCRIPTS_DIR)
print_sw(s["name"], t.SRV_RESTARTED, c.OK)
# now updating children
if s.get("children") != None and s["children"]:
print_sw(s["name"], t.UPDATE_CHILDREN, c.OK)
browse(s["children"], s)
# deleting the old image
# only if there was an old image
if dig_current.startswith("sha256:"):
print_sw(s["name"], t.DELETING, c.OK)
#print("Deleting: "+dig_current)
del_res = remove_image(s, dig_current)
check_err(s, del_res, t.DELETED, t.DELETE_FAILED)
# For debugging purposes
#s = dict()
#s["name"] = "python/3-slim"
#s["tag"] = "latest"
#services_file = [s]
# Handle arguments
assume_yes = False
for arg in sys.argv[1::]:
if arg == "-y" or arg == "--assume-yes":
assume_yes = True
elif arg == "-h" or arg == "--help":
print(t.HELP_MSG)
sys.exit(0)
else:
print(c.FAIL + c.BOLD + t.ARGS_ERROR + c.ENDC)
sys.exit(1)
browse(services_file, None)