1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Nov 2020
18 * \brief
19 */
20
21#include "bthread_queue.h"
22#include <cstring>
23#include "common/error_code.h"
24#include "common/logger.h"
25
26namespace proxima {
27namespace be {
28namespace query {
29
30BThreadQueue::BThreadQueue() : status_(TaskQueue::Status::INITIALIZED) {}
31
32BThreadQueue::~BThreadQueue() = default;
33
34namespace {
35
36//! Runner routine with execution_queue_start, first arguments not used
37static int Execute(void *meta, bthread::TaskIterator<TaskPtr> &iter) {
38 auto queue = static_cast<BThreadQueue *>(meta);
39 if (iter.is_queue_stopped()) {
40 LOG_INFO("Bthread Queue has been stopped.");
41 return 0;
42 }
43 auto queue_id = iter ? queue->id() : 0;
44 LOG_DEBUG("Bthread Queue [%zu] start run tasks", (size_t)queue_id);
45 // invoke task run interface
46 for (; iter; ++iter) {
47 LOG_DEBUG("Task [%s] is ready to run on queue[%zu]",
48 (*iter)->name().c_str(), (size_t)queue->id());
49 (*iter)->run();
50 LOG_DEBUG("Task [%s] has been finished on queue[%zu]",
51 (*iter)->name().c_str(), (size_t)queue->id());
52 }
53 LOG_DEBUG("Bthread Queue [%zu] finished to run tasks", (size_t)queue_id);
54 return 0;
55}
56
57} // namespace
58
59uint64_t BThreadQueue::id() const {
60 return queue_id_.value;
61}
62
63int BThreadQueue::start() {
64 if (status_ == Status::INITIALIZED) {
65 if (bthread::execution_queue_start(&queue_id_, &options_, Execute, this) ==
66 0) {
67 status_ = Status::STARTED;
68 LOG_DEBUG("BThreadQueue success to start, queue_id_[%zu]",
69 (size_t)queue_id_.value);
70 return 0;
71 }
72 LOG_ERROR("Failed to start BThreadQueue, queue_id_[%zu]",
73 (size_t)queue_id_.value);
74 clear_context();
75 } else {
76 LOG_ERROR("Failed to start BThreadQueue, which has not been initialized");
77 }
78 return PROXIMA_BE_ERROR_CODE(RuntimeError);
79}
80
81int BThreadQueue::stop() {
82 if (status_ != Status::STARTED) {
83 LOG_DEBUG("Can't stop an queue, which does not started yet. queue_id_[%zu]",
84 (size_t)queue_id_.value);
85 return PROXIMA_BE_ERROR_CODE(RuntimeError);
86 }
87 int code = bthread::execution_queue_stop(queue_id_);
88 if (code == 0) {
89 LOG_INFO("BThreadQueue stopped, queue_id_[%zu]", (size_t)queue_id_.value);
90 status_ = Status::STOPPED;
91 return 0;
92 }
93
94 LOG_ERROR("Failed to stop Bthread, ret code[%d]", code);
95 return PROXIMA_BE_ERROR_CODE(RuntimeError);
96}
97
98int BThreadQueue::join() {
99 if (status_ != Status::STOPPED) {
100 LOG_ERROR("Can't join an queue, which did not stop yet, queue_id_[%zu]",
101 (size_t)queue_id_.value);
102 return PROXIMA_BE_ERROR_CODE(RuntimeError);
103 }
104 int code = bthread::execution_queue_join(queue_id_);
105 if (code == 0) {
106 status_ = Status::JOINED;
107 return 0;
108 }
109 LOG_ERROR("Bthread join failed, ret code[%d]", code);
110 return PROXIMA_BE_ERROR_CODE(RuntimeError);
111}
112
113int BThreadQueue::put(const TaskPtr &task) {
114 if (task && started()) {
115 task->status(Task::Status::SCHEDULED);
116 if (bthread::execution_queue_execute(queue_id_, task) == 0) {
117 LOG_DEBUG("Scheduled task[%s]", task->name().c_str());
118 return 0;
119 } else {
120 task->status(Task::Status::INITIALIZED);
121 LOG_ERROR("Scheduled task[%s] failed", task->name().c_str());
122 }
123 }
124 LOG_ERROR("Failed to schedule task");
125 return PROXIMA_BE_ERROR_CODE(RuntimeError);
126}
127
128bool BThreadQueue::started() const {
129 return status_ == Status::STARTED;
130}
131
132void BThreadQueue::clear_context() {
133 status_ = Status::INITIALIZED;
134 std::memset(static_cast<void *>(&queue_id_), 0, sizeof(queue_id_));
135 std::memset(static_cast<void *>(&options_), 0, sizeof(options_));
136}
137
138} // namespace query
139} // namespace be
140} // namespace proxima
141