1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Nov 2020
18 * \brief
19 */
20
21#include "parallel_executor.h"
22#include "common/error_code.h"
23#include "common/logger.h"
24
25namespace proxima {
26namespace be {
27namespace query {
28
29ParallelExecutor::ParallelExecutor(SchedulerPtr scheduler)
30 : scheduler_(std::move(scheduler)) {}
31
32ParallelExecutor::~ParallelExecutor() = default;
33
34int ParallelExecutor::execute_task(const TaskPtr &task) {
35 if (task) {
36 task->status(Task::Status::SCHEDULED);
37 return task->run_once();
38 }
39 return PROXIMA_BE_ERROR_CODE(InvalidArgument);
40}
41
42int ParallelExecutor::execute_tasks(const TaskPtrList &tasks) {
43 int code = 0;
44 if (tasks.empty()) {
45 return code;
46 }
47
48 auto iter = tasks.begin();
49 // Keep the head task, schedule others to other coroutines
50 while (++iter != tasks.end()) {
51 code = scheduler_->schedule(*iter);
52 // Break loop, if schedule task failed,handle schedule error in
53 // wait_finish function
54 if (code != 0) {
55 LOG_ERROR("Can't schedule task to run. code[%d]", code);
56 break;
57 }
58 }
59
60 // Execute first task in queue, if all others have been scheduled
61 if (code == 0) {
62 execute_task(*tasks.begin());
63 }
64
65 // Ignore return code of first task, wait finished event for all tasks.
66 return wait_finish(tasks);
67}
68
69int ParallelExecutor::wait_finish(const TaskPtrList &tasks) {
70 int error_code = 0;
71
72 // Wait task finished, if has been scheduled before
73 for (auto &task : tasks) {
74 if (task->status() == Task::Status::INITIALIZED) {
75 // Task has not been scheduled, set error code to Schedule Error,
76 // and continue to reclaim the resources of others
77 error_code = PROXIMA_BE_ERROR_CODE(ScheduleError);
78 continue;
79 }
80 // Try run task in current coroutine if possible before waiting it has
81 // been done with other coroutine.
82 // Optimizing performance of query, The worst case of latency with
83 // multi-segments query is equal to sequence-execution, which more
84 // helpful when Proxima BE is under extreme high load.
85 task->run_once();
86
87 task->wait_finish();
88 if (error_code == 0 && task->exit_code() != 0) {
89 // Only save the error code of first task which failed
90 error_code = task->exit_code();
91 }
92 }
93
94 return error_code;
95}
96
97} // namespace query
98} // namespace be
99} // namespace proxima
100