Projects 2, 3, and 4: Distributed Key-Value Database
Achieve consistent replication in a hostile network
Project 3 is due at 11:59pm on Tuesday, December 3, 2024.
Project 4 is due at 11:59pm on Tuesday, December 10, 2024.
Slip days can NOT be used for Project 4. All projects must be turned in NLT 11:59pm on Tuesday, December 10, 2024.
Description
In these projects, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value datastore is a very simple type of database that supports two API calls from clients: put(key, value)
and get(key)
. The former API allows a client application to store a key-value pair in the database, while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached, Redis, DynamoDB, etc.
Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain consensus among the replicas.
Your datastore will be tested for both correctness and performance. We will provide a simulator for your datastore that will simulate clients who execute put()
and get()
commands, as well as an unreliable network that can drop packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets will result in a higher score), while another part will depend on the speed at which your datastore answers client queries (i.e., what is the query latency).
Your Program
For these projects, you will submit source code for one program named 4730kvstore
that implements your replicated datastore. You may use any language of your choice, and we will give you basic starter code in Python. Keep in mind that you are writing a program that will be run multiple times, in parallel, to form a distributed system.
If you use C or any other compiled language, your executable should be named 4730kvstore
. If you use an interpreted language, your script must be called 4730kvstore
, must include an appropriate shebang, and be marked as executable. If you use a virtual machine-based language (like Java or C#), you may need to write a brief Bash shell script, named 4730kvstore
, that conforms to the input syntax given below and then launches your program using whatever incantations are necessary. For example, if you write your solution in Java, your Bash script might resemble
#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 4730kvstore.jar $args';
Or, if you use python, your script might start with
#!/usr/bin/env python3
import foo from bar
and should be marked executable.
Language and Libraries
You can write your code in whatever language you choose, as long as your code compiles and runs on Gradescope. Do not use libraries that are disallowed for these projects. Similarly, your code must compile and run on the command line. You may use IDEs (e.g., Eclipse) during development, but do not submit to Gradescope without a Makefile. Make sure you code has no dependencies on your IDE. We provide starter code in Python; you are welcome to use this, but if you decide use another language, you will need to port the starter code to your language yourself.
You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft, Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc. You may use libraries or modules that implement local database storage (e.g., SQLite, BerkeleyDB, LevelDB) if you want to use them for persistent storage within each replica. If you have any questions about whether a particular library or module is allowed, post on Piazza.
Starter Code
Very basic starter code for the assignment in python is available on the Khoury GitHub server. The included starter code and simulator and written in Python 3, and should be able to run on any system with a relatively modern version of Python installed (e.g., Python 3.6 or greater). To get started, you should create a copy of the starter code on your local machine (or on the Khoury Linux machines if you wish):
git clone git@github.khoury.northeastern.edu:cs4730/raft-starter-code.git
The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a “no-op” message once every second. You may use this code as a basis for your development if you wish, but it is strongly recommended that you do not do so unless you are comfortable with Python.
Testing Your Code
To evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command line arguments, route messages between the datastore replicas, and generate requests from clients. You will not need to modify the simulator, although you may look at its source code if you wish.
The capabilities of the run
script are as follows:
$ ./run --help
usage: run [-h] [--replica REPLICA] [--silence] [--config_directory CONFIG_DIRECTORY] test
positional arguments:
test Path to a test config file, or "all" to run all tests.
optional arguments:
-h, --help show this help message and exit
--replica REPLICA, -r REPLICA
Fully qualified path to your replica program (Default: ./4730kvstore)
--silence, -s Pipe stdout and stderr of replicas to /dev/null. (Default: False)
--config_directory CONFIG_DIRECTORY, -c CONFIG_DIRECTORY
Path to a directory containing test configs. Only used when running "all" tests. (Default: ./configs/)
By default, the simulator expects your replica program to be named 4730kvstore
and for it to be in the same directory as the simulator. This default can be overridden using the --replica
command line argument, which we have provided as a convenience to aid you in testing. Note that when we grade, your replica program must be named 4730kvstore
and we will not use the --replica
option.
To execute a single test in the simulator, run run
with a single command line argument: the path to the desired JSON test configuration.
$ ./run <path-to-test>
As a convenience, if you want the simulator to run all test cases the last argument may be replaced with “all” (no quotes). In this case, the simulator will assume that the test configuration files are available in a folder named ./configs/
, although this default may be overridden using the option command line parameter --config\_directory
.
$ ./run all
The simulator has one additional, optional command line parameters. The simulator executes copies of your replica during each test. If you want to suppress the console output of your replicas, i.e., what they print to STDOUT and STDERR, pass the --silence
option to the simulator.
Config File Format
The configuration file(s) that you pass to run
contains a number of parameters that control the simulation. We have provided a number of test configurations in the ./configs/
folder. Your replicas do not need to read, modify, or parse these configurations; all configuration is handled by the simulator.
Each configuration file is formatted in JSON and has the following elements
-
lifetime
(Required): The number of seconds the simulation should run for. Must be at least 5. -
replicas
(Required): The number of replicas to execute, i.e., copies of your 4730kvstore program. Must be at least 3. -
requests
(Required): The number of get() and put() requests to randomly generate from clients. -
mix
(Optional): Float between 0 and 1 representing the fraction of client queries that areget()
s. Defaults to 0.8. -
wait
(Optional): The number of seconds to wait before sending any client requests. Defaults to 2 seconds. -
end_wait
(Optional) :The number of seconds to wait at the end of the simulation before measuring performance. Defaults to 2 seconds. -
seed
(Optional): The random seed to choose. If not specified, a random value is chosen. Setting this value will allow for a semi-reproducible set of clients and requests. -
drops
(Optional): Float between 0 and 1 representing the fraction of messages between replicas to drop. Defaults to 0. -
events
(Optional): A list of events that will occur during the simulation. Each event has a type and a time when it will trigger:-
type
(Required): The type of event. Valid types are:kill_non_leader
: will crash fail a random non-leader replicakill_leader
: will crash fail the current leader.part_easy
: partition the network, such that the leader has a quorumpart_hard
: partition the network, such that the leader does not have a quorumpart_end
: remove all network partitions
-
time
(Required): The timestamp, in seconds, when the event should occur.
-
-
tests
(Required): Information about how to test the replicas for performance and correctness:-
maximum_get_fail_fraction
(Optional): Float between 0 and 1 specifying the maximum fraction of get() requests that may fail. Defaults to 0.5. -
maximum_put_fail_fraction
(Optional): Float between 0 and 1 specifying the maximum fraction of put() requests that may fail. Defaults to 0.5. -
maximum_get_generation_fail_fraction
(Optional): Float between 0 and 1 specifying the maximum fraction of get() requests that may fail to be generated (due to insufficient put() requests succeeding). Defaults to 0.1. -
maximum_appends_batched_fraction
(Optional): Float between 0 and 1 specifying the maximum fraction of append messages that may be batched together. Defaults to 0.5. -
benchmarks
(Required): Sets thresholds across four categories for assessing the performance of the replicas. In each category, three thresholds are specified, which separate the extra credit, full credit, partial credit, and no credit performance ranges:total_msgs
: how many total messages were sent between the replicas (lower is better)?failures
: how many fail messages were sent to clients AND how many client queries were unanswered (lower is better)?duplicates
: how many duplicate responses were returned to clients (e.g., the same get() or put() was answered more than once) (lower is better)?median_latency
: median latency of answering client requests (lower is better)
-
For example, a simple configuration with no events and a read-heavy workload might look like the following
{
"lifetime": 30,
"replicas": 5,
"requests": 500,
"mix": 0.9,
"tests" : {
"benchmarks" : {
"total_msgs" : [1200, 3000, 5000],
"failures" : [0, 1, 2],
"duplicates" : [0, 2, 5],
"median_latency" : [0.0004, 0.002, 0.05]
}
}
}
and a more complex configuration with events and a lossy network might be
{
"lifetime": 30,
"replicas": 5,
"requests": 300,
"mix" : 0.2,
"drops" : 0.15,
"end_wait" : 5,
"events" : [{"type": "kill_leader", "time": 8},
{"type": "kill_leader", "time": 16}],
"tests" : {
"benchmarks" : {
"total_msgs" : [1000, 3000, 4000],
"failures" : [1, 10, 100],
"duplicates" : [0, 2, 10],
"median_latency" : [0.00015, 0.005, 0.05]
}
}
}
run Output, Single Test Configuration
The run
script will output any errors it encounters during the simulation, including malformed messages, messages to unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, run prints (1) some statistics about your datastore’s performance and behavior, (2) whether your datastore passed the correctness checks, and if not, why not, (3) how your datastore fared on the performance benchmarks. Note that performance is assessed only if the datastore passes the correctness checks.
Here is an example of the simulator’s output when a datastore fails the correctness checks:
$ ./run config.json
...
# Simulation Finished
## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 33/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 7
## Correctness Checks
Error: >0 incorrect responses to get()
Error: insufficient get() requests answered (33 > 60 * 0.50)
## Correctness Checks Failed, Skipping Performance Tests
Ideally, you would like all get()
and put()
requests to succeed without failing and for them to have low latency. Obviously, if your system is returning incorrect values to get()
requests then your datastore has consistency issues. Furthermore, you would like the total number of packets to be as low as possible, i.e., the overhead of your datastore on the network should be low.
Here is another example when the correctness checks pass; notice the performance results are now printed:
$ ./run config.json
...
# Simulation Finished
## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 0/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 0
## Correctness Checks
All correctness tests passed
## Performance Tests
## <test metric>: <your score> <benchmark score>, <test result>
Total Messages Between Replicas: 6370 >= 1000, Failed
Total Failures and Unanswered Requests: 49 < 60, Passed
Duplicate Responses to Clients: 3 < 4, Partial credit, needs improvement
Median Response Latency to Clients: 0.0001 < 0.0002, Bonus!
In this case, the performance results of the datastore are mixed. This implementation has extremely low median latency and is earning bonus point, and the number of failures/unanswered requests is acceptable, but the datastore could be improved by sending fewer duplicate requests and many fewer messages overall.
run Output, All Test Configurations
Additionally, run can be run in “all” mode, which tests your replica with all available test cases. “all” mode is equivalent to what we will use when we grade your submission. If your replica fails when the simulator is run in “all” mode, you can be assured that your replica will fare poorly when run under the grading script. To run the simulator in “all” mode, simply execute:
$ ./run all
Basic tests (5 replicas, 30 seconds, 100 requests):
No drops, no failures, 80% read [PASS] Performance Tiers: 3 1 2 0
No drops, no failures, 60% read [PASS] Performance Tiers: 2 1 2 0
No drops, no failures, 40% read [PASS] Performance Tiers: 2 1 1 0
No drops, no failures, 20% read [PASS] Performance Tiers: 3 2 2 1
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
10% drops, no failures, 80% read [FAIL]
...
This will run your replica on a number of test configurations, and will output whether your program performs sufficiently in each case. Note that the performance information is only printed if your replica passes the correctness checks for a given test. The performance tier numbers correspond to Bonus (0), Passed (1), Needs Improvement (2), and Failed (3) with respect to total messages, failures/unanswered, duplicates, and median latency, respectively.
Message Format
To simplify development, instead of using real packet formats, we will be sending our data across the wire in JSON (many languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages must be encoded as a dictionary and they must include the following four keys (at a minimum):
src
: The ID of the source of the message.dst
: The ID of the destination of the message.leader
: The ID of the leader, or"FFFF"
if the leader’s ID is unknown.type
: The type of the message.
The simulator uses src
and dst
instead of IP addresses in order to route and deliver messages. Furthermore, the simulator supports multicast: if dst
is set to "FFFF"
, the message will be delivered to all replicas (use multicast sparingly, since it is expensive). leader
is the ID of the replica that the sender of the message believes is the leader. All messages must include the leader
so that the simulator can learn which replica is the leader (otherwise, the simulator would have no way of determining this information).
type
describes the type of the message. You may define custom types in order to implement your datastore (and
you may add custom keys to the message dictionary in these cases). However, there are several message types that your
replicas must support in order to handle requests from clients.
-
get
:get()
messages are read requests from clients. They have the following format:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "get", "MID": "<a unique string>", "key": "<some key>"}
Your replicas may respond with an
ok
message which include the corresponding value:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>", "value": "<value of the key>"}
Or your replicas may respond with a
fail
message, in which case the client should retry theget()
:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
If the client issues a
get()
for a key that has does not exist (i.e., it was neverput()
), your datastore should return anok
message with an empty value (i.e., an empty string). -
put
:put()
messages are write requests from clients. They have the following format:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "put", "MID": "<a unique string>", "key": "<some key>", "value": "<value of the key>"}
Your replicas may respond with an
ok
message if the write was successful:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>"}
Or your replicas may respond with a
fail
message, in which case the client should retry theput()
:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
-
redirect
: If the client sends any message (get()
orput()
) to a replica that is not the leader, it should respond with a redirect:{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "redirect", "MID": "<a unique string>"}
In this case, the client will retry the request by sending it to the specified leader.
Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following would be a legal series of requests and responses, where 001A
is a client and 0000
and 0001
are replicas:
Request 1 {"src": "001A", "dst": "0001", "leader": "FFFF",
"type": "get", "MID": "4D61ACF83027", "key": "name"}
Response 1 {"src": "0001", "dst": "001A", "leader": "0000",
"type": "redirect", "MID": "4D61ACF83027"}
Request 2 {"src": "001A", "dst": "0000", "leader": "0000",
"type": "get", "MID": "9AB4CE50023", "key": "name"}
Response 2 {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
"MID": "9AB4CE50023", "value": "Alden Jackson"}
Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long as your messages include the four minimum required fields (src
, dst
, leader
, type
) the simulator will ensure that your messages are delivered.
Command Line Specification
The command line syntax for your 4730kvstore
program is given below. The simulator will pass parameters to each replica representing (1) the UDP port number the replica should connect to, (2) the ID of the replica, and (3) the IDs of all other replicas in the system. The syntax for launching your datastore is therefore:
./4730kvstore <UDP port> <your ID> <ID of second replica> [<ID of third replica> ...]]
The UDP port
is the port number on the localhost that you should send UDP packets to in order to communicate with your replicas. For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1
or F29A
). You will use these IDs as the src
and dst
in your messages. Clients will also be assigned unique IDs by the simulator.
Connecting to the LAN
You will be using local UDP sockets to emulate a LAN. Each of your replicas will connect to a single UDP socket (the way a server would connect to a single Ethernet cable). You do not need to be intimately familiar with how UDP sockets work, but essentially they are objects that you can read or write. However, rather than sending and receiving packets over the internet, the packets are instead passed between programs on the local machine. In other words, this is how your program will send and receive data from our simulator, which is just another program running locally on the machine. You should constantly be reading from your sockets to make sure you receive all messages (they will be buffered if you don’t read immediately).
In order to let the simulator know your replica is alive, upon startup you should send a special hello
message to the broadcast address ("FFFF"
). The starter code does this:
```
{"src": "<ID>", "dst": "FFFF", "leader": "FFFF", "type": "hello", "MID": "<a unique string>"}
```
We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded or asynchronous model, but expect it to be significantly more difficult to debug.
Datastore Requirements and Assumptions
The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:
- Consistency - clients should always receive correct answers to get() requests.
- Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e., your system should execute requests quickly).
Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify the projects, there are several things you do not need to implement:
- True persistence - you do not need to write client updates to disk, or worry about committing data to permanent storage. All of the data from clients and the log of updates can live in memory.
- Garbage collection - Raft maintains a log of all updates. In a real system, this log periodically needs to be garbage collected, since it cannot grow infinitely long. However, your system will not be running for long periods of time, and therefor you do not need to worry about garbage collection.
- Restarts - in a real system, replicas might fail for a while then come back online, necessitating snapshots and reconciliation. However, you may assume that replicas in the simulator will crash fail, i.e., they will die completely and never return.
Implementing Raft
The Raft paper is specifically designed to be easy to read. To implement the protocol you should definitely start by reading the paper. Additional papers and resources are available on the Raft Github. I suggest the following series of steps to begin working on your datastore implementation:
- Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests with a “type”: “fail” message.
- Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and put() requests with “type”: “redirect” messages.
- Add a timeout to detect leader failures (i.e., if you don’t hear from the leader in X milliseconds…) and make sure that the new election proceeds correctly.
- Implement a basic, empty version of the AppendEntries RPC call that doesn’t replicate any data, but acts as a keepalive message from the leader to other replicas to prevent unnecessary elections.
- Implement the transaction log and the “state machine” (i.e., a dictionary containing the key/value pairs from clients, Section 5.3). Don’t bother replicating the transactions, just ensure that the leader is able to correctly answer get() and put() requests.
- Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing when a quorum is in agreement.
- Add support for retrying failed commits and test it by experimenting with lossy network simulations.
- If you haven’t already, modify the leader election to support the additional restrictions in Section 5.4.1; test your implementation on lossy networks with failed leaders.
- Implement the subtle commit restriction given in Section 5.4.2.
- Improve your AppendEntries RPC call to implement batching, i.e., a single AppendEntries may send multiple outstanding log entries to a given replica.
- Test, test, test, and test some more ;)
Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm. Implementing steps 7-10 are necessary to ensure correctness of the protocol, but shouldn’t be too difficult.
Grading
The grading in this project will be broken down as follows:
Item | P2 | P3 | P4 |
---|---|---|---|
Program correctness: | 85 | 85 | 60 |
Performance: | 20 | ||
Style and documentation: | 15 | 15 | 15 |
The final grading in this project will be based on the number of test configurations that your replica successfully completes. More weight will be given to more difficult configurations. At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above. All student code will be scanned by plagiarism detection software to ensure that students are not copying code from the internet or each other.
Project Submissions
This is a very challenging series of projects. In order to ensure that you are making sufficient progress, each project increases the functionality of the distributed system.
Project 2
For Project 2, your 4730kvstore
program needs to pass three of the test cases: simple-1
, simple-2
, and crash-1
. Only correctness will be evaluated.
You should submit on Gradescope to the Project 2 project. Be sure to indicate who your teammate is, otherwise, they will not get any credit!
Project 3
For Project 3, your 4730kvstore
program will be able to pass all of the non-performance test cases, which is every test except the advanced-*
tests. As for Project 2, only correctness will be evaluated.
You should submit your Project 3 on Gradescope to the Project 3 project. Be sure to indicate who your teammate is, otherwise, they will not get any credit!
Final project: Project 4
For Project 4, your 4730kvstore
program will be able to pass all the test cases, including the advanced-*
tests. Correctness and performance will be evaluated for a subset of the tests.
You should submit your (thoroughly documented) code, a Makefile
, and a plain-text (no Word or PDF) README.md
file. In this file, you should describe your high-level approach, the challenges you faced, a list of properties/features of your design that you think is good, and an overview of how you tested your code.
You should submit your final version on Gradescope to the Project 4 project. Be sure to indicate who your teammate is, otherwise, they will not get any credit!