xref: /DADK/dadk-user/src/executor/mod.rs (revision eaa67f3cf8881c221a744937c6318444b068a801)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7 };
8 
9 use dadk_config::user::UserCleanLevel;
10 use log::{debug, error, info, warn};
11 
12 use crate::{
13     context::{Action, DadkUserExecuteContext},
14     executor::cache::CacheDir,
15     parser::{
16         task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
17         task_log::{BuildStatus, InstallStatus, TaskLog},
18     },
19     scheduler::{SchedEntities, SchedEntity},
20     utils::{file::FileUtils, path::abs_path},
21 };
22 
23 use self::cache::{CacheDirType, TaskDataDir};
24 
25 pub mod cache;
26 pub mod source;
27 pub mod target;
28 #[cfg(test)]
29 mod tests;
30 
31 lazy_static! {
32     // 全局环境变量的列表
33     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
34 }
35 
36 #[derive(Debug, Clone)]
37 pub struct Executor {
38     entity: Arc<SchedEntity>,
39     action: Action,
40     local_envs: EnvMap,
41     /// 任务构建结果输出到的目录
42     build_dir: CacheDir,
43     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
44     source_dir: Option<CacheDir>,
45     /// 任务数据目录
46     task_data_dir: TaskDataDir,
47     /// DragonOS sysroot的路径
48     dragonos_sysroot: PathBuf,
49 }
50 
51 impl Executor {
52     /// # 创建执行器
53     ///
54     /// 用于执行一个任务
55     ///
56     /// ## 参数
57     ///
58     /// * `entity` - 任务调度实体
59     ///
60     /// ## 返回值
61     ///
62     /// * `Ok(Executor)` - 创建成功
63     /// * `Err(ExecutorError)` - 创建失败
64     pub fn new(
65         entity: Arc<SchedEntity>,
66         action: Action,
67         dragonos_sysroot: PathBuf,
68     ) -> Result<Self, ExecutorError> {
69         let local_envs = EnvMap::new();
70         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
71         let task_data_dir = TaskDataDir::new(entity.clone())?;
72 
73         let source_dir = if CacheDir::need_source_cache(&entity) {
74             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
75         } else {
76             None
77         };
78 
79         let result: Executor = Self {
80             action,
81             entity,
82             local_envs,
83             build_dir,
84             source_dir,
85             task_data_dir,
86             dragonos_sysroot,
87         };
88 
89         return Ok(result);
90     }
91 
92     /// # 执行任务
93     ///
94     /// 创建执行器后,调用此方法执行任务。
95     /// 该方法会执行以下步骤:
96     ///
97     /// 1. 创建工作线程
98     /// 2. 准备环境变量
99     /// 3. 拉取数据(可选)
100     /// 4. 执行构建
101     pub fn execute(&mut self) -> Result<(), ExecutorError> {
102         info!("Execute task: {}", self.entity.task().name_version());
103 
104         let r = self.do_execute();
105         self.save_task_data(r.clone());
106         info!("Task {} finished", self.entity.task().name_version());
107         return r;
108     }
109 
110     /// # 保存任务数据
111     fn save_task_data(&self, r: Result<(), ExecutorError>) {
112         let mut task_log = self.task_data_dir.task_log();
113         match self.action {
114             Action::Build => {
115                 if r.is_ok() {
116                     task_log.set_build_status(BuildStatus::Success);
117                 } else {
118                     task_log.set_build_status(BuildStatus::Failed);
119                 }
120 
121                 task_log.set_build_time_now();
122             }
123 
124             Action::Install => {
125                 if r.is_ok() {
126                     task_log.set_install_status(InstallStatus::Success);
127                 } else {
128                     task_log.set_install_status(InstallStatus::Failed);
129                 }
130             }
131 
132             Action::Clean(_) => {
133                 task_log.clean_build_status();
134                 task_log.clean_install_status();
135             }
136         }
137 
138         self.task_data_dir
139             .save_task_log(&task_log)
140             .expect("Failed to save task log");
141     }
142 
143     fn do_execute(&mut self) -> Result<(), ExecutorError> {
144         // 准备本地环境变量
145         self.prepare_local_env()?;
146 
147         match self.action {
148             Action::Build => {
149                 // 构建任务
150                 self.build()?;
151             }
152             Action::Install => {
153                 // 把构建结果安装到DragonOS
154                 self.install()?;
155             }
156             Action::Clean(_) => {
157                 // 清理构建结果
158                 let r = self.clean();
159                 if let Err(e) = r {
160                     error!(
161                         "Failed to clean task {}: {:?}",
162                         self.entity.task().name_version(),
163                         e
164                     );
165                 }
166             }
167         }
168 
169         return Ok(());
170     }
171 
172     /// # 执行build操作
173     fn build(&mut self) -> Result<(), ExecutorError> {
174         if let Some(status) = self.task_log().build_status() {
175             if *status == BuildStatus::Success && self.entity.task().build_once {
176                 info!(
177                     "Task {} has been built successfully, skip build.",
178                     self.entity.task().name_version()
179                 );
180                 return Ok(());
181             }
182         }
183 
184         self.mv_target_to_tmp()?;
185 
186         // 确认源文件就绪
187         self.prepare_input()?;
188 
189         let command: Option<Command> = self.create_command()?;
190         if let Some(cmd) = command {
191             self.run_command(cmd)?;
192         }
193 
194         // 检查构建结果,如果为空,则抛出警告
195         if self.build_dir.is_empty()? {
196             warn!(
197                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
198                 self.entity.task().name_version(),
199             );
200         }
201         return Ok(());
202     }
203 
204     /// # 执行安装操作,把构建结果安装到DragonOS
205     fn install(&self) -> Result<(), ExecutorError> {
206         if let Some(status) = self.task_log().install_status() {
207             if *status == InstallStatus::Success && self.entity.task().install_once {
208                 info!(
209                     "Task {} has been installed successfully, skip install.",
210                     self.entity.task().name_version()
211                 );
212                 return Ok(());
213             }
214         }
215 
216         let binding = self.entity.task();
217         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
218         // 如果没有指定安装路径,则不执行安装
219         if in_dragonos_path.is_none() {
220             return Ok(());
221         }
222         info!("Installing task: {}", self.entity.task().name_version());
223         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
224 
225         debug!("in_dragonos_path: {}", in_dragonos_path);
226         // 去除开头的斜杠
227         {
228             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
229             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
230         }
231         // 拼接最终的安装路径
232         let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
233         debug!("install_path: {:?}", install_path);
234         // 创建安装路径
235         std::fs::create_dir_all(&install_path).map_err(|e| {
236             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
237         })?;
238 
239         // 拷贝构建结果到安装路径
240         let build_dir: PathBuf = self.build_dir.path.clone();
241         FileUtils::copy_dir_all(&build_dir, &install_path)
242             .map_err(|e| ExecutorError::InstallError(e))?;
243         info!("Task {} installed.", self.entity.task().name_version());
244 
245         // 安装完后,删除临时target文件
246         if let Some(target) = self.entity.target() {
247             target.clean_tmpdadk()?;
248         }
249 
250         return Ok(());
251     }
252 
253     fn clean(&self) -> Result<(), ExecutorError> {
254         let level = if let Action::Clean(l) = self.action {
255             l
256         } else {
257             panic!(
258                 "BUG: clean() called with non-clean action. executor details: {:?}",
259                 self
260             );
261         };
262         info!(
263             "Cleaning task: {}, level={level:?}",
264             self.entity.task().name_version()
265         );
266 
267         let r: Result<(), ExecutorError> = match level {
268             UserCleanLevel::All => self.clean_all(),
269             UserCleanLevel::InSrc => self.clean_src(),
270             UserCleanLevel::Output => {
271                 self.clean_target()?;
272                 self.clean_cache()
273             }
274         };
275 
276         if let Err(e) = r {
277             error!(
278                 "Failed to clean task: {}, error message: {:?}",
279                 self.entity.task().name_version(),
280                 e
281             );
282             return Err(e);
283         }
284 
285         return Ok(());
286     }
287 
288     fn clean_all(&self) -> Result<(), ExecutorError> {
289         // 在源文件目录执行清理
290         self.clean_src()?;
291         // 清理构建结果
292         self.clean_target()?;
293         // 清理缓存
294         self.clean_cache()?;
295         return Ok(());
296     }
297 
298     /// 在源文件目录执行清理
299     fn clean_src(&self) -> Result<(), ExecutorError> {
300         let cmd: Option<Command> = self.create_command()?;
301         if cmd.is_none() {
302             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
303             return Ok(());
304         }
305         info!(
306             "{}: Cleaning in source directory: {:?}",
307             self.entity.task().name_version(),
308             self.src_work_dir()
309         );
310 
311         let cmd = cmd.unwrap();
312         self.run_command(cmd)?;
313         return Ok(());
314     }
315 
316     /// 清理构建输出目录
317     fn clean_target(&self) -> Result<(), ExecutorError> {
318         info!(
319             "{}: Cleaning build target directory: {:?}",
320             self.entity.task().name_version(),
321             self.build_dir.path
322         );
323 
324         return self.build_dir.remove_self_recursive();
325     }
326 
327     /// 清理下载缓存
328     fn clean_cache(&self) -> Result<(), ExecutorError> {
329         let cache_dir = self.source_dir.as_ref();
330         if cache_dir.is_none() {
331             // 如果没有缓存目录,则认为用户不需要清理缓存
332             return Ok(());
333         }
334         info!(
335             "{}: Cleaning cache directory: {}",
336             self.entity.task().name_version(),
337             self.src_work_dir().display()
338         );
339         return cache_dir.unwrap().remove_self_recursive();
340     }
341 
342     /// 获取源文件的工作目录
343     fn src_work_dir(&self) -> PathBuf {
344         if let Some(local_path) = self.entity.task().source_path() {
345             return local_path;
346         }
347         return self.source_dir.as_ref().unwrap().path.clone();
348     }
349 
350     fn task_log(&self) -> TaskLog {
351         return self.task_data_dir.task_log();
352     }
353 
354     /// 为任务创建命令
355     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
356         // 获取命令
357         let raw_cmd = match self.entity.task().task_type {
358             TaskType::BuildFromSource(_) => match self.action {
359                 Action::Build => self.entity.task().build.build_command.clone(),
360                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
361                 _ => unimplemented!(
362                     "create_command: Action {:?} not supported yet.",
363                     self.action
364                 ),
365             },
366 
367             TaskType::InstallFromPrebuilt(_) => match self.action {
368                 Action::Build => self.entity.task().build.build_command.clone(),
369                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
370                 _ => unimplemented!(
371                     "create_command: Action {:?} not supported yet.",
372                     self.action
373                 ),
374             },
375         };
376 
377         if raw_cmd.is_none() {
378             return Ok(None);
379         }
380 
381         let raw_cmd = raw_cmd.unwrap();
382 
383         let mut command = Command::new("bash");
384         command.current_dir(self.src_work_dir());
385 
386         // 设置参数
387         command.arg("-c");
388         command.arg(raw_cmd);
389 
390         // 设置环境变量
391         let env_list = ENV_LIST.read().unwrap();
392         for (key, value) in env_list.envs.iter() {
393             // if key.starts_with("DADK") {
394             //     debug!("DADK env found: {}={}", key, value.value);
395             // }
396             command.env(key, value.value.clone());
397         }
398         drop(env_list);
399         for (key, value) in self.local_envs.envs.iter() {
400             debug!("Local env found: {}={}", key, value.value);
401             command.env(key, value.value.clone());
402         }
403 
404         return Ok(Some(command));
405     }
406 
407     /// # 准备工作线程本地环境变量
408     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
409         // 设置本地环境变量
410         self.prepare_target_env()?;
411 
412         let binding = self.entity.task();
413         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
414 
415         if let Some(task_envs) = task_envs {
416             for tv in task_envs.iter() {
417                 self.local_envs
418                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
419             }
420         }
421 
422         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
423         self.local_envs.add(EnvVar::new(
424             "DADK_CURRENT_BUILD_DIR".to_string(),
425             self.build_dir.path.to_str().unwrap().to_string(),
426         ));
427 
428         return Ok(());
429     }
430 
431     fn prepare_input(&self) -> Result<(), ExecutorError> {
432         // 拉取源文件
433         let task = self.entity.task();
434         match &task.task_type {
435             TaskType::BuildFromSource(cs) => {
436                 if self.source_dir.is_none() {
437                     return Ok(());
438                 }
439                 let source_dir = self.source_dir.as_ref().unwrap();
440                 match cs {
441                     CodeSource::Git(git) => {
442                         git.prepare(source_dir)
443                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
444                     }
445                     // 本地源文件,不需要拉取
446                     CodeSource::Local(_) => return Ok(()),
447                     // 在线压缩包,需要下载
448                     CodeSource::Archive(archive) => {
449                         archive
450                             .download_unzip(source_dir)
451                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
452                     }
453                 }
454             }
455             TaskType::InstallFromPrebuilt(pb) => {
456                 match pb {
457                     // 本地源文件,不需要拉取
458                     PrebuiltSource::Local(local_source) => {
459                         let local_path = local_source.path();
460                         let target_path = &self.build_dir.path;
461                         FileUtils::copy_dir_all(&local_path, &target_path)
462                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
463                         return Ok(());
464                     }
465                     // 在线压缩包,需要下载
466                     PrebuiltSource::Archive(archive) => {
467                         archive
468                             .download_unzip(&self.build_dir)
469                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
470                     }
471                 }
472             }
473         }
474 
475         return Ok(());
476     }
477 
478     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
479         let mut child = command
480             .stdin(Stdio::inherit())
481             .spawn()
482             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
483 
484         // 等待子进程结束
485         let r = child
486             .wait()
487             .map_err(|e| ExecutorError::IoError(e.to_string()));
488         debug!("Command finished: {:?}", r);
489         if r.is_ok() {
490             let r = r.unwrap();
491             if r.success() {
492                 return Ok(());
493             } else {
494                 // 执行失败,获取最后100行stderr输出
495                 let errmsg = format!(
496                     "Task {} failed, exit code = {}",
497                     self.entity.task().name_version(),
498                     r.code().unwrap()
499                 );
500                 error!("{errmsg}");
501                 let command_opt = command.output();
502                 if command_opt.is_err() {
503                     return Err(ExecutorError::TaskFailed(
504                         "Failed to get command output".to_string(),
505                     ));
506                 }
507                 let command_opt = command_opt.unwrap();
508                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
509                 let mut last_100_outputs = command_output
510                     .lines()
511                     .rev()
512                     .take(100)
513                     .collect::<Vec<&str>>();
514                 last_100_outputs.reverse();
515                 error!("Last 100 lines msg of stderr:");
516                 for line in last_100_outputs {
517                     error!("{}", line);
518                 }
519                 return Err(ExecutorError::TaskFailed(errmsg));
520             }
521         } else {
522             let errmsg = format!(
523                 "Task {} failed, msg = {:?}",
524                 self.entity.task().name_version(),
525                 r.err().unwrap()
526             );
527             error!("{errmsg}");
528             return Err(ExecutorError::TaskFailed(errmsg));
529         }
530     }
531 
532     pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
533         if let Some(rust_target) = self.entity.task().rust_target.clone() {
534             // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
535             self.entity
536                 .target()
537                 .as_ref()
538                 .unwrap()
539                 .cp_to_tmp(&rust_target)?;
540         }
541         return Ok(());
542     }
543 
544     pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
545         if self.entity.task().rust_target.is_some() {
546             // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
547             self.entity
548                 .target()
549                 .as_ref()
550                 .unwrap()
551                 .prepare_env(&mut self.local_envs);
552         }
553         return Ok(());
554     }
555 }
556 
557 #[derive(Debug, Clone)]
558 pub struct EnvMap {
559     pub envs: BTreeMap<String, EnvVar>,
560 }
561 
562 impl EnvMap {
563     pub fn new() -> Self {
564         Self {
565             envs: BTreeMap::new(),
566         }
567     }
568 
569     pub fn add(&mut self, env: EnvVar) {
570         self.envs.insert(env.key.clone(), env);
571     }
572 
573     #[allow(dead_code)]
574     pub fn get(&self, key: &str) -> Option<&EnvVar> {
575         self.envs.get(key)
576     }
577 
578     pub fn add_vars(&mut self, vars: Vars) {
579         for (key, value) in vars {
580             self.add(EnvVar::new(key, value));
581         }
582     }
583 }
584 
585 /// # 环境变量
586 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
587 pub struct EnvVar {
588     pub key: String,
589     pub value: String,
590 }
591 
592 impl EnvVar {
593     pub fn new(key: String, value: String) -> Self {
594         Self { key, value }
595     }
596 }
597 
598 /// # 任务执行器错误枚举
599 #[allow(dead_code)]
600 #[derive(Debug, Clone)]
601 pub enum ExecutorError {
602     /// 准备执行环境错误
603     PrepareEnvError(String),
604     IoError(String),
605     /// 构建执行错误
606     TaskFailed(String),
607     /// 安装错误
608     InstallError(String),
609     /// 清理错误
610     CleanError(String),
611 }
612 
613 /// # 准备全局环境变量
614 pub fn prepare_env(
615     sched_entities: &SchedEntities,
616     execute_ctx: &Arc<DadkUserExecuteContext>,
617 ) -> Result<(), ExecutorError> {
618     info!("Preparing environment variables...");
619     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
620     // 写入全局环境变量列表
621     let mut global_env_list = ENV_LIST.write().unwrap();
622     *global_env_list = env_list;
623     return Ok(());
624 }
625 
626 /// # 创建全局环境变量列表
627 fn create_global_env_list(
628     sched_entities: &SchedEntities,
629     execute_ctx: &Arc<DadkUserExecuteContext>,
630 ) -> Result<EnvMap, ExecutorError> {
631     let mut env_list = EnvMap::new();
632     let envs: Vars = std::env::vars();
633     env_list.add_vars(envs);
634 
635     // 为每个任务创建特定的环境变量
636     for entity in sched_entities.entities().iter() {
637         // 导出任务的构建目录环境变量
638         let build_dir = CacheDir::build_dir(entity.clone())?;
639 
640         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
641         env_list.add(EnvVar::new(
642             build_dir_key,
643             build_dir.to_str().unwrap().to_string(),
644         ));
645 
646         // 如果需要源码缓存目录,则导出
647         if CacheDir::need_source_cache(entity) {
648             let source_dir = CacheDir::source_dir(entity.clone())?;
649             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
650             env_list.add(EnvVar::new(
651                 source_dir_key,
652                 source_dir.to_str().unwrap().to_string(),
653             ));
654         }
655     }
656 
657     // 创建ARCH环境变量
658     let target_arch = execute_ctx.target_arch();
659     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
660 
661     return Ok(env_list);
662 }
663