// ------------------------------------------------------------------------------- // ConcurrentMentor: Distibuted Computation & Visualization System // (c)2001-2002 Michigan Technological University // Filename: // Channel.h // Program Description: // System header file // class interface definition for SynOneToOneChannel, AsynOneToOneChannel and // VectorClock class // ------------------------------------------------------------------------------- #ifndef _MTUCHANNEL_H #define _MTUCHANNEL_H //-------------------------------------------------------------------------------- // OS dependent header files //-------------------------------------------------------------------------------- #include "Connection.h" #ifdef _WIN32 #include #include #include #elif defined(_LINUX_) || defined(_SOLARIS_) #include #include #include #include #include #include #include #include //#include #include #include #endif //-------------------------------------------------------------------------------- // macro defines //-------------------------------------------------------------------------------- #define DEFUALT_RELIABILITY 1.0 // default asynchronous channel reliability //-------------------------------------------------------------------------------- // data types //-------------------------------------------------------------------------------- typedef void* ChannelMessage_t; // messgae type typedef float (*RELIABILITY_FUNC)(void); // user specified reliability function type #ifdef _WIN32 typedef SOCKET HANDLE1; #else typedef int HANDLE1; #endif // ------------------------------------------------------------------------------- // function prototypes // ------------------------------------------------------------------------------- void getMyId(int *myid); // get process id void getNumProcs(int *numprocesss); // get total number of user processes // ------------------------------------------------------------------------------- // SynOneToOneChannel class definition // ------------------------------------------------------------------------------- class SynOneToOneChannel { public: SynOneToOneChannel(int destID, int myID, char *channelname = 0); ~SynOneToOneChannel(); //blocking send receive int Send(ChannelMessage_t message, size_t message_size, bool visual = false, int linenumber = 0, char *filename = NULL); int Receive(ChannelMessage_t message, size_t message_size, bool visual = false, int linenumber = 0, char *filename = NULL); // is message available for reading bool IsMessageAvailable(); int getHandle(){return handle;} void Dump(); private: int myid; // current process ID int destid; // communication counterpart's id strstream channelName; // channal name unsigned int channelID; // channal ID HANDLE1 handle; // communication handle }; // ------------------------------------------------------------------------------- // AsynOneToOneChannel class definition // ------------------------------------------------------------------------------- class AsynOneToOneChannel { public: AsynOneToOneChannel(int destID, int myID, char *channelname = 0, float reliability = DEFUALT_RELIABILITY); AsynOneToOneChannel(int destID, int myID, float (* ReliabilityFunc)(void), char *channelname = 0); ~AsynOneToOneChannel(); //non-blocking send and receive int Send(ChannelMessage_t message, size_t message_size, bool visual = false, int linenumber = 0, char *filename = NULL); int Receive(ChannelMessage_t message, size_t message_size, bool visual = false, int linenumber = 0, char *filename = NULL); int NonVisual_Send(ChannelMessage_t message, size_t message_size){ return Send(message, message_size,false,0,NULL);} int NonVisual_Receive(ChannelMessage_t message, size_t message_size) { return Receive(message, message_size, false,0,NULL);} //no-blocking receive int NonBlockingRecv(ChannelMessage_t message, size_t message_size, bool visual = false, int linenumber = 0, char *filename = 0); int NonVisual_NonBlockingRecv(ChannelMessage_t message, size_t message_size) { return NonBlockingRecv(message, message_size, false, 0, NULL); } //receive available polling bool IsMessageAvailable(); int getHandle(){return handle;} private: int myid; int destid; // communication counterpart's id float reliability; // channel reliability, message loss rate RELIABILITY_FUNC relibility_func; strstream channelName; unsigned int channelID; HANDLE1 handle; // communication handle }; //************************************************************************************************** // class Monitor // ----- to monitor the availability of the channels for reciving message //*************************************************************************************************** #define MTU_INFINITE -1 #define MTU_ZERO 0 class Monitor { public: // constructor Monitor(); //destructor ~Monitor(); //select the ready channels from *channels[] in milliseconds int SelectReadReadyChannels(AsynOneToOneChannel *channels[], int num_channels, int milliseconds); int SelectReadReadyChannels(AsynOneToOneChannel channels[], int num_channels, int milliseconds); int SelectReadReadyChannels(SynOneToOneChannel *channels[], int num_channels, int milliseconds); int SelectReadReadyChannels(SynOneToOneChannel channels[], int num_channels, int milliseconds); bool ChannelReadReady(AsynOneToOneChannel *channel, int milliseconds); bool ChannelReadReady(SynOneToOneChannel *channel, int milliseconds); bool ChannelReadReady(AsynOneToOneChannel channel, int milliseconds); bool ChannelReadReady(SynOneToOneChannel channel, int milliseconds); int NumReadyChannels(){return num_ready;} int *ReadyChannelsIndex(){return readychannels;} private: int *readychannels; int num_ready; }; // ------------------------------------------------------------------------------- // VectorTime class definition // ------------------------------------------------------------------------------- class VectorClock { public: // constructor and destructor VectorClock(); ~VectorClock(); // copy constructor and copy assignment VectorClock(const VectorClock&); VectorClock& operator=(const VectorClock&); // subsript "[]" operator overloading, // user can access/change vector clock by using "Vectorclock[i]" int& operator[](int& i); // Dump current vector clock to stdout void Print(void) const; private: int *pVectorTime; }; //************************************************************************************ // console class for distributed output // *********************************************************************************** class Console { public: Console(); ~Console(); void output(const char *fmt, ...); void writeout(char *buf, size_t size); private: VectorClock *vectorTime; char serverIP[128]; int serverPort; char hostname[128]; CUDPConnection *connection; }; // ------------------------------------------------------------------------------- // method macro defines // ------------------------------------------------------------------------------- #ifndef _CHANNEL_CPP #define Send(X, Y) Send(X, Y, true, __LINE__, __FILE__) #define Receive(X, Y) Receive(X, Y, true, __LINE__, __FILE__) #define NonBlockingRecv(X,Y) NonBlockingRecv(X, Y, true, __LINE__, __FILE__) #endif #endif // Channel.h