1 | /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include <fcntl.h> |
17 | #include <poll.h> |
18 | #include <signal.h> |
19 | #include <stdlib.h> |
20 | #include <string.h> |
21 | #include <sys/types.h> |
22 | #include <sys/wait.h> |
23 | #include <memory> |
24 | #include <vector> |
25 | |
26 | #include "tensorflow/core/platform/logging.h" |
27 | #include "tensorflow/core/platform/subprocess.h" |
28 | |
29 | // 1) FYI from m3b@ about fork(): |
30 | // A danger of calling fork() (as opposed to clone() or vfork()) is that if |
31 | // many people have used pthread_atfork() to acquire locks, fork() can deadlock, |
32 | // because it's unlikely that the locking order will be correct in a large |
33 | // program where different layers are unaware of one another and using |
34 | // pthread_atfork() independently. |
35 | // |
36 | // The danger of not calling fork() is that if libc managed to use |
37 | // pthread_atfork() correctly (for example, to lock the environment), you'd |
38 | // miss out on that protection. (But as far as I can see most libc's don't get |
39 | // that right; certainly glibc doesn't seem to.) |
40 | // |
41 | // clone() or vfork() are also frustrating because clone() exists only on Linux, |
42 | // and both clone(...CLONE_VM...) and vfork() have interesting issues around |
43 | // signals being delivered after the fork and before the exec. It may be |
44 | // possible to work around the latter by blocking all signals before the fork |
45 | // and unblocking them afterwards. |
46 | // |
47 | // Fortunately, most people haven't heard of pthread_atfork(). |
48 | // |
49 | // |
50 | // 2) FYI from m3b@ about execv(): |
51 | // The execv() call implicitly uses the libc global variable environ, which was |
52 | // copied by fork(), and that copy could have raced with a setenv() call in |
53 | // another thread, since libc implementations are usually not very careful about |
54 | // this. (glibc isn't careful, for example.) |
55 | // |
56 | // If this were inside libc, we could use locks or memory barriers to avoid the |
57 | // race, but as it is, I see nothing you can do. Even if you tried to copy the |
58 | // environment before the fork(), the copying could race with other threads |
59 | // calling setenv(). The good news is that few people call setenv(). |
60 | // |
61 | // Amusingly, the standard says of fork(): "...to avoid errors, the child |
62 | // process may only execute async-signal-safe operations until such time as one |
63 | // of the exec functions is called." Notice that execve() is listed as |
64 | // async-signal-safe, but execv() is not, and the difference is just the |
65 | // handling of the environment. |
66 | |
67 | namespace tensorflow { |
68 | |
69 | SubProcess::SubProcess(int nfds) |
70 | : running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) { |
71 | // The input 'nfds' parameter is currently ignored and the internal constant |
72 | // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr). |
73 | for (int i = 0; i < kNFds; i++) { |
74 | action_[i] = ACTION_CLOSE; |
75 | parent_pipe_[i] = -1; |
76 | child_pipe_[i] = -1; |
77 | } |
78 | } |
79 | |
80 | SubProcess::~SubProcess() { |
81 | mutex_lock procLock(proc_mu_); |
82 | mutex_lock dataLock(data_mu_); |
83 | pid_ = -1; |
84 | running_ = false; |
85 | FreeArgs(); |
86 | ClosePipes(); |
87 | } |
88 | |
89 | void SubProcess::FreeArgs() { |
90 | free(exec_path_); |
91 | exec_path_ = nullptr; |
92 | |
93 | if (exec_argv_) { |
94 | for (char** p = exec_argv_; *p != nullptr; p++) { |
95 | free(*p); |
96 | } |
97 | delete[] exec_argv_; |
98 | exec_argv_ = nullptr; |
99 | } |
100 | } |
101 | |
102 | void SubProcess::ClosePipes() { |
103 | for (int i = 0; i < kNFds; i++) { |
104 | if (parent_pipe_[i] >= 0) { |
105 | close(parent_pipe_[i]); |
106 | parent_pipe_[i] = -1; |
107 | } |
108 | if (child_pipe_[i] >= 0) { |
109 | close(child_pipe_[i]); |
110 | child_pipe_[i] = -1; |
111 | } |
112 | } |
113 | } |
114 | |
115 | void SubProcess::SetProgram(const string& file, |
116 | const std::vector<string>& argv) { |
117 | mutex_lock procLock(proc_mu_); |
118 | mutex_lock dataLock(data_mu_); |
119 | if (running_) { |
120 | LOG(FATAL) << "SetProgram called after the process was started." ; |
121 | return; |
122 | } |
123 | |
124 | FreeArgs(); |
125 | exec_path_ = strdup(file.c_str()); |
126 | if (exec_path_ == nullptr) { |
127 | LOG(FATAL) << "SetProgram failed to allocate file string." ; |
128 | return; |
129 | } |
130 | |
131 | int argc = argv.size(); |
132 | exec_argv_ = new char*[argc + 1]; |
133 | for (int i = 0; i < argc; i++) { |
134 | exec_argv_[i] = strdup(argv[i].c_str()); |
135 | if (exec_argv_[i] == nullptr) { |
136 | LOG(FATAL) << "SetProgram failed to allocate command argument." ; |
137 | return; |
138 | } |
139 | } |
140 | exec_argv_[argc] = nullptr; |
141 | } |
142 | |
143 | void SubProcess::SetChannelAction(Channel chan, ChannelAction action) { |
144 | mutex_lock procLock(proc_mu_); |
145 | mutex_lock dataLock(data_mu_); |
146 | if (running_) { |
147 | LOG(FATAL) << "SetChannelAction called after the process was started." ; |
148 | } else if (!chan_valid(chan)) { |
149 | LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan; |
150 | } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) && |
151 | (action != ACTION_DUPPARENT)) { |
152 | LOG(FATAL) << "SetChannelAction called with invalid action: " << action; |
153 | } else { |
154 | action_[chan] = action; |
155 | } |
156 | } |
157 | |
158 | bool SubProcess::Start() { |
159 | mutex_lock procLock(proc_mu_); |
160 | mutex_lock dataLock(data_mu_); |
161 | if (running_) { |
162 | LOG(ERROR) << "Start called after the process was started." ; |
163 | return false; |
164 | } |
165 | if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) { |
166 | LOG(ERROR) << "Start called without setting a program." ; |
167 | return false; |
168 | } |
169 | |
170 | // Create parent/child pipes for the specified channels and make the |
171 | // parent-side of the pipes non-blocking. |
172 | for (int i = 0; i < kNFds; i++) { |
173 | if (action_[i] == ACTION_PIPE) { |
174 | int pipe_fds[2]; |
175 | if (pipe(pipe_fds) < 0) { |
176 | LOG(ERROR) << "Start cannot create pipe: " << strerror(errno); |
177 | ClosePipes(); |
178 | return false; |
179 | } |
180 | // Handle the direction of the pipe (stdin vs stdout/err). |
181 | if (i == 0) { |
182 | parent_pipe_[i] = pipe_fds[1]; |
183 | child_pipe_[i] = pipe_fds[0]; |
184 | } else { |
185 | parent_pipe_[i] = pipe_fds[0]; |
186 | child_pipe_[i] = pipe_fds[1]; |
187 | } |
188 | |
189 | if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) { |
190 | LOG(ERROR) << "Start cannot make pipe non-blocking: " |
191 | << strerror(errno); |
192 | ClosePipes(); |
193 | return false; |
194 | } |
195 | if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) { |
196 | LOG(ERROR) << "Start cannot make pipe close-on-exec: " |
197 | << strerror(errno); |
198 | ClosePipes(); |
199 | return false; |
200 | } |
201 | } |
202 | } |
203 | |
204 | // Start the child process and setup the file descriptors of both processes. |
205 | // See comment (1) in the header about issues with the use of fork(). |
206 | pid_ = fork(); |
207 | if (pid_ < 0) { |
208 | LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno); |
209 | ClosePipes(); |
210 | return false; |
211 | } |
212 | |
213 | if (pid_ > 0) { |
214 | // Parent process: close the child-side pipes and return. |
215 | running_ = true; |
216 | for (int i = 0; i < kNFds; i++) { |
217 | if (child_pipe_[i] >= 0) { |
218 | close(child_pipe_[i]); |
219 | child_pipe_[i] = -1; |
220 | } |
221 | } |
222 | return true; |
223 | } |
224 | |
225 | // Child process: close parent-side pipes and channels marked for closing. |
226 | // For pipe channels, replace their file descriptors with the pipes. |
227 | int devnull_fd = -1; |
228 | for (int i = 0; i < kNFds; i++) { |
229 | if (parent_pipe_[i] >= 0) { |
230 | close(parent_pipe_[i]); |
231 | parent_pipe_[i] = -1; |
232 | } |
233 | |
234 | switch (action_[i]) { |
235 | case ACTION_DUPPARENT: |
236 | // Nothing to do, fork() took care of it. |
237 | break; |
238 | |
239 | case ACTION_PIPE: |
240 | while (dup2(child_pipe_[i], i) < 0) { |
241 | if (!retry(errno)) { |
242 | _exit(1); |
243 | } |
244 | } |
245 | close(child_pipe_[i]); |
246 | child_pipe_[i] = -1; |
247 | break; |
248 | |
249 | case ACTION_CLOSE: |
250 | default: |
251 | // Do not close stdin/out/err, instead redirect them to /dev/null so |
252 | // their file descriptors remain unavailable for reuse by open(), etc. |
253 | if (i <= CHAN_STDERR) { |
254 | if (devnull_fd < 0) { |
255 | while ((devnull_fd = open("/dev/null" , O_RDWR, 0)) < 0) { |
256 | if (!retry(errno)) { |
257 | _exit(1); |
258 | } |
259 | } |
260 | } |
261 | while (dup2(devnull_fd, i) < 0) { |
262 | if (!retry(errno)) { |
263 | _exit(1); |
264 | } |
265 | } |
266 | } else { |
267 | close(i); |
268 | } |
269 | break; |
270 | } |
271 | } |
272 | |
273 | if (devnull_fd >= 0) { |
274 | close(devnull_fd); |
275 | } |
276 | |
277 | // Execute the child program. |
278 | // See comment (2) in the header about issues with the use of execv(). |
279 | execv(exec_path_, exec_argv_); |
280 | _exit(1); |
281 | } |
282 | |
283 | bool SubProcess::Wait() { |
284 | int status; |
285 | return WaitInternal(&status); |
286 | } |
287 | |
288 | bool SubProcess::WaitInternal(int* status) { |
289 | // The waiter must release proc_mu_ while waiting in order for Kill() to work. |
290 | proc_mu_.lock(); |
291 | bool running = running_; |
292 | pid_t pid = pid_; |
293 | proc_mu_.unlock(); |
294 | |
295 | bool ret = false; |
296 | if (running && (pid > 1)) { |
297 | pid_t cpid; |
298 | int cstat; |
299 | bool done = false; |
300 | while (!done) { |
301 | cpid = waitpid(pid, &cstat, 0); |
302 | if ((cpid < 0) && !retry(errno)) { |
303 | done = true; |
304 | } else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) { |
305 | *status = cstat; |
306 | ret = true; |
307 | done = true; |
308 | } |
309 | } |
310 | } |
311 | |
312 | proc_mu_.lock(); |
313 | if ((running_ == running) && (pid_ == pid)) { |
314 | running_ = false; |
315 | pid_ = -1; |
316 | } |
317 | proc_mu_.unlock(); |
318 | return ret; |
319 | } |
320 | |
321 | bool SubProcess::Kill(int signal) { |
322 | proc_mu_.lock(); |
323 | bool running = running_; |
324 | pid_t pid = pid_; |
325 | proc_mu_.unlock(); |
326 | |
327 | bool ret = false; |
328 | if (running && (pid > 1)) { |
329 | ret = (kill(pid, signal) == 0); |
330 | } |
331 | return ret; |
332 | } |
333 | |
334 | int SubProcess::Communicate(const string* stdin_input, string* stdout_output, |
335 | string* stderr_output) { |
336 | struct pollfd fds[kNFds]; |
337 | size_t nbytes[kNFds]; |
338 | string* iobufs[kNFds]; |
339 | int fd_count = 0; |
340 | |
341 | proc_mu_.lock(); |
342 | bool running = running_; |
343 | proc_mu_.unlock(); |
344 | if (!running) { |
345 | LOG(ERROR) << "Communicate called without a running process." ; |
346 | return 1; |
347 | } |
348 | |
349 | // If SIGPIPE handling is the default action, change it to ignore SIGPIPE and |
350 | // keep it ignored, don't change it back. This is needed while communicating |
351 | // with the child process so the parent process can survive the death of the |
352 | // child process while it is writing to its stdin. If the application has |
353 | // registered a SIGPIPE handler, then let it deal with any signals generated |
354 | // by the premature death of the child process, don't overwrite its handler. |
355 | struct sigaction act; |
356 | if (sigaction(SIGPIPE, nullptr, &act) < 0) { |
357 | LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno); |
358 | return 1; |
359 | } |
360 | if (act.sa_handler == SIG_DFL) { |
361 | memset(&act, 0, sizeof(act)); |
362 | act.sa_handler = SIG_IGN; |
363 | sigemptyset(&act.sa_mask); |
364 | if (sigaction(SIGPIPE, &act, nullptr) < 0) { |
365 | LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno); |
366 | return 1; |
367 | } |
368 | } |
369 | |
370 | // Lock data_mu_ but not proc_mu_ while communicating with the child process |
371 | // in order for Kill() to be able to terminate the child from another thread. |
372 | data_mu_.lock(); |
373 | |
374 | // Initialize the poll() structures and buffer tracking. |
375 | for (int i = 0; i < kNFds; i++) { |
376 | if (action_[i] == ACTION_PIPE) { |
377 | switch (i) { |
378 | case CHAN_STDIN: |
379 | // Special case: if no data is given to send to the child process, |
380 | // close the pipe to unblock the child, and skip the file descriptor. |
381 | if (stdin_input == nullptr) { |
382 | close(parent_pipe_[i]); |
383 | parent_pipe_[i] = -1; |
384 | continue; |
385 | } |
386 | iobufs[fd_count] = const_cast<string*>(stdin_input); |
387 | break; |
388 | case CHAN_STDOUT: |
389 | iobufs[fd_count] = stdout_output; |
390 | break; |
391 | case CHAN_STDERR: |
392 | iobufs[fd_count] = stderr_output; |
393 | break; |
394 | default: |
395 | iobufs[fd_count] = nullptr; |
396 | break; |
397 | } |
398 | nbytes[fd_count] = 0; |
399 | fds[fd_count].fd = parent_pipe_[i]; |
400 | fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT; |
401 | fds[fd_count].revents = 0; |
402 | fd_count++; |
403 | } |
404 | } |
405 | |
406 | // Loop communicating with the child process. |
407 | int fd_remain = fd_count; |
408 | char buf[4096]; |
409 | while (fd_remain > 0) { |
410 | int n = poll(fds, fd_count, -1); |
411 | if ((n < 0) && !retry(errno)) { |
412 | LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno); |
413 | fd_remain = 0; |
414 | } else if (n == 0) { |
415 | LOG(ERROR) << "Communicate cannot poll(): timeout not possible" ; |
416 | fd_remain = 0; |
417 | } else if (n > 0) { |
418 | // Handle the pipes ready for I/O. |
419 | for (int i = 0; i < fd_count; i++) { |
420 | if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) { |
421 | // Read from one of the child's outputs. |
422 | ssize_t n = read(fds[i].fd, buf, sizeof(buf)); |
423 | if (n > 0) { |
424 | if (iobufs[i] != nullptr) { |
425 | iobufs[i]->append(buf, n); |
426 | nbytes[i] += n; |
427 | } |
428 | } else if ((n == 0) || !retry(errno)) { |
429 | fds[i].fd = -1; |
430 | fd_remain--; |
431 | } |
432 | } else if ((fds[i].revents & POLLOUT) != 0) { |
433 | // Write to the child's stdin. |
434 | ssize_t n = iobufs[i]->size() - nbytes[i]; |
435 | if (n > 0) { |
436 | n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n); |
437 | } |
438 | if (n >= 0) { |
439 | nbytes[i] += n; |
440 | if (nbytes[i] >= iobufs[i]->size()) { |
441 | fds[i].fd = -1; |
442 | fd_remain--; |
443 | // Close the child's stdin pipe to unblock the process. |
444 | close(parent_pipe_[CHAN_STDIN]); |
445 | parent_pipe_[CHAN_STDIN] = -1; |
446 | } |
447 | } else if (!retry(errno)) { |
448 | fds[i].fd = -1; |
449 | fd_remain--; |
450 | } |
451 | } else if ((fds[i].revents & (POLLERR | POLLNVAL)) != 0) { |
452 | fds[i].fd = -1; |
453 | fd_remain--; |
454 | } |
455 | } |
456 | } |
457 | } |
458 | |
459 | data_mu_.unlock(); |
460 | |
461 | // Wait for the child process to exit and return its status. |
462 | int status; |
463 | return WaitInternal(&status) ? status : -1; |
464 | } |
465 | |
466 | std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) { |
467 | std::unique_ptr<SubProcess> proc(new SubProcess()); |
468 | proc->SetProgram(argv[0], argv); |
469 | proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT); |
470 | proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT); |
471 | return proc; |
472 | } |
473 | |
474 | } // namespace tensorflow |
475 | |