// \file child.cpp // // Copyright (C) 2014 MicroNeil Research Corporation. // // This program is part of the MicroNeil Research Open Library Project. For // more information go to http://www.microneil.com/OpenLibrary/index.html // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the // Free Software Foundation; either version 2 of the License, or (at your // option) any later version. // // This program is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for // more details. // // You should have received a copy of the GNU General Public License along with // this program; if not, write to the Free Software Foundation, Inc., 59 Temple // Place, Suite 330, Boston, MA 02111-1307 USA //============================================================================== #ifndef _WIN32 #include #include #include #include #include #include #include #endif #include #include #include "CodeDweller/timing.hpp" #include "CodeDweller/child.hpp" namespace CodeDweller { ChildStream::ChildStream(std::vector const &args, size_t bufSize) : childStreambuf(bufSize), std::iostream(&childStreambuf), cmdArgs(args) { init(); run(); } ChildStream::ChildStream(std::string const &childpath, size_t bufSize) : childStreambuf(bufSize), std::iostream(&childStreambuf) { cmdArgs.push_back(childpath); init(); run(); } ChildStream::ChildStream(size_t bufSize) : childStreambuf(bufSize), std::iostream(&childStreambuf) { init(); } ChildStream::~ChildStream() { try { close(); } catch (std::exception &e) { ; } } void ChildStream::init() { childStarted = false; childExited = false; exitCodeObtainedFlag = false; exitCode = 0; } void ChildStream::open(std::vector const &args) { cmdArgs = args; init(); run(); } void ChildStream::open(std::string const &childpath) { cmdArgs.clear(); cmdArgs.push_back(childpath); init(); run(); } size_t ChildStream::numBytesAvailable() const { return childStreambuf.numBytesAvailable(); } bool ChildStream::isRunning() const { return childStarted && !childExited; } void ChildStream::run() { if (childStarted) { throw std::logic_error("Child process was active when " "run() was called"); } if (cmdArgs.empty()) { throw std::invalid_argument("A child executable must be specified."); } #ifdef _WIN32 // Set the bInheritHandle flag so pipe handles are inherited. SECURITY_ATTRIBUTES securityAttributes; securityAttributes.nLength = sizeof(SECURITY_ATTRIBUTES); securityAttributes.bInheritHandle = true; securityAttributes.lpSecurityDescriptor = NULL; // Create a pipe for the child process's STDOUT. HANDLE childStdOutAtChild; HANDLE childStdOutAtParent; HANDLE childStdInAtChild; HANDLE childStdInAtParent; int bufferSize = 0; if (!CreatePipe(&childStdOutAtParent, &childStdOutAtChild, &securityAttributes, bufferSize)) { throw std::runtime_error("Error from CreatePipe for stdout: " + getErrorText()); } // Ensure the read handle to the pipe for STDOUT is not inherited. int inheritFlag = 0; if (!SetHandleInformation(childStdOutAtParent, HANDLE_FLAG_INHERIT, inheritFlag) ) { throw std::runtime_error("Error from GetHandleInformation for stdout: " + getErrorText()); } // Create a pipe for the child process's STDIN. if (! CreatePipe(&childStdInAtChild, &childStdInAtParent, &securityAttributes, bufferSize)) { throw std::runtime_error("Error from CreatePipe for stdin: " + getErrorText()); } // Ensure the write handle to the pipe for STDIN is not inherited. if (!SetHandleInformation(childStdInAtParent, HANDLE_FLAG_INHERIT, inheritFlag)) { throw std::runtime_error("Error from GetHandleInformation for stdin: " + getErrorText()); } // Set up members of the PROCESS_INFORMATION structure. PROCESS_INFORMATION processInfo; std::fill((char *) &processInfo, ((char *) &processInfo) + sizeof(PROCESS_INFORMATION), 0); // Set up members of the STARTUPINFO structure. This structure // specifies the STDIN and STDOUT handles for redirection. STARTUPINFO startInfo; std::fill((char *) &startInfo, ((char *) &startInfo) + sizeof(STARTUPINFO), 0); startInfo.cb = sizeof(STARTUPINFO); startInfo.hStdError = childStdOutAtChild; startInfo.hStdOutput = childStdOutAtChild; startInfo.hStdInput = childStdInAtChild; startInfo.dwFlags |= STARTF_USESTDHANDLES; // Assemble the command line. std::string cmdline; if (cmdArgs.size() == 1) { cmdline = cmdArgs[0]; } else { // Append all but last command-line arguments. for (size_t i = 0; i < cmdArgs.size() - 1; i++) { cmdline += cmdArgs[i] + " "; } cmdline += cmdArgs.back(); // Append last command-line argument. } // Create the child process. bool status; status = CreateProcess(NULL, (char *) cmdline.c_str(), NULL, // process security attributes NULL, // primary thread security attributes true, // handles are inherited 0, // creation flags NULL, // use parent's environment NULL, // use parent's current directory &startInfo, // STARTUPINFO pointer &processInfo); // receives PROCESS_INFORMATION // If an error occurs, exit the application. if (!status ) { throw std::runtime_error("Error from CreateProcess with " "command line \"" + cmdline + "\": " + getErrorText()); } // Provide the stream buffers with the handles for communicating // with the child process. childStreambuf.setInputHandle(childStdOutAtParent); childStreambuf.setOutputHandle(childStdInAtParent); // Save the handles to the child process and its primary thread. childProcess = processInfo.hProcess; childThread = processInfo.hThread; // Close the child's end of the pipes. if (!CloseHandle(childStdOutAtChild)) { throw std::runtime_error("Error closing the child process " "stdout handle: " + getErrorText()); } if (!CloseHandle(childStdInAtChild)) { throw std::runtime_error("Error closing the child process " "stdin handle: " + getErrorText()); } #else // Create the pipes for the stdin and stdout. int childStdInPipe[2]; int childStdOutPipe[2]; if (pipe(childStdInPipe) != 0) { throw std::runtime_error("Error creating pipe for stdin: " + getErrorText()); } if (pipe(childStdOutPipe) != 0) { ::close(childStdInPipe[0]); ::close(childStdInPipe[1]); throw std::runtime_error("Error creating pipe for stdout: " + getErrorText()); } // Create the child process. childPid = fork(); if (-1 == childPid) { for (int i = 0; i < 2; i++) { ::close(childStdInPipe[i]); ::close(childStdOutPipe[i]); } throw std::runtime_error("Error creating child process: " + getErrorText()); } if (0 == childPid) { // The child executes this. Redirect stdin. if (dup2(childStdInPipe[0], STDIN_FILENO) == -1) { std::string errMsg; // Send message to parent. errMsg = "Error redirecting stdin in the child: " + getErrorText(); ::write(childStdOutPipe[1], errMsg.data(), errMsg.size()); exit(-1); } // Redirect stdout. if (dup2(childStdOutPipe[1], STDOUT_FILENO) == -1) { std::string errMsg; // Send message to parent. errMsg = "Error redirecting stdout in the child: " + getErrorText(); ::write(childStdOutPipe[1], errMsg.data(), errMsg.size()); exit(-1); } // Close pipes. if ( (::close(childStdInPipe[0]) != 0) || (::close(childStdInPipe[1]) != 0) || (::close(childStdOutPipe[0]) != 0) || (::close(childStdOutPipe[1]) != 0) ) { std::string errMsg; // Send message to parent. errMsg = "Error closing the pipes in the child: " + getErrorText(); ::write(STDOUT_FILENO, errMsg.data(), errMsg.size()); exit(-1); } // Prepare the arguments. std::vector execvArgv; for (auto &arg : cmdArgs) { execvArgv.push_back(arg.c_str()); } execvArgv.push_back((char *) NULL); // Run the child process image. (void) execv(execvArgv[0], (char * const *) &(execvArgv[0])); // Error from exec. std::string errMsg; // Send message to parent. errMsg = "Error from exec: " + getErrorText(); ::write(STDOUT_FILENO, errMsg.data(), errMsg.size()); exit(-1); } // Provide the stream buffers with the file descriptors for // communicating with the child process. childStreambuf.setInputFileDescriptor(childStdOutPipe[0]); childStreambuf.setOutputFileDescriptor(childStdInPipe[1]); // Close the child's end of the pipes. if ( (::close(childStdInPipe[0]) != 0) || (::close(childStdOutPipe[1]) != 0) ) { std::string errMsg; throw std::runtime_error("Error closing child's end of pipes in " "the parent: " + getErrorText()); } #endif childStarted = true; } void ChildStream::close() { if (!childStarted) { throw std::logic_error("Child process was not started " "when close() was called"); } // Terminate the child if it's running. if (isRunning()) { std::string errorString; bool errorOccurred = false; #ifdef _WIN32 // Ignore errors. Reason: Terminating a proces that doesn't // exist (e.g. has exited) gives an error. (void) TerminateProcess(childProcess, terminateExitCode); #else if (kill(childPid, SIGKILL) != 0) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error terminating the child process: " + getErrorText(); errorOccurred = true; } else if (waitpid(childPid, NULL, 0) != childPid) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error waiting for the child process: " + getErrorText(); errorOccurred = true; } #endif if (errorOccurred) { throw std::runtime_error(errorString); } } // Reset. init(); } bool ChildStream::isDone() { if (childExited) { return true; } if (!childStarted) { throw std::logic_error("Child process was not started " "when isDone() was called"); } int result; #ifdef _WIN32 if (!GetExitCodeProcess(childProcess, (LPDWORD) &result)) { throw std::runtime_error("Error checking status of child process: " + getErrorText()); } if (STILL_ACTIVE == result) { return false; } // Child process has exited. Save the exit code. exitCode = result; exitCodeObtainedFlag = true; #else int status = 0; result = waitpid(childPid, &status, WNOHANG); if (-1 == result) { throw std::runtime_error("Error checking status of child process: " + getErrorText()); } else if (0 == result) { // Child is still running. return false; } if (WIFEXITED(status)) { // Child exited normally. exitCode = WEXITSTATUS(status); exitCodeObtainedFlag = true; } #endif childExited = true; return true; } int32_t ChildStream::result() { if (exitCodeObtainedFlag) { return exitCode; } // Check whether the process is running, and get the exit code. if (!isDone()) { throw std::logic_error("Child process was still running " "when result() was called"); } // Child process has exited. if (!exitCodeObtainedFlag) { // Exit code is not available. throw std::runtime_error("Child process has exited but the exit " "code is not available"); } return exitCode; } std::string ChildStream::getErrorText() { #ifdef _WIN32 LPVOID winMsgBuf; DWORD lastError = GetLastError(); FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, lastError, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (char *) &winMsgBuf, 0, NULL ); std::string errMsg((char *) winMsgBuf); LocalFree(winMsgBuf); return errMsg; #else return strerror(errno); #endif } ChildStream::ChildStreambuf::ChildStreambuf(std::size_t bufSize) : #ifdef _WIN32 inputHandle(0), outputHandle(0), #else inputFileDescriptor(-1), outputFileDescriptor(-1), #endif bufferSize(bufSize), readBuffer(bufferSize + 1), writeBuffer(bufferSize + 1) { // Read buffer initialization. char *end = &(readBuffer.front()) + readBuffer.size(); // Indicate to underflow that underflow has not been called. setg(end, end, end); // Write buffer initialization. char *base = &(writeBuffer.front()); // Indicate to overflow that overflow has not been called. setp(base, base + writeBuffer.size() - 1); } #ifdef _WIN32 void ChildStream::ChildStreambuf::setInputHandle(HANDLE inHandle) { inputHandle = inHandle; } #else void ChildStream::ChildStreambuf::setInputFileDescriptor(int inFd) { inputFileDescriptor = inFd; } #endif size_t ChildStream::ChildStreambuf::numBytesAvailable() const { size_t nBytesAvailable = egptr() - gptr(); #ifdef _WIN32 DWORD lpTotalBytesAvail; if (!PeekNamedPipe(inputHandle, NULL, 0, NULL, &lpTotalBytesAvail, NULL)) { throw std::runtime_error("Error from PeekNamedPipe: " + getErrorText()); } if (lpTotalBytesAvail > 0) { nBytesAvailable++; } #else fd_set readFd; int retVal; struct timeval timeout = {0, 0}; FD_ZERO(&readFd); FD_SET(inputFileDescriptor, &readFd); // Check if input is available. retVal = select(inputFileDescriptor + 1, &readFd, NULL, NULL, &timeout); if (-1 == retVal) { throw std::runtime_error("Error from select(): " + getErrorText()); } else if (retVal > 0) { nBytesAvailable++; } #endif return nBytesAvailable; } std::streambuf::int_type ChildStream::ChildStreambuf::underflow() { // Check for empty buffer. if (gptr() < egptr()) { // Not empty. return traits_type::to_int_type(*gptr()); } // Need to fill the buffer. char *base = &(readBuffer.front()); char *start = base; // Check whether this is the first fill. if (eback() == base) { // Not the first fill. Copy one putback character. *(eback()) = *(egptr() - 1); start++; } // start points to the start of the buffer. Fill buffer. #ifdef _WIN32 DWORD nBytesRead; if (!ReadFile(inputHandle, start, readBuffer.size() - (start - base), &nBytesRead, NULL)) { return traits_type::eof(); } #else ssize_t nBytesRead; nBytesRead = ::read(inputFileDescriptor, start, readBuffer.size() - (start - base)); if (-1 == nBytesRead) { return traits_type::eof(); } #endif // Check for EOF. if (0 == nBytesRead) { return traits_type::eof(); } // Update buffer pointers. setg(base, start, start + nBytesRead); return traits_type::to_int_type(*gptr()); } #ifdef _WIN32 void ChildStream::ChildStreambuf::setOutputHandle(HANDLE outHandle) { outputHandle = outHandle; } #else void ChildStream::ChildStreambuf::setOutputFileDescriptor(int outFd) { outputFileDescriptor = outFd; } #endif void ChildStream::ChildStreambuf::flushBuffer() { // Write. std::ptrdiff_t nBytes = pptr() - pbase(); #ifdef _WIN32 DWORD nBytesWritten; if (!WriteFile(outputHandle, pbase(), nBytes, &nBytesWritten, NULL)) { // Clear the output buffer. pbump(-nBytes); throw std::runtime_error("Error writing to child process: " + getErrorText()); } #else ssize_t nBytesWritten; nBytesWritten = ::write(outputFileDescriptor, pbase(), nBytes); #endif // Clear the output buffer. pbump(-nBytes); if (nBytes != nBytesWritten) { throw std::runtime_error("Not all data was written to to child " "process: " + getErrorText()); } return; } std::streambuf::int_type ChildStream::ChildStreambuf::overflow(int_type ch) { // Check whether we're writing EOF. if (traits_type::eof() != ch) { // Not writing EOF. *(pptr()) = ch; pbump(1); // Write. flushBuffer(); // Success. return ch; } return traits_type::eof(); } int ChildStream::ChildStreambuf::sync() { flushBuffer(); // Throws exception on failure. // Success. return 1; } Child::CircularBuffer::CircularBuffer(size_t maxSize) : buffer(maxSize + 1), capacity(maxSize) { iBegin = 0; iEnd = 0; } bool Child::CircularBuffer::empty() const { return (iBegin == iEnd); } size_t Child::CircularBuffer::nUsed() const { return capacity - nFree(); } void Child::CircularBuffer::clear() { iBegin = 0; iEnd = 0; } size_t Child::CircularBuffer::nFree() const { size_t iLocalBegin = iBegin; // Correct an invalid value. if (iLocalBegin >= capacity + 1) { iLocalBegin = 0; } if (iLocalBegin <= iEnd) { return capacity - (iEnd - iLocalBegin); } return iLocalBegin - iEnd - 1; } void Child::CircularBuffer::put(char const *ptr, size_t nBytes) { for (size_t i = 0; i < nBytes; i++) { buffer[iEnd] = *ptr; ptr++; nextIndex(iEnd); } } void Child::CircularBuffer::getAndErase(std::string &buf, size_t nBytes) { if ( (0 == nBytes) || (nBytes > nUsed()) ) { nBytes = nUsed(); } buf.clear(); if (buf.capacity() < nBytes) { buf.reserve(nBytes); } for (size_t i = 0; i < nBytes; i++) { buf.push_back(buffer[iBegin]); nextIndex(iBegin); } } Child::Child(std::vector const &args, size_t bufSize, std::uint16_t nominalAboveMinPollTime_ms, std::uint16_t deltaPollTime_ms) : bufferCapacity(bufSize), readBuffer(bufferCapacity), writeBuffer(bufferCapacity), nominalPollTime_ms(nominalAboveMinPollTime_ms + CodeDweller::MinimumSleeperTime), maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) { init(); open(args); } Child::Child(std::string const &childpath, size_t bufSize, std::uint16_t nominalAboveMinPollTime_ms, std::uint16_t deltaPollTime_ms) : bufferCapacity(bufSize), readBuffer(bufferCapacity), writeBuffer(bufferCapacity), nominalPollTime_ms(nominalAboveMinPollTime_ms + CodeDweller::MinimumSleeperTime), maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) { init(); open(childpath); } Child::Child(size_t bufSize, std::uint16_t nominalAboveMinPollTime_ms, std::uint16_t deltaPollTime_ms) : bufferCapacity(bufSize), readBuffer(bufferCapacity), writeBuffer(bufferCapacity), nominalPollTime_ms(nominalAboveMinPollTime_ms + CodeDweller::MinimumSleeperTime), maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) { init(); } Child::~Child() { try { close(); } catch (std::exception &e) { ; } } void Child::open(std::vector const &args) { cmdArgs = args; if (isRunning()) { throw std::logic_error("The child process was already active."); } init(); run(); } void Child::open(std::string const &childpath) { if (isRunning()) { throw std::logic_error("The child process was already active."); } cmdArgs.clear(); cmdArgs.push_back(childpath); init(); run(); } void Child::init() { childStarted = false; childExited = false; exitCodeObtainedFlag = false; exitCode = 0; readBuffer.clear(); nWriteBytes = 0; nTransmitBytes = 0; threadsAreRunning = false; stopFlag = true; errorText.clear(); } bool Child::write(std::string const &data) { if (!isRunning()) { throw std::logic_error("No child process is running."); } // The free space in the write buffer can be checked without // locking the mutex because: // // 1) bufferCapacity is unchanging, and // // 2) nWriteBytes is only decreased by the writer thread. // // This means that the calculated free space can only increase by // the action of the writer thread; the calculated free space is a // minimum value. In the worst case, this method would return // false unnecessarily. if (data.size() > bufferCapacity - nWriteBytes) { return false; } std::lock_guard lock(writeBufferMutex); std::copy(data.data(), data.data() + data.size(), &(writeBuffer[nWriteBytes])); nWriteBytes += data.size(); return true; } size_t Child::writeAndShrink(std::string &data) { if (!isRunning()) { throw std::logic_error("No child process is running."); } // See the comment above regarding checking the free space in the // write buffer without locking the mutex. size_t nFree = bufferCapacity - nWriteBytes; if (0 == nFree) { return 0; } std::lock_guard lock(writeBufferMutex); size_t nBytesToCopy = data.size(); if (nBytesToCopy > nFree) { nBytesToCopy = nFree; } std::copy(data.data(), data.data() + nBytesToCopy, &(writeBuffer[nWriteBytes])); nWriteBytes += nBytesToCopy; data.erase(0, nBytesToCopy); return nBytesToCopy; } bool Child::isFinishedWriting() const { return ( (0 == nWriteBytes) && (0 == nTransmitBytes) ); } size_t Child::read(std::string &data, size_t nBytes) { if (!isRunning()) { throw std::logic_error("No child process is running."); } data.clear(); // Whether the read circular buffer is empty can be checked // without locking the mutex because: // if (readBuffer.empty()) { return 0; } std::lock_guard lock(readBufferMutex); size_t nBytesToRead = nBytes; if (nBytesToRead > readBuffer.nUsed()) { nBytesToRead = readBuffer.nUsed(); } readBuffer.getAndErase(data, nBytesToRead); return data.size(); } void Child::close() { if (!childStarted) { throw std::logic_error("Child process was not started " "when close() was called"); } // Stop the reader and writer threads. Note: None of the error // conditions that cause an exception to be thrown by join() // can ever occur. stopFlag = true; // Terminate the child if it's running. if (isRunning()) { std::string errorString; bool errorOccurred = false; #ifdef _WIN32 // Ignore errors. Reason: Terminating a proces that doesn't // exist (e.g. has exited) gives an error. (void) TerminateProcess(childProcess, terminateExitCode); if (CloseHandle(inputHandle) == 0) { errorString = "Error closing input from the child: " + getErrorText(); errorOccurred = true; } if (CloseHandle(outputHandle) == 0) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error closing output to the child: " + getErrorText(); errorOccurred = true; } #else if (0 != ::close(inputFileDescriptor)) { errorString = "Error closing input from the child: " + getErrorText(); errorOccurred = true; } if (0 != ::close(outputFileDescriptor)) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error closing output to the child: " + getErrorText(); errorOccurred = true; } if (kill(childPid, SIGKILL) != 0) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error terminating the child process: " + getErrorText(); errorOccurred = true; } else if (waitpid(childPid, NULL, 0) != childPid) { errorString += (errorOccurred ? "\n" : ""); errorString += "Error waiting for the child process: " + getErrorText(); errorOccurred = true; } #endif if (errorOccurred) { throw std::runtime_error(errorString); } } if (threadsAreRunning) { readerThread.join(); writerThread.join(); threadsAreRunning = false; } // Reset. init(); } bool Child::isRunning() const { return childStarted && !childExited; } bool Child::errorOccurred(std::string &errorDescription) const { errorDescription = errorText; return !errorDescription.empty(); } void Child::run() { if (childStarted) { throw std::logic_error("Child process was active when " "run() was called"); } if (cmdArgs.empty()) { throw std::invalid_argument("A child executable must be specified."); } #ifdef _WIN32 // Set the bInheritHandle flag so pipe handles are inherited. SECURITY_ATTRIBUTES securityAttributes; securityAttributes.nLength = sizeof(SECURITY_ATTRIBUTES); securityAttributes.bInheritHandle = true; securityAttributes.lpSecurityDescriptor = NULL; // Create a pipe for the child process's STDOUT. HANDLE childStdOutAtChild; HANDLE childStdOutAtParent; HANDLE childStdInAtChild; HANDLE childStdInAtParent; int bufferSize = 0; if (!CreatePipe(&childStdOutAtParent, &childStdOutAtChild, &securityAttributes, bufferSize)) { throw std::runtime_error("Error from CreatePipe for stdout: " + getErrorText()); } // Ensure the read handle to the pipe for STDOUT is not inherited. int inheritFlag = 0; if (!SetHandleInformation(childStdOutAtParent, HANDLE_FLAG_INHERIT, inheritFlag) ) { throw std::runtime_error("Error from GetHandleInformation for stdout: " + getErrorText()); } // Create a pipe for the child process's STDIN. if (! CreatePipe(&childStdInAtChild, &childStdInAtParent, &securityAttributes, bufferSize)) { throw std::runtime_error("Error from CreatePipe for stdin: " + getErrorText()); } // Ensure the write handle to the pipe for STDIN is not inherited. if (!SetHandleInformation(childStdInAtParent, HANDLE_FLAG_INHERIT, inheritFlag)) { throw std::runtime_error("Error from GetHandleInformation for stdin: " + getErrorText()); } // Set up members of the PROCESS_INFORMATION structure. PROCESS_INFORMATION processInfo; std::fill((char *) &processInfo, ((char *) &processInfo) + sizeof(PROCESS_INFORMATION), 0); // Set up members of the STARTUPINFO structure. This structure // specifies the STDIN and STDOUT handles for redirection. STARTUPINFO startInfo; std::fill((char *) &startInfo, ((char *) &startInfo) + sizeof(STARTUPINFO), 0); startInfo.cb = sizeof(STARTUPINFO); startInfo.hStdError = childStdOutAtChild; startInfo.hStdOutput = childStdOutAtChild; startInfo.hStdInput = childStdInAtChild; startInfo.dwFlags |= STARTF_USESTDHANDLES; // Assemble the command line. std::string cmdline; if (cmdArgs.size() == 1) { cmdline = cmdArgs[0]; } else { // Append all but last command-line arguments. for (size_t i = 0; i < cmdArgs.size() - 1; i++) { cmdline += cmdArgs[i] + " "; } cmdline += cmdArgs.back(); // Append last command-line argument. } // Create the child process. bool status; status = CreateProcess(NULL, (char *) cmdline.c_str(), NULL, // process security attributes NULL, // primary thread security attributes true, // handles are inherited 0, // creation flags NULL, // use parent's environment NULL, // use parent's current directory &startInfo, // STARTUPINFO pointer &processInfo); // receives PROCESS_INFORMATION // If an error occurs, exit the application. if (!status ) { throw std::runtime_error("Error from CreateProcess with " "command line \"" + cmdline + "\": " + getErrorText()); } // Provide the stream buffers with the handles for communicating // with the child process. inputHandle = childStdOutAtParent; outputHandle = childStdInAtParent; // Save the handles to the child process and its primary thread. childProcess = processInfo.hProcess; childThread = processInfo.hThread; // Close the child's end of the pipes. if (!CloseHandle(childStdOutAtChild)) { throw std::runtime_error("Error closing the child process " "stdout handle: " + getErrorText()); } if (!CloseHandle(childStdInAtChild)) { throw std::runtime_error("Error closing the child process " "stdin handle: " + getErrorText()); } #else // Create the pipes for the stdin and stdout. int childStdInPipe[2]; int childStdOutPipe[2]; if (pipe(childStdInPipe) != 0) { throw std::runtime_error("Error creating pipe for stdin: " + getErrorText()); } if (pipe(childStdOutPipe) != 0) { ::close(childStdInPipe[0]); ::close(childStdInPipe[1]); throw std::runtime_error("Error creating pipe for stdout: " + getErrorText()); } // Create the child process. childPid = fork(); if (-1 == childPid) { for (int i = 0; i < 2; i++) { ::close(childStdInPipe[i]); ::close(childStdOutPipe[i]); } throw std::runtime_error("Error creating child process: " + getErrorText()); } if (0 == childPid) { // The child executes this. Redirect stdin. if (dup2(childStdInPipe[0], STDIN_FILENO) == -1) { std::string errMsg; // Send message to parent. errMsg = "Error redirecting stdin in the child: " + getErrorText(); ::write(childStdOutPipe[1], errMsg.data(), errMsg.size()); exit(-1); } // Redirect stdout. if (dup2(childStdOutPipe[1], STDOUT_FILENO) == -1) { std::string errMsg; // Send message to parent. errMsg = "Error redirecting stdout in the child: " + getErrorText(); ::write(childStdOutPipe[1], errMsg.data(), errMsg.size()); exit(-1); } // Close pipes. if ( (::close(childStdInPipe[0]) != 0) || (::close(childStdInPipe[1]) != 0) || (::close(childStdOutPipe[0]) != 0) || (::close(childStdOutPipe[1]) != 0) ) { std::string errMsg; // Send message to parent. errMsg = "Error closing the pipes in the child: " + getErrorText(); ::write(STDOUT_FILENO, errMsg.data(), errMsg.size()); exit(-1); } // Prepare the arguments. std::vector execvArgv; for (auto &arg : cmdArgs) { execvArgv.push_back(arg.c_str()); } execvArgv.push_back((char *) NULL); // Run the child process image. (void) execv(execvArgv[0], (char * const *) &(execvArgv[0])); // Error from exec. std::string errMsg; // Send message to parent. errMsg = "Error from exec: " + getErrorText(); ::write(STDOUT_FILENO, errMsg.data(), errMsg.size()); exit(-1); } // Provide the stream buffers with the file descriptors for // communicating with the child process. inputFileDescriptor = childStdOutPipe[0]; outputFileDescriptor = childStdInPipe[1]; // Close the child's end of the pipes. if ( (::close(childStdInPipe[0]) != 0) || (::close(childStdOutPipe[1]) != 0) ) { std::string errMsg; throw std::runtime_error("Error closing child's end of pipes in " "the parent: " + getErrorText()); } #endif childStarted = true; // Start the reader and writer threads. stopFlag = false; try { std::thread readerTemp(&Child::readFromChild, this); readerThread = std::move(readerTemp); } catch (std::exception &e) { throw std::runtime_error("Error starting reader thread: " + getErrorText()); } try { std::thread writerTemp(&Child::writeToChild, this); writerThread = std::move(writerTemp); } catch (std::exception &e) { stopFlag = true; readerThread.join(); throw std::runtime_error("Error starting writer thread: " + getErrorText()); } threadsAreRunning = true; } bool Child::isDone() { if (childExited) { return true; } if (!childStarted) { throw std::logic_error("Child process was not started " "when isDone() was called"); } int result; #ifdef _WIN32 if (!GetExitCodeProcess(childProcess, (LPDWORD) &result)) { throw std::runtime_error("Error checking status of child process: " + getErrorText()); } if (STILL_ACTIVE == result) { return false; } // Child process has exited. Save the exit code. exitCode = result; exitCodeObtainedFlag = true; #else int status = 0; result = waitpid(childPid, &status, WNOHANG); if (-1 == result) { throw std::runtime_error("Error checking status of child process: " + getErrorText()); } else if (0 == result) { // Child is still running. return false; } if (WIFEXITED(status)) { // Child exited normally. exitCode = WEXITSTATUS(status); exitCodeObtainedFlag = true; } #endif childExited = true; // Stop threads. stopFlag = true; readerThread.join(); writerThread.join(); threadsAreRunning = false; return true; } int32_t Child::result() { if (exitCodeObtainedFlag) { return exitCode; } // Check whether the process is running, and get the exit code. if (!isDone()) { throw std::logic_error("Child process was still running " "when result() was called"); } // Child process has exited. if (!exitCodeObtainedFlag) { // Exit code is not available. throw std::runtime_error("Child process has exited but the exit " "code is not available"); } return exitCode; } std::string Child::getErrorText() { #ifdef _WIN32 LPVOID winMsgBuf; DWORD lastError = GetLastError(); FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, lastError, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (char *) &winMsgBuf, 0, NULL ); std::string errMsg((char *) winMsgBuf); LocalFree(winMsgBuf); return errMsg; #else return strerror(errno); #endif } void Child::readFromChild() { std::vector rxBuf(bufferCapacity); CodeDweller::PollTimer pollTimer(nominalPollTime_ms, maximumPollTime_ms); auto sleepTime = std::chrono::milliseconds(maximumPollTime_ms); while (!stopFlag) { char *bufferPtr; bufferPtr = &(rxBuf[0]); // Blocking read from the child. #ifdef _WIN32 DWORD nBytesRead; if (!ReadFile(inputHandle, bufferPtr, bufferCapacity, &nBytesRead, NULL)) { if (stopFlag) { break; } // Broken pipe occurs when the child exits; this is not // necessarily an error condition. if (GetLastError() != ERROR_BROKEN_PIPE) { errorText = "Error reading from the child process: "; errorText += getErrorText(); } break; } #else ssize_t nBytesRead; nBytesRead = ::read(inputFileDescriptor, bufferPtr, bufferCapacity); if (-1 == nBytesRead) { if (stopFlag) { break; } errorText = "Error reading from the child process: "; errorText += getErrorText(); break; } else if (0 == nBytesRead) { // EOF; child exited. break; } #endif // Copy to the shared read buffer. while ((nBytesRead > 0) && !stopFlag) { int nBytesToPut, nBytesFree; nBytesToPut = nBytesRead; // Can be called in the reader thread without locking the // mutex. nBytesFree = readBuffer.nFree(); if (nBytesToPut > nBytesFree) { nBytesToPut = nBytesFree; } if (nBytesToPut > 0) { std::lock_guard lock(readBufferMutex); readBuffer.put(bufferPtr, nBytesToPut); bufferPtr += nBytesToPut; nBytesRead -= nBytesToPut; pollTimer.reset(); } else { pollTimer.pause(); } } } } void Child::writeToChild() { std::vector localWriteBuffer(bufferCapacity); size_t nLocalWriteBytes; CodeDweller::PollTimer pollTimer(nominalPollTime_ms, maximumPollTime_ms); auto sleepTime = std::chrono::milliseconds(maximumPollTime_ms); while (!stopFlag) { char *bufferPtr; // Poll for data in the shared write buffer. while ((0 == nWriteBytes) && !stopFlag) { pollTimer.pause(); } if (stopFlag) { goto exit; } // Copy from the shared write buffer. { std::lock_guard lock(writeBufferMutex); localWriteBuffer.swap(writeBuffer); nLocalWriteBytes = nWriteBytes; nWriteBytes = 0; } if (stopFlag) { goto exit; } pollTimer.reset(); // Blocking write to the child. bufferPtr = &(localWriteBuffer[0]); while (nLocalWriteBytes > 0) { #ifdef _WIN32 DWORD nBytesWritten; if (!WriteFile(outputHandle, bufferPtr, nLocalWriteBytes, &nBytesWritten, NULL)) { if (stopFlag) { goto exit; } errorText = "Error writing to the child process: "; errorText += getErrorText(); goto exit; } if (stopFlag) { goto exit; } #else ssize_t nBytesWritten; nBytesWritten = ::write(outputFileDescriptor, bufferPtr, nLocalWriteBytes); if (stopFlag) { goto exit; } if (-1 == nBytesWritten) { if (ENOSPC != errno) { // Some error other than no space. errorText = "Error writing to the child process: "; errorText += getErrorText(); goto exit; } } #endif nLocalWriteBytes -= nBytesWritten; bufferPtr += nBytesWritten; } } exit: return; } }