Coverage for jaypore_ci/jci.py: 85%
229 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 09:04 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 09:04 +0000
1"""
2The code submodule for Jaypore CI.
3"""
4import time
5import os
6from itertools import product
7from collections import defaultdict
8from typing import List, Union, Callable
9from contextlib import contextmanager
11import structlog
12import pendulum
14from jaypore_ci.exceptions import BadConfig
15from jaypore_ci.config import const
16from jaypore_ci.changelog import version_map
17from jaypore_ci import remotes, executors, reporters, repos, clean
18from jaypore_ci.interfaces import (
19 Remote,
20 Executor,
21 Reporter,
22 TriggerFailed,
23 Status,
24 Repo,
25)
26from jaypore_ci.logging import logger
28TZ = "UTC"
30__all__ = ["Pipeline", "Job"]
33# All of these statuses are considered "finished" statuses
34FIN_STATUSES = (Status.FAILED, Status.PASSED, Status.TIMEOUT, Status.SKIPPED)
35PREFIX = "JAYPORE_"
37# Check if we need to upgrade Jaypore CI
38def ensure_version_is_correct() -> None:
39 """
40 Ensure that the version of Jaypore CI that is running, the code inside
41 cicd.py, and pre-push.sh are at compatible versions.
43 If versions do not match then this function will print out instructions on
44 what to do in order to upgrade.
46 Downgrades are not allowed, you need to re-install that specific version.
47 """
48 if ( 48 ↛ 53line 48 didn't jump to line 53
49 const.expected_version is not None
50 and const.version is not None
51 and const.expected_version != const.version
52 ):
53 print("Expected : ", const.expected_version)
54 print("Got : ", const.version)
55 if const.version > const.expected_version:
56 print(
57 "Your current version is higher than the expected one. Please "
58 "re-install Jaypore CI in this repo as downgrades are not "
59 "supported."
60 )
61 if const.version < const.expected_version:
62 print("--- Upgrade Instructions ---")
63 for version in sorted(version_map.keys()):
64 if version < const.version or version > const.expected_version:
65 continue
66 for line in version_map[version]["instructions"]:
67 print(line)
68 print("--- -------------------- ---")
69 raise BadConfig(
70 "Version mismatch between arjoonn/jci:<tag> docker container and pre-push.sh script"
71 )
74class Job: # pylint: disable=too-many-instance-attributes
75 """
76 This is the fundamental building block for running jobs.
77 Each job goes through a lifecycle defined by
78 :class:`~jaypore_ci.interfaces.Status`.
80 A job is run by an :class:`~jaypore_ci.interfaces.Executor` as part of a
81 :class:`~jaypore_ci.jci.Pipeline`.
83 It is never created manually. The correct way to create a job is to use
84 :meth:`~jaypore_ci.jci.Pipeline.job`.
86 :param name: The name for the job. Names must be unique across
87 jobs and stages.
88 :param command: The command that we need to run for the job. It can
89 be set to `None` when `is_service` is True.
90 :param is_service: Is this job a service or not? Service jobs are
91 assumed to be
92 :class:`~jaypore_ci.interfaces.Status.PASSED` as
93 long as they start. They are shut down when the
94 entire pipeline has finished executing.
95 :param pipeline: The pipeline this job is associated with.
96 :param status: The :class:`~jaypore_ci.interfaces.Status` of this job.
97 :param image: What docker image to use for this job.
98 :param timeout: Defines how long a job is allowed to run before being
99 killed and marked as
100 class:`~jaypore_ci.interfaces.Status.FAILED`.
101 :param env: A dictionary of environment variables to pass to
102 the docker run command.
103 :param children: Defines which jobs depend on this job's output
104 status.
105 :param parents: Defines which jobs need to pass before this job can
106 be run.
107 :param stage: What stage the job belongs to. This stage name must
108 exist so that we can assign jobs to it.
109 :param executor_kwargs: A dictionary of keyword arguments that the executor
110 can use when running a job. Different executors may
111 use this in different ways, for example with the
112 :class:`~jaypore_ci.executors.docker.Docker`
113 executor this may be used to run jobs with
114 `--add-host or --device
115 <https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run>`_
116 .
117 """
119 def __init__(
120 self,
121 name: str,
122 command: Union[str, Callable],
123 pipeline: "Pipeline",
124 *,
125 status: str = None,
126 children: List["Job"] = None,
127 parents: List["Job"] = None,
128 is_service: bool = False,
129 stage: str = None,
130 # --- executor kwargs
131 image: str = None,
132 timeout: int = None,
133 env: dict = None,
134 executor_kwargs: dict = None,
135 ):
136 self.name = name
137 self.command = command
138 self.image = image
139 self.status = status
140 self.run_state = None
141 self.timeout = timeout
142 self.pipeline = pipeline
143 self.env = env
144 self.children = children if children is not None else []
145 self.parents = parents if parents is not None else []
146 self.is_service = is_service
147 self.stage = stage
148 self.executor_kwargs = executor_kwargs if executor_kwargs is not None else {}
149 # --- run information
150 self.logs = defaultdict(list)
151 self.job_id = id(self)
152 self.run_id = None
153 self.run_start = None
154 self.last_check = None
156 def logging(self):
157 """
158 Returns a logging instance that has job specific information bound to
159 it.
160 """
161 return self.pipeline.logging().bind(
162 job_id=self.job_id,
163 job_name=self.name,
164 run_id=self.run_id,
165 )
167 def update_report(self) -> str:
168 """
169 Update the status report. Usually called when a job changes some of
170 it's internal state like when logs are updated or when status has
171 changed.
172 """
173 self.logging().debug("Update report")
174 status = {
175 Status.PENDING: "pending",
176 Status.RUNNING: "pending",
177 Status.FAILED: "failure",
178 Status.PASSED: "success",
179 Status.TIMEOUT: "warning",
180 Status.SKIPPED: "warning",
181 }[self.pipeline.get_status()]
182 report = self.pipeline.reporter.render(self.pipeline)
183 with open("/jaypore_ci/run/jaypore_ci.status.txt", "w", encoding="utf-8") as fl:
184 fl.write(report)
185 self.pipeline.remote.publish(report, status)
186 return report
188 def trigger(self):
189 """
190 Trigger the job via the pipeline's executor.
191 This will immediately return and will not wait for the job to finish.
193 It is also idempotent. Calling this multiple times will only trigger
194 the job once.
195 """
196 if self.status == Status.PENDING:
197 self.run_start = pendulum.now(TZ)
198 self.logging().info("Trigger called")
199 self.status = Status.RUNNING
200 if isinstance(self.command, str): 200 ↛ 213line 200 didn't jump to line 213, because the condition on line 200 was never false
201 try:
202 self.run_id = self.pipeline.executor.run(self)
203 self.logging().info("Trigger done")
204 except TriggerFailed as e:
205 self.logging().error(
206 "Trigger failed",
207 error=e,
208 job_name=self.name,
209 )
210 self.status = Status.FAILED
211 else:
212 self.logging().info("Trigger called but job already running")
213 self.check_job()
215 def check_job(self, *, with_update_report=True):
216 """
217 This will check the status of the job.
218 If `with_update_report` is False, it will not push an update to the remote.
219 """
220 if isinstance(self.command, str) and self.run_id is not None:
221 self.logging().debug("Checking job run")
222 self.run_state = self.pipeline.executor.get_status(self.run_id)
223 self.last_check = pendulum.now(TZ)
224 self.logging().debug(
225 "Job run status found",
226 is_running=self.run_state.is_running,
227 exit_code=self.run_state.exit_code,
228 )
229 if self.run_state.is_running:
230 self.status = Status.RUNNING if not self.is_service else Status.PASSED
231 else:
232 self.status = (
233 Status.PASSED if self.run_state.exit_code == 0 else Status.FAILED
234 )
235 self.logs["stdout"] = reporters.clean_logs(self.run_state.logs)
236 if with_update_report:
237 self.update_report()
239 def is_complete(self) -> bool:
240 """
241 Is this job complete? It could have passed/ failed etc.
242 We no longer need to check for updates in a complete job.
243 """
244 return self.status in FIN_STATUSES
246 def get_env(self):
247 """
248 Gets the environment variables for a given job.
249 Order of precedence for setting values is:
251 1. Pipeline
252 2. Stage
253 3. Job
254 """
255 env = {
256 k[len(PREFIX) :]: v for k, v in os.environ.items() if k.startswith(PREFIX)
257 }
258 env.update(self.pipeline.pipe_kwargs.get("env", {}))
259 env.update(self.env) # Includes env specified in stage kwargs AND job kwargs
260 return env
263class Pipeline: # pylint: disable=too-many-instance-attributes
264 """
265 A pipeline acts as a controlling/organizing mechanism for multiple jobs.
267 :param repo: Provides information about the codebase.
268 :param reporter: Provides reports based on the state of the pipeline.
269 :param remote: Allows us to publish reports to somewhere like gitea/email.
270 :param executor: Runs the specified jobs.
271 :param poll_interval: Defines how frequently (in seconds) to check the
272 pipeline status and publish a report.
273 """
275 # We need a way to avoid actually running the examples. Something like a
276 # "dry-run" option so that only the building of the config is done and it's
277 # never actually run. It might be a good idea to make this an actual config
278 # variable but I'm not sure if we should do that or not.
279 __run_on_exit__ = True
281 def __init__( # pylint: disable=too-many-arguments
282 self,
283 *,
284 repo: Repo = None,
285 remote: Remote = None,
286 executor: Executor = None,
287 reporter: Reporter = None,
288 poll_interval: int = 10,
289 **kwargs,
290 ) -> "Pipeline":
291 self.jobs = {}
292 self.services = []
293 self.should_pass_called = set()
294 self.repo = repo if repo is not None else repos.Git.from_env()
295 self.remote = (
296 remote
297 if remote is not None
298 else remotes.gitea.Gitea.from_env(repo=self.repo)
299 )
300 self.executor = executor if executor is not None else executors.docker.Docker()
301 self.reporter = reporter if reporter is not None else reporters.text.Text()
302 self.poll_interval = poll_interval
303 self.stages = ["Pipeline"]
304 self.__pipe_id__ = None
305 self.executor.set_pipeline(self)
306 # ---
307 kwargs["image"] = kwargs.get("image", "arjoonn/jci")
308 kwargs["timeout"] = kwargs.get("timeout", 15 * 60)
309 kwargs["env"] = kwargs.get("env", {})
310 kwargs["stage"] = "Pipeline"
311 self.pipe_kwargs = kwargs
312 self.stage_kwargs = None
314 @property
315 def pipe_id(self):
316 if self.__pipe_id__ is None: 316 ↛ 318line 316 didn't jump to line 318, because the condition on line 316 was never false
317 self.__pipe_id__ = self.__get_pipe_id__()
318 return self.__pipe_id__
320 def __get_pipe_id__(self):
321 """
322 This is mainly here so that during testing we can override this and
323 provide a different way to get the pipe id
324 """
325 with open(f"/jaypore_ci/cidfiles/{self.repo.sha}", "r", encoding="utf-8") as fl:
326 return fl.read().strip()
328 def logging(self):
329 """
330 Return a logger with information about the current pipeline bound to
331 it.
332 """
333 return logger.bind(
334 **{
335 **structlog.get_context(self.remote.logging()),
336 **structlog.get_context(self.executor.logging()),
337 "pipe_id": id(self),
338 }
339 )
341 def __enter__(self):
342 ensure_version_is_correct()
343 self.executor.setup()
344 self.remote.setup()
345 return self
347 def __exit__(self, exc_type, exc_value, traceback):
348 if Pipeline.__run_on_exit__:
349 self.run()
350 self.executor.teardown()
351 self.remote.teardown()
352 return False
354 def get_status(self) -> Status:
355 """
356 Calculates a pipeline's status based on the status of it's jobs.
357 """
358 for job in self.jobs.values():
359 if job.status == Status.RUNNING:
360 return Status.RUNNING
361 service = None
362 for service in self.services: 362 ↛ 363line 362 didn't jump to line 363, because the loop on line 362 never started
363 service.check_job(with_update_report=False)
364 if service is not None: 364 ↛ 365line 364 didn't jump to line 365, because the condition on line 364 was never true
365 service.check_job(with_update_report=False)
366 has_pending = False
367 for job in self.jobs.values():
368 job.check_job(with_update_report=False)
369 if not job.is_complete():
370 has_pending = True
371 else:
372 if job.status != Status.PASSED: 372 ↛ 373line 372 didn't jump to line 373, because the condition on line 372 was never true
373 return Status.FAILED
374 return Status.PENDING if has_pending else Status.PASSED
376 def get_status_dot(self) -> str:
377 """
378 Get's the status dot for the pipeline.
379 """
380 if self.get_status() == Status.PASSED:
381 return "🟢"
382 if self.get_status() == Status.FAILED: 382 ↛ 383line 382 didn't jump to line 383, because the condition on line 382 was never true
383 return "🔴"
384 if self.get_status() == Status.SKIPPED: 384 ↛ 385line 384 didn't jump to line 385, because the condition on line 384 was never true
385 return "🔵"
386 return "🟡"
388 def job(
389 self,
390 name: str,
391 command: str,
392 *,
393 depends_on: List[str] = None,
394 **kwargs,
395 ) -> Job:
396 """
397 Creates a :class:`~jaypore_ci.jci.Job` instance based on the
398 pipeline/stage that it is being defined in. See
399 :class:`~jaypore_ci.jci.Job` for details on what parameters can be
400 passed to the job.
401 """
402 depends_on = [] if depends_on is None else depends_on
403 depends_on = [depends_on] if isinstance(depends_on, str) else depends_on
404 name = clean.name(name)
405 assert name, "Name should have some value after it is cleaned"
406 assert name not in self.jobs, f"{name} already defined"
407 assert name not in self.stages, "Stage name cannot match a job's name"
408 kwargs, job_kwargs = dict(self.pipe_kwargs), kwargs
409 kwargs.update(self.stage_kwargs if self.stage_kwargs is not None else {})
410 kwargs.update(job_kwargs)
411 if not kwargs.get("is_service"):
412 assert command, f"Command: {command}"
413 job = Job(
414 name=name if name is not None else " ",
415 command=command,
416 status=Status.PENDING,
417 pipeline=self,
418 children=[],
419 parents=depends_on,
420 **kwargs,
421 )
422 for parent_name in depends_on:
423 assert (
424 parent_name in self.jobs
425 ), f"Parent job has to be defined before a child. Cannot find {parent_name}"
426 parent = self.jobs[parent_name]
427 assert parent.stage == job.stage, "Cannot have dependencies across stages"
428 self.jobs[name] = job
429 if kwargs.get("is_service"):
430 self.services.append(job)
431 return job
433 @classmethod
434 def env_matrix(cls, **kwargs):
435 """
436 Return a cartesian product of all the provided kwargs.
437 """
438 keys = list(sorted(kwargs.keys()))
439 for values in product(*[kwargs[key] for key in keys]):
440 yield dict(list(zip(keys, values)))
442 def __ensure_duplex__(self):
443 for name, job in self.jobs.items():
444 for parent_name in job.parents:
445 parent = self.jobs[parent_name]
446 parent.children = list(sorted(set(parent.children).union(set([name]))))
448 def run(self):
449 """
450 Run the pipeline. This is always called automatically when the context
451 of the pipeline declaration finishes and so unless you are doing
452 something fancy you don't need to call this manually.
453 """
454 self.__ensure_duplex__()
455 # Run stages one by one
456 job = None
457 for stage in self.stages:
458 # --- Trigger starting jobs
459 jobs = {name: job for name, job in self.jobs.items() if job.stage == stage}
460 for name in {job.name for job in jobs.values() if not job.parents}:
461 jobs[name].trigger()
462 # --- monitor and ensure all jobs run
463 while not all(job.is_complete() for job in jobs.values()):
464 for job in jobs.values():
465 job.check_job(with_update_report=False)
466 if not job.is_complete():
467 # If all dependencies are met: trigger
468 if len(job.parents) == 0 or all(
469 jobs[parent_name].is_complete()
470 and jobs[parent_name].status == Status.PASSED
471 for parent_name in job.parents
472 ):
473 job.trigger()
474 elif any( 474 ↛ 479line 474 didn't jump to line 479, because the condition on line 474 was never true
475 jobs[parent_name].is_complete()
476 and jobs[parent_name].status != Status.PASSED
477 for parent_name in job.parents
478 ):
479 job.status = Status.SKIPPED
480 job.check_job()
481 time.sleep(self.poll_interval)
482 # --- has this stage passed?
483 if not all( 483 ↛ 487line 483 didn't jump to line 487, because the condition on line 483 was never true
484 job.is_complete() and job.status == Status.PASSED
485 for job in jobs.values()
486 ):
487 self.logging().error("Stage failed")
488 job.update_report()
489 break
490 self.logging().error("Pipeline passed")
491 if job is not None:
492 report = job.update_report()
493 self.logging().info("Report:", report=report)
495 @contextmanager
496 def stage(self, name, **kwargs):
497 """
498 A stage in a pipeline.
500 Any kwargs passed to this stage are supplied to jobs created within
501 this stage.
502 """
503 name = clean.name(name)
504 assert name, "Name should have some value after it is cleaned"
505 assert name not in self.jobs, "Stage name cannot match a job's name"
506 assert name not in self.stages, "Stage names cannot be re-used"
507 self.stages.append(name)
508 kwargs["stage"] = name
509 self.stage_kwargs = kwargs
510 yield # -------------------------
511 self.stage_kwargs = None