#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include "common.h" #include "ipc.h" #include "pa2345.h" #include "banking.h" int childCount; int procID; FILE* pointer_events_log; FILE* pointer_pipes_log; pid_t parentPid; typedef struct { int in; int out; } Pipes; Pipes* pointerPipes = NULL; typedef struct { int fromID; int toID; } Data; int* pointerBalance; BalanceHistory* pointerBalanceHistory = NULL; timestamp_t lamport_time = 0; enum { MAX_LEN_LOG_LINE = 5000 }; static const char * const log_pipe_read_fmt = "Pipe %d read data. Process - %d. Bytes - %lu. Message: %s\n"; static const char * const log_pipe_write_fmt = "Pipe %d write data. Process - %d. Bytes - %lu. Message: %s\n"; static const char * const log_pipe_close_fmt = "Pipe %d closed. Process - %d\n"; static const char * const log_pipe_create_fmt = "Pipe create (in %d out %d)\n"; void openLogsFile() { pointer_events_log = fopen(events_log, "w+b"); pointer_pipes_log = fopen(pipes_log, "w+b"); } void pipesLogWrite(char* line) { fprintf(pointer_pipes_log, "%s", line); fflush(pointer_pipes_log); } void eventsLogWrite(char* line) { fprintf(pointer_events_log,"%s",line); fflush(pointer_events_log); } void incTime() { lamport_time++; } timestamp_t getTime() { return lamport_time; } void compareTime(timestamp_t t) { if(t > lamport_time) { lamport_time = t; } incTime(); } void createPipe(int id) { //2 числа для канала. Для чтения и для записи. int fd[2]; //Создаем канал pipe2(fd,O_NONBLOCK); //После создания канала, заполняем нашу структуру связями вход/выход pointerPipes[id].in = fd[0]; pointerPipes[id].out = fd[1]; } //pCount = количество потомков + родитель void createPipes(int pCount) { //Считаем количество каналов int count = childCount * (childCount + 1); //Выделяем память под массив каналов pointerPipes = (Pipes*)malloc(sizeof(Pipes)*count); //Проходим по всем каналам for (int i=0; i= fromID) toID = toID + 1; if (fromID == id) { closeExcessPipeIn(i); } else if (toID == id) { closeExcessPipeOut(i); } else { closeExcessPipeIn(i); closeExcessPipeOut(i); } } } void closePipes() { int count = childCount * (childCount+1); for (int i=0; ifrom) to = to-1; return from*childCount+to; } int pipeWrite(int fd, const Message * msg) { if(fd==0) return -1; write(fd,&msg->s_header,sizeof(MessageHeader)); write(fd,msg->s_payload,msg->s_header.s_payload_len); char line[MAX_LEN_LOG_LINE]; sprintf(line,log_pipe_write_fmt,fd, procID, sizeof(MessageHeader)+msg->s_header.s_payload_len, msg->s_payload); pipesLogWrite(line); return 0; } int pipeRead(int fd, Message* msg, int recAny) { if(fd==0) return -1; int flag; do { flag = read(fd,&msg->s_header,sizeof(MessageHeader)); if(recAny == 0 && flag < 0) { return -1; } } while(flag < 0); flag = 0; do { flag = read(fd,msg->s_payload,msg->s_header.s_payload_len); } while(flag < 0); char line[MAX_LEN_LOG_LINE]; sprintf(line,log_pipe_read_fmt,fd,procID, sizeof(MessageHeader)+msg->s_header.s_payload_len, msg->s_payload); pipesLogWrite(line); return 0; } int send(void * self, local_id dst, const Message * msg) { Data* data = (Data*)self; int pipe = pointerPipes[calculatePipeNumber(data->fromID,dst)].out; return pipeWrite(pipe,msg); } int send_multicast(void * self, const Message * msg) { Data* data = (Data*)self; for(int i=0; ifromID) { int pipe = pointerPipes[calculatePipeNumber(data->fromID,i)].out; pipeWrite(pipe,msg); } } return 0; } int receive(void * self, local_id from, Message * msg) { Data* data = (Data*)self; int pipe = pointerPipes[calculatePipeNumber(from,data->fromID)].in; int res = pipeRead(pipe,msg,1); if(res==0) { compareTime(msg->s_header.s_local_time); } return res; } int receive_any(void * self, Message * msg) { Data* data = (Data*)self; for(int i=0; ifromID) { int pipe = pointerPipes[calculatePipeNumber(i,data->fromID)].in; if(pipeRead(pipe,msg,0)==0) { data->toID = i; compareTime(msg->s_header.s_local_time); return 0; } } } return -1; } Message getMessageTemplate() { Message msg; msg.s_header.s_magic = MESSAGE_MAGIC; msg.s_header.s_local_time = getTime(); return msg; } void waitAllMessage(MessageType type, int selfID) { Message msg; Data data; data.fromID = selfID; for(int i=1; is_history_len; i++) { if(pointerBalanceHistory->s_history[i].s_time == currentTime) { index = i; break; } } //Если нашли if(index > 0 && index < pointerBalanceHistory->s_history_len) { pointerBalanceHistory->s_history[index].s_time = currentTime; pointerBalanceHistory->s_history[index].s_balance_pending_in = pending_in; pointerBalanceHistory->s_history[index].s_balance = amount; } //Если не нашли, значит нужно добавить else { if(pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len-1].s_time < currentTime - 1) { for (timestamp_t t= pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len-1].s_time+1; ts_history[pointerBalanceHistory->s_history_len-1].s_balance, pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len-1].s_balance_pending_in); } } pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len].s_time = currentTime; pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len].s_balance_pending_in = pending_in; pointerBalanceHistory->s_history[pointerBalanceHistory->s_history_len].s_balance = amount; pointerBalanceHistory->s_history_len++; } } void Start(pid_t pid, int selfID) { //?? incTime(); char line[MAX_LEN_LOG_LINE]; sprintf(line,log_started_fmt,getTime(),selfID,pid,parentPid, pointerBalance[selfID]); eventsLogWrite(line); //Формирование сообщения Message msg = getMessageTemplate(); sprintf(msg.s_payload,log_started_fmt,getTime(),selfID,pid,parentPid, pointerBalance[selfID]); msg.s_header.s_type = STARTED; msg.s_header.s_payload_len = strlen(msg.s_payload)+1; Data data; data.fromID = selfID; //Отправление сообщения о старте всем остальным процессам send_multicast(&data,&msg); //Получение сообщения STARTED всеми процессами, кроме текущего и родителя waitAllMessage(STARTED,selfID); //Выделяем память под структуру с историей баланса pointerBalanceHistory = (BalanceHistory*)malloc(sizeof(BalanceHistory)); pointerBalanceHistory->s_id = selfID; pointerBalanceHistory->s_history_len = 0; timestamp_t zeroTime = 0; //Запись в историю значения баланса при старте t = 0 writeCurrentBalance(zeroTime,pointerBalance[selfID],0); char line2[MAX_LEN_LOG_LINE]; sprintf(line2,log_received_all_started_fmt,getTime(),selfID); eventsLogWrite(line); } void clear(void* object, size_t size) { for(int i=0; is_history_len) + sizeof(pointerBalanceHistory->s_history_len) + sizeof(pointerBalanceHistory->s_id); //Запись истории изменения баланса в сообщение for(int i =0; i0); } void startSystem() { //pid_t - встроенный тип. Позволяет получать "ид процесса" pid_t forkPid; for (int i = 1; i < childCount + 1; i++) { //Создание потомка. Весь код после fork выполняется и в родителе, и в потомке forkPid = fork(); //Если текущий процесс потомок if (forkPid == 0) { procID = i; childWork(i); break; } } if(forkPid != 0) { parentWork(); } } void shutdown() { //Освобождаем все каналы freePipes(); //Закрываем логи fclose(pointer_events_log); fclose(pointer_pipes_log); } int main(int argc, char *argv[]) { //Получение числа потомков if (argc <3 || strcmp(argv[1], "-p") !=0) return 1; childCount = atoi(argv[2]); pointerBalance = (int*)malloc(sizeof(int) * childCount + 1); pointerBalance[0] = 0; for (int i=1; i<=childCount; i++) { pointerBalance[i] = atoi(argv[i+2]); } preparationForStart(); startSystem(); shutdown(); return 0; }