xref: /DADK/dadk-user/src/executor/mod.rs (revision f60cc4eb0593781f9412d78e3277baddfff311bb)
1 use std::{
2     collections::{BTreeMap, VecDeque},
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7     time::SystemTime,
8 };
9 
10 use chrono::{DateTime, Utc};
11 use dadk_config::user::UserCleanLevel;
12 use log::{debug, error, info, warn};
13 
14 use crate::{
15     context::{Action, DadkUserExecuteContext},
16     executor::cache::CacheDir,
17     parser::{
18         task::{CodeSource, PrebuiltSource, TaskType},
19         task_log::{BuildStatus, InstallStatus, TaskLog},
20     },
21     scheduler::{SchedEntities, SchedEntity},
22     utils::{file::FileUtils, path::abs_path},
23 };
24 
25 use dadk_config::common::task::TaskEnv;
26 
27 use self::cache::{CacheDirType, TaskDataDir};
28 
29 pub mod cache;
30 pub mod source;
31 #[cfg(test)]
32 mod tests;
33 
34 lazy_static! {
35     // 全局环境变量的列表
36     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37 }
38 
39 #[derive(Debug, Clone)]
40 pub struct Executor {
41     entity: Arc<SchedEntity>,
42     action: Action,
43     local_envs: EnvMap,
44     /// 任务构建结果输出到的目录
45     build_dir: CacheDir,
46     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
47     source_dir: Option<CacheDir>,
48     /// 任务数据目录
49     task_data_dir: TaskDataDir,
50     /// DragonOS sysroot的路径
51     dragonos_sysroot: PathBuf,
52 }
53 
54 impl Executor {
55     /// # 创建执行器
56     ///
57     /// 用于执行一个任务
58     ///
59     /// ## 参数
60     ///
61     /// * `entity` - 任务调度实体
62     ///
63     /// ## 返回值
64     ///
65     /// * `Ok(Executor)` - 创建成功
66     /// * `Err(ExecutorError)` - 创建失败
67     pub fn new(
68         entity: Arc<SchedEntity>,
69         action: Action,
70         dragonos_sysroot: PathBuf,
71     ) -> Result<Self, ExecutorError> {
72         let local_envs = EnvMap::new();
73         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74         let task_data_dir = TaskDataDir::new(entity.clone())?;
75 
76         let source_dir = if CacheDir::need_source_cache(&entity) {
77             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78         } else {
79             None
80         };
81 
82         let result: Executor = Self {
83             action,
84             entity,
85             local_envs,
86             build_dir,
87             source_dir,
88             task_data_dir,
89             dragonos_sysroot,
90         };
91 
92         return Ok(result);
93     }
94 
95     /// # 执行任务
96     ///
97     /// 创建执行器后,调用此方法执行任务。
98     /// 该方法会执行以下步骤:
99     ///
100     /// 1. 创建工作线程
101     /// 2. 准备环境变量
102     /// 3. 拉取数据(可选)
103     /// 4. 执行构建
104     pub fn execute(&mut self) -> Result<(), ExecutorError> {
105         info!("Execute task: {}", self.entity.task().name_version());
106 
107         let r = self.do_execute();
108         self.save_task_data(r.clone());
109         info!("Task {} finished", self.entity.task().name_version());
110         return r;
111     }
112 
113     /// # 保存任务数据
114     fn save_task_data(&self, r: Result<(), ExecutorError>) {
115         let mut task_log = self.task_data_dir.task_log();
116         match self.action {
117             Action::Build => {
118                 if r.is_ok() {
119                     task_log.set_build_status(BuildStatus::Success);
120                 } else {
121                     task_log.set_build_status(BuildStatus::Failed);
122                 }
123 
124                 task_log.set_build_time_now();
125             }
126 
127             Action::Install => {
128                 if r.is_ok() {
129                     task_log.set_install_status(InstallStatus::Success);
130                 } else {
131                     task_log.set_install_status(InstallStatus::Failed);
132                 }
133                 task_log.set_install_time_now();
134             }
135 
136             Action::Clean(_) => {
137                 task_log.clean_build_status();
138                 task_log.clean_install_status();
139             }
140         }
141 
142         self.task_data_dir
143             .save_task_log(&task_log)
144             .expect("Failed to save task log");
145     }
146 
147     fn do_execute(&mut self) -> Result<(), ExecutorError> {
148         // 准备本地环境变量
149         self.prepare_local_env()?;
150 
151         match self.action {
152             Action::Build => {
153                 // 构建任务
154                 self.build()?;
155             }
156             Action::Install => {
157                 // 把构建结果安装到DragonOS
158                 self.install()?;
159             }
160             Action::Clean(_) => {
161                 // 清理构建结果
162                 let r = self.clean();
163                 if let Err(e) = r {
164                     error!(
165                         "Failed to clean task {}: {:?}",
166                         self.entity.task().name_version(),
167                         e
168                     );
169                 }
170             }
171         }
172 
173         return Ok(());
174     }
175 
176     fn build(&mut self) -> Result<(), ExecutorError> {
177         if let Some(status) = self.task_log().build_status() {
178             if let Some(build_time) = self.task_log().build_time() {
179                 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
180                 last_modified = core::cmp::max(
181                     last_modified,
182                     last_modified_time(&self.src_work_dir(), build_time)?,
183                 );
184 
185                 if *status == BuildStatus::Success
186                     && (self.entity.task().build_once || last_modified < *build_time)
187                 {
188                     info!(
189                         "Task {} has been built successfully, skip build.",
190                         self.entity.task().name_version()
191                     );
192                     return Ok(());
193                 }
194             }
195         }
196 
197         return self.do_build();
198     }
199 
200     /// # 执行build操作
201     fn do_build(&mut self) -> Result<(), ExecutorError> {
202         // 确认源文件就绪
203         self.prepare_input()?;
204 
205         let command: Option<Command> = self.create_command()?;
206         if let Some(cmd) = command {
207             self.run_command(cmd)?;
208         }
209 
210         // 检查构建结果,如果为空,则抛出警告
211         if self.build_dir.is_empty()? {
212             warn!(
213                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
214                 self.entity.task().name_version(),
215             );
216         }
217         return Ok(());
218     }
219 
220     fn install(&self) -> Result<(), ExecutorError> {
221         log::trace!("dadk-user: install {}", self.entity.task().name_version());
222         if let Some(status) = self.task_log().install_status() {
223             if let Some(install_time) = self.task_log().install_time() {
224                 let last_modified = last_modified_time(&self.build_dir.path, install_time)?;
225                 let last_modified = core::cmp::max(
226                     last_modified,
227                     last_modified_time(&self.entity.file_path(), install_time)?,
228                 );
229 
230                 if *status == InstallStatus::Success
231                     && (self.entity.task().install_once || last_modified < *install_time)
232                 {
233                     info!(
234                         "install: Task {} not changed.",
235                         self.entity.task().name_version()
236                     );
237                     return Ok(());
238                 }
239             }
240         }
241         log::trace!("dadk-user: to do install {}", self.entity.task().name_version());
242         return self.do_install();
243     }
244 
245     /// # 执行安装操作,把构建结果安装到DragonOS
246     fn do_install(&self) -> Result<(), ExecutorError> {
247         let binding = self.entity.task();
248         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
249         // 如果没有指定安装路径,则不执行安装
250         if in_dragonos_path.is_none() {
251             return Ok(());
252         }
253         info!("Installing task: {}", self.entity.task().name_version());
254         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
255 
256         debug!("in_dragonos_path: {}", in_dragonos_path);
257         // 去除开头的斜杠
258         {
259             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
260             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
261         }
262         // 拼接最终的安装路径
263         let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
264         debug!("install_path: {:?}", install_path);
265         // 创建安装路径
266         std::fs::create_dir_all(&install_path).map_err(|e| {
267             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
268         })?;
269 
270         // 拷贝构建结果到安装路径
271         let build_dir: PathBuf = self.build_dir.path.clone();
272         FileUtils::copy_dir_all(&build_dir, &install_path)
273             .map_err(|e| ExecutorError::InstallError(e))?;
274         info!("Task {} installed.", self.entity.task().name_version());
275 
276         return Ok(());
277     }
278 
279     fn clean(&self) -> Result<(), ExecutorError> {
280         let level = if let Action::Clean(l) = self.action {
281             l
282         } else {
283             panic!(
284                 "BUG: clean() called with non-clean action. executor details: {:?}",
285                 self
286             );
287         };
288         info!(
289             "Cleaning task: {}, level={level:?}",
290             self.entity.task().name_version()
291         );
292 
293         let r: Result<(), ExecutorError> = match level {
294             UserCleanLevel::All => self.clean_all(),
295             UserCleanLevel::InSrc => self.clean_src(),
296             UserCleanLevel::Output => {
297                 self.clean_target()?;
298                 self.clean_cache()
299             }
300         };
301 
302         if let Err(e) = r {
303             error!(
304                 "Failed to clean task: {}, error message: {:?}",
305                 self.entity.task().name_version(),
306                 e
307             );
308             return Err(e);
309         }
310 
311         return Ok(());
312     }
313 
314     fn clean_all(&self) -> Result<(), ExecutorError> {
315         // 在源文件目录执行清理
316         self.clean_src()?;
317         // 清理构建结果
318         self.clean_target()?;
319         // 清理缓存
320         self.clean_cache()?;
321         return Ok(());
322     }
323 
324     /// 在源文件目录执行清理
325     fn clean_src(&self) -> Result<(), ExecutorError> {
326         let cmd: Option<Command> = self.create_command()?;
327         if cmd.is_none() {
328             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
329             return Ok(());
330         }
331         info!(
332             "{}: Cleaning in source directory: {:?}",
333             self.entity.task().name_version(),
334             self.src_work_dir()
335         );
336 
337         let cmd = cmd.unwrap();
338         self.run_command(cmd)?;
339         return Ok(());
340     }
341 
342     /// 清理构建输出目录
343     fn clean_target(&self) -> Result<(), ExecutorError> {
344         info!(
345             "{}: Cleaning build target directory: {:?}",
346             self.entity.task().name_version(),
347             self.build_dir.path
348         );
349 
350         return self.build_dir.remove_self_recursive();
351     }
352 
353     /// 清理下载缓存
354     fn clean_cache(&self) -> Result<(), ExecutorError> {
355         let cache_dir = self.source_dir.as_ref();
356         if cache_dir.is_none() {
357             // 如果没有缓存目录,则认为用户不需要清理缓存
358             return Ok(());
359         }
360         info!(
361             "{}: Cleaning cache directory: {}",
362             self.entity.task().name_version(),
363             self.src_work_dir().display()
364         );
365         return cache_dir.unwrap().remove_self_recursive();
366     }
367 
368     /// 获取源文件的工作目录
369     fn src_work_dir(&self) -> PathBuf {
370         if let Some(local_path) = self.entity.task().source_path() {
371             return local_path;
372         }
373         return self.source_dir.as_ref().unwrap().path.clone();
374     }
375 
376     fn task_log(&self) -> TaskLog {
377         return self.task_data_dir.task_log();
378     }
379 
380     /// 为任务创建命令
381     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
382         // 获取命令
383         let raw_cmd = match self.entity.task().task_type {
384             TaskType::BuildFromSource(_) => match self.action {
385                 Action::Build => self.entity.task().build.build_command.clone(),
386                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
387                 _ => unimplemented!(
388                     "create_command: Action {:?} not supported yet.",
389                     self.action
390                 ),
391             },
392 
393             TaskType::InstallFromPrebuilt(_) => match self.action {
394                 Action::Build => self.entity.task().build.build_command.clone(),
395                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
396                 _ => unimplemented!(
397                     "create_command: Action {:?} not supported yet.",
398                     self.action
399                 ),
400             },
401         };
402 
403         if raw_cmd.is_none() {
404             return Ok(None);
405         }
406 
407         let raw_cmd = raw_cmd.unwrap();
408 
409         let mut command = Command::new("bash");
410         command.current_dir(self.src_work_dir());
411 
412         // 设置参数
413         command.arg("-c");
414         command.arg(raw_cmd);
415 
416         // 设置环境变量
417         let env_list = ENV_LIST.read().unwrap();
418         for (key, value) in env_list.envs.iter() {
419             // if key.starts_with("DADK") {
420             //     debug!("DADK env found: {}={}", key, value.value);
421             // }
422             command.env(key, value.value.clone());
423         }
424         drop(env_list);
425         for (key, value) in self.local_envs.envs.iter() {
426             debug!("Local env found: {}={}", key, value.value);
427             command.env(key, value.value.clone());
428         }
429 
430         return Ok(Some(command));
431     }
432 
433     /// # 准备工作线程本地环境变量
434     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
435         let binding = self.entity.task();
436         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
437 
438         if let Some(task_envs) = task_envs {
439             for tv in task_envs.iter() {
440                 self.local_envs
441                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
442             }
443         }
444 
445         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
446         self.local_envs.add(EnvVar::new(
447             "DADK_CURRENT_BUILD_DIR".to_string(),
448             self.build_dir.path.to_str().unwrap().to_string(),
449         ));
450 
451         return Ok(());
452     }
453 
454     fn prepare_input(&self) -> Result<(), ExecutorError> {
455         // 拉取源文件
456         let task = self.entity.task();
457         match &task.task_type {
458             TaskType::BuildFromSource(cs) => {
459                 if self.source_dir.is_none() {
460                     return Ok(());
461                 }
462                 let source_dir = self.source_dir.as_ref().unwrap();
463                 match cs {
464                     CodeSource::Git(git) => {
465                         git.prepare(source_dir)
466                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
467                     }
468                     // 本地源文件,不需要拉取
469                     CodeSource::Local(_) => return Ok(()),
470                     // 在线压缩包,需要下载
471                     CodeSource::Archive(archive) => {
472                         archive
473                             .download_unzip(source_dir)
474                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
475                     }
476                 }
477             }
478             TaskType::InstallFromPrebuilt(pb) => {
479                 match pb {
480                     // 本地源文件,不需要拉取
481                     PrebuiltSource::Local(local_source) => {
482                         let local_path = local_source.path();
483                         let target_path = &self.build_dir.path;
484                         FileUtils::copy_dir_all(&local_path, &target_path)
485                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
486                         return Ok(());
487                     }
488                     // 在线压缩包,需要下载
489                     PrebuiltSource::Archive(archive) => {
490                         archive
491                             .download_unzip(&self.build_dir)
492                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
493                     }
494                 }
495             }
496         }
497 
498         return Ok(());
499     }
500 
501     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
502         let mut child = command
503             .stdin(Stdio::inherit())
504             .spawn()
505             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
506 
507         // 等待子进程结束
508         let r = child
509             .wait()
510             .map_err(|e| ExecutorError::IoError(e.to_string()));
511         debug!("Command finished: {:?}", r);
512         if r.is_ok() {
513             let r = r.unwrap();
514             if r.success() {
515                 return Ok(());
516             } else {
517                 // 执行失败,获取最后100行stderr输出
518                 let errmsg = format!(
519                     "Task {} failed, exit code = {}",
520                     self.entity.task().name_version(),
521                     r.code().unwrap()
522                 );
523                 error!("{errmsg}");
524                 let command_opt = command.output();
525                 if command_opt.is_err() {
526                     return Err(ExecutorError::TaskFailed(
527                         "Failed to get command output".to_string(),
528                     ));
529                 }
530                 let command_opt = command_opt.unwrap();
531                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
532                 let mut last_100_outputs = command_output
533                     .lines()
534                     .rev()
535                     .take(100)
536                     .collect::<Vec<&str>>();
537                 last_100_outputs.reverse();
538                 error!("Last 100 lines msg of stderr:");
539                 for line in last_100_outputs {
540                     error!("{}", line);
541                 }
542                 return Err(ExecutorError::TaskFailed(errmsg));
543             }
544         } else {
545             let errmsg = format!(
546                 "Task {} failed, msg = {:?}",
547                 self.entity.task().name_version(),
548                 r.err().unwrap()
549             );
550             error!("{errmsg}");
551             return Err(ExecutorError::TaskFailed(errmsg));
552         }
553     }
554 }
555 
556 #[derive(Debug, Clone)]
557 pub struct EnvMap {
558     pub envs: BTreeMap<String, EnvVar>,
559 }
560 
561 impl EnvMap {
562     pub fn new() -> Self {
563         Self {
564             envs: BTreeMap::new(),
565         }
566     }
567 
568     pub fn add(&mut self, env: EnvVar) {
569         self.envs.insert(env.key.clone(), env);
570     }
571 
572     #[allow(dead_code)]
573     pub fn get(&self, key: &str) -> Option<&EnvVar> {
574         self.envs.get(key)
575     }
576 
577     pub fn add_vars(&mut self, vars: Vars) {
578         for (key, value) in vars {
579             self.add(EnvVar::new(key, value));
580         }
581     }
582 }
583 
584 /// # 环境变量
585 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
586 pub struct EnvVar {
587     pub key: String,
588     pub value: String,
589 }
590 
591 impl EnvVar {
592     pub fn new(key: String, value: String) -> Self {
593         Self { key, value }
594     }
595 }
596 
597 /// # 任务执行器错误枚举
598 #[allow(dead_code)]
599 #[derive(Debug, Clone)]
600 pub enum ExecutorError {
601     /// 准备执行环境错误
602     PrepareEnvError(String),
603     IoError(String),
604     /// 构建执行错误
605     TaskFailed(String),
606     /// 安装错误
607     InstallError(String),
608     /// 清理错误
609     CleanError(String),
610 }
611 
612 /// # 准备全局环境变量
613 pub fn prepare_env(
614     sched_entities: &SchedEntities,
615     execute_ctx: &Arc<DadkUserExecuteContext>,
616 ) -> Result<(), ExecutorError> {
617     info!("Preparing environment variables...");
618     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
619     // 写入全局环境变量列表
620     let mut global_env_list = ENV_LIST.write().unwrap();
621     *global_env_list = env_list;
622     return Ok(());
623 }
624 
625 /// # 创建全局环境变量列表
626 fn create_global_env_list(
627     sched_entities: &SchedEntities,
628     execute_ctx: &Arc<DadkUserExecuteContext>,
629 ) -> Result<EnvMap, ExecutorError> {
630     let mut env_list = EnvMap::new();
631     let envs: Vars = std::env::vars();
632     env_list.add_vars(envs);
633 
634     // 为每个任务创建特定的环境变量
635     for entity in sched_entities.entities().iter() {
636         // 导出任务的构建目录环境变量
637         let build_dir = CacheDir::build_dir(entity.clone())?;
638 
639         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
640         env_list.add(EnvVar::new(
641             build_dir_key,
642             build_dir.to_str().unwrap().to_string(),
643         ));
644 
645         // 如果需要源码缓存目录,则导出
646         if CacheDir::need_source_cache(entity) {
647             let source_dir = CacheDir::source_dir(entity.clone())?;
648             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
649             env_list.add(EnvVar::new(
650                 source_dir_key,
651                 source_dir.to_str().unwrap().to_string(),
652             ));
653         }
654     }
655 
656     // 创建ARCH环境变量
657     let target_arch = execute_ctx.target_arch();
658     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
659 
660     return Ok(env_list);
661 }
662 
663 /// # 获取文件最后的更新时间
664 ///
665 /// ## 参数
666 /// * `path` - 文件路径
667 /// * `last_modified` - 最后的更新时间
668 /// * `build_time` - 构建时间
669 fn last_modified_time(
670     path: &PathBuf,
671     build_time: &DateTime<Utc>,
672 ) -> Result<DateTime<Utc>, ExecutorError> {
673     let mut queue = VecDeque::new();
674     queue.push_back(path.clone());
675 
676     let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
677 
678     while let Some(current_path) = queue.pop_front() {
679         let metadata = current_path
680             .metadata()
681             .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
682 
683         if metadata.is_dir() {
684             for r in std::fs::read_dir(&current_path).unwrap() {
685                 if let Ok(entry) = r {
686                     // 忽略编译产物目录
687                     if entry.file_name() == "target" {
688                         continue;
689                     }
690 
691                     let entry_path = entry.path();
692                     let entry_metadata = entry.metadata().unwrap();
693                     // 比较文件的修改时间和last_modified,取最大值
694                     let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap());
695                     last_modified = std::cmp::max(last_modified, file_modified);
696 
697                     // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续搜索
698                     if last_modified > *build_time {
699                         return Ok(last_modified);
700                     }
701 
702                     if entry_metadata.is_dir() {
703                         // 如果是子目录,则将其加入队列
704                         queue.push_back(entry_path);
705                     }
706                 }
707             }
708         } else {
709             // 如果是文件,直接比较修改时间
710             let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap());
711             last_modified = std::cmp::max(last_modified, file_modified);
712 
713             // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归
714             if last_modified > *build_time {
715                 return Ok(last_modified);
716             }
717         }
718     }
719 
720     if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) {
721         return Err(ExecutorError::InstallError(format!(
722             "Failed to get last modified time for path: {}",
723             path.display()
724         )));
725     }
726     Ok(last_modified)
727 }
728