#!/usr/bin/python # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import os import pathlib import subprocess import warnings from dirsync import sync from git import Repo from git import SymbolicReference import otc_metadata.services data = otc_metadata.services.Services() def process_repositories(args, service): """Checkout repositories""" workdir = pathlib.Path(args.work_dir) workdir.mkdir(exist_ok=True) copy_from = None copy_to = None repo_to = None url_from = None url_to = None for repo in service["repositories"]: if repo["cloud_environments"][0] != args.cloud_environment: continue logging.debug(f"Processing repository {repo}") repo_dir = pathlib.Path(workdir, repo["type"], repo["repo"]) if repo["environment"] == args.source_environment: copy_from = repo_dir elif repo["environment"] == args.target_environment: copy_to = repo_dir else: continue checkout_exists = repo_dir.exists() repo_dir.mkdir(parents=True, exist_ok=True) if repo["type"] == "gitea": repo_url = ( f"ssh://git@gitea.eco.tsi-dev.otc-service.com:2222/" f"{repo['repo']}" ) elif repo["type"] == "github": repo_url = f"git@github.com:/{repo['repo']}" else: logging.error(f"Repository type {repo['type']} is not supported") exit(1) if not checkout_exists: git_repo = Repo.clone_from(repo_url, repo_dir, branch="main") else: logging.debug("Checkout already exists") git_repo = Repo(repo_dir) git_repo.remotes.origin.fetch() git_repo.heads.main.checkout() git_repo.remotes.origin.pull() if repo["environment"] == args.source_environment: url_from = repo_url elif repo["environment"] == args.target_environment: url_to = repo_url repo_to = git_repo logging.debug(f"Synchronizing from={url_from}, to={url_to}") if not copy_from or not copy_to: logging.warn( "Not synchronizing documents of the service. " "Target or source not known." ) for doc in data.docs_by_service_type(service["service_type"]): cloud_environment_doc_check = False for cloud_environment_doc in doc["cloud_environments"]: if cloud_environment_doc["name"] == args.cloud_environment: cloud_environment_doc_check = True break if cloud_environment_doc_check is False: continue logging.debug(f"Analyzing document {doc}") if args.document_type and doc.get("type") != args.document_type: logging.info( f"Skipping synchronizing {doc['title']} " f"due to the doc-type filter." ) continue # if doc.get("environment"): # logging.info( # f"Skipping synchronizing {doc['title']} " # f"since it is environment bound." # ) # continue branch_name = f"{args.branch_name}#{doc['type']}" remote_ref = SymbolicReference.create( repo_to, "refs/remotes/origin/%s" % branch_name ) new_branch = repo_to.create_head(branch_name, "main") remote_ref = repo_to.remotes[0].refs[branch_name] # get correct type new_branch.set_tracking_branch(remote_ref) new_branch.checkout() source_path = pathlib.Path(copy_from, doc["rst_location"]) target_path = pathlib.Path(copy_to, doc["rst_location"]) sync( source_path, target_path, 'sync', purge=True, create=True, content=True, ignore=['conf.py'] ) repo_to.index.add([doc["rst_location"]]) for obj in repo_to.index.diff(None).iter_change_type('D'): repo_to.index.remove([obj.b_path]) if len(repo_to.index.diff("HEAD")) == 0: # Nothing to commit logging.debug("No changes.") continue repo_to.index.commit( ( f"Synchronize {doc['title']}\n\n" f"Overwriting document\n" f"from: {url_from}\n" f"to: {url_to}\n\n" "Performed-by: gitea/infra/otc-metadata/tools/sync_doc_repo.py" ) ) repo_to.remotes.origin.push(new_branch) if "github" in url_to and args.open_pr_gh: subprocess.run( args=["gh", "pr", "create", "-f"], cwd=copy_to, check=True ) def main(): parser = argparse.ArgumentParser( description="Synchronize document between environments." ) parser.add_argument( "--source-environment", required=True, help="Environment to be used as a source", ) parser.add_argument( "--target-environment", required=True, help="Environment to be used as a source", ) parser.add_argument( "--service-type", required=True, help="Service to which document(s) belongs to", ) parser.add_argument( "--document-type", help=( "Type of the document to synchronize. " "All will be synchronized if not set." ), ) parser.add_argument( "--branch-name", required=True, help="Branch name to be used for synchronizing.", ) parser.add_argument( "--work-dir", required=True, help="Working directory to use for repository checkout.", ) parser.add_argument( "--open-pr-gh", action="store_true", help="Open Pull Request using `gh` utility (need to be present).", ) parser.add_argument( "--cloud-environment", required=True, default="eu_de", help="Cloud Environment. Default: eu_de", ) args = parser.parse_args() logging.basicConfig(level=logging.DEBUG) service = data.get_service_with_repo_by_service_type(service_type=args.service_type) if not service: warnings.warn(f"Service {args.service_type} was not found in metadata") os.exit(1) cloud_environment_service_check = False for cloud_environment_service in service["cloud_environments"]: if cloud_environment_service["name"] == args.cloud_environment: cloud_environment_service_check = True break if cloud_environment_service_check is False: warnings.warn(f"Service {args.service_type} has no cloud environment {args.cloud_environment}") os.exit(1) process_repositories(args, service) if __name__ == "__main__": main()