Source code for jaypore_ci.jci

"""
The code submodule for Jaypore CI.
"""
import time
import os
from itertools import product
from collections import defaultdict
from typing import List, Union, Callable
from contextlib import contextmanager

import structlog
import pendulum

from jaypore_ci.exceptions import BadConfig
from jaypore_ci.config import const
from jaypore_ci.changelog import version_map
from jaypore_ci import remotes, executors, reporters, repos, clean
from jaypore_ci.interfaces import (
    Remote,
    Executor,
    Reporter,
    TriggerFailed,
    Status,
    Repo,
)
from jaypore_ci.logging import logger

TZ = "UTC"

__all__ = ["Pipeline", "Job"]


# All of these statuses are considered "finished" statuses
FIN_STATUSES = (Status.FAILED, Status.PASSED, Status.TIMEOUT, Status.SKIPPED)
PREFIX = "JAYPORE_"

# Check if we need to upgrade Jaypore CI
def ensure_version_is_correct() -> None:
    """
    Ensure that the version of Jaypore CI that is running, the code inside
    cicd.py, and pre-push.sh are at compatible versions.

    If versions do not match then this function will print out instructions on
    what to do in order to upgrade.

    Downgrades are not allowed, you need to re-install that specific version.
    """
    if (
        const.expected_version is not None
        and const.version is not None
        and const.expected_version != const.version
    ):
        print("Expected : ", const.expected_version)
        print("Got      : ", const.version)
        if const.version > const.expected_version:
            print(
                "Your current version is higher than the expected one. Please "
                "re-install Jaypore CI in this repo as downgrades are not "
                "supported."
            )
        if const.version < const.expected_version:
            print("--- Upgrade Instructions ---")
            for version in sorted(version_map.keys()):
                if version < const.version or version > const.expected_version:
                    continue
                for line in version_map[version]["instructions"]:
                    print(line)
            print("--- -------------------- ---")
        raise BadConfig(
            "Version mismatch between arjoonn/jci:<tag> docker container and pre-push.sh script"
        )


[docs]class Job: # pylint: disable=too-many-instance-attributes """ This is the fundamental building block for running jobs. Each job goes through a lifecycle defined by :class:`~jaypore_ci.interfaces.Status`. A job is run by an :class:`~jaypore_ci.interfaces.Executor` as part of a :class:`~jaypore_ci.jci.Pipeline`. It is never created manually. The correct way to create a job is to use :meth:`~jaypore_ci.jci.Pipeline.job`. :param name: The name for the job. Names must be unique across jobs and stages. :param command: The command that we need to run for the job. It can be set to `None` when `is_service` is True. :param is_service: Is this job a service or not? Service jobs are assumed to be :class:`~jaypore_ci.interfaces.Status.PASSED` as long as they start. They are shut down when the entire pipeline has finished executing. :param pipeline: The pipeline this job is associated with. :param status: The :class:`~jaypore_ci.interfaces.Status` of this job. :param image: What docker image to use for this job. :param timeout: Defines how long a job is allowed to run before being killed and marked as class:`~jaypore_ci.interfaces.Status.FAILED`. :param env: A dictionary of environment variables to pass to the docker run command. :param children: Defines which jobs depend on this job's output status. :param parents: Defines which jobs need to pass before this job can be run. :param stage: What stage the job belongs to. This stage name must exist so that we can assign jobs to it. :param executor_kwargs: A dictionary of keyword arguments that the executor can use when running a job. Different executors may use this in different ways, for example with the :class:`~jaypore_ci.executors.docker.Docker` executor this may be used to run jobs with `--add-host or --device <https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run>`_ . """ def __init__( self, name: str, command: Union[str, Callable], pipeline: "Pipeline", *, status: str = None, children: List["Job"] = None, parents: List["Job"] = None, is_service: bool = False, stage: str = None, # --- executor kwargs image: str = None, timeout: int = None, env: dict = None, executor_kwargs: dict = None, ): self.name = name self.command = command self.image = image self.status = status self.run_state = None self.timeout = timeout self.pipeline = pipeline self.env = env self.children = children if children is not None else [] self.parents = parents if parents is not None else [] self.is_service = is_service self.stage = stage self.executor_kwargs = executor_kwargs if executor_kwargs is not None else {} # --- run information self.logs = defaultdict(list) self.job_id = id(self) self.run_id = None self.run_start = None self.last_check = None
[docs] def logging(self): """ Returns a logging instance that has job specific information bound to it. """ return self.pipeline.logging().bind( job_id=self.job_id, job_name=self.name, run_id=self.run_id, )
[docs] def update_report(self) -> str: """ Update the status report. Usually called when a job changes some of it's internal state like when logs are updated or when status has changed. """ self.logging().debug("Update report") status = { Status.PENDING: "pending", Status.RUNNING: "pending", Status.FAILED: "failure", Status.PASSED: "success", Status.TIMEOUT: "warning", Status.SKIPPED: "warning", }[self.pipeline.get_status()] report = self.pipeline.reporter.render(self.pipeline) with open("/jaypore_ci/run/jaypore_ci.status.txt", "w", encoding="utf-8") as fl: fl.write(report) self.pipeline.remote.publish(report, status) return report
[docs] def trigger(self): """ Trigger the job via the pipeline's executor. This will immediately return and will not wait for the job to finish. It is also idempotent. Calling this multiple times will only trigger the job once. """ if self.status == Status.PENDING: self.run_start = pendulum.now(TZ) self.logging().info("Trigger called") self.status = Status.RUNNING if isinstance(self.command, str): try: self.run_id = self.pipeline.executor.run(self) self.logging().info("Trigger done") except TriggerFailed as e: self.logging().error( "Trigger failed", error=e, job_name=self.name, ) self.status = Status.FAILED else: self.logging().info("Trigger called but job already running") self.check_job()
[docs] def check_job(self, *, with_update_report=True): """ This will check the status of the job. If `with_update_report` is False, it will not push an update to the remote. """ if isinstance(self.command, str) and self.run_id is not None: self.logging().debug("Checking job run") self.run_state = self.pipeline.executor.get_status(self.run_id) self.last_check = pendulum.now(TZ) self.logging().debug( "Job run status found", is_running=self.run_state.is_running, exit_code=self.run_state.exit_code, ) if self.run_state.is_running: self.status = Status.RUNNING if not self.is_service else Status.PASSED else: self.status = ( Status.PASSED if self.run_state.exit_code == 0 else Status.FAILED ) self.logs["stdout"] = reporters.clean_logs(self.run_state.logs) if with_update_report: self.update_report()
[docs] def is_complete(self) -> bool: """ Is this job complete? It could have passed/ failed etc. We no longer need to check for updates in a complete job. """ return self.status in FIN_STATUSES
[docs] def get_env(self): """ Gets the environment variables for a given job. Order of precedence for setting values is: 1. Pipeline 2. Stage 3. Job """ env = { k[len(PREFIX) :]: v for k, v in os.environ.items() if k.startswith(PREFIX) } env.update(self.pipeline.pipe_kwargs.get("env", {})) env.update(self.env) # Includes env specified in stage kwargs AND job kwargs return env
[docs]class Pipeline: # pylint: disable=too-many-instance-attributes """ A pipeline acts as a controlling/organizing mechanism for multiple jobs. :param repo: Provides information about the codebase. :param reporter: Provides reports based on the state of the pipeline. :param remote: Allows us to publish reports to somewhere like gitea/email. :param executor: Runs the specified jobs. :param poll_interval: Defines how frequently (in seconds) to check the pipeline status and publish a report. """ # We need a way to avoid actually running the examples. Something like a # "dry-run" option so that only the building of the config is done and it's # never actually run. It might be a good idea to make this an actual config # variable but I'm not sure if we should do that or not. __run_on_exit__ = True def __init__( # pylint: disable=too-many-arguments self, *, repo: Repo = None, remote: Remote = None, executor: Executor = None, reporter: Reporter = None, poll_interval: int = 10, **kwargs, ) -> "Pipeline": self.jobs = {} self.services = [] self.should_pass_called = set() self.repo = repo if repo is not None else repos.Git.from_env() self.remote = ( remote if remote is not None else remotes.gitea.Gitea.from_env(repo=self.repo) ) self.executor = executor if executor is not None else executors.docker.Docker() self.reporter = reporter if reporter is not None else reporters.text.Text() self.poll_interval = poll_interval self.stages = ["Pipeline"] self.__pipe_id__ = None self.executor.set_pipeline(self) # --- kwargs["image"] = kwargs.get("image", "arjoonn/jci") kwargs["timeout"] = kwargs.get("timeout", 15 * 60) kwargs["env"] = kwargs.get("env", {}) kwargs["stage"] = "Pipeline" self.pipe_kwargs = kwargs self.stage_kwargs = None @property def pipe_id(self): if self.__pipe_id__ is None: self.__pipe_id__ = self.__get_pipe_id__() return self.__pipe_id__ def __get_pipe_id__(self): """ This is mainly here so that during testing we can override this and provide a different way to get the pipe id """ with open(f"/jaypore_ci/cidfiles/{self.repo.sha}", "r", encoding="utf-8") as fl: return fl.read().strip()
[docs] def logging(self): """ Return a logger with information about the current pipeline bound to it. """ return logger.bind( **{ **structlog.get_context(self.remote.logging()), **structlog.get_context(self.executor.logging()), "pipe_id": id(self), } )
def __enter__(self): ensure_version_is_correct() self.executor.setup() self.remote.setup() return self def __exit__(self, exc_type, exc_value, traceback): if Pipeline.__run_on_exit__: self.run() self.executor.teardown() self.remote.teardown() return False
[docs] def get_status(self) -> Status: """ Calculates a pipeline's status based on the status of it's jobs. """ for job in self.jobs.values(): if job.status == Status.RUNNING: return Status.RUNNING service = None for service in self.services: service.check_job(with_update_report=False) if service is not None: service.check_job(with_update_report=False) has_pending = False for job in self.jobs.values(): job.check_job(with_update_report=False) if not job.is_complete(): has_pending = True else: if job.status != Status.PASSED: return Status.FAILED return Status.PENDING if has_pending else Status.PASSED
[docs] def get_status_dot(self) -> str: """ Get's the status dot for the pipeline. """ if self.get_status() == Status.PASSED: return "🟢" if self.get_status() == Status.FAILED: return "🔴" if self.get_status() == Status.SKIPPED: return "🔵" return "🟡"
[docs] def job( self, name: str, command: str, *, depends_on: List[str] = None, **kwargs, ) -> Job: """ Creates a :class:`~jaypore_ci.jci.Job` instance based on the pipeline/stage that it is being defined in. See :class:`~jaypore_ci.jci.Job` for details on what parameters can be passed to the job. """ depends_on = [] if depends_on is None else depends_on depends_on = [depends_on] if isinstance(depends_on, str) else depends_on name = clean.name(name) assert name, "Name should have some value after it is cleaned" assert name not in self.jobs, f"{name} already defined" assert name not in self.stages, "Stage name cannot match a job's name" kwargs, job_kwargs = dict(self.pipe_kwargs), kwargs kwargs.update(self.stage_kwargs if self.stage_kwargs is not None else {}) kwargs.update(job_kwargs) if not kwargs.get("is_service"): assert command, f"Command: {command}" job = Job( name=name if name is not None else " ", command=command, status=Status.PENDING, pipeline=self, children=[], parents=depends_on, **kwargs, ) for parent_name in depends_on: assert ( parent_name in self.jobs ), f"Parent job has to be defined before a child. Cannot find {parent_name}" parent = self.jobs[parent_name] assert parent.stage == job.stage, "Cannot have dependencies across stages" self.jobs[name] = job if kwargs.get("is_service"): self.services.append(job) return job
[docs] @classmethod def env_matrix(cls, **kwargs): """ Return a cartesian product of all the provided kwargs. """ keys = list(sorted(kwargs.keys())) for values in product(*[kwargs[key] for key in keys]): yield dict(list(zip(keys, values)))
def __ensure_duplex__(self): for name, job in self.jobs.items(): for parent_name in job.parents: parent = self.jobs[parent_name] parent.children = list(sorted(set(parent.children).union(set([name]))))
[docs] def run(self): """ Run the pipeline. This is always called automatically when the context of the pipeline declaration finishes and so unless you are doing something fancy you don't need to call this manually. """ self.__ensure_duplex__() # Run stages one by one job = None for stage in self.stages: # --- Trigger starting jobs jobs = {name: job for name, job in self.jobs.items() if job.stage == stage} for name in {job.name for job in jobs.values() if not job.parents}: jobs[name].trigger() # --- monitor and ensure all jobs run while not all(job.is_complete() for job in jobs.values()): for job in jobs.values(): job.check_job(with_update_report=False) if not job.is_complete(): # If all dependencies are met: trigger if len(job.parents) == 0 or all( jobs[parent_name].is_complete() and jobs[parent_name].status == Status.PASSED for parent_name in job.parents ): job.trigger() elif any( jobs[parent_name].is_complete() and jobs[parent_name].status != Status.PASSED for parent_name in job.parents ): job.status = Status.SKIPPED job.check_job() time.sleep(self.poll_interval) # --- has this stage passed? if not all( job.is_complete() and job.status == Status.PASSED for job in jobs.values() ): self.logging().error("Stage failed") job.update_report() break self.logging().error("Pipeline passed") if job is not None: report = job.update_report() self.logging().info("Report:", report=report)
[docs] @contextmanager def stage(self, name, **kwargs): """ A stage in a pipeline. Any kwargs passed to this stage are supplied to jobs created within this stage. """ name = clean.name(name) assert name, "Name should have some value after it is cleaned" assert name not in self.jobs, "Stage name cannot match a job's name" assert name not in self.stages, "Stage names cannot be re-used" self.stages.append(name) kwargs["stage"] = name self.stage_kwargs = kwargs yield # ------------------------- self.stage_kwargs = None