xref: /DADK/dadk-user/src/executor/mod.rs (revision cfb7b78ff5dff2a09cba93336ba222be89cbd3e1)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7     time::SystemTime,
8 };
9 
10 use chrono::{DateTime, Utc};
11 use dadk_config::user::UserCleanLevel;
12 use log::{debug, error, info, warn};
13 
14 use crate::{
15     context::{Action, DadkUserExecuteContext},
16     executor::cache::CacheDir,
17     parser::{
18         task::{CodeSource, PrebuiltSource, TaskType},
19         task_log::{BuildStatus, InstallStatus, TaskLog},
20     },
21     scheduler::{SchedEntities, SchedEntity},
22     utils::{file::FileUtils, path::abs_path},
23 };
24 
25 use dadk_config::common::task::TaskEnv;
26 
27 use self::cache::{CacheDirType, TaskDataDir};
28 
29 pub mod cache;
30 pub mod source;
31 #[cfg(test)]
32 mod tests;
33 
34 lazy_static! {
35     // 全局环境变量的列表
36     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37 }
38 
39 #[derive(Debug, Clone)]
40 pub struct Executor {
41     entity: Arc<SchedEntity>,
42     action: Action,
43     local_envs: EnvMap,
44     /// 任务构建结果输出到的目录
45     build_dir: CacheDir,
46     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
47     source_dir: Option<CacheDir>,
48     /// 任务数据目录
49     task_data_dir: TaskDataDir,
50     /// DragonOS sysroot的路径
51     dragonos_sysroot: PathBuf,
52 }
53 
54 impl Executor {
55     /// # 创建执行器
56     ///
57     /// 用于执行一个任务
58     ///
59     /// ## 参数
60     ///
61     /// * `entity` - 任务调度实体
62     ///
63     /// ## 返回值
64     ///
65     /// * `Ok(Executor)` - 创建成功
66     /// * `Err(ExecutorError)` - 创建失败
67     pub fn new(
68         entity: Arc<SchedEntity>,
69         action: Action,
70         dragonos_sysroot: PathBuf,
71     ) -> Result<Self, ExecutorError> {
72         let local_envs = EnvMap::new();
73         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74         let task_data_dir = TaskDataDir::new(entity.clone())?;
75 
76         let source_dir = if CacheDir::need_source_cache(&entity) {
77             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78         } else {
79             None
80         };
81 
82         let result: Executor = Self {
83             action,
84             entity,
85             local_envs,
86             build_dir,
87             source_dir,
88             task_data_dir,
89             dragonos_sysroot,
90         };
91 
92         return Ok(result);
93     }
94 
95     /// # 执行任务
96     ///
97     /// 创建执行器后,调用此方法执行任务。
98     /// 该方法会执行以下步骤:
99     ///
100     /// 1. 创建工作线程
101     /// 2. 准备环境变量
102     /// 3. 拉取数据(可选)
103     /// 4. 执行构建
104     pub fn execute(&mut self) -> Result<(), ExecutorError> {
105         info!("Execute task: {}", self.entity.task().name_version());
106 
107         let r = self.do_execute();
108         self.save_task_data(r.clone());
109         info!("Task {} finished", self.entity.task().name_version());
110         return r;
111     }
112 
113     /// # 保存任务数据
114     fn save_task_data(&self, r: Result<(), ExecutorError>) {
115         let mut task_log = self.task_data_dir.task_log();
116         match self.action {
117             Action::Build => {
118                 if r.is_ok() {
119                     task_log.set_build_status(BuildStatus::Success);
120                 } else {
121                     task_log.set_build_status(BuildStatus::Failed);
122                 }
123 
124                 task_log.set_build_time_now();
125             }
126 
127             Action::Install => {
128                 if r.is_ok() {
129                     task_log.set_install_status(InstallStatus::Success);
130                 } else {
131                     task_log.set_install_status(InstallStatus::Failed);
132                 }
133             }
134 
135             Action::Clean(_) => {
136                 task_log.clean_build_status();
137                 task_log.clean_install_status();
138             }
139         }
140 
141         self.task_data_dir
142             .save_task_log(&task_log)
143             .expect("Failed to save task log");
144     }
145 
146     fn do_execute(&mut self) -> Result<(), ExecutorError> {
147         // 准备本地环境变量
148         self.prepare_local_env()?;
149 
150         match self.action {
151             Action::Build => {
152                 // 构建任务
153                 self.build()?;
154             }
155             Action::Install => {
156                 // 把构建结果安装到DragonOS
157                 self.install()?;
158             }
159             Action::Clean(_) => {
160                 // 清理构建结果
161                 let r = self.clean();
162                 if let Err(e) = r {
163                     error!(
164                         "Failed to clean task {}: {:?}",
165                         self.entity.task().name_version(),
166                         e
167                     );
168                 }
169             }
170         }
171 
172         return Ok(());
173     }
174 
175     fn build(&mut self) -> Result<(), ExecutorError> {
176         if let Some(status) = self.task_log().build_status() {
177             if let Some(build_time) = self.task_log().build_time() {
178                 let last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
179                 if *status == BuildStatus::Success
180                     && (self.entity.task().build_once || last_modified < *build_time)
181                 {
182                     info!(
183                         "Task {} has been built successfully, skip build.",
184                         self.entity.task().name_version()
185                     );
186                     return Ok(());
187                 }
188             }
189         }
190 
191         return self.do_build();
192     }
193 
194     /// # 执行build操作
195     fn do_build(&mut self) -> Result<(), ExecutorError> {
196         // 确认源文件就绪
197         self.prepare_input()?;
198 
199         let command: Option<Command> = self.create_command()?;
200         if let Some(cmd) = command {
201             self.run_command(cmd)?;
202         }
203 
204         // 检查构建结果,如果为空,则抛出警告
205         if self.build_dir.is_empty()? {
206             warn!(
207                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
208                 self.entity.task().name_version(),
209             );
210         }
211         return Ok(());
212     }
213 
214     fn install(&self) -> Result<(), ExecutorError> {
215         if let Some(status) = self.task_log().install_status() {
216             if let Some(build_time) = self.task_log().build_time() {
217                 let last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
218                 if *status == InstallStatus::Success
219                     && (self.entity.task().install_once || last_modified < *build_time)
220                 {
221                     info!(
222                         "Task {} has been installed successfully, skip install.",
223                         self.entity.task().name_version()
224                     );
225                     return Ok(());
226                 }
227             }
228         }
229 
230         return self.do_install();
231     }
232 
233     /// # 执行安装操作,把构建结果安装到DragonOS
234     fn do_install(&self) -> Result<(), ExecutorError> {
235         let binding = self.entity.task();
236         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
237         // 如果没有指定安装路径,则不执行安装
238         if in_dragonos_path.is_none() {
239             return Ok(());
240         }
241         info!("Installing task: {}", self.entity.task().name_version());
242         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
243 
244         debug!("in_dragonos_path: {}", in_dragonos_path);
245         // 去除开头的斜杠
246         {
247             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
248             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
249         }
250         // 拼接最终的安装路径
251         let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
252         debug!("install_path: {:?}", install_path);
253         // 创建安装路径
254         std::fs::create_dir_all(&install_path).map_err(|e| {
255             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
256         })?;
257 
258         // 拷贝构建结果到安装路径
259         let build_dir: PathBuf = self.build_dir.path.clone();
260         FileUtils::copy_dir_all(&build_dir, &install_path)
261             .map_err(|e| ExecutorError::InstallError(e))?;
262         info!("Task {} installed.", self.entity.task().name_version());
263 
264         return Ok(());
265     }
266 
267     fn clean(&self) -> Result<(), ExecutorError> {
268         let level = if let Action::Clean(l) = self.action {
269             l
270         } else {
271             panic!(
272                 "BUG: clean() called with non-clean action. executor details: {:?}",
273                 self
274             );
275         };
276         info!(
277             "Cleaning task: {}, level={level:?}",
278             self.entity.task().name_version()
279         );
280 
281         let r: Result<(), ExecutorError> = match level {
282             UserCleanLevel::All => self.clean_all(),
283             UserCleanLevel::InSrc => self.clean_src(),
284             UserCleanLevel::Output => {
285                 self.clean_target()?;
286                 self.clean_cache()
287             }
288         };
289 
290         if let Err(e) = r {
291             error!(
292                 "Failed to clean task: {}, error message: {:?}",
293                 self.entity.task().name_version(),
294                 e
295             );
296             return Err(e);
297         }
298 
299         return Ok(());
300     }
301 
302     fn clean_all(&self) -> Result<(), ExecutorError> {
303         // 在源文件目录执行清理
304         self.clean_src()?;
305         // 清理构建结果
306         self.clean_target()?;
307         // 清理缓存
308         self.clean_cache()?;
309         return Ok(());
310     }
311 
312     /// 在源文件目录执行清理
313     fn clean_src(&self) -> Result<(), ExecutorError> {
314         let cmd: Option<Command> = self.create_command()?;
315         if cmd.is_none() {
316             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
317             return Ok(());
318         }
319         info!(
320             "{}: Cleaning in source directory: {:?}",
321             self.entity.task().name_version(),
322             self.src_work_dir()
323         );
324 
325         let cmd = cmd.unwrap();
326         self.run_command(cmd)?;
327         return Ok(());
328     }
329 
330     /// 清理构建输出目录
331     fn clean_target(&self) -> Result<(), ExecutorError> {
332         info!(
333             "{}: Cleaning build target directory: {:?}",
334             self.entity.task().name_version(),
335             self.build_dir.path
336         );
337 
338         return self.build_dir.remove_self_recursive();
339     }
340 
341     /// 清理下载缓存
342     fn clean_cache(&self) -> Result<(), ExecutorError> {
343         let cache_dir = self.source_dir.as_ref();
344         if cache_dir.is_none() {
345             // 如果没有缓存目录,则认为用户不需要清理缓存
346             return Ok(());
347         }
348         info!(
349             "{}: Cleaning cache directory: {}",
350             self.entity.task().name_version(),
351             self.src_work_dir().display()
352         );
353         return cache_dir.unwrap().remove_self_recursive();
354     }
355 
356     /// 获取源文件的工作目录
357     fn src_work_dir(&self) -> PathBuf {
358         if let Some(local_path) = self.entity.task().source_path() {
359             return local_path;
360         }
361         return self.source_dir.as_ref().unwrap().path.clone();
362     }
363 
364     fn task_log(&self) -> TaskLog {
365         return self.task_data_dir.task_log();
366     }
367 
368     /// 为任务创建命令
369     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
370         // 获取命令
371         let raw_cmd = match self.entity.task().task_type {
372             TaskType::BuildFromSource(_) => match self.action {
373                 Action::Build => self.entity.task().build.build_command.clone(),
374                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
375                 _ => unimplemented!(
376                     "create_command: Action {:?} not supported yet.",
377                     self.action
378                 ),
379             },
380 
381             TaskType::InstallFromPrebuilt(_) => match self.action {
382                 Action::Build => self.entity.task().build.build_command.clone(),
383                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
384                 _ => unimplemented!(
385                     "create_command: Action {:?} not supported yet.",
386                     self.action
387                 ),
388             },
389         };
390 
391         if raw_cmd.is_none() {
392             return Ok(None);
393         }
394 
395         let raw_cmd = raw_cmd.unwrap();
396 
397         let mut command = Command::new("bash");
398         command.current_dir(self.src_work_dir());
399 
400         // 设置参数
401         command.arg("-c");
402         command.arg(raw_cmd);
403 
404         // 设置环境变量
405         let env_list = ENV_LIST.read().unwrap();
406         for (key, value) in env_list.envs.iter() {
407             // if key.starts_with("DADK") {
408             //     debug!("DADK env found: {}={}", key, value.value);
409             // }
410             command.env(key, value.value.clone());
411         }
412         drop(env_list);
413         for (key, value) in self.local_envs.envs.iter() {
414             debug!("Local env found: {}={}", key, value.value);
415             command.env(key, value.value.clone());
416         }
417 
418         return Ok(Some(command));
419     }
420 
421     /// # 准备工作线程本地环境变量
422     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
423         let binding = self.entity.task();
424         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
425 
426         if let Some(task_envs) = task_envs {
427             for tv in task_envs.iter() {
428                 self.local_envs
429                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
430             }
431         }
432 
433         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
434         self.local_envs.add(EnvVar::new(
435             "DADK_CURRENT_BUILD_DIR".to_string(),
436             self.build_dir.path.to_str().unwrap().to_string(),
437         ));
438 
439         return Ok(());
440     }
441 
442     fn prepare_input(&self) -> Result<(), ExecutorError> {
443         // 拉取源文件
444         let task = self.entity.task();
445         match &task.task_type {
446             TaskType::BuildFromSource(cs) => {
447                 if self.source_dir.is_none() {
448                     return Ok(());
449                 }
450                 let source_dir = self.source_dir.as_ref().unwrap();
451                 match cs {
452                     CodeSource::Git(git) => {
453                         git.prepare(source_dir)
454                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
455                     }
456                     // 本地源文件,不需要拉取
457                     CodeSource::Local(_) => return Ok(()),
458                     // 在线压缩包,需要下载
459                     CodeSource::Archive(archive) => {
460                         archive
461                             .download_unzip(source_dir)
462                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
463                     }
464                 }
465             }
466             TaskType::InstallFromPrebuilt(pb) => {
467                 match pb {
468                     // 本地源文件,不需要拉取
469                     PrebuiltSource::Local(local_source) => {
470                         let local_path = local_source.path();
471                         let target_path = &self.build_dir.path;
472                         FileUtils::copy_dir_all(&local_path, &target_path)
473                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
474                         return Ok(());
475                     }
476                     // 在线压缩包,需要下载
477                     PrebuiltSource::Archive(archive) => {
478                         archive
479                             .download_unzip(&self.build_dir)
480                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
481                     }
482                 }
483             }
484         }
485 
486         return Ok(());
487     }
488 
489     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
490         let mut child = command
491             .stdin(Stdio::inherit())
492             .spawn()
493             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
494 
495         // 等待子进程结束
496         let r = child
497             .wait()
498             .map_err(|e| ExecutorError::IoError(e.to_string()));
499         debug!("Command finished: {:?}", r);
500         if r.is_ok() {
501             let r = r.unwrap();
502             if r.success() {
503                 return Ok(());
504             } else {
505                 // 执行失败,获取最后100行stderr输出
506                 let errmsg = format!(
507                     "Task {} failed, exit code = {}",
508                     self.entity.task().name_version(),
509                     r.code().unwrap()
510                 );
511                 error!("{errmsg}");
512                 let command_opt = command.output();
513                 if command_opt.is_err() {
514                     return Err(ExecutorError::TaskFailed(
515                         "Failed to get command output".to_string(),
516                     ));
517                 }
518                 let command_opt = command_opt.unwrap();
519                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
520                 let mut last_100_outputs = command_output
521                     .lines()
522                     .rev()
523                     .take(100)
524                     .collect::<Vec<&str>>();
525                 last_100_outputs.reverse();
526                 error!("Last 100 lines msg of stderr:");
527                 for line in last_100_outputs {
528                     error!("{}", line);
529                 }
530                 return Err(ExecutorError::TaskFailed(errmsg));
531             }
532         } else {
533             let errmsg = format!(
534                 "Task {} failed, msg = {:?}",
535                 self.entity.task().name_version(),
536                 r.err().unwrap()
537             );
538             error!("{errmsg}");
539             return Err(ExecutorError::TaskFailed(errmsg));
540         }
541     }
542 }
543 
544 #[derive(Debug, Clone)]
545 pub struct EnvMap {
546     pub envs: BTreeMap<String, EnvVar>,
547 }
548 
549 impl EnvMap {
550     pub fn new() -> Self {
551         Self {
552             envs: BTreeMap::new(),
553         }
554     }
555 
556     pub fn add(&mut self, env: EnvVar) {
557         self.envs.insert(env.key.clone(), env);
558     }
559 
560     #[allow(dead_code)]
561     pub fn get(&self, key: &str) -> Option<&EnvVar> {
562         self.envs.get(key)
563     }
564 
565     pub fn add_vars(&mut self, vars: Vars) {
566         for (key, value) in vars {
567             self.add(EnvVar::new(key, value));
568         }
569     }
570 }
571 
572 /// # 环境变量
573 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
574 pub struct EnvVar {
575     pub key: String,
576     pub value: String,
577 }
578 
579 impl EnvVar {
580     pub fn new(key: String, value: String) -> Self {
581         Self { key, value }
582     }
583 }
584 
585 /// # 任务执行器错误枚举
586 #[allow(dead_code)]
587 #[derive(Debug, Clone)]
588 pub enum ExecutorError {
589     /// 准备执行环境错误
590     PrepareEnvError(String),
591     IoError(String),
592     /// 构建执行错误
593     TaskFailed(String),
594     /// 安装错误
595     InstallError(String),
596     /// 清理错误
597     CleanError(String),
598 }
599 
600 /// # 准备全局环境变量
601 pub fn prepare_env(
602     sched_entities: &SchedEntities,
603     execute_ctx: &Arc<DadkUserExecuteContext>,
604 ) -> Result<(), ExecutorError> {
605     info!("Preparing environment variables...");
606     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
607     // 写入全局环境变量列表
608     let mut global_env_list = ENV_LIST.write().unwrap();
609     *global_env_list = env_list;
610     return Ok(());
611 }
612 
613 /// # 创建全局环境变量列表
614 fn create_global_env_list(
615     sched_entities: &SchedEntities,
616     execute_ctx: &Arc<DadkUserExecuteContext>,
617 ) -> Result<EnvMap, ExecutorError> {
618     let mut env_list = EnvMap::new();
619     let envs: Vars = std::env::vars();
620     env_list.add_vars(envs);
621 
622     // 为每个任务创建特定的环境变量
623     for entity in sched_entities.entities().iter() {
624         // 导出任务的构建目录环境变量
625         let build_dir = CacheDir::build_dir(entity.clone())?;
626 
627         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
628         env_list.add(EnvVar::new(
629             build_dir_key,
630             build_dir.to_str().unwrap().to_string(),
631         ));
632 
633         // 如果需要源码缓存目录,则导出
634         if CacheDir::need_source_cache(entity) {
635             let source_dir = CacheDir::source_dir(entity.clone())?;
636             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
637             env_list.add(EnvVar::new(
638                 source_dir_key,
639                 source_dir.to_str().unwrap().to_string(),
640             ));
641         }
642     }
643 
644     // 创建ARCH环境变量
645     let target_arch = execute_ctx.target_arch();
646     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
647 
648     return Ok(env_list);
649 }
650 
651 /// # 获取文件最后的更新时间
652 ///
653 /// ## 参数
654 /// * `path` - 文件路径
655 /// * `last_modified` - 最后的更新时间
656 /// * `build_time` - 构建时间
657 fn last_modified_time(
658     path: &PathBuf,
659     build_time: &DateTime<Utc>,
660 ) -> Result<DateTime<Utc>, ExecutorError> {
661     let metadata = path
662         .metadata()
663         .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
664 
665     let last_modified = if metadata.is_dir() {
666         let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
667         for r in std::fs::read_dir(path).unwrap() {
668             if let Ok(entry) = r {
669                 // 忽略编译产物目录
670                 if entry.file_name() == "target" {
671                     continue;
672                 }
673 
674                 let metadata = entry.metadata().unwrap();
675                 if metadata.is_dir() {
676                     // 如果是子目录,则递归找改子目录下的文件最后的更新时间
677                     last_modified = std::cmp::max(
678                         last_modified,
679                         last_modified_time(&entry.path(), build_time)?,
680                     );
681                 } else {
682                     // 比较文件的修改时间和last_modified,取最大值
683                     last_modified = std::cmp::max(
684                         last_modified,
685                         DateTime::<Utc>::from(metadata.modified().unwrap()),
686                     );
687                 }
688 
689                 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归
690                 if last_modified > *build_time {
691                     return Ok(last_modified);
692                 }
693             }
694         }
695         last_modified
696     } else {
697         DateTime::<Utc>::from(metadata.modified().unwrap())
698     };
699 
700     return Ok(last_modified);
701 }
702