summaryrefslogtreecommitdiff
path: root/event_log.c
diff options
context:
space:
mode:
authorMike Pavone <pavone@retrodev.com>2020-05-02 17:33:23 -0700
committerMike Pavone <pavone@retrodev.com>2020-05-02 17:33:23 -0700
commitfe4f53ea6416afa821c02f2c15a9e7c78eff1eba (patch)
tree7dcd0037fa89b8378b96c3cb3508fb117da64b5e /event_log.c
parent040cc6af560f470e8030569b0ab765289b11f75a (diff)
Use zlib to compress event log streams
Diffstat (limited to 'event_log.c')
-rw-r--r--event_log.c335
1 files changed, 262 insertions, 73 deletions
diff --git a/event_log.c b/event_log.c
index 877d910..99bf536 100644
--- a/event_log.c
+++ b/event_log.c
@@ -15,6 +15,7 @@
#include "util.h"
#include "blastem.h"
#include "saves.h"
+#include "zlib/zlib.h"
enum {
CMD_GAMEPAD_DOWN,
@@ -24,9 +25,47 @@ enum {
static uint8_t active, fully_active;
static FILE *event_file;
static serialize_buffer buffer;
+static uint8_t *compressed;
+static size_t compressed_storage;
+static z_stream output_stream;
+static uint32_t last;
+
+static void event_log_common_init(void)
+{
+ init_serialize(&buffer);
+ compressed_storage = 128*1024;
+ compressed = malloc(compressed_storage);
+ deflateInit(&output_stream, 9);
+ output_stream.avail_out = compressed_storage;
+ output_stream.next_out = compressed;
+ output_stream.avail_in = 0;
+ output_stream.next_in = buffer.data;
+ last = 0;
+ active = 1;
+}
+
+static uint8_t multi_count;
+static size_t multi_start;
+static void finish_multi(void)
+{
+ buffer.data[multi_start] |= multi_count - 2;
+ multi_count = 0;
+}
+
+static void file_finish(void)
+{
+ fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
+ output_stream.next_out = compressed;
+ output_stream.avail_out = compressed_storage;
+ int result = deflate(&output_stream, Z_FINISH);
+ if (Z_STREAM_END != result) {
+ fatal_error("Final deflate call returned %d\n", result);
+ }
+ fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
+ fclose(event_file);
+}
static const char el_ident[] = "BLSTEL\x02\x00";
-static uint32_t last;
void event_log_file(char *fname)
{
event_file = fopen(fname, "wb");
@@ -35,9 +74,9 @@ void event_log_file(char *fname)
return;
}
fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
- init_serialize(&buffer);
- active = fully_active = 1;
- last = 0;
+ event_log_common_init();
+ fully_active = 1;
+ atexit(file_finish);
}
static int listen_sock, remotes[7];
@@ -70,7 +109,7 @@ void event_log_tcp(char *address, char *port)
goto cleanup_address;
}
socket_blocking(listen_sock, 0);
- active = 1;
+ event_log_common_init();
cleanup_address:
freeaddrinfo(result);
}
@@ -90,12 +129,15 @@ void event_system_start(system_type stype, vid_std video_std, char *name)
}
save_int8(&buffer, name_len);
save_buffer8(&buffer, name, strlen(name));
- if (!fully_active) {
+ if (listen_sock) {
system_start = malloc(buffer.size);
system_start_size = buffer.size;
memcpy(system_start, buffer.data, buffer.size);
- buffer.size = 0;
+ } else {
+ //system start header is never compressed, so write to file immediately
+ fwrite(buffer.data, 1, buffer.size, event_file);
}
+ buffer.size = 0;
}
//header formats
@@ -106,21 +148,17 @@ void event_system_start(system_type stype, vid_std video_std, char *name)
#define FORMAT_4BYTE 0xF0
static uint8_t last_event_type = 0xFF;
static uint32_t last_delta;
-static uint8_t multi_count;
-static size_t multi_start;
static void event_header(uint8_t type, uint32_t cycle)
{
uint32_t delta = cycle - last;
if (multi_count) {
if (type != last_event_type || delta != last_delta) {
- buffer.data[multi_start] |= multi_count - 2;
- multi_count = 0;
+ finish_multi();
} else {
++multi_count;
if (multi_count == 17) {
- buffer.data[multi_start] |= multi_count - 2;
+ finish_multi();
last_event_type = 0xFF;
- multi_count = 0;
}
return;
}
@@ -159,7 +197,7 @@ void event_cycle_adjust(uint32_t cycle, uint32_t deduction)
save_int32(&buffer, deduction);
}
-static size_t remote_send_progress[7];
+static uint8_t *remote_send_progress[7];
static uint8_t remote_needs_state[7];
static void flush_socket(void)
{
@@ -174,11 +212,11 @@ static void flush_socket(void)
current_system->save_state = EVENTLOG_SLOT + 1;
}
}
- size_t min_progress = 0;
+ uint8_t *min_progress = compressed;
for (int i = 0; i < num_remotes; i++) {
int sent = 1;
if (remote_needs_state[i]) {
- remote_send_progress[i] = buffer.size;
+ remote_send_progress[i] = output_stream.next_out;
} else {
uint8_t buffer[1500];
int bytes = recv(remotes[i], buffer, sizeof(buffer), 0);
@@ -210,9 +248,9 @@ static void flush_socket(void)
}
}
}
- while (sent && buffer.size - remote_send_progress[i])
+ while (sent && output_stream.next_out > remote_send_progress[i])
{
- sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0);
+ sent = send(remotes[i], remote_send_progress[i], output_stream.next_out - remote_send_progress[i], 0);
if (sent >= 0) {
remote_send_progress[i] += sent;
} else if (socket_error_is_wouldblock()) {
@@ -229,11 +267,12 @@ static void flush_socket(void)
}
}
}
- if (min_progress == buffer.size) {
- buffer.size = 0;
- memset(remote_send_progress, 0, sizeof(remote_send_progress));
- multi_count = 0;
- last_event_type = 0xFF;
+ if (min_progress == output_stream.next_out) {
+ output_stream.next_out = compressed;
+ output_stream.avail_out = compressed_storage;
+ for (int i = 0; i < num_remotes; i++) {
+ remote_send_progress[i] = compressed;
+ }
}
}
@@ -245,13 +284,26 @@ void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload)
event_header(type, cycle);
last = cycle;
save_buffer8(&buffer, payload, size);
- if (listen_sock && buffer.size > 1280) {
- if (multi_count) {
- buffer.data[multi_start] |= multi_count - 2;
- multi_count = 0;
- last_event_type = 0xFF;
+ if (!multi_count) {
+ last_event_type = 0xFF;
+ output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
+ int result = deflate(&output_stream, Z_NO_FLUSH);
+ if (result != Z_OK) {
+ fatal_error("deflate returned %d\n", result);
+ }
+ if (listen_sock) {
+ if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) {
+ flush_socket();
+ }
+ } else if (!output_stream.avail_out) {
+ fwrite(compressed, 1, compressed_storage, event_file);
+ output_stream.next_out = compressed;
+ output_stream.avail_out = compressed_storage;
+ }
+ if (!output_stream.avail_in) {
+ buffer.size = 0;
+ output_stream.next_in = buffer.data;
}
- flush_socket();
}
}
@@ -300,6 +352,39 @@ static size_t send_all(int sock, uint8_t *data, size_t size, int flags)
return total;
}
+void deflate_flush(uint8_t full)
+{
+ output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
+ while (output_stream.avail_in)
+ {
+ if (!output_stream.avail_out) {
+ size_t old_storage = compressed_storage;
+ uint8_t *old_compressed = compressed;
+ compressed_storage *= 2;
+ compressed = realloc(compressed, compressed_storage);
+ output_stream.next_out = compressed + old_storage;
+ output_stream.avail_out = old_storage;
+ for (int i = 0; i < num_remotes; i++) {
+ if (!remote_needs_state[i]) {
+ remote_send_progress[i] = compressed + (remote_send_progress[i] - old_compressed);
+ }
+ }
+ }
+ int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH);
+ if (result != (full ? Z_STREAM_END : Z_OK)) {
+ fatal_error("deflate returned %d\n", result);
+ }
+ if (full) {
+ result = deflateReset(&output_stream);
+ if (result != Z_OK) {
+ fatal_error("deflateReset returned %d\n", result);
+ }
+ }
+ }
+ output_stream.next_in = buffer.data;
+ buffer.size = 0;
+}
+
void event_state(uint32_t cycle, serialize_buffer *state)
{
if (!fully_active) {
@@ -311,20 +396,12 @@ void event_state(uint32_t cycle, serialize_buffer *state)
last_byte_address >> 8, last_byte_address,
state->size >> 16, state->size >> 8, state->size
};
+ uint8_t sent_system_start = 0;
for (int i = 0; i < num_remotes; i++)
{
if (remote_needs_state[i]) {
- if(
- send_all(remotes[i], system_start, system_start_size, 0) == system_start_size
- && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header)
- && send_all(remotes[i], state->data, state->size, 0) == state->size
- ) {
- remote_send_progress[i] = buffer.size;
- remote_needs_state[i] = 0;
- socket_blocking(remotes[i], 0);
- int flag = 1;
- setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
- fully_active = 1;
+ if (send_all(remotes[i], system_start, system_start_size, 0) == system_start_size) {
+ sent_system_start = 1;
} else {
socket_close(remotes[i]);
remotes[i] = remotes[num_remotes-1];
@@ -335,6 +412,41 @@ void event_state(uint32_t cycle, serialize_buffer *state)
}
}
}
+ if (sent_system_start) {
+ if (fully_active) {
+ if (multi_count) {
+ finish_multi();
+ }
+ //full flush is needed so new and old clients can share a stream
+ deflate_flush(1);
+ }
+ save_buffer8(&buffer, header, sizeof(header));
+ save_buffer8(&buffer, state->data, state->size);
+ size_t old_compressed_size = output_stream.next_out - compressed;
+ deflate_flush(1);
+ size_t state_size = output_stream.next_out - compressed - old_compressed_size;
+ for (int i = 0; i < num_remotes; i++) {
+ if (remote_needs_state[i]) {
+ if (send_all(remotes[i], compressed + old_compressed_size, state_size, 0) == state_size) {
+ remote_send_progress[i] = compressed + old_compressed_size;
+ remote_needs_state[i] = 0;
+ socket_blocking(remotes[i], 0);
+ int flag = 1;
+ setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
+ fully_active = 1;
+ } else {
+ socket_close(remotes[i]);
+ remotes[i] = remotes[num_remotes-1];
+ remote_send_progress[i] = remote_send_progress[num_remotes-1];
+ remote_needs_state[i] = remote_needs_state[num_remotes-1];
+ num_remotes--;
+ i--;
+ }
+ }
+ }
+ output_stream.next_out = compressed + old_compressed_size;
+ output_stream.avail_out = compressed_storage - old_compressed_size;
+ }
}
void event_flush(uint32_t cycle)
@@ -343,25 +455,55 @@ void event_flush(uint32_t cycle)
return;
}
if (fully_active) {
- event_log(EVENT_FLUSH, cycle, 0, NULL);
+ event_header(EVENT_FLUSH, cycle);
+ last = cycle;
+
+ deflate_flush(0);
}
if (event_file) {
- fwrite(buffer.data, 1, buffer.size, event_file);
+ fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
fflush(event_file);
- buffer.size = 0;
- multi_count = 0;
- last_event_type = 0xFF;
+ output_stream.next_out = compressed;
+ output_stream.avail_out = compressed_storage;
} else if (listen_sock) {
flush_socket();
}
}
+static void init_event_reader_common(event_reader *reader)
+{
+ reader->last_cycle = 0;
+ reader->repeat_event = 0xFF;
+ reader->storage = 512 * 1024;
+ init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
+ reader->buffer.size = 0;
+ memset(&reader->input_stream, 0, sizeof(reader->input_stream));
+
+}
+
void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
{
reader->socket = 0;
reader->last_cycle = 0;
reader->repeat_event = 0xFF;
- init_deserialize(&reader->buffer, data, size);
+ init_event_reader_common(reader);
+ uint8_t name_len = data[1];
+ reader->buffer.size = name_len + 2;
+ memcpy(reader->buffer.data, data, reader->buffer.size);
+ reader->input_stream.next_in = data + reader->buffer.size;
+ reader->input_stream.avail_in = size - reader->buffer.size;
+
+ int result = inflateInit(&reader->input_stream);
+ if (Z_OK != result) {
+ fatal_error("inflateInit returned %d\n", result);
+ }
+ reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
+ reader->input_stream.avail_out = reader->storage - reader->buffer.size;
+ result = inflate(&reader->input_stream, Z_NO_FLUSH);
+ if (Z_OK != result && Z_STREAM_END != result) {
+ fatal_error("inflate returned %d\n", result);
+ }
+ reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
}
void init_event_reader_tcp(event_reader *reader, char *address, char *port)
@@ -382,10 +524,10 @@ void init_event_reader_tcp(event_reader *reader, char *address, char *port)
fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
}
- reader->storage = 512 * 1024;
- reader->last_cycle = 0;
- init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
- reader->buffer.size = 0;
+ init_event_reader_common(reader);
+ reader->socket_buffer_size = 256 * 1024;
+ reader->socket_buffer = malloc(reader->socket_buffer_size);
+
while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
{
int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
@@ -394,6 +536,21 @@ void init_event_reader_tcp(event_reader *reader, char *address, char *port)
}
reader->buffer.size += bytes;
}
+ size_t init_msg_len = 3 + reader->buffer.data[2];
+ memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len);
+ reader->input_stream.next_in = reader->socket_buffer;
+ reader->input_stream.avail_in = reader->buffer.size - init_msg_len;
+ reader->buffer.size = init_msg_len;
+ int res = inflateInit(&reader->input_stream);
+ if (Z_OK != res) {
+ fatal_error("inflateInit returned %d\n", res);
+ }
+ reader->input_stream.next_out = reader->buffer.data + init_msg_len;
+ reader->input_stream.avail_out = reader->storage - init_msg_len;
+ res = inflate(&reader->input_stream, Z_NO_FLUSH);
+ if (Z_OK != res && Z_BUF_ERROR != res) {
+ fatal_error("inflate returned %d in init_event_reader_tcp\n", res);
+ }
socket_blocking(reader->socket, 0);
int flag = 1;
setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
@@ -401,35 +558,70 @@ void init_event_reader_tcp(event_reader *reader, char *address, char *port)
static void read_from_socket(event_reader *reader)
{
- if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
- reader->storage *= 2;
- uint8_t *new_buf = malloc(reader->storage);
- memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
- free(reader->buffer.data);
- reader->buffer.data = new_buf;
- reader->buffer.size -= reader->buffer.cur_pos;
- reader->buffer.cur_pos = 0;
- } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) {
- memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
- reader->buffer.size -= reader->buffer.cur_pos;
- reader->buffer.cur_pos = 0;
- }
- int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
+ if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) {
+ reader->socket_buffer_size *= 2;
+ uint8_t *new_buf = malloc(reader->socket_buffer_size);
+ memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in);
+ free(reader->socket_buffer);
+ reader->socket_buffer = new_buf;
+ reader->input_stream.next_in = new_buf;
+ } else if (
+ reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in
+ && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2
+ ) {
+ memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in);
+ reader->input_stream.next_in = reader->socket_buffer;
+ }
+ uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in;
+ size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start;
+ int bytes = recv(reader->socket, space_start, space, 0);
if (bytes >= 0) {
- reader->buffer.size += bytes;
+ reader->input_stream.avail_in += bytes;
} else if (!socket_error_is_wouldblock()) {
fatal_error("Connection closed, error = %X\n", socket_last_error());
}
}
+static void inflate_flush(event_reader *reader)
+{
+ if (reader->buffer.cur_pos > reader->storage / 2) {
+ memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
+ reader->buffer.size -= reader->buffer.cur_pos;
+ reader->buffer.cur_pos = 0;
+ reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
+ reader->input_stream.avail_out = reader->storage - reader->buffer.size;
+ }
+ int result = inflate(&reader->input_stream, Z_SYNC_FLUSH);
+ if (Z_OK != result && Z_STREAM_END != result) {
+ fatal_error("inflate returned %d\n", result);
+ }
+ reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
+ if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) {
+ inflateReset(&reader->input_stream);
+ if (reader->input_stream.avail_in) {
+ inflate_flush(reader);
+ }
+ }
+
+}
+
void reader_ensure_data(event_reader *reader, size_t bytes)
{
- if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
- socket_blocking(reader->socket, 1);
- while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+ if (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+ if (reader->socket) {
read_from_socket(reader);
}
- socket_blocking(reader->socket, 0);
+ if (reader->input_stream.avail_in) {
+ inflate_flush(reader);
+ }
+ if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
+ socket_blocking(reader->socket, 1);
+ while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+ read_from_socket(reader);
+ inflate_flush(reader);
+ }
+ socket_blocking(reader->socket, 0);
+ }
}
}
@@ -441,10 +633,7 @@ uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
reader->last_cycle = *cycle_out;
return reader->repeat_event;
}
- if (reader->socket) {
- read_from_socket(reader);
- reader_ensure_data(reader, 1);
- }
+ reader_ensure_data(reader, 1);
uint8_t header = load_int8(&reader->buffer);
uint8_t ret;
uint32_t delta;