diff options
author | Vivien Kraus <vivien@planete-kraus.eu> | 2023-04-26 20:27:54 +0200 |
---|---|---|
committer | Vivien Kraus <vivien@planete-kraus.eu> | 2023-05-10 00:01:16 +0200 |
commit | 7ae97c9c9c9bab8e8406cdc5dbc61c5c4d4603a5 (patch) | |
tree | ac00d71050ec677982c29ddedfb2fe507bf340c3 /src | |
parent | 1269e572a57daf7499f0bfcb535fb09ce4d2a02c (diff) |
Add an append-only file.
Diffstat (limited to 'src')
-rw-r--r-- | src/libdisfluid/Makefile.am | 1 | ||||
-rw-r--r-- | src/libdisfluid/disfluid-append-only-file.h | 680 | ||||
-rw-r--r-- | src/libdisfluid/disfluid-tests.h | 134 |
3 files changed, 815 insertions, 0 deletions
diff --git a/src/libdisfluid/Makefile.am b/src/libdisfluid/Makefile.am index aeba503..0354f5d 100644 --- a/src/libdisfluid/Makefile.am +++ b/src/libdisfluid/Makefile.am @@ -2,6 +2,7 @@ lib_LTLIBRARIES += %D%/libdisfluid.la %C%_libdisfluid_la_SOURCES = \ %D%/disfluid-authors.h \ + %D%/disfluid-append-only-file.h \ %D%/disfluid-cache-entry.h \ %D%/disfluid-cache-entry-key.h \ %D%/disfluid-cache-entry-hash.h \ diff --git a/src/libdisfluid/disfluid-append-only-file.h b/src/libdisfluid/disfluid-append-only-file.h new file mode 100644 index 0000000..65ad40c --- /dev/null +++ b/src/libdisfluid/disfluid-append-only-file.h @@ -0,0 +1,680 @@ +#ifndef DISFLUID_APPEND_ONLY_FILE_INCLUDED +# define DISFLUID_APPEND_ONLY_FILE_INCLUDED + +# include <config.h> +# include "string-desc.h" + +MAYBE_UNUSED static int +ao_file_prepare (int fd, const string_desc_t file_magic); + +MAYBE_UNUSED static int ao_file_lock_for_writing (int fd, size_t *top); + +MAYBE_UNUSED static int ao_file_read_top (int fd, size_t *top); + +MAYBE_UNUSED static int +ao_file_read (int fd, size_t offset, string_desc_t data); + +MAYBE_UNUSED static int +ao_file_push_data (int fd, const string_desc_t data, size_t *offset); + +MAYBE_UNUSED static int ao_file_commit_transaction (int fd); + +MAYBE_UNUSED static void ao_file_abort_transaction (int fd); + +# include <unistd.h> + +/* The file is structured in this way: + + - 16 bytes for the magic tag; + + - 1 bytes for the status: 0 -> no pending transaction, 1 -> has a + pending transaction; + + - 7 bytes reserved; + + - 8 bytes for the top before applying the transaction; + + - 8 bytes for the top after applying the transaction. + +The top is always in logical units, i.e. without counting the +header. It is the index of the last byte. */ + +/* You can lock either top offsets. You can lock both, but only if you + lock the second one and then the first one. */ + +/* A transaction is considered committed in two cases: + - byte 16 is 0: the top is the first offset; + - byte 16 is 1: the top is the second offset. This is not a normal case, so we relax 2 expectations: that a concurrent read can happen with a transaction being written, and a process can read on a read-only storage. */ + +/* The process to update a file is to append data, then set the second + offset, then sync the file, and then set byte 16 to 1. */ + +/* To read a file, while byte 16 is 1, apply the transaction. Once you + have 0, then you know the transaction has been applied. */ + +/* The difficulty is obviously to lock the file correctly, so that + once you have a read lock for the file, the first offset can’t + change, and once you have a write lock to add a transaction, then + the second offset is yours to update and no other transaction can + run (but the file can still be read). */ + +/* Here is how you would lock the file for reading: */ + +/* + 0. Acquire a shared lock on the first offset. + + 1. Atomically read byte 16. If it indicates that there is no + transaction, success! Otherwise, this is the abnormal case, + continue. + + 2. Release the shared lock on the first offset. + + 3. Acquire an exclusive lock on the second offset. This may block + for a long time, if a new transaction has started between steps 2 + and 3. + + 4. Read byte 16. If it is 0, go to 11. Otherwise, continue. + + 5. Acquire an exclusive lock on the first offset. + + 6. Read the second offset. + + 7. Write the second offset to the first slot. + + 8. Fsync. + + 9. Write 0 as byte 16. + + 10. Fsync. + + 11. Release the lock on the first offset. + + 12. Release the lock on the second offset. + + 13. Go to step 0. */ + +/* Note that this only locks the second offset in abnormal cases. */ + +/* Here is how you would lock the file for appending more data: */ + +/* 1. Acquire an exclusive lock on the second offset. + + 2. Atomically read byte 16. If it is 0, success. Otherwise, + continue. + + 3. Acquire an exclusive lock on the first offset. + + 4. Read the second offset. + + 5. Write the second offset to the first slot. + + 6. Fsync. + + 7. Write 0 as byte 16. + + 8. Fsync. + + 9. Release the lock on the first offset. +*/ + +/* Note that while a transaction is being computed, the first offset + is unlocked. */ + +/* To commit a transaction: + + 1. update the second offset; + + 2. sync file; + + 3. set byte 16 to 1; + + 4. Re-sync; + + 5. Acquire an exclusive lock on the first offset; + + 6. Write the second offset in the first slot; + + 7. Sync again; + + 8. Set byte 16 to 0; + + 9. Release the lock on the first offset; + + 10. Release the lock on the second offset. +*/ + +/* This scheme ensures that: + - the first offset is never read if byte 16 is 1; + - the second offset is never read if byte 16 is 0; + - the second offset is only changed if byte 16 is 0, and an exclusive lock has been acquired for it; + - the first offset is only changed if byte 16 is 1, and an exclusive lock has been acquired for it; + - a process that has a lock for the first offset will never try to lock the second offset; + - byte 16 is only set to 1 if the second offset is synced; + - byte 16 is only set to 0 if the first offset is synced; + - once the second offset is unlocked, the transaction has been fully committed; + - the first offset can’t change while the second offset is locked; + - only one transaction can be added at the same time; + - when a transaction is being added, byte 16 is 0, so reading is non-blocking. */ + +static int +ao_file_read_u8 (int fd, uint8_t * number) +{ + if (read (fd, number, 1) <= 0) + { + return -1; + } + return 0; +} + +static int +ao_file_read_u64 (int fd, size_t *number) +{ + *number = 0; + for (size_t i = 0; i < 8; i++) + { + uint8_t byte; + if (ao_file_read_u8 (fd, &byte) < 0) + { + return -1; + } + *number *= 256; + *number += byte; + } + return 0; +} + +static int +ao_file_write_u8 (int fd, uint8_t number) +{ + if (write (fd, &number, 1) == -1) + { + return -1; + } + return 0; +} + +static int +ao_file_write_u64 (int fd, size_t offset) +{ + uint8_t big[8] = { 0 }; + for (size_t i = 8; i-- > 0;) + { + big[i] = offset % 256; + offset /= 256; + } + for (size_t i = 0; i < 8; i++) + { + if (ao_file_write_u8 (fd, big[i]) < 0) + { + return -1; + } + } + return 0; +} + +static int +ao_file_lock (int fd, size_t offset, size_t length, bool exclusive) +{ + struct flock lock = { + .l_type = F_RDLCK, + .l_whence = SEEK_SET, + .l_start = offset, + .l_len = length + }; + if (exclusive) + { + lock.l_type = F_WRLCK; + } + if (fcntl (fd, F_SETLKW, &lock) == -1) + { + return -1; + } + return 0; +} + +static void +ao_file_unlock (int fd, size_t offset, size_t length) +{ + struct flock lock = { + .l_type = F_UNLCK, + .l_whence = SEEK_SET, + .l_start = offset, + .l_len = length + }; + fcntl (fd, F_SETLKW, &lock); +} + +static int +ao_file_write_magic (int fd, const string_desc_t magic) +{ + assert (magic._nbytes == 16); + int error = 0; + if (lseek (fd, 0, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } + ssize_t n_written = 0; + const char *to_write = magic._data; + size_t n_to_write = magic._nbytes; + while (n_written < n_to_write) + { + n_written = write (fd, to_write, n_to_write); + if (n_written <= 0) + { + error = -1; + goto cleanup; + } + assert (n_written <= n_to_write); + to_write += n_written; + n_to_write -= n_written; + } +cleanup: + return error; +} + +static int +ao_file_read_flags (int fd, bool *has_transaction) +{ + int error = 0; + *has_transaction = false; + if (lseek (fd, 16, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } + uint8_t flags; + if (read (fd, &flags, 1) <= 0) + { + error = -1; + goto cleanup; + } + switch (flags) + { + case 0: + *has_transaction = false; + break; + case 1: + *has_transaction = true; + break; + default: + error = -1; + goto cleanup; + } +cleanup: + return error; +} + +static int +ao_file_write_flags (int fd, bool has_transaction) +{ + int error = 0; + if (lseek (fd, 16, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } + uint8_t flags = 0; + if (has_transaction) + { + flags = 1; + } + if (write (fd, &flags, 1) <= 0) + { + error = -1; + goto cleanup; + } +cleanup: + return error; +} + +static int +ao_file_read_offset (int fd, int which, size_t *offset) +{ + assert (which == 0 || which == 1); + size_t read_offset = 24; + if (which == 1) + { + read_offset = 32; + } + if (lseek (fd, read_offset, SEEK_SET) == -1) + { + return -1; + } + return ao_file_read_u64 (fd, offset); +} + +static int +ao_file_write_offset (int fd, int which, size_t offset) +{ + assert (which == 0 || which == 1); + size_t write_offset = 24; + if (which == 1) + { + write_offset = 32; + } + if (lseek (fd, write_offset, SEEK_SET) == -1) + { + return -1; + } + return ao_file_write_u64 (fd, offset); +} + +static int +ao_file_lock_for_writing (int fd, size_t *top) +{ + /* On success, an exclusive lock is maintained on the second + offset. */ + int error = 0; + if (ao_file_lock (fd, 32, 8, true) < 0) + { + error = -1; + goto cleanup; + } + bool has_transaction = false; + if (ao_file_read_flags (fd, &has_transaction) < 0) + { + error = -1; + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + if (has_transaction) + { + if (ao_file_lock (fd, 24, 8, true) < 0) + { + error = -1; + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + size_t true_offset = 0; + if (ao_file_read_offset (fd, 1, &true_offset) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + *top = true_offset; + if (ao_file_write_offset (fd, 0, true_offset) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + if (fsync (fd) == -1) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + if (ao_file_write_flags (fd, false) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + if (fsync (fd) == -1) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + ao_file_unlock (fd, 24, 8); + } + else + { + size_t true_offset = 0; + if (ao_file_read_offset (fd, 0, &true_offset) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + *top = true_offset; + if (ao_file_write_offset (fd, 1, true_offset) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); + goto cleanup; + } + } +cleanup: + return error; +} + +static int +ao_file_try_lock_for_reading (int fd) +{ + /* Return 0 on success, -1 on error, -2 if it should be retried. */ + int error = 0; + if (ao_file_lock (fd, 24, 8, false) < 0) + { + error = -1; + goto cleanup; + } + bool has_transaction = false; + if (ao_file_read_flags (fd, &has_transaction) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + goto cleanup; + } + if (has_transaction) + { + ao_file_unlock (fd, 24, 8); + size_t offset; + if (ao_file_lock_for_writing (fd, &offset) < 0) + { + error = -1; + goto cleanup; + } + /* The second offset is still locked. */ + ao_file_unlock (fd, 32, 8); + error = -2; + } +cleanup: + return error; +} + +static int +ao_file_lock_for_reading (int fd) +{ + int error = 0; + while ((error = ao_file_try_lock_for_reading (fd)) == -2) + ; + return error; +} + +static int +ao_file_prepare (int fd, const string_desc_t file_magic) +{ + int error = 0; + if (file_magic._nbytes != 16) + { + error = -1; + goto cleanup; + } + if (ao_file_write_magic (fd, file_magic) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_write_flags (fd, false) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_write_offset (fd, 0, 0) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_write_offset (fd, 1, 0) < 0) + { + error = -1; + goto cleanup; + } +cleanup: + return error; +} + +static int +ao_file_read_top (int fd, size_t *top) +{ + /* If called during a transaction, ao_file_lock_for_reading will + recursively lock the second offset, but since it is a process + lock, everything will be OK. */ + int error = 0; + if (ao_file_lock_for_reading (fd) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_read_offset (fd, 0, top) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + goto cleanup; + } + ao_file_unlock (fd, 24, 8); +cleanup: + return error; +} + +static int +ao_file_read (int fd, size_t offset, string_desc_t data) +{ + int error = 0; + assert (offset >= data._nbytes); + if (lseek (fd, offset + 40 - data._nbytes, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } + ssize_t n_read = 0; + while (n_read < data._nbytes) + { + n_read = read (fd, data._data, data._nbytes); + if (n_read <= 0) + { + error = -1; + goto cleanup; + } + assert (n_read <= data._nbytes); + data._data += n_read; + data._nbytes -= n_read; + } +cleanup: + return error; +} + +static int +ao_file_commit_transaction (int fd) +{ + int error = 0; + if (fsync (fd) == -1) + { + error = -1; + goto cleanup; + } + if (ao_file_write_flags (fd, true) < 0) + { + error = -1; + goto cleanup; + } + if (fsync (fd) == -1) + { + error = -1; + goto cleanup; + } + size_t actual_top; + if (ao_file_read_offset (fd, 1, &actual_top) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_lock (fd, 24, 8, true) < 0) + { + error = -1; + goto cleanup; + } + if (ao_file_write_offset (fd, 0, actual_top) < 0) + { + error = -1; + ao_file_unlock (fd, 24, 8); + goto cleanup; + } + if (fsync (fd) == -1) + { + error = -1; + ao_file_unlock (fd, 24, 8); + goto cleanup; + } + if (ao_file_write_flags (fd, false) < 0) + { + error = -1; + goto cleanup; + } + ao_file_unlock (fd, 24, 8); + ao_file_unlock (fd, 32, 8); +cleanup: + return error; +} + +static void +ao_file_abort_transaction (int fd) +{ + ao_file_unlock (fd, 32, 8); +} + +static int +ao_file_push_data (int fd, const string_desc_t data, size_t *offset) +{ + int error = 0; + if (ao_file_read_offset (fd, 1, offset) < 0) + { + error = -1; + goto cleanup; + } + if (lseek (fd, *offset + 40, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } + size_t n_to_write = data._nbytes; + const char *to_write = data._data; + ssize_t n_written = 0; + while (n_written < n_to_write) + { + n_written = write (fd, to_write, n_to_write); + if (n_written == -1) + { + error = -1; + goto cleanup; + } + assert (n_written <= n_to_write); + to_write += n_written; + n_to_write -= n_written; + *offset += n_written; + } + if (ao_file_write_offset (fd, 1, *offset) < 0) + { + error = -1; + goto cleanup; + } + if (lseek (fd, *offset + 40, SEEK_SET) == -1) + { + error = -1; + goto cleanup; + } +cleanup: + return error; +} + +#endif /* not DISFLUID_APPEND_ONLY_FILE_INCLUDED */ diff --git a/src/libdisfluid/disfluid-tests.h b/src/libdisfluid/disfluid-tests.h index e77b497..50b5a9f 100644 --- a/src/libdisfluid/disfluid-tests.h +++ b/src/libdisfluid/disfluid-tests.h @@ -14,6 +14,7 @@ static inline char *run_tests (size_t *n_tests, size_t *n_errors); # include "disfluid-cache-entry-key.h" # include "disfluid-db.h" # include "string-desc.h" +# include "disfluid-append-only-file.h" # define BYTES * 1 @@ -527,6 +528,135 @@ START_TEST (test_db_trie_leaf) END_TEST /* *INDENT-ON* */ +/* *INDENT-OFF* */ +START_TEST (test_aof_recover) +/* *INDENT-ON* */ + +{ + /* Here we have a partially updated file. The top was offset 5, but + is in the process of being updated to 13. In the mean time, the + first offset is garbage. */ + static const uint8_t partial_update[] = { + /* Magic: */ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + /* Status: */ + 1, 42, 42, 42, 42, 42, 42, 42, + /* Top before: */ + 42, 42, 42, 42, 42, 42, 42, 42, + /* Top after: */ + 0, 0, 0, 0, 0, 0, 0, 13, + /* Data: */ + 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33 + }; + char filename[] = "/tmp/test-partial-update-XXXXXX"; + int file = mkstemp (filename); + ck_assert_int_ge (file, 0); + size_t n_to_write = sizeof (partial_update); + const uint8_t *to_write = partial_update; + while (n_to_write > 0) + { + ssize_t n_written = write (file, to_write, n_to_write); + ck_assert_int_gt (n_written, 0); + ck_assert_int_le (n_written, n_to_write); + n_to_write -= n_written; + to_write += n_written; + } + size_t top = 42; + int error = ao_file_read_top (file, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 13); + if (lseek (file, 0, SEEK_SET) == -1) + { + ck_assert (false); + } + static const uint8_t after_update_expected[] = { + /* Magic: */ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + /* Status: */ + 0, 42, 42, 42, 42, 42, 42, 42, + /* Top before: */ + 0, 0, 0, 0, 0, 0, 0, 13, + /* Top after: */ + 0, 0, 0, 0, 0, 0, 0, 13, + /* Data: */ + 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33 + }; + uint8_t actual_data[sizeof (after_update_expected)] = { 0 }; + size_t n_to_read = sizeof (after_update_expected); + uint8_t *read_ptr = actual_data; + while (n_to_read > 0) + { + ssize_t n_read = read (file, read_ptr, n_to_read); + ck_assert_int_gt (n_read, 0); + ck_assert_int_le (n_read, n_to_read); + n_to_read -= n_read; + read_ptr += n_read; + } + for (size_t i = 0; i < sizeof (after_update_expected); i++) + { + ck_assert_int_eq (actual_data[i], after_update_expected[i]); + } + remove (filename); + close (file); +} +/* *INDENT-OFF* */ +END_TEST +/* *INDENT-ON* */ + +/* *INDENT-OFF* */ +START_TEST (test_aof_can_read_locked_file) +/* *INDENT-ON* */ + +{ + /* Here we construct a file with "Hello", and lock it. We should + still be able to call ao_file_read_top. */ + char filename[] = "/tmp/test-lock-read-XXXXXX"; + int file = mkstemp (filename); + ck_assert_int_ge (file, 0); + static const char *magic_data = "disfluid test ao"; + const string_desc_t file_magic = { + ._data = (char *) magic_data, + ._nbytes = strlen (magic_data) + }; + int error = ao_file_prepare (file, file_magic); + ck_assert_int_eq (error, 0); + size_t top; + error = ao_file_lock_for_writing (file, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 0); + static const char *hello_data = "Hello"; + const string_desc_t hello = { + ._data = (char *) hello_data, + ._nbytes = strlen (hello_data) + }; + error = ao_file_push_data (file, hello, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 5); + error = ao_file_commit_transaction (file); + ck_assert_int_eq (error, 0); + static const char *world_data = ", world!"; + const string_desc_t world = { + ._data = (char *) world_data, + ._nbytes = strlen (world_data) + }; + error = ao_file_lock_for_writing (file, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 5); + error = ao_file_push_data (file, world, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 13); + /* file is still locked. */ + error = ao_file_read_top (file, &top); + ck_assert_int_eq (error, 0); + ck_assert_int_eq (top, 5); + ao_file_abort_transaction (file); + remove (filename); + close (file); +} +/* *INDENT-OFF* */ +END_TEST +/* *INDENT-ON* */ + static inline char * tests_read_whole_file (int file) { @@ -581,6 +711,10 @@ run_tests (size_t *n_tests, size_t *n_errors) tcase_add_test (db, test_db_trie_inner); tcase_add_test (db, test_db_trie_leaf); suite_add_tcase (suite, db); + TCase *aof = tcase_create (_("append-only file")); + tcase_add_test (aof, test_aof_recover); + tcase_add_test (aof, test_aof_can_read_locked_file); + suite_add_tcase (suite, aof); SRunner *runner = srunner_create (suite); char log_file_name[] = "/tmp/disfluid-unit-tests-XXXXXX"; int log_file = mkstemp (log_file_name); |