#include #include #include #include #include #include #include #include "common.h" #include "ipc.h" #include "pa1.h" int childCount; FILE* pointer_events_log; FILE* pointer_pipes_log; int procID; pid_t parentPid; typedef struct { int in; int out; } Pipes; Pipes* pointerPipes = NULL; typedef struct { int fromID; int toID; } Data; void openLogsFile() { pointer_events_log = fopen(events_log, "w+b"); pointer_pipes_log = fopen(pipes_log, "w+b"); } void logWritePipeCreate(int pipeIn, int pipeOut) { char * const line = "Pipe CREATE (in %d out %d)\n"; fprintf(pointer_pipes_log, line, pipeIn, pipeOut); fflush(pointer_pipes_log); } void createPipe(int id) { //2 числа для канала. Для чтения и для записи. int fd[2]; //Создаем канал pipe(fd); //После создания канала, заполняем нашу структуру связями вход/выход pointerPipes[id].in = fd[0]; pointerPipes[id].out = fd[1]; } //pCount = количество потомков + родитель void createPipes(int pCount) { //Считаем количество каналов int count = childCount * (childCount + 1); //Выделяем память под массив каналов pointerPipes = (Pipes*)malloc(sizeof(Pipes)*count); //Проходим по всем каналам for (int i=0; i= fromID) toID = toID + 1; if (fromID == id) { closeExcessPipeIn(i); } else if (toID == id) { closeExcessPipeOut(i); } else { closeExcessPipeIn(i); closeExcessPipeOut(i); } } } void closePipes() { int count = childCount * (childCount+1); for (int i=0; ifrom) to = to-1; return from*childCount+to; } void logWritePipeWrite(int fd, int pID, int dataBytes, const char* payload) { char * const line = "Pipe %d write data. Process - %3d. Bytes - %d. Message: %s\n"; fprintf(pointer_pipes_log, line, fd, pID, dataBytes, payload); fflush(pointer_pipes_log); } int pipeWrite(int fd, const Message * msg) { if(fd==0) return -1; if(msg==NULL) return -2; write(fd,&msg->s_header,sizeof(MessageHeader)); write(fd,msg->s_payload,msg->s_header.s_payload_len); logWritePipeWrite(fd, procID, sizeof(MessageHeader)+msg->s_header.s_payload_len, msg->s_payload); return 0; } void logWritePipeRead(int fd, int pID, int dataBytes, const char* payload) { char * const line = "Pipe %d read data. Process - %d. Bytes - %d. Message: %s\n"; fprintf(pointer_pipes_log, line, fd, pID, dataBytes, payload); fflush(pointer_pipes_log); } int pipeRead(int fd, Message* msg) { if(fd==0) return -1; if(msg==NULL) return -2; read(fd,&msg->s_header,sizeof(MessageHeader)); read(fd,msg->s_payload,msg->s_header.s_payload_len); logWritePipeRead(fd, procID, sizeof(MessageHeader)+msg->s_header.s_payload_len, msg->s_payload); return 0; } int send(void * self, local_id dst, const Message * msg) { Data* data = (Data*)self; int pipe = pointerPipes[calculatePipeNumber(data->fromID,dst)].out; return pipeWrite(pipe,msg); } int send_multicast(void * self, const Message * msg) { Data* data = (Data*)self; for(int i=0; ifromID) { int pipe = pointerPipes[calculatePipeNumber(data->fromID,i)].out; pipeWrite(pipe,msg); } } return 0; } int receive(void * self, local_id from, Message * msg) { Data* data = (Data*)self; int pipe = pointerPipes[calculatePipeNumber(from,data->fromID)].in; return pipeRead(pipe,msg); } int receive_any(void * self, Message * msg) { Data* data = (Data*)self; do { for(int i=0; ifromID) { int pipe = pointerPipes[calculatePipeNumber(i,data->fromID)].in; if(pipeRead(pipe,msg)==0) { data->toID = i; return 0; } } } } while(1); return -1; } void logWriteProcStart(int selfID, pid_t pid) { fprintf(pointer_events_log,log_started_fmt,selfID,pid,parentPid); fflush(pointer_events_log); } void logWriteProcDone(int selfID) { fprintf(pointer_events_log,log_done_fmt,selfID); fflush(pointer_events_log); } Message getMessageTemplate() { Message msg; msg.s_header.s_magic = MESSAGE_MAGIC; msg.s_header.s_local_time = time(NULL); return msg; } void waitAllChildProcReceiveMsg(MessageType type, int selfID) { Message msg; Data data; data.fromID = selfID; for(int i=0; i0); } shutdown(); return 0; }