1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author guonix |
17 | * \date Nov 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include "parallel_executor.h" |
22 | #include "common/error_code.h" |
23 | #include "common/logger.h" |
24 | |
25 | namespace proxima { |
26 | namespace be { |
27 | namespace query { |
28 | |
29 | ParallelExecutor::ParallelExecutor(SchedulerPtr scheduler) |
30 | : scheduler_(std::move(scheduler)) {} |
31 | |
32 | ParallelExecutor::~ParallelExecutor() = default; |
33 | |
34 | int ParallelExecutor::execute_task(const TaskPtr &task) { |
35 | if (task) { |
36 | task->status(Task::Status::SCHEDULED); |
37 | return task->run_once(); |
38 | } |
39 | return PROXIMA_BE_ERROR_CODE(InvalidArgument); |
40 | } |
41 | |
42 | int ParallelExecutor::execute_tasks(const TaskPtrList &tasks) { |
43 | int code = 0; |
44 | if (tasks.empty()) { |
45 | return code; |
46 | } |
47 | |
48 | auto iter = tasks.begin(); |
49 | // Keep the head task, schedule others to other coroutines |
50 | while (++iter != tasks.end()) { |
51 | code = scheduler_->schedule(*iter); |
52 | // Break loop, if schedule task failed,handle schedule error in |
53 | // wait_finish function |
54 | if (code != 0) { |
55 | LOG_ERROR("Can't schedule task to run. code[%d]" , code); |
56 | break; |
57 | } |
58 | } |
59 | |
60 | // Execute first task in queue, if all others have been scheduled |
61 | if (code == 0) { |
62 | execute_task(*tasks.begin()); |
63 | } |
64 | |
65 | // Ignore return code of first task, wait finished event for all tasks. |
66 | return wait_finish(tasks); |
67 | } |
68 | |
69 | int ParallelExecutor::wait_finish(const TaskPtrList &tasks) { |
70 | int error_code = 0; |
71 | |
72 | // Wait task finished, if has been scheduled before |
73 | for (auto &task : tasks) { |
74 | if (task->status() == Task::Status::INITIALIZED) { |
75 | // Task has not been scheduled, set error code to Schedule Error, |
76 | // and continue to reclaim the resources of others |
77 | error_code = PROXIMA_BE_ERROR_CODE(ScheduleError); |
78 | continue; |
79 | } |
80 | // Try run task in current coroutine if possible before waiting it has |
81 | // been done with other coroutine. |
82 | // Optimizing performance of query, The worst case of latency with |
83 | // multi-segments query is equal to sequence-execution, which more |
84 | // helpful when Proxima BE is under extreme high load. |
85 | task->run_once(); |
86 | |
87 | task->wait_finish(); |
88 | if (error_code == 0 && task->exit_code() != 0) { |
89 | // Only save the error code of first task which failed |
90 | error_code = task->exit_code(); |
91 | } |
92 | } |
93 | |
94 | return error_code; |
95 | } |
96 | |
97 | } // namespace query |
98 | } // namespace be |
99 | } // namespace proxima |
100 | |