#include "common.h" #include "ipc.h" #include "pa2345.h" #include "proc.h" #include "banking.h" #include #include #include #include #include #include #include struct settings { uint8_t proc_num; uint16_t balances[MAX_PROCESS_ID]; }; int settings_init(struct settings* settings, int argc, char *argv[]) { settings->proc_num = 2; int c; while (1) { int option_index = 0; static struct option long_options[] = { {"processes", required_argument, 0, 0 }, {0, 0, 0, 0 } }; c = getopt_long(argc, argv, "p:", long_options, &option_index); if (c == -1) break; switch (c) { case 0: break; case 'p': ; char *eptr = NULL; unsigned long val = strtoul(optarg, &eptr, 10); if (*eptr) { return 1; } if (val > MAX_PROCESS_ID) { return 1; } settings->proc_num = val; default: break; } } int bal_idx = 0; while (optind < argc) { char *eptr = NULL; unsigned long val = strtoul(argv[optind++], &eptr, 10); if (*eptr) { return 1; } settings->balances[bal_idx++] = val; } return 0; } void child_transfer(FILE *log, Process *me, const Message *order_msg, balance_t *balance, BalanceHistory *hist, timestamp_t *last) { TransferOrder *order = (TransferOrder *)(order_msg->s_payload); local_id from = order->s_src; local_id to = order->s_dst; balance_t amount = order->s_amount; timestamp_t curr_time = get_lamport_time(); for (timestamp_t t = *last; t <= curr_time; ++t) { hist->s_history[t].s_balance_pending_in = hist->s_history[*last].s_balance_pending_in; hist->s_history[t].s_time = t; hist->s_history[t].s_balance = *balance; } Message msg = *order_msg; msg.s_header.s_magic = MESSAGE_MAGIC; msg.s_header.s_local_time = inc_lamport_time(); char entry[100]; if (to == me->id) { *balance += amount; snprintf(entry, 100, log_transfer_in_fmt, get_lamport_time(), me->id, amount, from); msg.s_header.s_payload_len = 0; msg.s_header.s_type = ACK; send(me, PARENT_ID, &msg); hist->s_history[get_lamport_time()].s_balance_pending_in = 0; hist->s_history[get_lamport_time()].s_time = get_lamport_time(); hist->s_history[get_lamport_time()].s_balance = *balance; *last = get_lamport_time(); } else { *balance -= amount; send(me, to, &msg); snprintf(entry, 100, log_transfer_out_fmt, get_lamport_time(), me->id, amount, to); hist->s_history[get_lamport_time()].s_balance_pending_in = amount; hist->s_history[get_lamport_time()].s_time = get_lamport_time(); hist->s_history[get_lamport_time()].s_balance = *balance; hist->s_history[get_lamport_time() + 1].s_balance_pending_in = amount; hist->s_history[get_lamport_time() + 1].s_time = get_lamport_time() + 1; hist->s_history[get_lamport_time() + 1].s_balance = *balance; hist->s_history[get_lamport_time() + 2].s_balance_pending_in = 0; hist->s_history[get_lamport_time() + 2].s_time = get_lamport_time() + 2; hist->s_history[get_lamport_time() + 2].s_balance = *balance; *last = get_lamport_time() + 2; } printf("%s", entry); fprintf(log, "%s", entry); } void child_cycle(FILE *log, Process *me, balance_t balance) { BalanceHistory hist; hist.s_id = me->id; timestamp_t last_time = inc_lamport_time(); for (timestamp_t t = 0; t <= last_time; ++t) { hist.s_history[t].s_balance_pending_in = 0; hist.s_history[t].s_time = t; hist.s_history[t].s_balance = balance; } /* STAGE I */ char entry[100]; size_t size = snprintf(entry, 100, log_started_fmt, get_lamport_time(), me->id, getpid(), getppid(), balance); printf("%s", entry); fprintf(log, "%s", entry); Message msg; msg.s_header.s_magic = MESSAGE_MAGIC; msg.s_header.s_payload_len = size; msg.s_header.s_type = STARTED; msg.s_header.s_local_time = get_lamport_time(); strncpy(msg.s_payload, entry, MAX_PAYLOAD_LEN); send_multicast(me, &msg); await_all(me, STARTED); snprintf(entry, 100, log_received_all_started_fmt, get_lamport_time(), me->id); printf("%s", entry); fprintf(log, "%s", entry); /* STAGE II */ char procs_stopped = 0; while (1) { receive_any(me, &msg); if (msg.s_header.s_type == STOP) { break; } switch (msg.s_header.s_type) { case DONE: ++procs_stopped; break; case TRANSFER: child_transfer(log, me, &msg, &balance, &hist, &last_time); break; } } /* STAGE III */ size = snprintf(entry, 100, log_done_fmt, get_lamport_time(), me->id, balance); printf("%s", entry); fprintf(log, "%s", entry); msg.s_header.s_payload_len = size; msg.s_header.s_type = DONE; msg.s_header.s_local_time = inc_lamport_time(); strncpy(msg.s_payload, entry, MAX_PAYLOAD_LEN); send_multicast(me, &msg); while (procs_stopped < me->fildes_num - 2) { receive_any(me, &msg); switch (msg.s_header.s_type) { case DONE: ++procs_stopped; break; case TRANSFER: child_transfer(log, me, &msg, &balance, &hist, &last_time); break; } } timestamp_t curr_time = get_lamport_time(); for (timestamp_t t = last_time; t <= curr_time; ++t) { hist.s_history[t].s_balance_pending_in = hist.s_history[last_time].s_balance_pending_in; hist.s_history[t].s_time = t; hist.s_history[t].s_balance = balance; } hist.s_history_len = curr_time + 1; msg.s_header.s_payload_len = sizeof(BalanceHistory); msg.s_header.s_type = BALANCE_HISTORY; msg.s_header.s_local_time = inc_lamport_time(); memcpy(msg.s_payload, &hist, sizeof(BalanceHistory)); send(me, PARENT_ID, &msg); snprintf(entry, 100, log_received_all_done_fmt, get_lamport_time(), me->id); printf("%s", entry); fprintf(log, "%s", entry); } void parent_cycle(Process *me) { await_all(me, STARTED); bank_robbery(me, me->fildes_num - 1); Message msg; msg.s_header.s_magic = MESSAGE_MAGIC; msg.s_header.s_payload_len = 0; msg.s_header.s_type = STOP; msg.s_header.s_local_time = inc_lamport_time(); send_multicast(me, &msg); await_all(me, DONE); AllHistory hist; hist.s_history_len = me->fildes_num - 1; for (uint8_t i = 1; i < me->fildes_num; ++i) { receive(me, i, &msg); if (msg.s_header.s_type != BALANCE_HISTORY) { fprintf(stderr, "Not a history\n"); } memcpy(&hist.s_history[i - 1], msg.s_payload, sizeof(BalanceHistory)); } print_history(&hist); for (uint8_t i = 0; i < me->fildes_num - 1; ++i) { int wstatus = 0; wait(&wstatus); } } int main(int argc, char *argv[]) { struct settings settings; settings_init(&settings, argc, argv); uint8_t n = settings.proc_num; FILE *pipe_log = fopen(pipes_log, "w"); FILE *event_log = fopen(events_log, "w"); Process me; proc_init(n + 1, &me, pipe_log); fclose(pipe_log); if (me.id != PARENT_ID) { child_cycle(event_log, &me, settings.balances[me.id - 1]); } else { parent_cycle(&me); } proc_destruct(&me); return 0; }