Browse Source

Implemented Child read(), write(), and writeAndShrink() with strings.


git-svn-id: https://svn.microneil.com/svn/CodeDweller/branches/adeniz_1@82 d34b734f-a00e-4b39-a726-e4eeb87269ab
adeniz_1
adeniz 9 years ago
parent
commit
8fa4e41a55
2 changed files with 192 additions and 26 deletions
  1. 120
    23
      child.cpp
  2. 72
    3
      child.hpp

+ 120
- 23
child.cpp View File

} }
ChildStream::~ChildStream() { ChildStream::~ChildStream() {
if (isRunning()) {
try {
close(); close();
} catch (std::exception &e) {
;
} }
} }
void ChildStream::close() { void ChildStream::close() {
if (isDone()) {
if (!childStarted) {
throw std::logic_error("Child process was not started "
"when close() was called");
}
return;
// Terminate the child if it's running.
if (isRunning()) {
}
std::string errorString;
bool errorOccurred = false;
#ifdef _WIN32 #ifdef _WIN32
if (!TerminateProcess(childProcess, terminateExitCode)) {
if (!TerminateProcess(childProcess, terminateExitCode)) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error terminating the child process: " +
getErrorText();
errorOccurred = true;
}
#else #else
if (kill(childPid, SIGTERM) != 0) {
if (kill(childPid, SIGKILL) != 0) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error terminating the child process: " +
getErrorText();
errorOccurred = true;
} else if (waitpid(childPid, NULL, 0) != childPid) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error waiting for the child process: " +
getErrorText();
errorOccurred = true;
}
#endif #endif
throw std::runtime_error("Error terminating the child process: " +
getErrorText());
if (errorOccurred) {
throw std::runtime_error(errorString);
}
} }
// Reset.
init();
} }
bool ChildStream::isDone() { bool ChildStream::isDone() {
} }
Child::CircularBuffer::CircularBuffer(size_t maxSize) : Child::CircularBuffer::CircularBuffer(size_t maxSize) :
buffer(maxSize),
buffer(maxSize + 1),
capacity(maxSize) { capacity(maxSize) {
iBegin = 0; iBegin = 0;
size_t Child::CircularBuffer::nFree() const { size_t Child::CircularBuffer::nFree() const {
if (iBegin <= iEnd) {
return capacity - (iEnd - iBegin);
size_t iLocalBegin = iBegin;
// Correct an invalid value.
if (iLocalBegin >= capacity + 1) {
iLocalBegin = 0;
}
if (iLocalBegin <= iEnd) {
return capacity - (iEnd - iLocalBegin);
} }
return iBegin - iEnd;
return iLocalBegin - iEnd - 1;
} }
Child::~Child() { Child::~Child() {
try { try {
close(); close();
} catch (...) {
} catch (std::exception &e) {
; ;
} }
} }
throw std::logic_error("No child process is running."); throw std::logic_error("No child process is running.");
} }
#warning Check that this is okay before locking
// The free space in the write buffer can be checked without
// locking the mutex because:
//
// 1) bufferCapacity is unchanging, and
//
// 2) nWriteBytes is only decreased by the writer thread.
//
// This means that the calculated free space can only increase by
// the action of the writer thread; the calculated free space is a
// minimum value. In the worst case, this method would return
// false unnecessarily.
if (data.size() > bufferCapacity - nWriteBytes) { if (data.size() > bufferCapacity - nWriteBytes) {
return false; return false;
} }
throw std::logic_error("No child process is running."); throw std::logic_error("No child process is running.");
} }
#warning Check that this is okay before locking
// See the comment above regarding checking the free space in the
// write buffer without locking the mutex.
size_t nFree = bufferCapacity - nWriteBytes; size_t nFree = bufferCapacity - nWriteBytes;
if (0 == nFree) { if (0 == nFree) {
data.clear(); data.clear();
#warning Check that this is okay before locking
// Whether the read circular buffer is empty can be checked
// without locking the mutex because:
//
if (readBuffer.empty()) { if (readBuffer.empty()) {
return 0; return 0;
} }
// Terminate the child if it's running. // Terminate the child if it's running.
if (isRunning()) { if (isRunning()) {
std::string errorString;
bool errorOccurred = false;
#ifdef _WIN32 #ifdef _WIN32
if (!CloseHandle(inputHandle)) {
errorString = "Error closing input from the child: " + getErrorText();
errorOccurred = true;
}
if (!CloseHandle(outputHandle)) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error closing output to the child: " + getErrorText();
errorOccurred = true;
}
if (!TerminateProcess(childProcess, terminateExitCode)) { if (!TerminateProcess(childProcess, terminateExitCode)) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error terminating the child process: " +
getErrorText();
errorOccurred = true;
}
#else #else
if (kill(childPid, SIGTERM) != 0) {
if (0 != ::close(inputFileDescriptor)) {
errorString = "Error closing input from the child: " + getErrorText();
errorOccurred = true;
}
if (0 != ::close(outputFileDescriptor)) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error closing output to the child: " + getErrorText();
errorOccurred = true;
}
if (kill(childPid, SIGKILL) != 0) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error terminating the child process: " +
getErrorText();
errorOccurred = true;
} else if (waitpid(childPid, NULL, 0) != childPid) {
errorString += (errorOccurred ? "\n" : "");
errorString += "Error waiting for the child process: " +
getErrorText();
errorOccurred = true;
}
#endif #endif
throw std::runtime_error("Error terminating the child process: " +
getErrorText());
if (errorOccurred) {
throw std::runtime_error(errorString);
} }
} }
if (threadsAreRunning) { if (threadsAreRunning) {
// Reset. // Reset.
init(); init();
} }
bool Child::isRunning() const { bool Child::isRunning() const {
errorText += getErrorText(); errorText += getErrorText();
break; break;
} else if (0 == nBytesRead) {
// EOF; child exited.
break;
} }
#endif #endif
// Copy to the shared read buffer. // Copy to the shared read buffer.
while ((nBytesRead > 0) && !stopFlag) { while ((nBytesRead > 0) && !stopFlag) {
int nBytesToPut;
int nBytesToPut, nBytesFree;
nBytesToPut = nBytesRead; nBytesToPut = nBytesRead;
#warning Check thread safety
if (nBytesToPut > readBuffer.nFree()) {
nBytesToPut = readBuffer.nFree();
// Can be called in the reader thread without locking the
// mutex.
nBytesFree = readBuffer.nFree();
if (nBytesToPut > nBytesFree) {
nBytesToPut = nBytesFree;
} }
if (nBytesToPut > 0) { if (nBytesToPut > 0) {

+ 72
- 3
child.hpp View File

communicate with the child process via non-blocking methods, and communicate with the child process via non-blocking methods, and
obtain the exit code of the child process. obtain the exit code of the child process.


The existing process can be terminated by the close() method,
and a new process spawned by the open() method.

The close() method calls TerminateProcess under Windows, and
sends SIGKILL under Linux. After sending SIGKILL, the close()
methods calls waitpid() to wait for the child process to exit.

When a child process is spawned, this class creates two threads:
One for writing to the child, and one for reading from the
child. These threads communicate with the user thread via:

<ol>

<li> A shared boolean that indicates when the threads should
stop.</li>

<li> A shared linear buffer containing data to write to the
child process.</li>

<li> A shared circular buffer containing data read from the
child process.</li>

</ol>

*/ */


class Child { class Child {


@returns true if the container is empty, false otherwise. @returns true if the container is empty, false otherwise.


This method can be invoked by the user thread without
serializing access to the object with a mutex. The
reasoning is:

1) The buffer is empty if and only if iBegin == iEnd.
2) Only user thread modifies iBegin, by extracting data
from the buffer.
This means that if iBegin == iEnd, then:
1) iBegin is a valid index, since only the user thread can
modify iBegin. The user thread maintains the validity
of iBegin.
2) iEnd is a valid index, since it equals iBegin.
3) The result iBegin == iEnd is also valid, and indicates
whether the buffer is empty.

*/ */
bool empty() const; bool empty() const;




/** Get the available space. /** Get the available space.


@returns the number of bytes that can be written to the
buffer without overwriting any existing data.
@returns a number of bytes that can be written to the buffer
without overwriting any existing data.

This method can be invoked by the reader thread without
serializing access to the object with a mutex. The reason
is that:

1) The free space depends on capacity, iBegin, and iEnd.
2) The capacity is not changed while the threads are
running, and only the reader thread modifies iEnd.
Therefore, the reader thread always sees a valid and
up-to-date value for capacity and iEnd.

3) Because the user thread modifies iBegin, iBegin might
be invalid. The only invalid value is capacity + 1, in
which case the correct value of iBegin is 0. This method
checks for the invalid value, and uses the correct
value as needed.

4) Because the user thread modifies iBegin, iBegin might
be out-of-date. Because the user thread only
increments iBegin, an out-of-date value would result in
a smaller value of the available space in the buffer.


*/ */
size_t nFree() const; size_t nFree() const;


@param[in] nBytes is the number of bytes to put. @param[in] nBytes is the number of bytes to put.


@warning The capacity of the buffer must not be exceeded;
exceeding the capacity corrupts the buffer.

*/ */
void put(char const *ptr, size_t nBytes); void put(char const *ptr, size_t nBytes);


*/ */
void nextIndex(size_t &index) const { void nextIndex(size_t &index) const {
index++; index++;
if (index >= capacity)
if (index >= capacity + 1)
index = 0; index = 0;
} }



Loading…
Cancel
Save