1 | /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/core/grappler/clusters/single_machine.h" |
17 | |
18 | #include <atomic> |
19 | #include <memory> |
20 | |
21 | #include "tensorflow/cc/training/queue_runner.h" |
22 | #include "tensorflow/core/common_runtime/device.h" |
23 | #include "tensorflow/core/common_runtime/device_mgr.h" |
24 | #include "tensorflow/core/common_runtime/gpu/gpu_id.h" |
25 | #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" |
26 | #include "tensorflow/core/grappler/clusters/utils.h" |
27 | #include "tensorflow/core/grappler/utils.h" |
28 | #include "tensorflow/core/kernels/ops_util.h" |
29 | #include "tensorflow/core/lib/core/errors.h" |
30 | #include "tensorflow/core/lib/strings/strcat.h" |
31 | #include "tensorflow/core/platform/mutex.h" |
32 | #include "tensorflow/core/platform/notification.h" |
33 | #include "tensorflow/core/platform/types.h" |
34 | #include "tensorflow/core/public/session.h" |
35 | |
36 | namespace tensorflow { |
37 | namespace grappler { |
38 | |
39 | static std::atomic<bool> already_provisioned(false); |
40 | |
41 | SingleMachine::SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus) |
42 | : Cluster(timeout_s), expected_init_time_s_(0), closing_(false) { |
43 | VLOG(1) << "Number of CPU cores: " << num_cpu_cores |
44 | << " Number of GPUs: " << num_gpus; |
45 | thread_pool_.reset(new thread::ThreadPool( |
46 | Env::Default(), SanitizeThreadSuffix("single_machine" ), 2)); |
47 | |
48 | (*options_.config.mutable_device_count())["CPU" ] = 1; |
49 | if (num_gpus > 0) { |
50 | (*options_.config.mutable_device_count())["GPU" ] = num_gpus; |
51 | } |
52 | CHECK_GE(num_cpu_cores, 1); |
53 | options_.config.set_intra_op_parallelism_threads(num_cpu_cores); |
54 | // Create a session specific thread pool to ensure the threads are reset when |
55 | // the session is reset. |
56 | options_.config.add_session_inter_op_thread_pool()->set_num_threads( |
57 | num_cpu_cores); |
58 | if (timeout_s > 0) { |
59 | options_.config.set_operation_timeout_in_ms(timeout_s * 1000); |
60 | } |
61 | } |
62 | |
63 | SingleMachine::~SingleMachine() { |
64 | CloseSession(false /*use_timeout*/).IgnoreError(); |
65 | |
66 | // Reset the thread-pool so that there are no outstanding Session::Run(...)s |
67 | // when we delete the session. |
68 | thread_pool_.reset(); |
69 | } |
70 | |
71 | Status SingleMachine::Provision() { |
72 | // This is really ugly: to avoid leaking variables, we need to reset the tf |
73 | // session every time we're done processing a grappler item. However, |
74 | // variables are global, and therefore we can't have more than 1 session alive |
75 | // at a time. This check detects when more that one cluster is provisioned. |
76 | if (already_provisioned) { |
77 | return errors::Unavailable( |
78 | "Can't provision more than one single cluster at a time" ); |
79 | } |
80 | |
81 | TF_RETURN_IF_ERROR(ResetSession()); |
82 | |
83 | std::vector<DeviceAttributes> devices; |
84 | TF_RETURN_IF_ERROR(session_->ListDevices(&devices)); |
85 | for (const auto& dev : devices) { |
86 | DeviceProperties attr; |
87 | if (dev.device_type() == "CPU" ) { |
88 | attr = GetLocalCPUInfo(); |
89 | } else if (dev.device_type() == "GPU" ) { |
90 | DeviceNameUtils::ParsedName parsed; |
91 | if (!DeviceNameUtils::ParseFullName(dev.name(), &parsed)) { |
92 | return errors::InvalidArgument( |
93 | strings::StrCat("Not able to parse GPU device name: " , dev.name())); |
94 | } |
95 | TfGpuId tf_gpu_id(parsed.id); |
96 | CudaGpuId cuda_gpu_id; |
97 | Status s = GpuIdManager::TfToCudaGpuId(tf_gpu_id, &cuda_gpu_id); |
98 | if (!s.ok()) { |
99 | return errors::Unavailable("Unknown TF GPU device with id " , |
100 | tf_gpu_id.value(), ": " , s.ToString()); |
101 | } |
102 | attr = GetLocalGPUInfo(cuda_gpu_id); |
103 | } else if (dev.device_type().find("XLA" ) == string::npos) { |
104 | // Filter out the fake XLA devices to avoid double counting the actual |
105 | // hardware resources that are available. |
106 | attr.set_type(dev.device_type()); |
107 | } |
108 | // Overwrite the memory size since users might have requested to use only a |
109 | // fraction of the available device memory. |
110 | attr.set_memory_size(dev.memory_limit()); |
111 | devices_[dev.name()] = attr; |
112 | } |
113 | already_provisioned = true; |
114 | |
115 | // Clear highmark stats of all local allocators. |
116 | if (cpu_allocator_stats_enabled_) { |
117 | TF_RETURN_IF_ERROR(ClearAllocatorStats()); |
118 | } |
119 | return Status::OK(); |
120 | } |
121 | |
122 | Status SingleMachine::Initialize(const GrapplerItem& item) { |
123 | mutex_lock l(this->last_graph_mu_); |
124 | if (last_graph_ != &item.graph || last_graph_id_ != item.id) { |
125 | init_ops_ = item.init_ops; |
126 | expected_init_time_s_ = item.expected_init_time; |
127 | last_graph_ = nullptr; |
128 | queue_runner_defs_ = item.queue_runners; |
129 | last_graph_id_ = item.id; |
130 | } |
131 | return Status::OK(); |
132 | } |
133 | |
134 | Status SingleMachine::Shutdown() { |
135 | TF_RETURN_IF_ERROR(ShutdownSession()); |
136 | |
137 | mutex_lock l(this->last_graph_mu_); |
138 | last_graph_ = nullptr; |
139 | already_provisioned = false; |
140 | |
141 | return Status::OK(); |
142 | } |
143 | |
144 | Status SingleMachine::Run(const GraphDef& graph_def, |
145 | const std::vector<std::pair<string, Tensor>>& feed, |
146 | const std::vector<string>& fetch, |
147 | RunMetadata* metadata) { |
148 | { |
149 | mutex_lock l(this->last_graph_mu_); |
150 | if (last_graph_ != &graph_def) { |
151 | TF_RETURN_IF_ERROR(ResetSession()); |
152 | TF_RETURN_IF_ERROR(session_->Create(graph_def)); |
153 | if (!init_ops_.empty()) { |
154 | init_metadata_ = RunMetadata(); |
155 | int64 timeout_s = timeout_s_ + expected_init_time_s_; |
156 | TF_RETURN_IF_ERROR( |
157 | RunWithTimeout({}, init_ops_, &init_metadata_, timeout_s)); |
158 | // The compute cost for init ops is likely to be pessimistic since init |
159 | // ops are run only once before warmup. Therefore we only keep their |
160 | // memory costs. |
161 | for (auto node : *init_metadata_.mutable_cost_graph()->mutable_node()) { |
162 | node.clear_compute_cost(); |
163 | } |
164 | // Also clear the timeline to save memory |
165 | init_metadata_.clear_step_stats(); |
166 | } |
167 | // We can have at most one hardware trace. Use it for the main graph, and |
168 | // downgrade tracing of the queue runners to a software trace. |
169 | RunOptions queue_options = run_options_; |
170 | if (queue_options.trace_level() >= RunOptions::HARDWARE_TRACE) { |
171 | queue_options.set_trace_level(RunOptions::SOFTWARE_TRACE); |
172 | } |
173 | for (size_t i = 0; i < queue_runner_defs_.size(); ++i) { |
174 | std::unique_ptr<QueueRunner> queue_runner; |
175 | TF_RETURN_IF_ERROR(QueueRunner::New(queue_runner_defs_[i], |
176 | coordinator_.get(), &queue_runner)); |
177 | |
178 | TF_RETURN_IF_ERROR(queue_runner->StartAndCollectCostGraph( |
179 | session_.get(), queue_options)); |
180 | TF_RETURN_IF_ERROR( |
181 | coordinator_->RegisterRunner(std::move(queue_runner))); |
182 | TF_RETURN_IF_ERROR(coordinator_->GetStatus()); |
183 | } |
184 | |
185 | // Warmup TensorFlow if needed |
186 | for (int i = 0; |
187 | i < options_.config.graph_options().build_cost_model_after(); ++i) { |
188 | TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, nullptr)); |
189 | } |
190 | |
191 | last_graph_ = &graph_def; |
192 | } |
193 | } |
194 | |
195 | if (metadata) { |
196 | TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, metadata)); |
197 | // Merge the costs of the initialization and the queue runners. |
198 | CostGraphDef queue_costs; |
199 | TF_RETURN_IF_ERROR(coordinator_->ExportCostGraph(&queue_costs)); |
200 | MergeCosts(metadata->mutable_cost_graph(), init_metadata_.cost_graph(), |
201 | queue_costs); |
202 | } else { |
203 | return RunWithTimeout(feed, fetch, nullptr); |
204 | } |
205 | return Status::OK(); |
206 | } |
207 | |
208 | Status SingleMachine::EnablePeakMemoryStats(bool enable) { |
209 | EnableCPUAllocatorStats(enable); |
210 | cpu_allocator_stats_enabled_ = enable; |
211 | // No need to enable GPU allocator stats since its stats are always collected. |
212 | return Status::OK(); |
213 | } |
214 | |
215 | Status SingleMachine::GetPeakMemoryUsage( |
216 | std::unordered_map<string, uint64>* device_peak_memory) const { |
217 | // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the |
218 | // the AllocatorStats would be collected. |
219 | if (!cpu_allocator_stats_enabled_) { |
220 | return Status(error::INVALID_ARGUMENT, |
221 | "Tracking allocation for CPU is not enabled." ); |
222 | } |
223 | |
224 | const DeviceMgr* device_mgr; |
225 | TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr)); |
226 | std::vector<Device*> devices = device_mgr->ListDevices(); |
227 | |
228 | device_peak_memory->clear(); |
229 | for (Device* device : devices) { |
230 | AllocatorStats stats; |
231 | auto* allocator = device->GetAllocator(AllocatorAttributes()); |
232 | if (!allocator->TracksAllocationSizes()) { |
233 | return Status(error::INVALID_ARGUMENT, |
234 | "Tracking allocation is not enabled." ); |
235 | } |
236 | allocator->GetStats(&stats); |
237 | (*device_peak_memory)[device->name()] = stats.max_bytes_in_use; |
238 | } |
239 | |
240 | return Status::OK(); |
241 | } |
242 | |
243 | Status SingleMachine::RunWithTimeout( |
244 | const std::vector<std::pair<string, Tensor>>& feed, |
245 | const std::vector<string>& fetch, RunMetadata* run_metadata) { |
246 | return RunWithTimeout(feed, fetch, run_metadata, timeout_s_); |
247 | } |
248 | |
249 | Status SingleMachine::RunWithTimeout( |
250 | const std::vector<std::pair<string, Tensor>>& feed, |
251 | const std::vector<string>& fetch, RunMetadata* run_metadata, |
252 | int64 timeout_s) { |
253 | // We shouldn't be running or closing the session at this point. |
254 | { |
255 | mutex_lock l(close_mu_); |
256 | CHECK(!closing_); |
257 | } |
258 | |
259 | auto status = std::make_shared<Status>(); |
260 | auto local_metadata = std::make_shared<RunMetadata>(); |
261 | const bool executed_in_time = ExecuteWithTimeout( |
262 | [this, status, local_metadata, feed, fetch]() { |
263 | *status = session_->Run(run_options_, feed, {}, fetch, nullptr, |
264 | local_metadata.get()); |
265 | }, |
266 | timeout_s * 1000, thread_pool_.get()); |
267 | if (!executed_in_time) { |
268 | return errors::DeadlineExceeded("Failed to run the graph after " , timeout_s, |
269 | " seconds, aborting" ); |
270 | } else if (run_metadata && status->ok()) { |
271 | *run_metadata = *local_metadata; |
272 | } |
273 | return *status; |
274 | } |
275 | |
276 | Status SingleMachine::CloseSession(bool use_timeout) { |
277 | if (!session_ || !thread_pool_) { |
278 | return Status::OK(); |
279 | } |
280 | |
281 | { |
282 | mutex_lock l(close_mu_); |
283 | |
284 | if (!closing_) { |
285 | closing_ = true; |
286 | } |
287 | } |
288 | |
289 | const bool executed_in_time = ExecuteWithTimeout( |
290 | [&]() { |
291 | if (this->coordinator_) { |
292 | this->coordinator_->RequestStop().IgnoreError(); |
293 | // Wait for all the runners to have closed their queues. |
294 | while (!this->coordinator_->AllRunnersStopped()) { |
295 | sleep(1); |
296 | } |
297 | // Now we can close the session. This should cancel any pending I/O |
298 | // operation. |
299 | this->session_->Close().IgnoreError(); |
300 | // Last but not least, we can delete the coordinator. |
301 | this->coordinator_.reset(); |
302 | } else { |
303 | this->session_->Close().IgnoreError(); |
304 | } |
305 | |
306 | mutex_lock l2(close_mu_); |
307 | closing_ = false; |
308 | }, |
309 | use_timeout ? timeout_s_ * 1000 : -1, thread_pool_.get()); |
310 | |
311 | if (!executed_in_time) { |
312 | // Let the caller know that we can't shutdown the session, and therefore |
313 | // can't process any further. |
314 | return errors::Unavailable("Failed to close the previous session after " , |
315 | timeout_s_, " seconds, aborting" ); |
316 | } |
317 | |
318 | return Status::OK(); |
319 | } |
320 | |
321 | Status SingleMachine::ShutdownSession() { |
322 | TF_RETURN_IF_ERROR(CloseSession(true /*use_timeout*/)); |
323 | |
324 | // Delete the threadpool: this ensures that all the pending closures complete |
325 | // before we return. Note that if TF deadlocked on us, the closures will |
326 | // never complete, and the call to thread_pool_.reset() will never return: |
327 | // therefore we need to delete the threadpool with the background thread. |
328 | // That thread itself will also never complete, so the user should |
329 | // abort the process to avoid leaking too many resources. |
330 | auto n = std::make_shared<Notification>(); |
331 | Env::Default()->SchedClosure([this, n]() { |
332 | thread_pool_.reset(); |
333 | n->Notify(); |
334 | }); |
335 | int64 timeout_us = 1000000ll * timeout_s_; |
336 | const bool notified = WaitForNotificationWithTimeout(n.get(), timeout_us); |
337 | if (!notified) { |
338 | // Let the caller know that we can't shutdown the session properly since |
339 | // there are calls to Session::Run() still running. |
340 | return errors::Unavailable("The session is still running graphs after " , |
341 | timeout_s_, " seconds" ); |
342 | } |
343 | |
344 | return Status::OK(); |
345 | } |
346 | |
347 | Status SingleMachine::ResetSession() { |
348 | if (session_) { |
349 | LOG(INFO) << "Cleaning up previous session" ; |
350 | |
351 | // Make sure the session is properly closed |
352 | TF_RETURN_IF_ERROR(ShutdownSession()); |
353 | |
354 | // Destroying the object deletes all its variables as well. This is only |
355 | // true for DirectSession. |
356 | session_.reset(); |
357 | } |
358 | |
359 | LOG(INFO) << "Starting new session" ; |
360 | |
361 | // Create a new threadpool |
362 | thread_pool_.reset(new thread::ThreadPool( |
363 | Env::Default(), SanitizeThreadSuffix("single_machine" ), 2)); |
364 | |
365 | session_.reset(NewSession(options_)); |
366 | if (!session_) { |
367 | return errors::Unknown("Failed to create session" ); |
368 | } |
369 | coordinator_.reset(new Coordinator()); |
370 | |
371 | return Status::OK(); |
372 | } |
373 | |
374 | void SingleMachine::MergeCosts(CostGraphDef* graph_costs, |
375 | const CostGraphDef& init_costs, |
376 | const CostGraphDef& queue_costs) { |
377 | graph_costs->mutable_node()->Reserve(graph_costs->node_size() + |
378 | init_costs.node_size() + |
379 | queue_costs.node_size()); |
380 | std::unordered_set<string> nodes_seen; |
381 | int queue_costs_id_offset = graph_costs->node_size(); |
382 | for (const auto& node : graph_costs->node()) { |
383 | nodes_seen.insert(node.name()); |
384 | if (node.id() >= queue_costs_id_offset) { |
385 | queue_costs_id_offset = node.id() + 1; |
386 | } |
387 | } |
388 | |
389 | int init_costs_id_offset = queue_costs_id_offset + queue_costs.node_size(); |
390 | // The costs obtained by running the main graph could be more stable than |
391 | // the one we get from the queue runners since the queue runners run |
392 | // asynchronously. |
393 | for (const auto& node : queue_costs.node()) { |
394 | if (nodes_seen.find(node.name()) != nodes_seen.end()) { |
395 | continue; |
396 | } |
397 | |
398 | auto* new_node = graph_costs->add_node(); |
399 | new_node->MergeFrom(node); |
400 | |
401 | new_node->set_id(node.id() + queue_costs_id_offset); |
402 | if (new_node->id() >= init_costs_id_offset) { |
403 | init_costs_id_offset = new_node->id() + 1; |
404 | } |
405 | |
406 | for (auto& input_info : *new_node->mutable_input_info()) { |
407 | input_info.set_preceding_node(input_info.preceding_node() + |
408 | queue_costs_id_offset); |
409 | } |
410 | for (auto& control_input : *new_node->mutable_control_input()) { |
411 | control_input += queue_costs_id_offset; |
412 | } |
413 | } |
414 | |
415 | // Don't overwrite the costs with that generated during initialization since |
416 | // these are possibly outdated. |
417 | for (const auto& node : init_costs.node()) { |
418 | if (nodes_seen.find(node.name()) != nodes_seen.end()) { |
419 | continue; |
420 | } |
421 | |
422 | auto* new_node = graph_costs->add_node(); |
423 | new_node->MergeFrom(node); |
424 | |
425 | new_node->set_id(node.id() + init_costs_id_offset); |
426 | for (auto& input_info : *new_node->mutable_input_info()) { |
427 | input_info.set_preceding_node(input_info.preceding_node() + |
428 | init_costs_id_offset); |
429 | } |
430 | for (auto& control_input : *new_node->mutable_control_input()) { |
431 | control_input += init_costs_id_offset; |
432 | } |
433 | } |
434 | } |
435 | |
436 | Status SingleMachine::ClearAllocatorStats() const { |
437 | // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the |
438 | // the AllocatorStats would be collected. |
439 | if (!cpu_allocator_stats_enabled_) { |
440 | return Status(error::INVALID_ARGUMENT, |
441 | "Tracking allocation for CPU is not enabled." ); |
442 | } |
443 | |
444 | const DeviceMgr* device_mgr; |
445 | TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr)); |
446 | std::vector<Device*> devices = device_mgr->ListDevices(); |
447 | |
448 | for (Device* device : devices) { |
449 | AllocatorStats stats; |
450 | auto* allocator = device->GetAllocator(AllocatorAttributes()); |
451 | if (!allocator->TracksAllocationSizes()) { |
452 | return Status(error::INVALID_ARGUMENT, |
453 | "Tracking allocation is not enabled." ); |
454 | } |
455 | allocator->ClearStats(); |
456 | } |
457 | return Status::OK(); |
458 | } |
459 | |
460 | } // namespace grappler |
461 | } // namespace tensorflow |
462 | |