Coverage for jaypore_ci/executors/docker.py: 83%
108 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 09:04 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 09:04 +0000
1"""
2A docker executor for Jaypore CI.
3"""
4from copy import deepcopy
6import pendulum
7import docker
8from rich import print as rprint
9from tqdm import tqdm
11from jaypore_ci import clean
12from jaypore_ci.interfaces import Executor, TriggerFailed, JobStatus
13from jaypore_ci.logging import logger
16class Docker(Executor):
17 """
18 Run jobs via docker. To communicate with docker we use the `Python docker
19 sdk <https://docker-py.readthedocs.io/en/stable/client.html>`_.
21 Using this executor will:
22 - Create a separate network for each run
23 - Run jobs as part of the network
24 - Clean up all jobs when the pipeline exits.
25 """
27 def __init__(self):
28 super().__init__()
29 self.pipe_id = None
30 self.pipeline = None
31 self.docker = docker.from_env()
32 self.client = docker.APIClient()
33 self.__execution_order__ = []
35 def logging(self):
36 """
37 Returns a logging instance that has executor specific
38 information bound to it.
39 """
40 return logger.bind(pipe_id=self.pipe_id, network_name=self.get_net())
42 def set_pipeline(self, pipeline):
43 """
44 Set executor's pipeline to the given one.
46 This will clean up old networks and create new ones.
47 """
48 if self.pipe_id is not None: 48 ↛ 49line 48 didn't jump to line 49, because the condition on line 48 was never true
49 self.delete_network()
50 self.delete_all_jobs()
51 self.pipe_id = pipeline.pipe_id
52 self.pipeline = pipeline
53 self.create_network()
55 def teardown(self):
56 self.delete_network()
57 self.delete_all_jobs()
59 def setup(self):
60 self.delete_old_containers()
62 def delete_old_containers(self):
63 a_week_back = pendulum.now().subtract(days=7)
64 pipe_ids_removed = set()
65 for container in tqdm(
66 self.docker.containers.list(filters={"status": "exited"}),
67 desc="Removing jobs older than a week",
68 ):
69 if "jayporeci_" not in container.name: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true
70 continue
71 if "__job__" in container.name: 71 ↛ 75line 71 didn't jump to line 75, because the condition on line 71 was never false
72 pipe_ids_removed.add(
73 container.name.split("__job__")[1].split("__", 1)[0]
74 )
75 finished_at = pendulum.parse(container.attrs["State"]["FinishedAt"])
76 if finished_at <= a_week_back: 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true
77 container.remove(v=True)
78 for network in tqdm(
79 self.docker.networks.list(
80 names=[self.get_net(pipe_id=pipe_id) for pipe_id in pipe_ids_removed]
81 ),
82 desc="Removing related networks",
83 ):
84 network.remove()
86 def get_net(self, *, pipe_id=None):
87 """
88 Return a network name based on what the curent pipeline is.
89 """
90 pipe_id = pipe_id if pipe_id is not None else self.pipe_id
91 return f"jayporeci__net__{pipe_id}" if pipe_id is not None else None
93 def create_network(self):
94 """
95 Will create a docker network.
97 If it fails to do so in 3 attempts it will raise an
98 exception and fail.
99 """
100 assert self.pipe_id is not None, "Cannot create network if pipe is not set"
101 for _ in range(3): 101 ↛ 111line 101 didn't jump to line 111, because the loop on line 101 didn't complete
102 if len(self.docker.networks.list(names=[self.get_net()])) != 0:
103 self.logging().info("Found network", network_name=self.get_net())
104 return
105 self.logging().info(
106 "Create network",
107 subprocess=self.docker.networks.create(
108 name=self.get_net(), driver="bridge"
109 ),
110 )
111 raise TriggerFailed("Cannot create network")
113 def delete_all_jobs(self):
114 """
115 Deletes all jobs associated with the pipeline for this
116 executor.
118 It will stop any jobs that are still running.
119 """
120 assert self.pipe_id is not None, "Cannot delete jobs if pipe is not set"
121 job = None
122 for job in self.pipeline.jobs.values():
123 if job.run_id is not None and not job.run_id.startswith("pyrun_"): 123 ↛ 122line 123 didn't jump to line 122, because the condition on line 123 was never false
124 container = self.docker.containers.get(job.run_id)
125 container.stop(timeout=1)
126 self.logging().info("Stop job:", run_id=job.run_id)
127 job.check_job(with_update_report=False)
128 if job is not None:
129 job.check_job()
130 self.logging().info("All jobs stopped")
132 def delete_network(self):
133 """
134 Delete the network for this executor.
135 """
136 assert self.pipe_id is not None, "Cannot delete network if pipe is not set"
137 try:
138 net = self.docker.networks.get(self.get_net())
139 net.remove()
140 except docker.errors.NotFound:
141 self.logging().error("Delete network: Not found", netid=self.get_net())
143 def get_job_name(self, job, tail=False):
144 """
145 Generates a clean job name slug.
146 """
147 name = clean.name(job.name)
148 if tail:
149 return name
150 return f"jayporeci__job__{self.pipe_id}__{name}"
152 def run(self, job: "Job") -> str:
153 """
154 Run the given job and return a docker container ID.
155 In case something goes wrong it will raise TriggerFailed
156 """
157 assert self.pipe_id is not None, "Cannot run job if pipe id is not set"
158 ex_kwargs = deepcopy(job.executor_kwargs)
159 env = job.get_env()
160 env.update(ex_kwargs.pop("environment", {}))
161 trigger = {
162 "detach": True,
163 "environment": env,
164 "volumes": list(
165 set(
166 [
167 "/var/run/docker.sock:/var/run/docker.sock",
168 "/usr/bin/docker:/usr/bin/docker:ro",
169 "/tmp/jayporeci__cidfiles:/jaypore_ci/cidfiles:ro",
170 f"/tmp/jayporeci__src__{self.pipeline.remote.sha}:/jaypore_ci/run",
171 ]
172 + (ex_kwargs.pop("volumes", []))
173 )
174 ),
175 "name": self.get_job_name(job),
176 "network": self.get_net(),
177 "image": job.image,
178 "command": job.command if not job.is_service else None,
179 }
180 for key, value in ex_kwargs.items(): 180 ↛ 181line 180 didn't jump to line 181, because the loop on line 180 never started
181 if key in trigger:
182 self.logging().warning(
183 f"Overwriting existing value of `{key}` for job trigger.",
184 old_value=trigger[key],
185 new_value=value,
186 )
187 trigger[key] = value
188 if not job.is_service: 188 ↛ 190line 188 didn't jump to line 190, because the condition on line 188 was never false
189 trigger["working_dir"] = "/jaypore_ci/run"
190 if not job.is_service: 190 ↛ 192line 190 didn't jump to line 192, because the condition on line 190 was never false
191 assert job.command
192 rprint(trigger)
193 try:
194 container = self.docker.containers.run(**trigger)
195 self.__execution_order__.append(
196 (self.get_job_name(job, tail=True), container.id, "Run")
197 )
198 return container.id
199 except docker.errors.APIError as e:
200 self.logging().exception(e)
201 raise TriggerFailed(e) from e
203 def get_status(self, run_id: str) -> JobStatus:
204 """
205 Given a run_id, it will get the status for that run.
206 """
207 inspect = self.client.inspect_container(run_id)
208 status = JobStatus(
209 is_running=inspect["State"]["Running"],
210 exit_code=int(inspect["State"]["ExitCode"]),
211 logs="",
212 started_at=pendulum.parse(inspect["State"]["StartedAt"]),
213 finished_at=pendulum.parse(inspect["State"]["FinishedAt"])
214 if inspect["State"]["FinishedAt"] != "0001-01-01T00:00:00Z"
215 else None,
216 )
217 # --- logs
218 self.logging().debug("Check status", status=status)
219 logs = self.docker.containers.get(run_id).logs().decode()
220 return status._replace(logs=logs)
222 def get_execution_order(self):
223 return {name: i for i, (name, *_) in enumerate(self.__execution_order__)}