1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#include "ActivityProfilerController.h"
10
11#include <chrono>
12#include <functional>
13#include <thread>
14
15#include "ActivityLoggerFactory.h"
16#include "ActivityTrace.h"
17#include "CuptiActivityApi.h"
18#ifdef HAS_ROCTRACER
19#include "RoctracerActivityApi.h"
20#endif
21#include "ThreadUtil.h"
22#include "output_json.h"
23#include "output_membuf.h"
24
25#include "Logger.h"
26
27using namespace std::chrono;
28
29namespace KINETO_NAMESPACE {
30
31constexpr milliseconds kProfilerIntervalMsecs(1000);
32
33#if !USE_GOOGLE_LOG
34static std::unique_ptr<LoggerCollector>& loggerCollectorFactory() {
35 static std::unique_ptr<LoggerCollector> factory = nullptr;
36 return factory;
37}
38
39void ActivityProfilerController::setLoggerCollectorFactory(
40 std::function<std::unique_ptr<LoggerCollector>()> factory) {
41 loggerCollectorFactory() = factory();
42}
43#endif // !USE_GOOGLE_LOG
44
45ActivityProfilerController::ActivityProfilerController(
46 ConfigLoader& configLoader, bool cpuOnly)
47 : configLoader_(configLoader) {
48#ifdef HAS_ROCTRACER
49 profiler_ = std::make_unique<CuptiActivityProfiler>(
50 RoctracerActivityApi::singleton(), cpuOnly);
51#else
52 profiler_ = std::make_unique<CuptiActivityProfiler>(
53 CuptiActivityApi::singleton(), cpuOnly);
54#endif
55 configLoader_.addHandler(ConfigLoader::ConfigKind::ActivityProfiler, this);
56
57#if !USE_GOOGLE_LOG
58 if (loggerCollectorFactory()) {
59 Logger::addLoggerObserver(loggerCollectorFactory().get());
60 }
61#endif // !USE_GOOGLE_LOG
62}
63
64ActivityProfilerController::~ActivityProfilerController() {
65 configLoader_.removeHandler(
66 ConfigLoader::ConfigKind::ActivityProfiler, this);
67 if (profilerThread_) {
68 // signaling termination of the profiler loop
69 stopRunloop_ = true;
70 profilerThread_->join();
71 delete profilerThread_;
72 profilerThread_ = nullptr;
73 }
74
75#if !USE_GOOGLE_LOG
76 if (loggerCollectorFactory()) {
77 Logger::removeLoggerObserver(loggerCollectorFactory().get());
78 }
79#endif // !USE_GOOGLE_LOG
80}
81
82static ActivityLoggerFactory initLoggerFactory() {
83 ActivityLoggerFactory factory;
84 factory.addProtocol("file", [](const std::string& url) {
85 return std::unique_ptr<ActivityLogger>(new ChromeTraceLogger(url));
86 });
87 return factory;
88}
89
90static ActivityLoggerFactory& loggerFactory() {
91 static ActivityLoggerFactory factory = initLoggerFactory();
92 return factory;
93}
94
95void ActivityProfilerController::addLoggerFactory(
96 const std::string& protocol, ActivityLoggerFactory::FactoryFunc factory) {
97 loggerFactory().addProtocol(protocol, factory);
98}
99
100static std::unique_ptr<ActivityLogger> makeLogger(const Config& config) {
101 if (config.activitiesLogToMemory()) {
102 return std::make_unique<MemoryTraceLogger>(config);
103 }
104 return loggerFactory().makeLogger(config.activitiesLogUrl());
105}
106
107static std::unique_ptr<InvariantViolationsLogger>& invariantViolationsLoggerFactory() {
108 static std::unique_ptr<InvariantViolationsLogger> factory = nullptr;
109 return factory;
110}
111
112void ActivityProfilerController::setInvariantViolationsLoggerFactory(
113 const std::function<std::unique_ptr<InvariantViolationsLogger>()>& factory) {
114 invariantViolationsLoggerFactory() = factory();
115}
116
117bool ActivityProfilerController::canAcceptConfig() {
118 return !profiler_->isActive();
119}
120
121void ActivityProfilerController::acceptConfig(const Config& config) {
122 VLOG(1) << "acceptConfig";
123 if (config.activityProfilerEnabled()) {
124 scheduleTrace(config);
125 }
126}
127
128bool ActivityProfilerController::shouldActivateTimestampConfig(
129 const std::chrono::time_point<std::chrono::system_clock>& now) {
130 if (asyncRequestConfig_->hasProfileStartIteration()) {
131 return false;
132 }
133 // Note on now + kProfilerIntervalMsecs
134 // Profiler interval does not align perfectly up to startTime - warmup.
135 // Waiting until the next tick won't allow sufficient time for the profiler to warm up.
136 // So check if we are very close to the warmup time and trigger warmup.
137 if (now + kProfilerIntervalMsecs
138 >= (asyncRequestConfig_->requestTimestamp() - asyncRequestConfig_->activitiesWarmupDuration())) {
139 LOG(INFO) << "Received on-demand activity trace request by "
140 << " profile timestamp = "
141 << asyncRequestConfig_->requestTimestamp().time_since_epoch().count();
142 return true;
143 }
144 return false;
145}
146
147bool ActivityProfilerController::shouldActivateIterationConfig(
148 int64_t currentIter) {
149 if (!asyncRequestConfig_->hasProfileStartIteration()) {
150 return false;
151 }
152 auto rootIter = asyncRequestConfig_->startIterationIncludingWarmup();
153 // Keep waiting, it is not time to start yet.
154 if (currentIter < rootIter) {
155 return false;
156 }
157
158 LOG(INFO) << "Received on-demand activity trace request by "
159 " profile start iteration = "
160 << asyncRequestConfig_->profileStartIteration()
161 << ", current iteration = " << currentIter;
162 // Re-calculate the start iter if requested iteration is in the past.
163 if (currentIter > rootIter) {
164 auto newProfileStart = currentIter +
165 asyncRequestConfig_->activitiesWarmupIterations();
166 // Use Start Iteration Round Up if it is present.
167 if (asyncRequestConfig_->profileStartIterationRoundUp() > 0) {
168 // round up to nearest multiple
169 auto divisor = asyncRequestConfig_->profileStartIterationRoundUp();
170 auto rem = newProfileStart % divisor;
171 newProfileStart += ((rem == 0) ? 0 : divisor - rem);
172 LOG(INFO) << "Rounding up profiler start iteration to : " << newProfileStart;
173 asyncRequestConfig_->setProfileStartIteration(newProfileStart);
174 if (currentIter != asyncRequestConfig_->startIterationIncludingWarmup()) {
175 // Ex. Current 9, start 8, warmup 5, roundup 100. Resolves new start to 100,
176 // with warmup starting at 95. So don't start now.
177 return false;
178 }
179 } else {
180 LOG(INFO) << "Start iteration updated to : " << newProfileStart;
181 asyncRequestConfig_->setProfileStartIteration(newProfileStart);
182 }
183 }
184 return true;
185}
186
187void ActivityProfilerController::profilerLoop() {
188 setThreadName("Kineto Activity Profiler");
189 VLOG(0) << "Entering activity profiler loop";
190
191 auto now = system_clock::now();
192 auto next_wakeup_time = now + kProfilerIntervalMsecs;
193
194 while (!stopRunloop_) {
195 now = system_clock::now();
196
197 while (now < next_wakeup_time) {
198 /* sleep override */
199 std::this_thread::sleep_for(next_wakeup_time - now);
200 now = system_clock::now();
201 }
202
203 // Perform Double-checked locking to reduce overhead of taking lock.
204 if (asyncRequestConfig_ && !profiler_->isActive()) {
205 std::lock_guard<std::mutex> lock(asyncConfigLock_);
206 if (asyncRequestConfig_ && !profiler_->isActive() &&
207 shouldActivateTimestampConfig(now)) {
208 activateConfig(now);
209 }
210 }
211
212 while (next_wakeup_time < now) {
213 next_wakeup_time += kProfilerIntervalMsecs;
214 }
215
216 if (profiler_->isActive()) {
217 next_wakeup_time = profiler_->performRunLoopStep(now, next_wakeup_time);
218 VLOG(1) << "Profiler loop: "
219 << duration_cast<milliseconds>(system_clock::now() - now).count()
220 << "ms";
221 }
222 }
223
224 VLOG(0) << "Exited activity profiling loop";
225}
226
227void ActivityProfilerController::step() {
228 // Do not remove this copy to currentIter. Otherwise count is not guaranteed.
229 int64_t currentIter = ++iterationCount_;
230 VLOG(0) << "Step called , iteration = " << currentIter;
231
232 // Perform Double-checked locking to reduce overhead of taking lock.
233 if (asyncRequestConfig_ && !profiler_->isActive()) {
234 std::lock_guard<std::mutex> lock(asyncConfigLock_);
235 auto now = system_clock::now();
236 if (asyncRequestConfig_ && !profiler_->isActive() &&
237 shouldActivateIterationConfig(currentIter)) {
238 activateConfig(now);
239 }
240 }
241
242 if (profiler_->isActive()) {
243 auto now = system_clock::now();
244 auto next_wakeup_time = now + kProfilerIntervalMsecs;
245 profiler_->performRunLoopStep(now, next_wakeup_time, currentIter);
246 }
247}
248
249// This function should only be called when holding the configLock_.
250void ActivityProfilerController::activateConfig(
251 std::chrono::time_point<std::chrono::system_clock> now) {
252 logger_ = makeLogger(*asyncRequestConfig_);
253 profiler_->setLogger(logger_.get());
254 LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND();
255 profiler_->configure(*asyncRequestConfig_, now);
256 asyncRequestConfig_ = nullptr;
257}
258
259void ActivityProfilerController::scheduleTrace(const Config& config) {
260 VLOG(1) << "scheduleTrace";
261 if (profiler_->isActive()) {
262 LOG(WARNING) << "Ignored request - profiler busy";
263 return;
264 }
265 int64_t currentIter = iterationCount_;
266 if (config.hasProfileStartIteration() && currentIter < 0) {
267 LOG(WARNING) << "Ignored profile iteration count based request as "
268 << "application is not updating iteration count";
269 return;
270 }
271
272 bool newConfigScheduled = false;
273 if (!asyncRequestConfig_) {
274 std::lock_guard<std::mutex> lock(asyncConfigLock_);
275 if (!asyncRequestConfig_) {
276 asyncRequestConfig_ = config.clone();
277 newConfigScheduled = true;
278 }
279 }
280 if (!newConfigScheduled) {
281 LOG(WARNING) << "Ignored request - another profile request is pending.";
282 return;
283 }
284
285 // start a profilerLoop() thread to handle request
286 if (!profilerThread_) {
287 profilerThread_ =
288 new std::thread(&ActivityProfilerController::profilerLoop, this);
289 }
290}
291
292void ActivityProfilerController::prepareTrace(const Config& config) {
293 // Requests from ActivityProfilerApi have higher priority than
294 // requests from other sources (signal, daemon).
295 // Cancel any ongoing request and refuse new ones.
296 auto now = system_clock::now();
297 if (profiler_->isActive()) {
298 LOG(WARNING) << "Cancelling current trace request in order to start "
299 << "higher priority synchronous request";
300 if (libkineto::api().client()) {
301 libkineto::api().client()->stop();
302 }
303 profiler_->stopTrace(now);
304 profiler_->reset();
305 }
306
307 profiler_->configure(config, now);
308}
309
310void ActivityProfilerController::startTrace() {
311 UST_LOGGER_MARK_COMPLETED(kWarmUpStage);
312 profiler_->startTrace(std::chrono::system_clock::now());
313}
314
315std::unique_ptr<ActivityTraceInterface> ActivityProfilerController::stopTrace() {
316 profiler_->stopTrace(std::chrono::system_clock::now());
317 UST_LOGGER_MARK_COMPLETED(kCollectionStage);
318 auto logger = std::make_unique<MemoryTraceLogger>(profiler_->config());
319 profiler_->processTrace(*logger);
320 // Will follow up with another patch for logging URLs when ActivityTrace is moved.
321 UST_LOGGER_MARK_COMPLETED(kPostProcessingStage);
322 profiler_->reset();
323 return std::make_unique<ActivityTrace>(std::move(logger), loggerFactory());
324}
325
326void ActivityProfilerController::addMetadata(
327 const std::string& key, const std::string& value) {
328 profiler_->addMetadata(key, value);
329}
330
331void ActivityProfilerController::logInvariantViolation(
332 const std::string& profile_id,
333 const std::string& assertion,
334 const std::string& error,
335 const std::string& group_profile_id) {
336 if (invariantViolationsLoggerFactory()) {
337 invariantViolationsLoggerFactory()->logInvariantViolation(profile_id, assertion, error, group_profile_id);
338 }
339}
340
341} // namespace KINETO_NAMESPACE
342