forked from infra/otc-metadata
193 lines
5.9 KiB
Python
193 lines
5.9 KiB
Python
#!/usr/bin/python
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import warnings
|
|
|
|
from git import Repo
|
|
from git import SymbolicReference
|
|
|
|
|
|
import otc_metadata.services
|
|
|
|
data = otc_metadata.services.Services()
|
|
|
|
|
|
def process_repositories(args, service):
|
|
"""Checkout repositories
|
|
|
|
"""
|
|
workdir = pathlib.Path(args.work_dir)
|
|
workdir.mkdir(exist_ok=True)
|
|
|
|
copy_from = None
|
|
copy_to = None
|
|
repo_to = None
|
|
url_from = None
|
|
url_to = None
|
|
|
|
for repo in service["repositories"]:
|
|
logging.debug(f"Processing repository {repo}")
|
|
repo_dir = pathlib.Path(
|
|
workdir, repo["type"], repo["repo"]
|
|
)
|
|
|
|
if repo["environment"] == args.source_environment:
|
|
copy_from = repo_dir
|
|
elif repo["environment"] == args.target_environment:
|
|
copy_to = repo_dir
|
|
else:
|
|
continue
|
|
|
|
checkout_exists = repo_dir.exists()
|
|
repo_dir.mkdir(parents=True, exist_ok=True)
|
|
if repo["type"] == 'gitea':
|
|
repo_url = (
|
|
f"ssh://git@gitea.eco.tsi-dev.otc-service.com:2222/"
|
|
"{repo['repo']}"
|
|
)
|
|
elif repo["type"] == 'github':
|
|
repo_url = f"git@github.com:/{repo['repo']}"
|
|
else:
|
|
logging.error(f"Repository typw {repo['type']} is not supported")
|
|
exit(1)
|
|
if not checkout_exists:
|
|
git_repo = Repo.clone_from(
|
|
repo_url,
|
|
repo_dir, branch='main')
|
|
else:
|
|
logging.debug(f"Checkout already exists")
|
|
git_repo = Repo(repo_dir)
|
|
git_repo.remotes.origin.fetch()
|
|
git_repo.heads.main.checkout()
|
|
git_repo.remotes.origin.pull()
|
|
|
|
if repo["environment"] == args.source_environment:
|
|
url_from = repo_url
|
|
elif repo["environment"] == args.target_environment:
|
|
url_to = repo_url
|
|
repo_to = git_repo
|
|
logging.debug(f"Synchronizing from={url_from}, to={url_to}")
|
|
|
|
if not copy_from or not copy_to:
|
|
logging.warn(
|
|
"Not synchronizing documents of the service. "
|
|
"Target or source not known.")
|
|
|
|
for doc in data.docs_by_service_type(service["service_type"]):
|
|
logging.debug(f"Analyzing document {doc}")
|
|
if args.document_type and doc["type"] != args.document_type:
|
|
logging.info(
|
|
f"Skipping synchronizing {doc['title']} "
|
|
"due to the doc-type filter.")
|
|
continue
|
|
if doc.get("environment"):
|
|
logging.info(
|
|
f"Skipping synchronizing {doc['title']} "
|
|
f"since it is environment bound.")
|
|
continue
|
|
|
|
branch_name = f"{args.branch_name}#{doc['type']}"
|
|
remote_ref = SymbolicReference.create(
|
|
repo_to, "refs/remotes/origin/%s" % branch_name)
|
|
new_branch = repo_to.create_head(branch_name, 'main')
|
|
remote_ref = repo_to.remotes[0].refs[branch_name] # get correct type
|
|
new_branch.set_tracking_branch(remote_ref)
|
|
new_branch.checkout()
|
|
|
|
shutil.copytree(
|
|
pathlib.Path(copy_from, doc["rst_location"]),
|
|
pathlib.Path(copy_to, doc["rst_location"]),
|
|
ignore=lambda a, b: ["conf.py"],
|
|
dirs_exist_ok=True
|
|
)
|
|
repo_to.index.add([doc["rst_location"]])
|
|
repo_to.index.commit(
|
|
(
|
|
f"Synchronize {doc['title']}\n\n"
|
|
f"Overwriting document\n"
|
|
f"from: {url_from}\n"
|
|
f"to: {url_to}\n\n"
|
|
"Performed-by: gitea/infra/otc-metadata/tools/sync_doc_repo.py"
|
|
)
|
|
)
|
|
repo_to.remotes.origin.push(new_branch)
|
|
if 'github' in url_to and args.open_pr_gh:
|
|
subprocess.run(
|
|
args=['gh', 'pr', 'create', '-f'],
|
|
cwd=copy_to,
|
|
check=True
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Synchronize document between environments.')
|
|
parser.add_argument(
|
|
'--source-environment',
|
|
required=True,
|
|
help='Environment to be used as a source'
|
|
)
|
|
parser.add_argument(
|
|
'--target-environment',
|
|
required=True,
|
|
help='Environment to be used as a source'
|
|
)
|
|
parser.add_argument(
|
|
'--service-type',
|
|
required=True,
|
|
help='Service to which document(s) belongs to'
|
|
)
|
|
parser.add_argument(
|
|
'--document-type',
|
|
help=(
|
|
'Type of the document to synchronize. '
|
|
'All will be synchronized if not set.'
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
'--branch-name',
|
|
required=True,
|
|
help='Branch name to be used for synchronizing.'
|
|
)
|
|
parser.add_argument(
|
|
'--work-dir',
|
|
required=True,
|
|
help='Working directory to use for repository checkout.'
|
|
)
|
|
parser.add_argument(
|
|
'--open-pr-gh',
|
|
action='store_true',
|
|
help='Open Pull Request using `gh` utility (need to be present).'
|
|
)
|
|
args = parser.parse_args()
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
service = data.service_dict.get(args.service_type)
|
|
if not service:
|
|
warnings.warn(f"Service {args.service_type} was not found in metadata")
|
|
os.exit(1)
|
|
|
|
process_repositories(args, service)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|