xref: /DADK/dadk-user/src/executor/mod.rs (revision afbf76c12fc99a80f89216143547fd6a91ae86f1)
1 use std::{
2     collections::{BTreeMap, VecDeque},
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7     time::SystemTime,
8 };
9 
10 use chrono::{DateTime, Utc};
11 use dadk_config::user::UserCleanLevel;
12 use log::{debug, error, info, warn};
13 
14 use crate::{
15     context::{Action, DadkUserExecuteContext},
16     executor::cache::CacheDir,
17     parser::{
18         task::{CodeSource, PrebuiltSource, TaskType},
19         task_log::{BuildStatus, InstallStatus, TaskLog},
20     },
21     scheduler::{SchedEntities, SchedEntity},
22     utils::{file::FileUtils, path::abs_path},
23 };
24 
25 use dadk_config::common::task::TaskEnv;
26 
27 use self::cache::{CacheDirType, TaskDataDir};
28 
29 pub mod cache;
30 pub mod source;
31 #[cfg(test)]
32 mod tests;
33 
34 lazy_static! {
35     // 全局环境变量的列表
36     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37 }
38 
39 #[derive(Debug, Clone)]
40 pub struct Executor {
41     entity: Arc<SchedEntity>,
42     action: Action,
43     local_envs: EnvMap,
44     /// 任务构建结果输出到的目录
45     build_dir: CacheDir,
46     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
47     source_dir: Option<CacheDir>,
48     /// 任务数据目录
49     task_data_dir: TaskDataDir,
50     /// DragonOS sysroot的路径
51     dragonos_sysroot: PathBuf,
52 }
53 
54 impl Executor {
55     /// # 创建执行器
56     ///
57     /// 用于执行一个任务
58     ///
59     /// ## 参数
60     ///
61     /// * `entity` - 任务调度实体
62     ///
63     /// ## 返回值
64     ///
65     /// * `Ok(Executor)` - 创建成功
66     /// * `Err(ExecutorError)` - 创建失败
67     pub fn new(
68         entity: Arc<SchedEntity>,
69         action: Action,
70         dragonos_sysroot: PathBuf,
71     ) -> Result<Self, ExecutorError> {
72         let local_envs = EnvMap::new();
73         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74         let task_data_dir = TaskDataDir::new(entity.clone())?;
75 
76         let source_dir = if CacheDir::need_source_cache(&entity) {
77             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78         } else {
79             None
80         };
81 
82         let result: Executor = Self {
83             action,
84             entity,
85             local_envs,
86             build_dir,
87             source_dir,
88             task_data_dir,
89             dragonos_sysroot,
90         };
91 
92         return Ok(result);
93     }
94 
95     /// # 执行任务
96     ///
97     /// 创建执行器后,调用此方法执行任务。
98     /// 该方法会执行以下步骤:
99     ///
100     /// 1. 创建工作线程
101     /// 2. 准备环境变量
102     /// 3. 拉取数据(可选)
103     /// 4. 执行构建
104     pub fn execute(&mut self) -> Result<(), ExecutorError> {
105         info!("Execute task: {}", self.entity.task().name_version());
106 
107         let r = self.do_execute();
108         self.save_task_data(r.clone());
109         info!("Task {} finished", self.entity.task().name_version());
110         return r;
111     }
112 
113     /// # 保存任务数据
114     fn save_task_data(&self, r: Result<(), ExecutorError>) {
115         let mut task_log = self.task_data_dir.task_log();
116         match self.action {
117             Action::Build => {
118                 if r.is_ok() {
119                     task_log.set_build_status(BuildStatus::Success);
120                 } else {
121                     task_log.set_build_status(BuildStatus::Failed);
122                 }
123 
124                 task_log.set_build_time_now();
125             }
126 
127             Action::Install => {
128                 if r.is_ok() {
129                     task_log.set_install_status(InstallStatus::Success);
130                 } else {
131                     task_log.set_install_status(InstallStatus::Failed);
132                 }
133                 task_log.set_install_time_now();
134             }
135 
136             Action::Clean(_) => {
137                 task_log.clean_build_status();
138                 task_log.clean_install_status();
139             }
140         }
141 
142         self.task_data_dir
143             .save_task_log(&task_log)
144             .expect("Failed to save task log");
145     }
146 
147     fn do_execute(&mut self) -> Result<(), ExecutorError> {
148         // 准备本地环境变量
149         self.prepare_local_env()?;
150 
151         match self.action {
152             Action::Build => {
153                 // 构建任务
154                 self.build()?;
155             }
156             Action::Install => {
157                 // 把构建结果安装到DragonOS
158                 self.install()?;
159             }
160             Action::Clean(_) => {
161                 // 清理构建结果
162                 let r = self.clean();
163                 if let Err(e) = r {
164                     error!(
165                         "Failed to clean task {}: {:?}",
166                         self.entity.task().name_version(),
167                         e
168                     );
169                 }
170             }
171         }
172 
173         return Ok(());
174     }
175 
176     fn build(&mut self) -> Result<(), ExecutorError> {
177         if let Some(status) = self.task_log().build_status() {
178             if let Some(build_time) = self.task_log().build_time() {
179                 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
180                 last_modified = core::cmp::max(
181                     last_modified,
182                     last_modified_time(&self.src_work_dir(), build_time)?,
183                 );
184 
185                 if *status == BuildStatus::Success
186                     && (self.entity.task().build_once || last_modified < *build_time)
187                 {
188                     info!(
189                         "Task {} has been built successfully, skip build.",
190                         self.entity.task().name_version()
191                     );
192                     return Ok(());
193                 }
194             }
195         }
196 
197         return self.do_build();
198     }
199 
200     /// # 执行build操作
201     fn do_build(&mut self) -> Result<(), ExecutorError> {
202         // 确认源文件就绪
203         self.prepare_input()?;
204 
205         let command: Option<Command> = self.create_command()?;
206         if let Some(cmd) = command {
207             self.run_command(cmd)?;
208         }
209 
210         // 检查构建结果,如果为空,则抛出警告
211         if self.build_dir.is_empty()? {
212             warn!(
213                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
214                 self.entity.task().name_version(),
215             );
216         }
217         return Ok(());
218     }
219 
220     fn install(&self) -> Result<(), ExecutorError> {
221         log::trace!("dadk-user: install {}", self.entity.task().name_version());
222         if let Some(status) = self.task_log().install_status() {
223             if let Some(install_time) = self.task_log().install_time() {
224                 let last_modified = last_modified_time(&self.build_dir.path, install_time)?;
225                 let last_modified = core::cmp::max(
226                     last_modified,
227                     last_modified_time(&self.entity.file_path(), install_time)?,
228                 );
229 
230                 if *status == InstallStatus::Success
231                     && (self.entity.task().install_once || last_modified < *install_time)
232                 {
233                     info!(
234                         "install: Task {} not changed.",
235                         self.entity.task().name_version()
236                     );
237                     return Ok(());
238                 }
239             }
240         }
241         log::trace!(
242             "dadk-user: to do install {}",
243             self.entity.task().name_version()
244         );
245         return self.do_install();
246     }
247 
248     /// # 执行安装操作,把构建结果安装到DragonOS
249     fn do_install(&self) -> Result<(), ExecutorError> {
250         let binding = self.entity.task();
251         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
252         // 如果没有指定安装路径,则不执行安装
253         if in_dragonos_path.is_none() {
254             return Ok(());
255         }
256         info!("Installing task: {}", self.entity.task().name_version());
257         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
258 
259         debug!("in_dragonos_path: {}", in_dragonos_path);
260         // 去除开头的斜杠
261         {
262             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
263             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
264         }
265         // 拼接最终的安装路径
266         let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
267         debug!("install_path: {:?}", install_path);
268         // 创建安装路径
269         std::fs::create_dir_all(&install_path).map_err(|e| {
270             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
271         })?;
272 
273         // 拷贝构建结果到安装路径
274         let build_dir: PathBuf = self.build_dir.path.clone();
275         FileUtils::copy_dir_all(&build_dir, &install_path)
276             .map_err(|e| ExecutorError::InstallError(e))?;
277         info!("Task {} installed.", self.entity.task().name_version());
278 
279         return Ok(());
280     }
281 
282     fn clean(&self) -> Result<(), ExecutorError> {
283         let level = if let Action::Clean(l) = self.action {
284             l
285         } else {
286             panic!(
287                 "BUG: clean() called with non-clean action. executor details: {:?}",
288                 self
289             );
290         };
291         info!(
292             "Cleaning task: {}, level={level:?}",
293             self.entity.task().name_version()
294         );
295 
296         let r: Result<(), ExecutorError> = match level {
297             UserCleanLevel::All => self.clean_all(),
298             UserCleanLevel::InSrc => self.clean_src(),
299             UserCleanLevel::Output => {
300                 self.clean_target()?;
301                 self.clean_cache()
302             }
303         };
304 
305         if let Err(e) = r {
306             error!(
307                 "Failed to clean task: {}, error message: {:?}",
308                 self.entity.task().name_version(),
309                 e
310             );
311             return Err(e);
312         }
313 
314         return Ok(());
315     }
316 
317     fn clean_all(&self) -> Result<(), ExecutorError> {
318         // 在源文件目录执行清理
319         self.clean_src()?;
320         // 清理构建结果
321         self.clean_target()?;
322         // 清理缓存
323         self.clean_cache()?;
324         return Ok(());
325     }
326 
327     /// 在源文件目录执行清理
328     fn clean_src(&self) -> Result<(), ExecutorError> {
329         let cmd: Option<Command> = self.create_command()?;
330         if cmd.is_none() {
331             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
332             return Ok(());
333         }
334         info!(
335             "{}: Cleaning in source directory: {:?}",
336             self.entity.task().name_version(),
337             self.src_work_dir()
338         );
339 
340         let cmd = cmd.unwrap();
341         self.run_command(cmd)?;
342         return Ok(());
343     }
344 
345     /// 清理构建输出目录
346     fn clean_target(&self) -> Result<(), ExecutorError> {
347         info!(
348             "{}: Cleaning build target directory: {:?}",
349             self.entity.task().name_version(),
350             self.build_dir.path
351         );
352 
353         return self.build_dir.remove_self_recursive();
354     }
355 
356     /// 清理下载缓存
357     fn clean_cache(&self) -> Result<(), ExecutorError> {
358         let cache_dir = self.source_dir.as_ref();
359         if cache_dir.is_none() {
360             // 如果没有缓存目录,则认为用户不需要清理缓存
361             return Ok(());
362         }
363         info!(
364             "{}: Cleaning cache directory: {}",
365             self.entity.task().name_version(),
366             self.src_work_dir().display()
367         );
368         return cache_dir.unwrap().remove_self_recursive();
369     }
370 
371     /// 获取源文件的工作目录
372     fn src_work_dir(&self) -> PathBuf {
373         if let Some(local_path) = self.entity.task().source_path() {
374             return local_path;
375         }
376         return self.source_dir.as_ref().unwrap().path.clone();
377     }
378 
379     fn task_log(&self) -> TaskLog {
380         return self.task_data_dir.task_log();
381     }
382 
383     /// 为任务创建命令
384     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
385         // 获取命令
386         let raw_cmd = match self.entity.task().task_type {
387             TaskType::BuildFromSource(_) => match self.action {
388                 Action::Build => self.entity.task().build.build_command.clone(),
389                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
390                 _ => unimplemented!(
391                     "create_command: Action {:?} not supported yet.",
392                     self.action
393                 ),
394             },
395 
396             TaskType::InstallFromPrebuilt(_) => match self.action {
397                 Action::Build => self.entity.task().build.build_command.clone(),
398                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
399                 _ => unimplemented!(
400                     "create_command: Action {:?} not supported yet.",
401                     self.action
402                 ),
403             },
404         };
405 
406         if raw_cmd.is_none() {
407             return Ok(None);
408         }
409 
410         let raw_cmd = raw_cmd.unwrap();
411 
412         let mut command = Command::new("bash");
413         command.current_dir(self.src_work_dir());
414 
415         // 设置参数
416         command.arg("-c");
417         command.arg(raw_cmd);
418 
419         // 设置环境变量
420         let env_list = ENV_LIST.read().unwrap();
421         for (key, value) in env_list.envs.iter() {
422             // if key.starts_with("DADK") {
423             //     debug!("DADK env found: {}={}", key, value.value);
424             // }
425             command.env(key, value.value.clone());
426         }
427         drop(env_list);
428         for (key, value) in self.local_envs.envs.iter() {
429             debug!("Local env found: {}={}", key, value.value);
430             command.env(key, value.value.clone());
431         }
432 
433         return Ok(Some(command));
434     }
435 
436     /// # 准备工作线程本地环境变量
437     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
438         let binding = self.entity.task();
439         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
440 
441         if let Some(task_envs) = task_envs {
442             for tv in task_envs.iter() {
443                 self.local_envs
444                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
445             }
446         }
447 
448         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
449         self.local_envs.add(EnvVar::new(
450             "DADK_CURRENT_BUILD_DIR".to_string(),
451             self.build_dir.path.to_str().unwrap().to_string(),
452         ));
453 
454         return Ok(());
455     }
456 
457     fn prepare_input(&self) -> Result<(), ExecutorError> {
458         // 拉取源文件
459         let task = self.entity.task();
460         match &task.task_type {
461             TaskType::BuildFromSource(cs) => {
462                 if self.source_dir.is_none() {
463                     return Ok(());
464                 }
465                 let source_dir = self.source_dir.as_ref().unwrap();
466                 match cs {
467                     CodeSource::Git(git) => {
468                         git.prepare(source_dir)
469                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
470                     }
471                     // 本地源文件,不需要拉取
472                     CodeSource::Local(_) => return Ok(()),
473                     // 在线压缩包,需要下载
474                     CodeSource::Archive(archive) => {
475                         archive
476                             .download_unzip(source_dir)
477                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
478                     }
479                 }
480             }
481             TaskType::InstallFromPrebuilt(pb) => {
482                 match pb {
483                     // 本地源文件,不需要拉取
484                     PrebuiltSource::Local(local_source) => {
485                         let local_path = local_source.path();
486                         let target_path = &self.build_dir.path;
487                         FileUtils::copy_dir_all(&local_path, &target_path)
488                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
489                         return Ok(());
490                     }
491                     // 在线压缩包,需要下载
492                     PrebuiltSource::Archive(archive) => {
493                         archive
494                             .download_unzip(&self.build_dir)
495                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
496                     }
497                 }
498             }
499         }
500 
501         return Ok(());
502     }
503 
504     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
505         let mut child = command
506             .stdin(Stdio::inherit())
507             .spawn()
508             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
509 
510         // 等待子进程结束
511         let r = child
512             .wait()
513             .map_err(|e| ExecutorError::IoError(e.to_string()));
514         debug!("Command finished: {:?}", r);
515         if r.is_ok() {
516             let r = r.unwrap();
517             if r.success() {
518                 return Ok(());
519             } else {
520                 // 执行失败,获取最后100行stderr输出
521                 let errmsg = format!(
522                     "Task {} failed, exit code = {}",
523                     self.entity.task().name_version(),
524                     r.code().unwrap()
525                 );
526                 error!("{errmsg}");
527                 let command_opt = command.output();
528                 if command_opt.is_err() {
529                     return Err(ExecutorError::TaskFailed(
530                         "Failed to get command output".to_string(),
531                     ));
532                 }
533                 let command_opt = command_opt.unwrap();
534                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
535                 let mut last_100_outputs = command_output
536                     .lines()
537                     .rev()
538                     .take(100)
539                     .collect::<Vec<&str>>();
540                 last_100_outputs.reverse();
541                 error!("Last 100 lines msg of stderr:");
542                 for line in last_100_outputs {
543                     error!("{}", line);
544                 }
545                 return Err(ExecutorError::TaskFailed(errmsg));
546             }
547         } else {
548             let errmsg = format!(
549                 "Task {} failed, msg = {:?}",
550                 self.entity.task().name_version(),
551                 r.err().unwrap()
552             );
553             error!("{errmsg}");
554             return Err(ExecutorError::TaskFailed(errmsg));
555         }
556     }
557 }
558 
559 #[derive(Debug, Clone)]
560 pub struct EnvMap {
561     pub envs: BTreeMap<String, EnvVar>,
562 }
563 
564 impl EnvMap {
565     pub fn new() -> Self {
566         Self {
567             envs: BTreeMap::new(),
568         }
569     }
570 
571     pub fn add(&mut self, env: EnvVar) {
572         self.envs.insert(env.key.clone(), env);
573     }
574 
575     #[allow(dead_code)]
576     pub fn get(&self, key: &str) -> Option<&EnvVar> {
577         self.envs.get(key)
578     }
579 
580     pub fn add_vars(&mut self, vars: Vars) {
581         for (key, value) in vars {
582             self.add(EnvVar::new(key, value));
583         }
584     }
585 }
586 
587 /// # 环境变量
588 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
589 pub struct EnvVar {
590     pub key: String,
591     pub value: String,
592 }
593 
594 impl EnvVar {
595     pub fn new(key: String, value: String) -> Self {
596         Self { key, value }
597     }
598 }
599 
600 /// # 任务执行器错误枚举
601 #[allow(dead_code)]
602 #[derive(Debug, Clone)]
603 pub enum ExecutorError {
604     /// 准备执行环境错误
605     PrepareEnvError(String),
606     IoError(String),
607     /// 构建执行错误
608     TaskFailed(String),
609     /// 安装错误
610     InstallError(String),
611     /// 清理错误
612     CleanError(String),
613 }
614 
615 /// # 准备全局环境变量
616 pub fn prepare_env(
617     sched_entities: &SchedEntities,
618     execute_ctx: &Arc<DadkUserExecuteContext>,
619 ) -> Result<(), ExecutorError> {
620     info!("Preparing environment variables...");
621     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
622     // 写入全局环境变量列表
623     let mut global_env_list = ENV_LIST.write().unwrap();
624     *global_env_list = env_list;
625     return Ok(());
626 }
627 
628 /// # 创建全局环境变量列表
629 fn create_global_env_list(
630     sched_entities: &SchedEntities,
631     execute_ctx: &Arc<DadkUserExecuteContext>,
632 ) -> Result<EnvMap, ExecutorError> {
633     let mut env_list = EnvMap::new();
634     let envs: Vars = std::env::vars();
635     env_list.add_vars(envs);
636 
637     // 为每个任务创建特定的环境变量
638     for entity in sched_entities.entities().iter() {
639         // 导出任务的构建目录环境变量
640         let build_dir = CacheDir::build_dir(entity.clone())?;
641 
642         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
643         env_list.add(EnvVar::new(
644             build_dir_key,
645             build_dir.to_str().unwrap().to_string(),
646         ));
647 
648         // 如果需要源码缓存目录,则导出
649         if CacheDir::need_source_cache(entity) {
650             let source_dir = CacheDir::source_dir(entity.clone())?;
651             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
652             env_list.add(EnvVar::new(
653                 source_dir_key,
654                 source_dir.to_str().unwrap().to_string(),
655             ));
656         }
657     }
658 
659     // 创建ARCH环境变量
660     let target_arch = execute_ctx.target_arch();
661     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
662 
663     return Ok(env_list);
664 }
665 
666 /// # 获取文件最后的更新时间
667 ///
668 /// ## 参数
669 /// * `path` - 文件路径
670 /// * `last_modified` - 最后的更新时间
671 /// * `build_time` - 构建时间
672 fn last_modified_time(
673     path: &PathBuf,
674     build_time: &DateTime<Utc>,
675 ) -> Result<DateTime<Utc>, ExecutorError> {
676     let mut queue = VecDeque::new();
677     queue.push_back(path.clone());
678 
679     let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
680 
681     while let Some(current_path) = queue.pop_front() {
682         let metadata = current_path
683             .metadata()
684             .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
685 
686         if metadata.is_dir() {
687             for r in std::fs::read_dir(&current_path).unwrap() {
688                 if let Ok(entry) = r {
689                     // 忽略编译产物目录
690                     if entry.file_name() == "target" {
691                         continue;
692                     }
693 
694                     let entry_path = entry.path();
695                     let entry_metadata = entry.metadata().unwrap();
696                     // 比较文件的修改时间和last_modified,取最大值
697                     let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap());
698                     last_modified = std::cmp::max(last_modified, file_modified);
699 
700                     // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续搜索
701                     if last_modified > *build_time {
702                         return Ok(last_modified);
703                     }
704 
705                     if entry_metadata.is_dir() {
706                         // 如果是子目录,则将其加入队列
707                         queue.push_back(entry_path);
708                     }
709                 }
710             }
711         } else {
712             // 如果是文件,直接比较修改时间
713             let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap());
714             last_modified = std::cmp::max(last_modified, file_modified);
715 
716             // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归
717             if last_modified > *build_time {
718                 return Ok(last_modified);
719             }
720         }
721     }
722 
723     if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) {
724         return Err(ExecutorError::InstallError(format!(
725             "Failed to get last modified time for path: {}",
726             path.display()
727         )));
728     }
729     Ok(last_modified)
730 }
731