1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author guonix |
17 | * \date Nov 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include "bthread_queue.h" |
22 | #include <cstring> |
23 | #include "common/error_code.h" |
24 | #include "common/logger.h" |
25 | |
26 | namespace proxima { |
27 | namespace be { |
28 | namespace query { |
29 | |
30 | BThreadQueue::BThreadQueue() : status_(TaskQueue::Status::INITIALIZED) {} |
31 | |
32 | BThreadQueue::~BThreadQueue() = default; |
33 | |
34 | namespace { |
35 | |
36 | //! Runner routine with execution_queue_start, first arguments not used |
37 | static int Execute(void *meta, bthread::TaskIterator<TaskPtr> &iter) { |
38 | auto queue = static_cast<BThreadQueue *>(meta); |
39 | if (iter.is_queue_stopped()) { |
40 | LOG_INFO("Bthread Queue has been stopped." ); |
41 | return 0; |
42 | } |
43 | auto queue_id = iter ? queue->id() : 0; |
44 | LOG_DEBUG("Bthread Queue [%zu] start run tasks" , (size_t)queue_id); |
45 | // invoke task run interface |
46 | for (; iter; ++iter) { |
47 | LOG_DEBUG("Task [%s] is ready to run on queue[%zu]" , |
48 | (*iter)->name().c_str(), (size_t)queue->id()); |
49 | (*iter)->run(); |
50 | LOG_DEBUG("Task [%s] has been finished on queue[%zu]" , |
51 | (*iter)->name().c_str(), (size_t)queue->id()); |
52 | } |
53 | LOG_DEBUG("Bthread Queue [%zu] finished to run tasks" , (size_t)queue_id); |
54 | return 0; |
55 | } |
56 | |
57 | } // namespace |
58 | |
59 | uint64_t BThreadQueue::id() const { |
60 | return queue_id_.value; |
61 | } |
62 | |
63 | int BThreadQueue::start() { |
64 | if (status_ == Status::INITIALIZED) { |
65 | if (bthread::execution_queue_start(&queue_id_, &options_, Execute, this) == |
66 | 0) { |
67 | status_ = Status::STARTED; |
68 | LOG_DEBUG("BThreadQueue success to start, queue_id_[%zu]" , |
69 | (size_t)queue_id_.value); |
70 | return 0; |
71 | } |
72 | LOG_ERROR("Failed to start BThreadQueue, queue_id_[%zu]" , |
73 | (size_t)queue_id_.value); |
74 | clear_context(); |
75 | } else { |
76 | LOG_ERROR("Failed to start BThreadQueue, which has not been initialized" ); |
77 | } |
78 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
79 | } |
80 | |
81 | int BThreadQueue::stop() { |
82 | if (status_ != Status::STARTED) { |
83 | LOG_DEBUG("Can't stop an queue, which does not started yet. queue_id_[%zu]" , |
84 | (size_t)queue_id_.value); |
85 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
86 | } |
87 | int code = bthread::execution_queue_stop(queue_id_); |
88 | if (code == 0) { |
89 | LOG_INFO("BThreadQueue stopped, queue_id_[%zu]" , (size_t)queue_id_.value); |
90 | status_ = Status::STOPPED; |
91 | return 0; |
92 | } |
93 | |
94 | LOG_ERROR("Failed to stop Bthread, ret code[%d]" , code); |
95 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
96 | } |
97 | |
98 | int BThreadQueue::join() { |
99 | if (status_ != Status::STOPPED) { |
100 | LOG_ERROR("Can't join an queue, which did not stop yet, queue_id_[%zu]" , |
101 | (size_t)queue_id_.value); |
102 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
103 | } |
104 | int code = bthread::execution_queue_join(queue_id_); |
105 | if (code == 0) { |
106 | status_ = Status::JOINED; |
107 | return 0; |
108 | } |
109 | LOG_ERROR("Bthread join failed, ret code[%d]" , code); |
110 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
111 | } |
112 | |
113 | int BThreadQueue::put(const TaskPtr &task) { |
114 | if (task && started()) { |
115 | task->status(Task::Status::SCHEDULED); |
116 | if (bthread::execution_queue_execute(queue_id_, task) == 0) { |
117 | LOG_DEBUG("Scheduled task[%s]" , task->name().c_str()); |
118 | return 0; |
119 | } else { |
120 | task->status(Task::Status::INITIALIZED); |
121 | LOG_ERROR("Scheduled task[%s] failed" , task->name().c_str()); |
122 | } |
123 | } |
124 | LOG_ERROR("Failed to schedule task" ); |
125 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
126 | } |
127 | |
128 | bool BThreadQueue::started() const { |
129 | return status_ == Status::STARTED; |
130 | } |
131 | |
132 | void BThreadQueue::clear_context() { |
133 | status_ = Status::INITIALIZED; |
134 | std::memset(static_cast<void *>(&queue_id_), 0, sizeof(queue_id_)); |
135 | std::memset(static_cast<void *>(&options_), 0, sizeof(options_)); |
136 | } |
137 | |
138 | } // namespace query |
139 | } // namespace be |
140 | } // namespace proxima |
141 | |