xref: /DADK/dadk-user/src/executor/mod.rs (revision d2ade6ef037f43d0342021730a0d8b3af7bc36c1)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7 };
8 
9 use log::{debug, error, info, warn};
10 
11 use crate::{
12     context::{Action, DadkUserExecuteContext},
13     executor::cache::CacheDir,
14     parser::{
15         task::{CodeSource, PrebuiltSource, TaskType},
16         task_log::{BuildStatus, InstallStatus, TaskLog},
17     },
18     scheduler::{SchedEntities, SchedEntity},
19     utils::{file::FileUtils, path::abs_path},
20 };
21 
22 use dadk_config::{common::task::TaskEnv, user::UserCleanLevel};
23 
24 use self::cache::{CacheDirType, TaskDataDir};
25 
26 pub mod cache;
27 pub mod source;
28 #[cfg(test)]
29 mod tests;
30 
31 lazy_static! {
32     // 全局环境变量的列表
33     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
34 }
35 
36 #[derive(Debug, Clone)]
37 pub struct Executor {
38     entity: Arc<SchedEntity>,
39     action: Action,
40     local_envs: EnvMap,
41     /// 任务构建结果输出到的目录
42     build_dir: CacheDir,
43     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
44     source_dir: Option<CacheDir>,
45     /// 任务数据目录
46     task_data_dir: TaskDataDir,
47     /// DragonOS sysroot的路径
48     dragonos_sysroot: PathBuf,
49 }
50 
51 impl Executor {
52     /// # 创建执行器
53     ///
54     /// 用于执行一个任务
55     ///
56     /// ## 参数
57     ///
58     /// * `entity` - 任务调度实体
59     ///
60     /// ## 返回值
61     ///
62     /// * `Ok(Executor)` - 创建成功
63     /// * `Err(ExecutorError)` - 创建失败
64     pub fn new(
65         entity: Arc<SchedEntity>,
66         action: Action,
67         dragonos_sysroot: PathBuf,
68     ) -> Result<Self, ExecutorError> {
69         let local_envs = EnvMap::new();
70         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
71         let task_data_dir = TaskDataDir::new(entity.clone())?;
72 
73         let source_dir = if CacheDir::need_source_cache(&entity) {
74             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
75         } else {
76             None
77         };
78 
79         let result: Executor = Self {
80             action,
81             entity,
82             local_envs,
83             build_dir,
84             source_dir,
85             task_data_dir,
86             dragonos_sysroot,
87         };
88 
89         return Ok(result);
90     }
91 
92     /// # 执行任务
93     ///
94     /// 创建执行器后,调用此方法执行任务。
95     /// 该方法会执行以下步骤:
96     ///
97     /// 1. 创建工作线程
98     /// 2. 准备环境变量
99     /// 3. 拉取数据(可选)
100     /// 4. 执行构建
101     pub fn execute(&mut self) -> Result<(), ExecutorError> {
102         info!("Execute task: {}", self.entity.task().name_version());
103 
104         let r = self.do_execute();
105         self.save_task_data(r.clone());
106         info!("Task {} finished", self.entity.task().name_version());
107         return r;
108     }
109 
110     /// # 保存任务数据
111     fn save_task_data(&self, r: Result<(), ExecutorError>) {
112         let mut task_log = self.task_data_dir.task_log();
113         match self.action {
114             Action::Build => {
115                 if r.is_ok() {
116                     task_log.set_build_status(BuildStatus::Success);
117                 } else {
118                     task_log.set_build_status(BuildStatus::Failed);
119                 }
120 
121                 task_log.set_build_time_now();
122             }
123 
124             Action::Install => {
125                 if r.is_ok() {
126                     task_log.set_install_status(InstallStatus::Success);
127                 } else {
128                     task_log.set_install_status(InstallStatus::Failed);
129                 }
130             }
131 
132             Action::Clean(_) => {
133                 task_log.clean_build_status();
134                 task_log.clean_install_status();
135             }
136         }
137 
138         self.task_data_dir
139             .save_task_log(&task_log)
140             .expect("Failed to save task log");
141     }
142 
143     fn do_execute(&mut self) -> Result<(), ExecutorError> {
144         // 准备本地环境变量
145         self.prepare_local_env()?;
146 
147         match self.action {
148             Action::Build => {
149                 // 构建任务
150                 self.build()?;
151             }
152             Action::Install => {
153                 // 把构建结果安装到DragonOS
154                 self.install()?;
155             }
156             Action::Clean(_) => {
157                 // 清理构建结果
158                 let r = self.clean();
159                 if let Err(e) = r {
160                     error!(
161                         "Failed to clean task {}: {:?}",
162                         self.entity.task().name_version(),
163                         e
164                     );
165                 }
166             }
167         }
168 
169         return Ok(());
170     }
171 
172     /// # 执行build操作
173     fn build(&mut self) -> Result<(), ExecutorError> {
174         if let Some(status) = self.task_log().build_status() {
175             if *status == BuildStatus::Success && self.entity.task().build_once {
176                 info!(
177                     "Task {} has been built successfully, skip build.",
178                     self.entity.task().name_version()
179                 );
180                 return Ok(());
181             }
182         }
183 
184         // 确认源文件就绪
185         self.prepare_input()?;
186 
187         let command: Option<Command> = self.create_command()?;
188         if let Some(cmd) = command {
189             self.run_command(cmd)?;
190         }
191 
192         // 检查构建结果,如果为空,则抛出警告
193         if self.build_dir.is_empty()? {
194             warn!(
195                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
196                 self.entity.task().name_version(),
197             );
198         }
199         return Ok(());
200     }
201 
202     /// # 执行安装操作,把构建结果安装到DragonOS
203     fn install(&self) -> Result<(), ExecutorError> {
204         if let Some(status) = self.task_log().install_status() {
205             if *status == InstallStatus::Success && self.entity.task().install_once {
206                 info!(
207                     "Task {} has been installed successfully, skip install.",
208                     self.entity.task().name_version()
209                 );
210                 return Ok(());
211             }
212         }
213 
214         let binding = self.entity.task();
215         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
216         // 如果没有指定安装路径,则不执行安装
217         if in_dragonos_path.is_none() {
218             return Ok(());
219         }
220         info!("Installing task: {}", self.entity.task().name_version());
221         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
222 
223         debug!("in_dragonos_path: {}", in_dragonos_path);
224         // 去除开头的斜杠
225         {
226             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
227             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
228         }
229         // 拼接最终的安装路径
230         let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
231         debug!("install_path: {:?}", install_path);
232         // 创建安装路径
233         std::fs::create_dir_all(&install_path).map_err(|e| {
234             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
235         })?;
236 
237         // 拷贝构建结果到安装路径
238         let build_dir: PathBuf = self.build_dir.path.clone();
239         FileUtils::copy_dir_all(&build_dir, &install_path)
240             .map_err(|e| ExecutorError::InstallError(e))?;
241         info!("Task {} installed.", self.entity.task().name_version());
242 
243         return Ok(());
244     }
245 
246     fn clean(&self) -> Result<(), ExecutorError> {
247         let level = if let Action::Clean(l) = self.action {
248             l
249         } else {
250             panic!(
251                 "BUG: clean() called with non-clean action. executor details: {:?}",
252                 self
253             );
254         };
255         info!(
256             "Cleaning task: {}, level={level:?}",
257             self.entity.task().name_version()
258         );
259 
260         let r: Result<(), ExecutorError> = match level {
261             UserCleanLevel::All => self.clean_all(),
262             UserCleanLevel::InSrc => self.clean_src(),
263             UserCleanLevel::Output => {
264                 self.clean_target()?;
265                 self.clean_cache()
266             }
267         };
268 
269         if let Err(e) = r {
270             error!(
271                 "Failed to clean task: {}, error message: {:?}",
272                 self.entity.task().name_version(),
273                 e
274             );
275             return Err(e);
276         }
277 
278         return Ok(());
279     }
280 
281     fn clean_all(&self) -> Result<(), ExecutorError> {
282         // 在源文件目录执行清理
283         self.clean_src()?;
284         // 清理构建结果
285         self.clean_target()?;
286         // 清理缓存
287         self.clean_cache()?;
288         return Ok(());
289     }
290 
291     /// 在源文件目录执行清理
292     fn clean_src(&self) -> Result<(), ExecutorError> {
293         let cmd: Option<Command> = self.create_command()?;
294         if cmd.is_none() {
295             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
296             return Ok(());
297         }
298         info!(
299             "{}: Cleaning in source directory: {:?}",
300             self.entity.task().name_version(),
301             self.src_work_dir()
302         );
303 
304         let cmd = cmd.unwrap();
305         self.run_command(cmd)?;
306         return Ok(());
307     }
308 
309     /// 清理构建输出目录
310     fn clean_target(&self) -> Result<(), ExecutorError> {
311         info!(
312             "{}: Cleaning build target directory: {:?}",
313             self.entity.task().name_version(),
314             self.build_dir.path
315         );
316 
317         return self.build_dir.remove_self_recursive();
318     }
319 
320     /// 清理下载缓存
321     fn clean_cache(&self) -> Result<(), ExecutorError> {
322         let cache_dir = self.source_dir.as_ref();
323         if cache_dir.is_none() {
324             // 如果没有缓存目录,则认为用户不需要清理缓存
325             return Ok(());
326         }
327         info!(
328             "{}: Cleaning cache directory: {}",
329             self.entity.task().name_version(),
330             self.src_work_dir().display()
331         );
332         return cache_dir.unwrap().remove_self_recursive();
333     }
334 
335     /// 获取源文件的工作目录
336     fn src_work_dir(&self) -> PathBuf {
337         if let Some(local_path) = self.entity.task().source_path() {
338             return local_path;
339         }
340         return self.source_dir.as_ref().unwrap().path.clone();
341     }
342 
343     fn task_log(&self) -> TaskLog {
344         return self.task_data_dir.task_log();
345     }
346 
347     /// 为任务创建命令
348     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
349         // 获取命令
350         let raw_cmd = match self.entity.task().task_type {
351             TaskType::BuildFromSource(_) => match self.action {
352                 Action::Build => self.entity.task().build.build_command.clone(),
353                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
354                 _ => unimplemented!(
355                     "create_command: Action {:?} not supported yet.",
356                     self.action
357                 ),
358             },
359 
360             TaskType::InstallFromPrebuilt(_) => match self.action {
361                 Action::Build => self.entity.task().build.build_command.clone(),
362                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
363                 _ => unimplemented!(
364                     "create_command: Action {:?} not supported yet.",
365                     self.action
366                 ),
367             },
368         };
369 
370         if raw_cmd.is_none() {
371             return Ok(None);
372         }
373 
374         let raw_cmd = raw_cmd.unwrap();
375 
376         let mut command = Command::new("bash");
377         command.current_dir(self.src_work_dir());
378 
379         // 设置参数
380         command.arg("-c");
381         command.arg(raw_cmd);
382 
383         // 设置环境变量
384         let env_list = ENV_LIST.read().unwrap();
385         for (key, value) in env_list.envs.iter() {
386             // if key.starts_with("DADK") {
387             //     debug!("DADK env found: {}={}", key, value.value);
388             // }
389             command.env(key, value.value.clone());
390         }
391         drop(env_list);
392         for (key, value) in self.local_envs.envs.iter() {
393             debug!("Local env found: {}={}", key, value.value);
394             command.env(key, value.value.clone());
395         }
396 
397         return Ok(Some(command));
398     }
399 
400     /// # 准备工作线程本地环境变量
401     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
402         let binding = self.entity.task();
403         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
404 
405         if let Some(task_envs) = task_envs {
406             for tv in task_envs.iter() {
407                 self.local_envs
408                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
409             }
410         }
411 
412         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
413         self.local_envs.add(EnvVar::new(
414             "DADK_CURRENT_BUILD_DIR".to_string(),
415             self.build_dir.path.to_str().unwrap().to_string(),
416         ));
417 
418         return Ok(());
419     }
420 
421     fn prepare_input(&self) -> Result<(), ExecutorError> {
422         // 拉取源文件
423         let task = self.entity.task();
424         match &task.task_type {
425             TaskType::BuildFromSource(cs) => {
426                 if self.source_dir.is_none() {
427                     return Ok(());
428                 }
429                 let source_dir = self.source_dir.as_ref().unwrap();
430                 match cs {
431                     CodeSource::Git(git) => {
432                         git.prepare(source_dir)
433                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
434                     }
435                     // 本地源文件,不需要拉取
436                     CodeSource::Local(_) => return Ok(()),
437                     // 在线压缩包,需要下载
438                     CodeSource::Archive(archive) => {
439                         archive
440                             .download_unzip(source_dir)
441                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
442                     }
443                 }
444             }
445             TaskType::InstallFromPrebuilt(pb) => {
446                 match pb {
447                     // 本地源文件,不需要拉取
448                     PrebuiltSource::Local(local_source) => {
449                         let local_path = local_source.path();
450                         let target_path = &self.build_dir.path;
451                         FileUtils::copy_dir_all(&local_path, &target_path)
452                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
453                         return Ok(());
454                     }
455                     // 在线压缩包,需要下载
456                     PrebuiltSource::Archive(archive) => {
457                         archive
458                             .download_unzip(&self.build_dir)
459                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
460                     }
461                 }
462             }
463         }
464 
465         return Ok(());
466     }
467 
468     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
469         let mut child = command
470             .stdin(Stdio::inherit())
471             .spawn()
472             .map_err(|e| ExecutorError::IoError(e.to_string()))?;
473 
474         // 等待子进程结束
475         let r = child
476             .wait()
477             .map_err(|e| ExecutorError::IoError(e.to_string()));
478         debug!("Command finished: {:?}", r);
479         if r.is_ok() {
480             let r = r.unwrap();
481             if r.success() {
482                 return Ok(());
483             } else {
484                 // 执行失败,获取最后100行stderr输出
485                 let errmsg = format!(
486                     "Task {} failed, exit code = {}",
487                     self.entity.task().name_version(),
488                     r.code().unwrap()
489                 );
490                 error!("{errmsg}");
491                 let command_opt = command.output();
492                 if command_opt.is_err() {
493                     return Err(ExecutorError::TaskFailed(
494                         "Failed to get command output".to_string(),
495                     ));
496                 }
497                 let command_opt = command_opt.unwrap();
498                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
499                 let mut last_100_outputs = command_output
500                     .lines()
501                     .rev()
502                     .take(100)
503                     .collect::<Vec<&str>>();
504                 last_100_outputs.reverse();
505                 error!("Last 100 lines msg of stderr:");
506                 for line in last_100_outputs {
507                     error!("{}", line);
508                 }
509                 return Err(ExecutorError::TaskFailed(errmsg));
510             }
511         } else {
512             let errmsg = format!(
513                 "Task {} failed, msg = {:?}",
514                 self.entity.task().name_version(),
515                 r.err().unwrap()
516             );
517             error!("{errmsg}");
518             return Err(ExecutorError::TaskFailed(errmsg));
519         }
520     }
521 }
522 
523 #[derive(Debug, Clone)]
524 pub struct EnvMap {
525     pub envs: BTreeMap<String, EnvVar>,
526 }
527 
528 impl EnvMap {
529     pub fn new() -> Self {
530         Self {
531             envs: BTreeMap::new(),
532         }
533     }
534 
535     pub fn add(&mut self, env: EnvVar) {
536         self.envs.insert(env.key.clone(), env);
537     }
538 
539     #[allow(dead_code)]
540     pub fn get(&self, key: &str) -> Option<&EnvVar> {
541         self.envs.get(key)
542     }
543 
544     pub fn add_vars(&mut self, vars: Vars) {
545         for (key, value) in vars {
546             self.add(EnvVar::new(key, value));
547         }
548     }
549 }
550 
551 /// # 环境变量
552 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
553 pub struct EnvVar {
554     pub key: String,
555     pub value: String,
556 }
557 
558 impl EnvVar {
559     pub fn new(key: String, value: String) -> Self {
560         Self { key, value }
561     }
562 }
563 
564 /// # 任务执行器错误枚举
565 #[allow(dead_code)]
566 #[derive(Debug, Clone)]
567 pub enum ExecutorError {
568     /// 准备执行环境错误
569     PrepareEnvError(String),
570     IoError(String),
571     /// 构建执行错误
572     TaskFailed(String),
573     /// 安装错误
574     InstallError(String),
575     /// 清理错误
576     CleanError(String),
577 }
578 
579 /// # 准备全局环境变量
580 pub fn prepare_env(
581     sched_entities: &SchedEntities,
582     execute_ctx: &Arc<DadkUserExecuteContext>,
583 ) -> Result<(), ExecutorError> {
584     info!("Preparing environment variables...");
585     let env_list = create_global_env_list(sched_entities, execute_ctx)?;
586     // 写入全局环境变量列表
587     let mut global_env_list = ENV_LIST.write().unwrap();
588     *global_env_list = env_list;
589     return Ok(());
590 }
591 
592 /// # 创建全局环境变量列表
593 fn create_global_env_list(
594     sched_entities: &SchedEntities,
595     execute_ctx: &Arc<DadkUserExecuteContext>,
596 ) -> Result<EnvMap, ExecutorError> {
597     let mut env_list = EnvMap::new();
598     let envs: Vars = std::env::vars();
599     env_list.add_vars(envs);
600 
601     // 为每个任务创建特定的环境变量
602     for entity in sched_entities.entities().iter() {
603         // 导出任务的构建目录环境变量
604         let build_dir = CacheDir::build_dir(entity.clone())?;
605 
606         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
607         env_list.add(EnvVar::new(
608             build_dir_key,
609             build_dir.to_str().unwrap().to_string(),
610         ));
611 
612         // 如果需要源码缓存目录,则导出
613         if CacheDir::need_source_cache(entity) {
614             let source_dir = CacheDir::source_dir(entity.clone())?;
615             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
616             env_list.add(EnvVar::new(
617                 source_dir_key,
618                 source_dir.to_str().unwrap().to_string(),
619             ));
620         }
621     }
622 
623     // 创建ARCH环境变量
624     let target_arch = execute_ctx.target_arch();
625     env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
626 
627     return Ok(env_list);
628 }
629