//------------------------------------------------------------------------ // Filename: // merge-thread.cpp // PROGRAM DESCRIPTION // Implementation of merge sorting thread and master thread //------------------------------------------------------------------------ #include #include "ThreadClass.h" #include "merge-thread.h" // external data variable extern MergeThread *ppMergeThread[MAX_THREADS]; extern AsynOneToOneChannel *ppChannel[MAX_CHANNELS]; // const data variable int END_OF_DATA = -1; // end of input flag //------------------------------------------------------------------------ // MergeThread::MergeThread() // constructor //------------------------------------------------------------------------ MergeThread::MergeThread(int threadID) { ThreadName.seekp(0, ios::beg); ThreadName << "MergeThread" << threadID << ends; Index = UserDefinedThreadID = threadID; cout << "Merge thread " << Index << " has been created" << endl; } //------------------------------------------------------------------------ // MergeThread::MergeThread() // destructor //------------------------------------------------------------------------ MergeThread::~MergeThread() { } //------------------------------------------------------------------------ // MergeThread::ThreadFunc() //------------------------------------------------------------------------ void MergeThread::ThreadFunc() { Thread::ThreadFunc(); int InputID1 = 2 * Index; // input channel1's ID int InputID2 = 2 * Index - 1; // input channel2's ID int OutputID = Index - 1; // output channel's ID int in1, in2; // receive the first number from each input channel ppChannel[InputID1]->Receive(&in1, sizeof(int)); ppChannel[InputID2]->Receive(&in2, sizeof(int)); // terminate until a END_OF_DATA is received while (in1 != END_OF_DATA && in2 != END_OF_DATA) { if (in1 < in2) { ppChannel[OutputID]->Send(&in1, sizeof(int)); // send the smaller one ppChannel[InputID1]->Receive(&in1, sizeof(int)); // get the next } else { ppChannel[OutputID]->Send(&in2, sizeof(int)); // send the smaller one ppChannel[InputID2]->Receive(&in2, sizeof(int)); // get the next } } // send the remaining data to the output if (in1 == END_OF_DATA) { // is END_OF_DATA from input channel 1? while (in2 != END_OF_DATA) { // copy input 1's data to output ppChannel[OutputID]->Send(&in2, sizeof(int)); ppChannel[InputID2]->Receive(&in2, sizeof(int)); } } else { while(in1 != END_OF_DATA) { // copy input 2's data to output ppChannel[OutputID]->Send(&in1, sizeof(int)); ppChannel[InputID1]->Receive(&in1, sizeof(int)); } } ppChannel[OutputID]->Send(&END_OF_DATA, sizeof(int)); // send END_OF_DATA Exit(); } //------------------------------------------------------------------------ // MasterThread::MasterThread() // constructor //------------------------------------------------------------------------ MasterThread::MasterThread(int threadID, int numberOfData) { UserDefinedThreadID = threadID; NumberOfData = numberOfData; ThreadName.seekp(0, ios::beg); ThreadName << "Master" << ends; cout << "The master thread has been created" << endl; } //------------------------------------------------------------------------ // MasterThread::ThreadFunc() //------------------------------------------------------------------------ void MasterThread::ThreadFunc() { Thread::ThreadFunc(); int channelID; int input; cout << "Please enter " << NumberOfData << " non-negative integers: " << endl; for (int i = 1; i <= NumberOfData; i++) { cin >> input; channelID = NumberOfData - 2 + i; // feed one input data to each first level channel // followed by END_OF_DATA ppChannel[channelID]->Send(&input, sizeof(int)); ppChannel[channelID]->Send(&END_OF_DATA, sizeof(int)); } Exit(); } //------------------------------------------------------------------------ // CollectorThread::CollectorThread() // constructor //------------------------------------------------------------------------ CollectorThread::CollectorThread(int threadID) { UserDefinedThreadID = threadID; ThreadName.seekp(0, ios::beg); ThreadName << "Collector" << ends; cout << "The Collector thread has been created" << endl; } //------------------------------------------------------------------------ // CollectorThread::ThreadFunc() //------------------------------------------------------------------------ void CollectorThread::ThreadFunc() { Thread::ThreadFunc(); int channelID = 0; int input; ppChannel[channelID]->Receive(&input, sizeof(int)); cout << "Final sorted array is:" << endl; while (input != END_OF_DATA) { // receive data from the last output channel cout << input << " "; ppChannel[channelID]->Receive(&input, sizeof(int)); } Exit(); } // end of merge-Thread.cpp