1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Nov 2020
18 * \brief
19 */
20
21#include "scheduler.h"
22#include <atomic>
23#include <thread>
24#include "common/error_code.h"
25#include "common/logger.h"
26#include "bthread_queue.h"
27
28namespace proxima {
29namespace be {
30namespace query {
31
32//! Alias for TaskQueueVector
33using TaskQueueVector = std::vector<TaskQueuePtr>;
34
35/*!
36 * Selector
37 */
38struct Selector {
39 //! Pick a number between [0:limit]
40 static uint32_t pick(uint32_t /* limit */) {
41 return 0;
42 }
43};
44
45//! Round Robin Selector
46struct RoundRobinSelector {
47 //! Pick a number between [0:limit]
48 uint32_t pick(uint32_t limit) {
49 return count++ % limit;
50 }
51
52 private:
53 std::atomic<uint64_t> count{0};
54};
55
56template <typename Selector>
57class SchedulerImpl : public Scheduler {
58 public:
59 //! Constructor
60 SchedulerImpl() : concurrency_(0) {}
61
62 //! Destructor
63 ~SchedulerImpl() override = default;
64
65 public:
66 //! Dispatch task to execution queue
67 int schedule(TaskPtr task) override {
68 if (!queues_.empty()) {
69 uint32_t pos = selector_.pick(concurrency_);
70 LOG_DEBUG("Selector return[%u], task[%s]", pos, task->name().c_str());
71 return queues_[pos]->put(task);
72 }
73 return PROXIMA_BE_ERROR_CODE(UnreadyQueue);
74 }
75
76 //! Retrieve concurrency field
77 uint32_t concurrency() const override {
78 return concurrency_;
79 }
80
81 //! Set concurrency field
82 uint32_t concurrency(uint32_t concurrent) override {
83 recycle_queue();
84 concurrency_ = resize(concurrent);
85 return concurrency_;
86 }
87
88 private:
89 //! Resize queue buckets
90 uint32_t resize(uint32_t size) {
91 while (size--) {
92 TaskQueuePtr queue = TaskQueuePtr(new BThreadQueue());
93 if (queue->start() == 0) {
94 LOG_DEBUG("Success to start task execution queue");
95 queues_.emplace_back(queue);
96 }
97 }
98 return static_cast<uint32_t>(queues_.size());
99 }
100
101 void recycle_queue() {
102 for (auto &queue : queues_) {
103 queue->stop();
104 queue->join();
105 }
106 queues_.clear();
107 }
108
109 private:
110 //! Concurrency
111 uint32_t concurrency_{0};
112 //! Execution queue buckets
113 TaskQueueVector queues_{};
114 //! Selector, Load balance role
115 Selector selector_{};
116};
117
118//! Retrieve default scheduler reference
119SchedulerPtr Scheduler::Default() {
120 static SchedulerPtr kScheduler =
121 SchedulerPtr(new SchedulerImpl<RoundRobinSelector>());
122 return kScheduler;
123}
124
125//! Retrieve hardware concurrency
126uint32_t Scheduler::HostConcurrency() {
127 return std::thread::hardware_concurrency();
128}
129
130} // namespace query
131} // namespace be
132} // namespace proxima
133