xref: /DADK/dadk-user/src/executor/mod.rs (revision 5129c63bfab60a54627def29dc1b53c722677e4e)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7 };
8 
9 use log::{debug, error, info, warn};
10 
11 use crate::{
12     console::{clean::CleanLevel, Action},
13     context::DadkUserExecuteContext,
14     executor::cache::CacheDir,
15     parser::{
16         task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
17         task_log::{BuildStatus, InstallStatus, TaskLog},
18     },
19     scheduler::{SchedEntities, SchedEntity},
20     utils::file::FileUtils,
21 };
22 
23 use self::cache::{CacheDirType, TaskDataDir};
24 
25 pub mod cache;
26 pub mod source;
27 pub mod target;
28 #[cfg(test)]
29 mod tests;
30 
31 lazy_static! {
32     // 全局环境变量的列表
33     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
34 }
35 
36 #[derive(Debug, Clone)]
37 pub struct Executor {
38     entity: Arc<SchedEntity>,
39     action: Action,
40     local_envs: EnvMap,
41     /// 任务构建结果输出到的目录
42     build_dir: CacheDir,
43     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
44     source_dir: Option<CacheDir>,
45     /// 任务数据目录
46     task_data_dir: TaskDataDir,
47     /// DragonOS sysroot的路径
48     dragonos_sysroot: PathBuf,
49 }
50 
51 impl Executor {
52     /// # 创建执行器
53     ///
54     /// 用于执行一个任务
55     ///
56     /// ## 参数
57     ///
58     /// * `entity` - 任务调度实体
59     ///
60     /// ## 返回值
61     ///
62     /// * `Ok(Executor)` - 创建成功
63     /// * `Err(ExecutorError)` - 创建失败
64     pub fn new(
65         entity: Arc<SchedEntity>,
66         action: Action,
67         dragonos_sysroot: PathBuf,
68     ) -> Result<Self, ExecutorError> {
69         let local_envs = EnvMap::new();
70         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
71         let task_data_dir = TaskDataDir::new(entity.clone())?;
72 
73         let source_dir = if CacheDir::need_source_cache(&entity) {
74             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
75         } else {
76             None
77         };
78 
79         let result: Executor = Self {
80             action,
81             entity,
82             local_envs,
83             build_dir,
84             source_dir,
85             task_data_dir,
86             dragonos_sysroot,
87         };
88 
89         return Ok(result);
90     }
91 
92     /// # 执行任务
93     ///
94     /// 创建执行器后,调用此方法执行任务。
95     /// 该方法会执行以下步骤:
96     ///
97     /// 1. 创建工作线程
98     /// 2. 准备环境变量
99     /// 3. 拉取数据(可选)
100     /// 4. 执行构建
101     pub fn execute(&mut self) -> Result<(), ExecutorError> {
102         info!("Execute task: {}", self.entity.task().name_version());
103 
104         let r = self.do_execute();
105         self.save_task_data(r.clone());
106         info!("Task {} finished", self.entity.task().name_version());
107         return r;
108     }
109 
110     /// # 保存任务数据
111     fn save_task_data(&self, r: Result<(), ExecutorError>) {
112         let mut task_log = self.task_data_dir.task_log();
113         match self.action {
114             Action::Build => {
115                 if r.is_ok() {
116                     task_log.set_build_status(BuildStatus::Success);
117                 } else {
118                     task_log.set_build_status(BuildStatus::Failed);
119                 }
120 
121                 task_log.set_build_time_now();
122             }
123 
124             Action::Install => {
125                 if r.is_ok() {
126                     task_log.set_install_status(InstallStatus::Success);
127                 } else {
128                     task_log.set_install_status(InstallStatus::Failed);
129                 }
130             }
131 
132             Action::Clean(_) => {
133                 task_log.clean_build_status();
134                 task_log.clean_install_status();
135             }
136 
137             _ => {}
138         }
139 
140         self.task_data_dir
141             .save_task_log(&task_log)
142             .expect("Failed to save task log");
143     }
144 
145     fn do_execute(&mut self) -> Result<(), ExecutorError> {
146         // 准备本地环境变量
147         self.prepare_local_env()?;
148 
149         match self.action {
150             Action::Build => {
151                 // 构建任务
152                 self.build()?;
153             }
154             Action::Install => {
155                 // 把构建结果安装到DragonOS
156                 self.install()?;
157             }
158             Action::Clean(_) => {
159                 // 清理构建结果
160                 let r = self.clean();
161                 if let Err(e) = r {
162                     error!(
163                         "Failed to clean task {}: {:?}",
164                         self.entity.task().name_version(),
165                         e
166                     );
167                 }
168             }
169             _ => {
170                 error!("Unsupported action: {:?}", self.action);
171             }
172         }
173 
174         return Ok(());
175     }
176 
177     /// # 执行build操作
178     fn build(&mut self) -> Result<(), ExecutorError> {
179         if let Some(status) = self.task_log().build_status() {
180             if *status == BuildStatus::Success && self.entity.task().build_once {
181                 info!(
182                     "Task {} has been built successfully, skip build.",
183                     self.entity.task().name_version()
184                 );
185                 return Ok(());
186             }
187         }
188 
189         self.mv_target_to_tmp()?;
190 
191         // 确认源文件就绪
192         self.prepare_input()?;
193 
194         let command: Option<Command> = self.create_command()?;
195         if let Some(cmd) = command {
196             self.run_command(cmd)?;
197         }
198 
199         // 检查构建结果,如果为空,则抛出警告
200         if self.build_dir.is_empty()? {
201             warn!(
202                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
203                 self.entity.task().name_version(),
204             );
205         }
206         return Ok(());
207     }
208 
209     /// # 执行安装操作,把构建结果安装到DragonOS
210     fn install(&self) -> Result<(), ExecutorError> {
211         if let Some(status) = self.task_log().install_status() {
212             if *status == InstallStatus::Success && self.entity.task().install_once {
213                 info!(
214                     "Task {} has been installed successfully, skip install.",
215                     self.entity.task().name_version()
216                 );
217                 return Ok(());
218             }
219         }
220 
221         let binding = self.entity.task();
222         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
223         // 如果没有指定安装路径,则不执行安装
224         if in_dragonos_path.is_none() {
225             return Ok(());
226         }
227         info!("Installing task: {}", self.entity.task().name_version());
228         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
229 
230         debug!("in_dragonos_path: {}", in_dragonos_path);
231         // 去除开头的斜杠
232         {
233             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
234             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
235         }
236         // 拼接最终的安装路径
237         let install_path = self.dragonos_sysroot.join(in_dragonos_path);
238         debug!("install_path: {:?}", install_path);
239         // 创建安装路径
240         std::fs::create_dir_all(&install_path).map_err(|e| {
241             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
242         })?;
243 
244         // 拷贝构建结果到安装路径
245         let build_dir: PathBuf = self.build_dir.path.clone();
246         FileUtils::copy_dir_all(&build_dir, &install_path)
247             .map_err(|e| ExecutorError::InstallError(e))?;
248         info!("Task {} installed.", self.entity.task().name_version());
249 
250         // 安装完后,删除临时target文件
251         if let Some(target) = self.entity.target() {
252             target.clean_tmpdadk()?;
253         }
254 
255         return Ok(());
256     }
257 
258     fn clean(&self) -> Result<(), ExecutorError> {
259         let level = if let Action::Clean(l) = self.action {
260             l.level
261         } else {
262             panic!(
263                 "BUG: clean() called with non-clean action. executor details: {:?}",
264                 self
265             );
266         };
267         info!(
268             "Cleaning task: {}, level={level}",
269             self.entity.task().name_version()
270         );
271 
272         let r: Result<(), ExecutorError> = match level {
273             CleanLevel::All => self.clean_all(),
274             CleanLevel::Src => self.clean_src(),
275             CleanLevel::Target => self.clean_target(),
276             CleanLevel::Cache => self.clean_cache(),
277         };
278 
279         if let Err(e) = r {
280             error!(
281                 "Failed to clean task: {}, error message: {:?}",
282                 self.entity.task().name_version(),
283                 e
284             );
285             return Err(e);
286         }
287 
288         return Ok(());
289     }
290 
291     fn clean_all(&self) -> Result<(), ExecutorError> {
292         // 在源文件目录执行清理
293         self.clean_src()?;
294         // 清理构建结果
295         self.clean_target()?;
296         // 清理缓存
297         self.clean_cache()?;
298         return Ok(());
299     }
300 
301     /// 在源文件目录执行清理
302     fn clean_src(&self) -> Result<(), ExecutorError> {
303         let cmd: Option<Command> = self.create_command()?;
304         if cmd.is_none() {
305             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
306             return Ok(());
307         }
308         info!(
309             "{}: Cleaning in source directory: {:?}",
310             self.entity.task().name_version(),
311             self.src_work_dir()
312         );
313 
314         let cmd = cmd.unwrap();
315         self.run_command(cmd)?;
316         return Ok(());
317     }
318 
319     /// 清理构建输出目录
320     fn clean_target(&self) -> Result<(), ExecutorError> {
321         info!(
322             "{}: Cleaning build target directory: {:?}",
323             self.entity.task().name_version(),
324             self.build_dir.path
325         );
326 
327         return self.build_dir.remove_self_recursive();
328     }
329 
330     /// 清理下载缓存
331     fn clean_cache(&self) -> Result<(), ExecutorError> {
332         let cache_dir = self.source_dir.as_ref();
333         if cache_dir.is_none() {
334             // 如果没有缓存目录,则认为用户不需要清理缓存
335             return Ok(());
336         }
337         info!(
338             "{}: Cleaning cache directory: {}",
339             self.entity.task().name_version(),
340             self.src_work_dir().display()
341         );
342         return cache_dir.unwrap().remove_self_recursive();
343     }
344 
345     /// 获取源文件的工作目录
346     fn src_work_dir(&self) -> PathBuf {
347         if let Some(local_path) = self.entity.task().source_path() {
348             return local_path;
349         }
350         return self.source_dir.as_ref().unwrap().path.clone();
351     }
352 
353     fn task_log(&self) -> TaskLog {
354         return self.task_data_dir.task_log();
355     }
356 
357     /// 为任务创建命令
358     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
359         // 获取命令
360         let raw_cmd = match self.entity.task().task_type {
361             TaskType::BuildFromSource(_) => match self.action {
362                 Action::Build => self.entity.task().build.build_command.clone(),
363                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
364                 _ => unimplemented!(
365                     "create_command: Action {:?} not supported yet.",
366                     self.action
367                 ),
368             },
369 
370             TaskType::InstallFromPrebuilt(_) => match self.action {
371                 Action::Build => self.entity.task().build.build_command.clone(),
372                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
373                 _ => unimplemented!(
374                     "create_command: Action {:?} not supported yet.",
375                     self.action
376                 ),
377             },
378         };
379 
380         if raw_cmd.is_none() {
381             return Ok(None);
382         }
383 
384         let raw_cmd = raw_cmd.unwrap();
385 
386         let mut command = Command::new("bash");
387         command.current_dir(self.src_work_dir());
388 
389         // 设置参数
390         command.arg("-c");
391         command.arg(raw_cmd);
392 
393         // 设置环境变量
394         let env_list = ENV_LIST.read().unwrap();
395         for (key, value) in env_list.envs.iter() {
396             // if key.starts_with("DADK") {
397             //     debug!("DADK env found: {}={}", key, value.value);
398             // }
399             command.env(key, value.value.clone());
400         }
401         drop(env_list);
402         for (key, value) in self.local_envs.envs.iter() {
403             debug!("Local env found: {}={}", key, value.value);
404             command.env(key, value.value.clone());
405         }
406 
407         return Ok(Some(command));
408     }
409 
410     /// # 准备工作线程本地环境变量
411     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
412         // 设置本地环境变量
413         self.prepare_target_env()?;
414 
415         let binding = self.entity.task();
416         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
417 
418         if let Some(task_envs) = task_envs {
419             for tv in task_envs.iter() {
420                 self.local_envs
421                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
422             }
423         }
424 
425         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
426         self.local_envs.add(EnvVar::new(
427             "DADK_CURRENT_BUILD_DIR".to_string(),
428             self.build_dir.path.to_str().unwrap().to_string(),
429         ));
430 
431         return Ok(());
432     }
433 
434     fn prepare_input(&self) -> Result<(), ExecutorError> {
435         // 拉取源文件
436         let task = self.entity.task();
437         match &task.task_type {
438             TaskType::BuildFromSource(cs) => {
439                 if self.source_dir.is_none() {
440                     return Ok(());
441                 }
442                 let source_dir = self.source_dir.as_ref().unwrap();
443                 match cs {
444                     CodeSource::Git(git) => {
445                         git.prepare(source_dir)
446                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
447                     }
448                     // 本地源文件,不需要拉取
449                     CodeSource::Local(_) => return Ok(()),
450                     // 在线压缩包,需要下载
451                     CodeSource::Archive(archive) => {
452                         archive
453                             .download_unzip(source_dir)
454                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
455                     }
456                 }
457             }
458             TaskType::InstallFromPrebuilt(pb) => {
459                 match pb {
460                     // 本地源文件,不需要拉取
461                     PrebuiltSource::Local(local_source) => {
462                         let local_path = local_source.path();
463                         let target_path = &self.build_dir.path;
464                         FileUtils::copy_dir_all(&local_path, &target_path)
465                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
466                         return Ok(());
467                     }
468                     // 在线压缩包,需要下载
469                     PrebuiltSource::Archive(archive) => {
470                         archive
471                             .download_unzip(&self.build_dir)
472                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
473                     }
474                 }
475             }
476         }
477 
478         return Ok(());
479     }
480 
481     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
482         let mut child = command
483             .stdin(Stdio::inherit())
484             .spawn()
485             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
486 
487         // 等待子进程结束
488         let r = child
489             .wait()
490             .map_err(|e| ExecutorError::IoError(e.to_string()));
491         debug!("Command finished: {:?}", r);
492         if r.is_ok() {
493             let r = r.unwrap();
494             if r.success() {
495                 return Ok(());
496             } else {
497                 // 执行失败,获取最后100行stderr输出
498                 let errmsg = format!(
499                     "Task {} failed, exit code = {}",
500                     self.entity.task().name_version(),
501                     r.code().unwrap()
502                 );
503                 error!("{errmsg}");
504                 let command_opt = command.output();
505                 if command_opt.is_err() {
506                     return Err(ExecutorError::TaskFailed(
507                         "Failed to get command output".to_string(),
508                     ));
509                 }
510                 let command_opt = command_opt.unwrap();
511                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
512                 let mut last_100_outputs = command_output
513                     .lines()
514                     .rev()
515                     .take(100)
516                     .collect::<Vec<&str>>();
517                 last_100_outputs.reverse();
518                 error!("Last 100 lines msg of stderr:");
519                 for line in last_100_outputs {
520                     error!("{}", line);
521                 }
522                 return Err(ExecutorError::TaskFailed(errmsg));
523             }
524         } else {
525             let errmsg = format!(
526                 "Task {} failed, msg = {:?}",
527                 self.entity.task().name_version(),
528                 r.err().unwrap()
529             );
530             error!("{errmsg}");
531             return Err(ExecutorError::TaskFailed(errmsg));
532         }
533     }
534 
535     pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
536         if let Some(rust_target) = self.entity.task().rust_target.clone() {
537             // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
538             self.entity
539                 .target()
540                 .as_ref()
541                 .unwrap()
542                 .cp_to_tmp(&rust_target)?;
543         }
544         return Ok(());
545     }
546 
547     pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
548         if self.entity.task().rust_target.is_some() {
549             // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
550             self.entity
551                 .target()
552                 .as_ref()
553                 .unwrap()
554                 .prepare_env(&mut self.local_envs);
555         }
556         return Ok(());
557     }
558 }
559 
560 #[derive(Debug, Clone)]
561 pub struct EnvMap {
562     pub envs: BTreeMap<String, EnvVar>,
563 }
564 
565 impl EnvMap {
566     pub fn new() -> Self {
567         Self {
568             envs: BTreeMap::new(),
569         }
570     }
571 
572     pub fn add(&mut self, env: EnvVar) {
573         self.envs.insert(env.key.clone(), env);
574     }
575 
576     #[allow(dead_code)]
577     pub fn get(&self, key: &str) -> Option<&EnvVar> {
578         self.envs.get(key)
579     }
580 
581     pub fn add_vars(&mut self, vars: Vars) {
582         for (key, value) in vars {
583             self.add(EnvVar::new(key, value));
584         }
585     }
586 }
587 
588 /// # 环境变量
589 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
590 pub struct EnvVar {
591     pub key: String,
592     pub value: String,
593 }
594 
595 impl EnvVar {
596     pub fn new(key: String, value: String) -> Self {
597         Self { key, value }
598     }
599 }
600 
601 /// # 任务执行器错误枚举
602 #[allow(dead_code)]
603 #[derive(Debug, Clone)]
604 pub enum ExecutorError {
605     /// 准备执行环境错误
606     PrepareEnvError(String),
607     IoError(String),
608     /// 构建执行错误
609     TaskFailed(String),
610     /// 安装错误
611     InstallError(String),
612     /// 清理错误
613     CleanError(String),
614 }
615 
616 /// # 准备全局环境变量
617 pub fn prepare_env(
618     sched_entities: &SchedEntities,
619     execute_ctx: &Arc<DadkUserExecuteContext>,
620 ) -> Result<(), ExecutorError> {
621     info!("Preparing environment variables...");
622     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
623     // 写入全局环境变量列表
624     let mut global_env_list = ENV_LIST.write().unwrap();
625     *global_env_list = env_list;
626     return Ok(());
627 }
628 
629 /// # 创建全局环境变量列表
630 fn create_global_env_list(
631     sched_entities: &SchedEntities,
632     execute_ctx: &Arc<DadkUserExecuteContext>,
633 ) -> Result<EnvMap, ExecutorError> {
634     let mut env_list = EnvMap::new();
635     let envs: Vars = std::env::vars();
636     env_list.add_vars(envs);
637 
638     // 为每个任务创建特定的环境变量
639     for entity in sched_entities.entities().iter() {
640         // 导出任务的构建目录环境变量
641         let build_dir = CacheDir::build_dir(entity.clone())?;
642 
643         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
644         env_list.add(EnvVar::new(
645             build_dir_key,
646             build_dir.to_str().unwrap().to_string(),
647         ));
648 
649         // 如果需要源码缓存目录,则导出
650         if CacheDir::need_source_cache(entity) {
651             let source_dir = CacheDir::source_dir(entity.clone())?;
652             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
653             env_list.add(EnvVar::new(
654                 source_dir_key,
655                 source_dir.to_str().unwrap().to_string(),
656             ));
657         }
658     }
659 
660     // 创建ARCH环境变量
661     let target_arch = execute_ctx.target_arch();
662     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
663 
664     return Ok(env_list);
665 }
666