diff options
author | Mike Pavone <pavone@retrodev.com> | 2020-05-02 00:52:21 -0700 |
---|---|---|
committer | Mike Pavone <pavone@retrodev.com> | 2020-05-02 00:52:21 -0700 |
commit | 040cc6af560f470e8030569b0ab765289b11f75a (patch) | |
tree | 1f8f97eb21dd384894d35aacd464b9ca3d1a7c8b /event_log.c | |
parent | 00dd4dd72920547516ce27c7f3f3e82eb433ad87 (diff) |
Fix some netplay issues
Diffstat (limited to 'event_log.c')
-rw-r--r-- | event_log.c | 76 |
1 files changed, 47 insertions, 29 deletions
diff --git a/event_log.c b/event_log.c index 63122cf..877d910 100644 --- a/event_log.c +++ b/event_log.c @@ -124,7 +124,7 @@ static void event_header(uint8_t type, uint32_t cycle) } return; } - } else if (type == last_event_type && delta == last_delta) { + } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) { //make some room save_int8(&buffer, 0); //shift existing command @@ -246,6 +246,11 @@ void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload) last = cycle; save_buffer8(&buffer, payload, size); if (listen_sock && buffer.size > 1280) { + if (multi_count) { + buffer.data[multi_start] |= multi_count - 2; + multi_count = 0; + last_event_type = 0xFF; + } flush_socket(); } } @@ -394,6 +399,40 @@ void init_event_reader_tcp(event_reader *reader, char *address, char *port) setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); } +static void read_from_socket(event_reader *reader) +{ + if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { + reader->storage *= 2; + uint8_t *new_buf = malloc(reader->storage); + memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); + free(reader->buffer.data); + reader->buffer.data = new_buf; + reader->buffer.size -= reader->buffer.cur_pos; + reader->buffer.cur_pos = 0; + } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) { + memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); + reader->buffer.size -= reader->buffer.cur_pos; + reader->buffer.cur_pos = 0; + } + int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); + if (bytes >= 0) { + reader->buffer.size += bytes; + } else if (!socket_error_is_wouldblock()) { + fatal_error("Connection closed, error = %X\n", socket_last_error()); + } +} + +void reader_ensure_data(event_reader *reader, size_t bytes) +{ + if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { + socket_blocking(reader->socket, 1); + while (reader->buffer.size - reader->buffer.cur_pos < bytes) { + read_from_socket(reader); + } + socket_blocking(reader->socket, 0); + } +} + uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) { if (reader->repeat_remaining) { @@ -403,34 +442,8 @@ uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) return reader->repeat_event; } if (reader->socket) { - uint8_t blocking = 0; - if (reader->buffer.size - reader->buffer.cur_pos < 9) { - //set back to block mode - socket_blocking(reader->socket, 1); - blocking = 1; - } - if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { - reader->storage *= 2; - uint8_t *new_buf = malloc(reader->storage); - memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); - free(reader->buffer.data); - reader->buffer.data = new_buf; - reader->buffer.size -= reader->buffer.cur_pos; - reader->buffer.cur_pos = 0; - } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) { - memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); - reader->buffer.size -= reader->buffer.cur_pos; - reader->buffer.cur_pos = 0; - } - int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); - if (bytes >= 0) { - reader->buffer.size += bytes; - if (blocking && reader->buffer.size - reader->buffer.cur_pos >= 9) { - socket_blocking(reader->socket, 0); - } - } else if (!socket_error_is_wouldblock()) { - printf("Connection closed, error = %X\n", socket_last_error()); - } + read_from_socket(reader); + reader_ensure_data(reader, 1); } uint8_t header = load_int8(&reader->buffer); uint8_t ret; @@ -439,15 +452,18 @@ uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) if ((header & 0xF0) == (EVENT_MULTI << 4)) { reader->repeat_remaining = (header & 0xF) + 1; multi_start = 1; + reader_ensure_data(reader, 1); header = load_int8(&reader->buffer); } if ((header & 0xF0) < FORMAT_3BYTE) { delta = (header & 0xF) + 16; ret = header >> 4; } else if ((header & 0xF0) == FORMAT_3BYTE) { + reader_ensure_data(reader, 2); delta = load_int16(&reader->buffer); ret = header & 0xF; } else { + reader_ensure_data(reader, 3); delta = load_int8(&reader->buffer) << 16; //sign extend 24-bit delta to 32-bit if (delta & 0x800000) { @@ -463,11 +479,13 @@ uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) *cycle_out = reader->last_cycle + delta; reader->last_cycle = *cycle_out; if (ret == EVENT_ADJUST) { + reader_ensure_data(reader, 4); size_t old_pos = reader->buffer.cur_pos; uint32_t adjust = load_int32(&reader->buffer); reader->buffer.cur_pos = old_pos; reader->last_cycle -= adjust; } else if (ret == EVENT_STATE) { + reader_ensure_data(reader, 8); reader->last_cycle = load_int32(&reader->buffer); reader->last_word_address = load_int8(&reader->buffer) << 16; reader->last_word_address |= load_int16(&reader->buffer); |