#include "ipc.h" #include "proc.h" #include #include #include int send(void * self, local_id dst, const Message * msg) { Process *p = (Process *)self; if (write(p->write_fildes[dst], (char *)msg, sizeof(MessageHeader) + msg->s_header.s_payload_len) == -1) { perror("sending"); return 1; } return 0; } int send_multicast(void * self, const Message * msg) { Process *p = (Process *)self; for (uint8_t i = 0; i < p->fildes_num; ++i) { if (i == p->id) continue; if (send(self, i, msg)) { return 1; } } return 0; } int receive(void * self, local_id from, Message * msg) { Process *p = (Process *)self; while (read(p->read_fildes[from], (char *)&(msg->s_header), sizeof(MessageHeader)) == -1) { if (errno != EAGAIN) { perror("receiving"); return 1; } } set_lamport_time(msg->s_header.s_local_time); uint16_t payload_len = msg->s_header.s_payload_len; while (read(p->read_fildes[from], (char *)msg->s_payload, payload_len) != payload_len) { perror("receiving"); return 1; } return 0; } int receive_any(void * self, Message * msg) { Process *p = (Process *)self; int status; while (1) { for (size_t i = 0; i < p->fildes_num; ++i) { if (i == p->id) { continue; } status = read(p->read_fildes[i], (char *)&(msg->s_header), sizeof(MessageHeader)); if (status <= 0) { continue; } set_lamport_time(msg->s_header.s_local_time); uint16_t payload_len = msg->s_header.s_payload_len; if (payload_len == 0) { return 0; } read(p->read_fildes[i], (char *)&(msg->s_payload), payload_len); return 0; } } return 1; }