1 use std::{ 2 collections::BTreeMap, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 time::SystemTime, 8 }; 9 10 use chrono::{DateTime, Utc}; 11 use dadk_config::user::UserCleanLevel; 12 use log::{debug, error, info, warn}; 13 14 use crate::{ 15 context::{Action, DadkUserExecuteContext}, 16 executor::cache::CacheDir, 17 parser::{ 18 task::{CodeSource, PrebuiltSource, TaskType}, 19 task_log::{BuildStatus, InstallStatus, TaskLog}, 20 }, 21 scheduler::{SchedEntities, SchedEntity}, 22 utils::{file::FileUtils, path::abs_path}, 23 }; 24 25 use dadk_config::common::task::TaskEnv; 26 27 use self::cache::{CacheDirType, TaskDataDir}; 28 29 pub mod cache; 30 pub mod source; 31 #[cfg(test)] 32 mod tests; 33 34 lazy_static! { 35 // 全局环境变量的列表 36 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 37 } 38 39 #[derive(Debug, Clone)] 40 pub struct Executor { 41 entity: Arc<SchedEntity>, 42 action: Action, 43 local_envs: EnvMap, 44 /// 任务构建结果输出到的目录 45 build_dir: CacheDir, 46 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 47 source_dir: Option<CacheDir>, 48 /// 任务数据目录 49 task_data_dir: TaskDataDir, 50 /// DragonOS sysroot的路径 51 dragonos_sysroot: PathBuf, 52 } 53 54 impl Executor { 55 /// # 创建执行器 56 /// 57 /// 用于执行一个任务 58 /// 59 /// ## 参数 60 /// 61 /// * `entity` - 任务调度实体 62 /// 63 /// ## 返回值 64 /// 65 /// * `Ok(Executor)` - 创建成功 66 /// * `Err(ExecutorError)` - 创建失败 67 pub fn new( 68 entity: Arc<SchedEntity>, 69 action: Action, 70 dragonos_sysroot: PathBuf, 71 ) -> Result<Self, ExecutorError> { 72 let local_envs = EnvMap::new(); 73 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 74 let task_data_dir = TaskDataDir::new(entity.clone())?; 75 76 let source_dir = if CacheDir::need_source_cache(&entity) { 77 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 78 } else { 79 None 80 }; 81 82 let result: Executor = Self { 83 action, 84 entity, 85 local_envs, 86 build_dir, 87 source_dir, 88 task_data_dir, 89 dragonos_sysroot, 90 }; 91 92 return Ok(result); 93 } 94 95 /// # 执行任务 96 /// 97 /// 创建执行器后,调用此方法执行任务。 98 /// 该方法会执行以下步骤: 99 /// 100 /// 1. 创建工作线程 101 /// 2. 准备环境变量 102 /// 3. 拉取数据(可选) 103 /// 4. 执行构建 104 pub fn execute(&mut self) -> Result<(), ExecutorError> { 105 info!("Execute task: {}", self.entity.task().name_version()); 106 107 let r = self.do_execute(); 108 self.save_task_data(r.clone()); 109 info!("Task {} finished", self.entity.task().name_version()); 110 return r; 111 } 112 113 /// # 保存任务数据 114 fn save_task_data(&self, r: Result<(), ExecutorError>) { 115 let mut task_log = self.task_data_dir.task_log(); 116 match self.action { 117 Action::Build => { 118 if r.is_ok() { 119 task_log.set_build_status(BuildStatus::Success); 120 } else { 121 task_log.set_build_status(BuildStatus::Failed); 122 } 123 124 task_log.set_build_time_now(); 125 } 126 127 Action::Install => { 128 if r.is_ok() { 129 task_log.set_install_status(InstallStatus::Success); 130 } else { 131 task_log.set_install_status(InstallStatus::Failed); 132 } 133 } 134 135 Action::Clean(_) => { 136 task_log.clean_build_status(); 137 task_log.clean_install_status(); 138 } 139 } 140 141 self.task_data_dir 142 .save_task_log(&task_log) 143 .expect("Failed to save task log"); 144 } 145 146 fn do_execute(&mut self) -> Result<(), ExecutorError> { 147 // 准备本地环境变量 148 self.prepare_local_env()?; 149 150 match self.action { 151 Action::Build => { 152 // 构建任务 153 self.build()?; 154 } 155 Action::Install => { 156 // 把构建结果安装到DragonOS 157 self.install()?; 158 } 159 Action::Clean(_) => { 160 // 清理构建结果 161 let r = self.clean(); 162 if let Err(e) = r { 163 error!( 164 "Failed to clean task {}: {:?}", 165 self.entity.task().name_version(), 166 e 167 ); 168 } 169 } 170 } 171 172 return Ok(()); 173 } 174 175 fn build(&mut self) -> Result<(), ExecutorError> { 176 if let Some(status) = self.task_log().build_status() { 177 if let Some(build_time) = self.task_log().build_time() { 178 let last_modified = last_modified_time(&self.entity.file_path(), build_time)?; 179 if *status == BuildStatus::Success 180 && (self.entity.task().build_once || last_modified < *build_time) 181 { 182 info!( 183 "Task {} has been built successfully, skip build.", 184 self.entity.task().name_version() 185 ); 186 return Ok(()); 187 } 188 } 189 } 190 191 return self.do_build(); 192 } 193 194 /// # 执行build操作 195 fn do_build(&mut self) -> Result<(), ExecutorError> { 196 // 确认源文件就绪 197 self.prepare_input()?; 198 199 let command: Option<Command> = self.create_command()?; 200 if let Some(cmd) = command { 201 self.run_command(cmd)?; 202 } 203 204 // 检查构建结果,如果为空,则抛出警告 205 if self.build_dir.is_empty()? { 206 warn!( 207 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 208 self.entity.task().name_version(), 209 ); 210 } 211 return Ok(()); 212 } 213 214 fn install(&self) -> Result<(), ExecutorError> { 215 if let Some(status) = self.task_log().install_status() { 216 if let Some(build_time) = self.task_log().build_time() { 217 let last_modified = last_modified_time(&self.entity.file_path(), build_time)?; 218 if *status == InstallStatus::Success 219 && (self.entity.task().install_once || last_modified < *build_time) 220 { 221 info!( 222 "Task {} has been installed successfully, skip install.", 223 self.entity.task().name_version() 224 ); 225 return Ok(()); 226 } 227 } 228 } 229 230 return self.do_install(); 231 } 232 233 /// # 执行安装操作,把构建结果安装到DragonOS 234 fn do_install(&self) -> Result<(), ExecutorError> { 235 let binding = self.entity.task(); 236 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 237 // 如果没有指定安装路径,则不执行安装 238 if in_dragonos_path.is_none() { 239 return Ok(()); 240 } 241 info!("Installing task: {}", self.entity.task().name_version()); 242 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 243 244 debug!("in_dragonos_path: {}", in_dragonos_path); 245 // 去除开头的斜杠 246 { 247 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 248 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 249 } 250 // 拼接最终的安装路径 251 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path)); 252 debug!("install_path: {:?}", install_path); 253 // 创建安装路径 254 std::fs::create_dir_all(&install_path).map_err(|e| { 255 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 256 })?; 257 258 // 拷贝构建结果到安装路径 259 let build_dir: PathBuf = self.build_dir.path.clone(); 260 FileUtils::copy_dir_all(&build_dir, &install_path) 261 .map_err(|e| ExecutorError::InstallError(e))?; 262 info!("Task {} installed.", self.entity.task().name_version()); 263 264 return Ok(()); 265 } 266 267 fn clean(&self) -> Result<(), ExecutorError> { 268 let level = if let Action::Clean(l) = self.action { 269 l 270 } else { 271 panic!( 272 "BUG: clean() called with non-clean action. executor details: {:?}", 273 self 274 ); 275 }; 276 info!( 277 "Cleaning task: {}, level={level:?}", 278 self.entity.task().name_version() 279 ); 280 281 let r: Result<(), ExecutorError> = match level { 282 UserCleanLevel::All => self.clean_all(), 283 UserCleanLevel::InSrc => self.clean_src(), 284 UserCleanLevel::Output => { 285 self.clean_target()?; 286 self.clean_cache() 287 } 288 }; 289 290 if let Err(e) = r { 291 error!( 292 "Failed to clean task: {}, error message: {:?}", 293 self.entity.task().name_version(), 294 e 295 ); 296 return Err(e); 297 } 298 299 return Ok(()); 300 } 301 302 fn clean_all(&self) -> Result<(), ExecutorError> { 303 // 在源文件目录执行清理 304 self.clean_src()?; 305 // 清理构建结果 306 self.clean_target()?; 307 // 清理缓存 308 self.clean_cache()?; 309 return Ok(()); 310 } 311 312 /// 在源文件目录执行清理 313 fn clean_src(&self) -> Result<(), ExecutorError> { 314 let cmd: Option<Command> = self.create_command()?; 315 if cmd.is_none() { 316 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 317 return Ok(()); 318 } 319 info!( 320 "{}: Cleaning in source directory: {:?}", 321 self.entity.task().name_version(), 322 self.src_work_dir() 323 ); 324 325 let cmd = cmd.unwrap(); 326 self.run_command(cmd)?; 327 return Ok(()); 328 } 329 330 /// 清理构建输出目录 331 fn clean_target(&self) -> Result<(), ExecutorError> { 332 info!( 333 "{}: Cleaning build target directory: {:?}", 334 self.entity.task().name_version(), 335 self.build_dir.path 336 ); 337 338 return self.build_dir.remove_self_recursive(); 339 } 340 341 /// 清理下载缓存 342 fn clean_cache(&self) -> Result<(), ExecutorError> { 343 let cache_dir = self.source_dir.as_ref(); 344 if cache_dir.is_none() { 345 // 如果没有缓存目录,则认为用户不需要清理缓存 346 return Ok(()); 347 } 348 info!( 349 "{}: Cleaning cache directory: {}", 350 self.entity.task().name_version(), 351 self.src_work_dir().display() 352 ); 353 return cache_dir.unwrap().remove_self_recursive(); 354 } 355 356 /// 获取源文件的工作目录 357 fn src_work_dir(&self) -> PathBuf { 358 if let Some(local_path) = self.entity.task().source_path() { 359 return local_path; 360 } 361 return self.source_dir.as_ref().unwrap().path.clone(); 362 } 363 364 fn task_log(&self) -> TaskLog { 365 return self.task_data_dir.task_log(); 366 } 367 368 /// 为任务创建命令 369 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 370 // 获取命令 371 let raw_cmd = match self.entity.task().task_type { 372 TaskType::BuildFromSource(_) => match self.action { 373 Action::Build => self.entity.task().build.build_command.clone(), 374 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 375 _ => unimplemented!( 376 "create_command: Action {:?} not supported yet.", 377 self.action 378 ), 379 }, 380 381 TaskType::InstallFromPrebuilt(_) => match self.action { 382 Action::Build => self.entity.task().build.build_command.clone(), 383 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 384 _ => unimplemented!( 385 "create_command: Action {:?} not supported yet.", 386 self.action 387 ), 388 }, 389 }; 390 391 if raw_cmd.is_none() { 392 return Ok(None); 393 } 394 395 let raw_cmd = raw_cmd.unwrap(); 396 397 let mut command = Command::new("bash"); 398 command.current_dir(self.src_work_dir()); 399 400 // 设置参数 401 command.arg("-c"); 402 command.arg(raw_cmd); 403 404 // 设置环境变量 405 let env_list = ENV_LIST.read().unwrap(); 406 for (key, value) in env_list.envs.iter() { 407 // if key.starts_with("DADK") { 408 // debug!("DADK env found: {}={}", key, value.value); 409 // } 410 command.env(key, value.value.clone()); 411 } 412 drop(env_list); 413 for (key, value) in self.local_envs.envs.iter() { 414 debug!("Local env found: {}={}", key, value.value); 415 command.env(key, value.value.clone()); 416 } 417 418 return Ok(Some(command)); 419 } 420 421 /// # 准备工作线程本地环境变量 422 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 423 let binding = self.entity.task(); 424 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 425 426 if let Some(task_envs) = task_envs { 427 for tv in task_envs.iter() { 428 self.local_envs 429 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 430 } 431 } 432 433 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 434 self.local_envs.add(EnvVar::new( 435 "DADK_CURRENT_BUILD_DIR".to_string(), 436 self.build_dir.path.to_str().unwrap().to_string(), 437 )); 438 439 return Ok(()); 440 } 441 442 fn prepare_input(&self) -> Result<(), ExecutorError> { 443 // 拉取源文件 444 let task = self.entity.task(); 445 match &task.task_type { 446 TaskType::BuildFromSource(cs) => { 447 if self.source_dir.is_none() { 448 return Ok(()); 449 } 450 let source_dir = self.source_dir.as_ref().unwrap(); 451 match cs { 452 CodeSource::Git(git) => { 453 git.prepare(source_dir) 454 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 455 } 456 // 本地源文件,不需要拉取 457 CodeSource::Local(_) => return Ok(()), 458 // 在线压缩包,需要下载 459 CodeSource::Archive(archive) => { 460 archive 461 .download_unzip(source_dir) 462 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 463 } 464 } 465 } 466 TaskType::InstallFromPrebuilt(pb) => { 467 match pb { 468 // 本地源文件,不需要拉取 469 PrebuiltSource::Local(local_source) => { 470 let local_path = local_source.path(); 471 let target_path = &self.build_dir.path; 472 FileUtils::copy_dir_all(&local_path, &target_path) 473 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 474 return Ok(()); 475 } 476 // 在线压缩包,需要下载 477 PrebuiltSource::Archive(archive) => { 478 archive 479 .download_unzip(&self.build_dir) 480 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 481 } 482 } 483 } 484 } 485 486 return Ok(()); 487 } 488 489 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 490 let mut child = command 491 .stdin(Stdio::inherit()) 492 .spawn() 493 .map_err(|e| ExecutorError::IoError(e.to_string()))?; 494 495 // 等待子进程结束 496 let r = child 497 .wait() 498 .map_err(|e| ExecutorError::IoError(e.to_string())); 499 debug!("Command finished: {:?}", r); 500 if r.is_ok() { 501 let r = r.unwrap(); 502 if r.success() { 503 return Ok(()); 504 } else { 505 // 执行失败,获取最后100行stderr输出 506 let errmsg = format!( 507 "Task {} failed, exit code = {}", 508 self.entity.task().name_version(), 509 r.code().unwrap() 510 ); 511 error!("{errmsg}"); 512 let command_opt = command.output(); 513 if command_opt.is_err() { 514 return Err(ExecutorError::TaskFailed( 515 "Failed to get command output".to_string(), 516 )); 517 } 518 let command_opt = command_opt.unwrap(); 519 let command_output = String::from_utf8_lossy(&command_opt.stderr); 520 let mut last_100_outputs = command_output 521 .lines() 522 .rev() 523 .take(100) 524 .collect::<Vec<&str>>(); 525 last_100_outputs.reverse(); 526 error!("Last 100 lines msg of stderr:"); 527 for line in last_100_outputs { 528 error!("{}", line); 529 } 530 return Err(ExecutorError::TaskFailed(errmsg)); 531 } 532 } else { 533 let errmsg = format!( 534 "Task {} failed, msg = {:?}", 535 self.entity.task().name_version(), 536 r.err().unwrap() 537 ); 538 error!("{errmsg}"); 539 return Err(ExecutorError::TaskFailed(errmsg)); 540 } 541 } 542 } 543 544 #[derive(Debug, Clone)] 545 pub struct EnvMap { 546 pub envs: BTreeMap<String, EnvVar>, 547 } 548 549 impl EnvMap { 550 pub fn new() -> Self { 551 Self { 552 envs: BTreeMap::new(), 553 } 554 } 555 556 pub fn add(&mut self, env: EnvVar) { 557 self.envs.insert(env.key.clone(), env); 558 } 559 560 #[allow(dead_code)] 561 pub fn get(&self, key: &str) -> Option<&EnvVar> { 562 self.envs.get(key) 563 } 564 565 pub fn add_vars(&mut self, vars: Vars) { 566 for (key, value) in vars { 567 self.add(EnvVar::new(key, value)); 568 } 569 } 570 } 571 572 /// # 环境变量 573 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 574 pub struct EnvVar { 575 pub key: String, 576 pub value: String, 577 } 578 579 impl EnvVar { 580 pub fn new(key: String, value: String) -> Self { 581 Self { key, value } 582 } 583 } 584 585 /// # 任务执行器错误枚举 586 #[allow(dead_code)] 587 #[derive(Debug, Clone)] 588 pub enum ExecutorError { 589 /// 准备执行环境错误 590 PrepareEnvError(String), 591 IoError(String), 592 /// 构建执行错误 593 TaskFailed(String), 594 /// 安装错误 595 InstallError(String), 596 /// 清理错误 597 CleanError(String), 598 } 599 600 /// # 准备全局环境变量 601 pub fn prepare_env( 602 sched_entities: &SchedEntities, 603 execute_ctx: &Arc<DadkUserExecuteContext>, 604 ) -> Result<(), ExecutorError> { 605 info!("Preparing environment variables..."); 606 let env_list = create_global_env_list(sched_entities, execute_ctx)?; 607 // 写入全局环境变量列表 608 let mut global_env_list = ENV_LIST.write().unwrap(); 609 *global_env_list = env_list; 610 return Ok(()); 611 } 612 613 /// # 创建全局环境变量列表 614 fn create_global_env_list( 615 sched_entities: &SchedEntities, 616 execute_ctx: &Arc<DadkUserExecuteContext>, 617 ) -> Result<EnvMap, ExecutorError> { 618 let mut env_list = EnvMap::new(); 619 let envs: Vars = std::env::vars(); 620 env_list.add_vars(envs); 621 622 // 为每个任务创建特定的环境变量 623 for entity in sched_entities.entities().iter() { 624 // 导出任务的构建目录环境变量 625 let build_dir = CacheDir::build_dir(entity.clone())?; 626 627 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 628 env_list.add(EnvVar::new( 629 build_dir_key, 630 build_dir.to_str().unwrap().to_string(), 631 )); 632 633 // 如果需要源码缓存目录,则导出 634 if CacheDir::need_source_cache(entity) { 635 let source_dir = CacheDir::source_dir(entity.clone())?; 636 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 637 env_list.add(EnvVar::new( 638 source_dir_key, 639 source_dir.to_str().unwrap().to_string(), 640 )); 641 } 642 } 643 644 // 创建ARCH环境变量 645 let target_arch = execute_ctx.target_arch(); 646 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into())); 647 648 return Ok(env_list); 649 } 650 651 /// # 获取文件最后的更新时间 652 /// 653 /// ## 参数 654 /// * `path` - 文件路径 655 /// * `last_modified` - 最后的更新时间 656 /// * `build_time` - 构建时间 657 fn last_modified_time( 658 path: &PathBuf, 659 build_time: &DateTime<Utc>, 660 ) -> Result<DateTime<Utc>, ExecutorError> { 661 let metadata = path 662 .metadata() 663 .map_err(|e| ExecutorError::InstallError(e.to_string()))?; 664 665 let last_modified = if metadata.is_dir() { 666 let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH); 667 for r in std::fs::read_dir(path).unwrap() { 668 if let Ok(entry) = r { 669 // 忽略编译产物目录 670 if entry.file_name() == "target" { 671 continue; 672 } 673 674 let metadata = entry.metadata().unwrap(); 675 if metadata.is_dir() { 676 // 如果是子目录,则递归找改子目录下的文件最后的更新时间 677 last_modified = std::cmp::max( 678 last_modified, 679 last_modified_time(&entry.path(), build_time)?, 680 ); 681 } else { 682 // 比较文件的修改时间和last_modified,取最大值 683 last_modified = std::cmp::max( 684 last_modified, 685 DateTime::<Utc>::from(metadata.modified().unwrap()), 686 ); 687 } 688 689 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归 690 if last_modified > *build_time { 691 return Ok(last_modified); 692 } 693 } 694 } 695 last_modified 696 } else { 697 DateTime::<Utc>::from(metadata.modified().unwrap()) 698 }; 699 700 return Ok(last_modified); 701 } 702