Skip to content

Commit

Permalink
networking.c/writeToClient: handle WSAEWOULDBLOCK
Browse files Browse the repository at this point in the history
- fixed writeToClient() after failed tests
- adjusted tests to match antirez/[email protected]
  • Loading branch information
tporadowski committed Aug 1, 2018
1 parent 155111f commit bdcf80e
Show file tree
Hide file tree
Showing 30 changed files with 767 additions and 128 deletions.
14 changes: 11 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,14 @@ int writeToClient(int fd, client *c, int handler_installed) {
server.el, c, c->buf, NULL);
if (result == SOCKET_ERROR && errno != WSA_IO_PENDING) {
nwritten = -1;

//[tporadowski/#11] we might be bursting data too fast, so turn it into another try that will put back the client
// in the sending queue
if (errno == WSAEWOULDBLOCK) {
serverLog(LL_DEBUG, "writeToClient: will try again (EAGAIN) due to WSAEWOULDBLOCK");
errno = EAGAIN;
}

break;
}
#else
Expand Down Expand Up @@ -1002,7 +1010,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
Expand All @@ -1014,7 +1022,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
}
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
Expand Down Expand Up @@ -1070,7 +1078,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
#endif
return C_OK;
}
}

/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
Expand Down
1 change: 1 addition & 0 deletions tests/assets/default.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Redis configuration for testing.

#always-show-logo yes
notify-keyspace-events KEA
daemonize no
pidfile /var/run/redis.pid
Expand Down
11 changes: 8 additions & 3 deletions tests/cluster/tests/04-resharding.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ array set content {}
set tribpid {}

test "Cluster consistency during live resharding" {
set ele 0
for {set j 0} {$j < $numops} {incr j} {
# Trigger the resharding once we execute half the ops.
if {$tribpid ne {} &&
Expand Down Expand Up @@ -87,7 +88,7 @@ test "Cluster consistency during live resharding" {
# Write random data to random list.
set listid [randomInt $numkeys]
set key "key:$listid"
set ele [randomValue]
incr ele
# We write both with Lua scripts and with plain commands.
# This way we are able to stress Lua -> Redis command invocation
# as well, that has tests to prevent Lua to write into wrong
Expand Down Expand Up @@ -116,7 +117,9 @@ test "Cluster consistency during live resharding" {
test "Verify $numkeys keys for consistency with logical content" {
# Check that the Redis Cluster content matches our logical content.
foreach {key value} [array get content] {
assert {[$cluster lrange $key 0 -1] eq $value}
if {[$cluster lrange $key 0 -1] ne $value} {
fail "Key $key expected to hold '$value' but actual content is [$cluster lrange $key 0 -1]"
}
}
}

Expand All @@ -134,7 +137,9 @@ test "Cluster should eventually be up again" {
test "Verify $numkeys keys after the crash & restart" {
# Check that the Redis Cluster content matches our logical content.
foreach {key value} [array get content] {
assert {[$cluster lrange $key 0 -1] eq $value}
if {[$cluster lrange $key 0 -1] ne $value} {
fail "Key $key expected to hold '$value' but actual content is [$cluster lrange $key 0 -1]"
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions tests/integration/aof.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ tags {"aof"} {
assert_equal 1 [is_alive $srv]
}

test "Truncated AOF loaded: we expect foo to be equal to 6 now" {
assert_equal 1 [is_alive $srv]
set client [redis [dict get $srv host] [dict get $srv port]]

set client [redis [dict get $srv host] [dict get $srv port]]
test "Truncated AOF loaded: we expect foo to be equal to 6 now" {
assert {[$client get foo] eq "6"}
}
}
Expand Down
78 changes: 78 additions & 0 deletions tests/integration/psync2-reg.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Issue 3899 regression test.
# We create a chain of three instances: master -> slave -> slave2
# and continuously break the link while traffic is generated by
# redis-benchmark. At the end we check that the data is the same
# everywhere.

start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
# Config
set debug_msg 0 ; # Enable additional debug messages

set no_exit 0 ; # Do not exit at end of the test

set duration 20 ; # Total test seconds

for {set j 0} {$j < 3} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}

# Setup the replication and backlog parameters
test "PSYNC2 #3899 regression: setup" {
$R(1) slaveof $R_host(0) $R_port(0)
$R(2) slaveof $R_host(0) $R_port(0)
$R(0) set foo bar
wait_for_condition 50 1000 {
[$R(1) dbsize] == 1 && [$R(2) dbsize] == 1
} else {
fail "Slaves not replicating from master"
}
$R(0) config set repl-backlog-size 10mb
$R(1) config set repl-backlog-size 10mb
}

set cycle_start_time [clock milliseconds]
set bench_pid [exec src/redis-benchmark -p $R_port(0) -n 10000000 -r 1000 incr __rand_int__ > /dev/null &]
while 1 {
set elapsed [expr {[clock milliseconds]-$cycle_start_time}]
if {$elapsed > $duration*1000} break
if {rand() < .05} {
test "PSYNC2 #3899 regression: kill first slave" {
$R(1) client kill type master
}
}
if {rand() < .05} {
test "PSYNC2 #3899 regression: kill chained slave" {
$R(2) client kill type master
}
}
after 100
}
exec kill -9 $bench_pid

if {$debug_msg} {
for {set j 0} {$j < 100} {incr j} {
if {
[$R(0) debug digest] == [$R(1) debug digest] &&
[$R(1) debug digest] == [$R(2) debug digest]
} break
puts [$R(0) debug digest]
puts [$R(1) debug digest]
puts [$R(2) debug digest]
after 1000
}
}

test "PSYNC2 #3899 regression: verify consistency" {
wait_for_condition 50 1000 {
([$R(0) debug digest] eq [$R(1) debug digest]) &&
([$R(1) debug digest] eq [$R(2) debug digest])
} else {
fail "The three instances have different data sets"
}
}
}}}
182 changes: 182 additions & 0 deletions tests/integration/psync2.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
set master_id 0 ; # Current master
set start_time [clock seconds] ; # Test start time
set counter_value 0 ; # Current value of the Redis counter "x"

# Config
set debug_msg 0 ; # Enable additional debug messages

set no_exit 0; ; # Do not exit at end of the test

set duration 20 ; # Total test seconds

set genload 1 ; # Load master with writes at every cycle

set genload_time 5000 ; # Writes duration time in ms

set disconnect 1 ; # Break replication link between random
# master and slave instances while the
# master is loaded with writes.

set disconnect_period 1000 ; # Disconnect repl link every N ms.

for {set j 0} {$j < 5} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}

set cycle 1
while {([clock seconds]-$start_time) < $duration} {
test "PSYNC2: --- CYCLE $cycle ---" {
incr cycle
}

# Create a random replication layout.
# Start with switching master (this simulates a failover).

# 1) Select the new master.
set master_id [randomInt 5]
set used [list $master_id]
test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
$R($master_id) slaveof no one
if {$counter_value == 0} {
$R($master_id) set x $counter_value
}
}

# 2) Attach all the slaves to a random instance
while {[llength $used] != 5} {
while 1 {
set slave_id [randomInt 5]
if {[lsearch -exact $used $slave_id] == -1} break
}
set rand [randomInt [llength $used]]
set mid [lindex $used $rand]
set master_host $R_host($mid)
set master_port $R_port($mid)

test "PSYNC2: Set #$slave_id to replicate from #$mid" {
$R($slave_id) slaveof $master_host $master_port
}
lappend used $slave_id
}

# 3) Increment the counter and wait for all the instances
# to converge.
test "PSYNC2: cluster is consistent after failover" {
$R($master_id) incr x; incr counter_value
for {set j 0} {$j < 5} {incr j} {
wait_for_condition 50 1000 {
[$R($j) get x] == $counter_value
} else {
fail "Instance #$j x variable is inconsistent"
}
}
}

# 4) Generate load while breaking the connection of random
# slave-master pairs.
test "PSYNC2: generate load while killing replication links" {
set t [clock milliseconds]
set next_break [expr {$t+$disconnect_period}]
while {[clock milliseconds]-$t < $genload_time} {
if {$genload} {
$R($master_id) incr x; incr counter_value
}
if {[clock milliseconds] == $next_break} {
set next_break \
[expr {[clock milliseconds]+$disconnect_period}]
set slave_id [randomInt 5]
if {$disconnect} {
$R($slave_id) client kill type master
if {$debug_msg} {
puts "+++ Breaking link for slave #$slave_id"
}
}
}
}
}

# 5) Increment the counter and wait for all the instances
set x [$R($master_id) get x]
test "PSYNC2: cluster is consistent after load (x = $x)" {
for {set j 0} {$j < 5} {incr j} {
wait_for_condition 50 1000 {
[$R($j) get x] == $counter_value
} else {
fail "Instance #$j x variable is inconsistent"
}
}
}

# Put down the old master so that it cannot generate more
# replication stream, this way in the next master switch, the time at
# which we move slaves away is not important, each will have full
# history (otherwise PINGs will make certain slaves have more history),
# and sometimes a full resync will be needed.
$R($master_id) slaveof 127.0.0.1 0 ;# We use port zero to make it fail.

if {$debug_msg} {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
}
}

test "PSYNC2: total sum of full synchronizations is exactly 4" {
set sum 0
for {set j 0} {$j < 5} {incr j} {
incr sum [status $R($j) sync_full]
}
assert {$sum == 4}
}
}

test "PSYNC2: Bring the master back again for next test" {
$R($master_id) slaveof no one
set master_host $R_host($master_id)
set master_port $R_port($master_id)
for {set j 0} {$j < 5} {incr j} {
if {$j == $master_id} continue
$R($j) slaveof $master_host $master_port
}

# Wait for slaves to sync
wait_for_condition 50 1000 {
[status $R($master_id) connected_slaves] == 4
} else {
fail "Slave not reconnecting"
}
}

test "PSYNC2: Partial resync after restart using RDB aux fields" {
# Pick a random slave
set slave_id [expr {($master_id+1)%5}]
set sync_count [status $R($master_id) sync_full]
catch {
$R($slave_id) config rewrite
$R($slave_id) debug restart
}
wait_for_condition 50 1000 {
[status $R($master_id) connected_slaves] == 4
} else {
fail "Slave not reconnecting"
}
set new_sync_count [status $R($master_id) sync_full]
assert {$sync_count == $new_sync_count}
}

if {$no_exit} {
while 1 { puts -nonewline .; flush stdout; after 1000}
}

}}}}}
8 changes: 3 additions & 5 deletions tests/integration/rdb.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ proc start_server_and_kill_it {overrides code} {
kill_server $srv
}

if { $::tcl_platform(platform) != "windows" } {
# Make the RDB file unreadable
file attributes [file join $server_path dump.rdb] -permissions 0222

Expand All @@ -67,7 +66,7 @@ if {!$isroot} {
test {Server should not start if RDB file can't be open} {
wait_for_condition 50 100 {
[string match {*Fatal error loading*} \
[exec tail -n1 < [dict get $srv stdout]]]
[exec tail -1 < [dict get $srv stdout]]]
} else {
fail "Server started even if RDB was unreadable!"
}
Expand All @@ -90,11 +89,10 @@ close $fd
start_server_and_kill_it [list "dir" $server_path] {
test {Server should not start if RDB is corrupted} {
wait_for_condition 50 100 {
[string match {*RDB checksum*} \
[exec tail -n10 < [dict get $srv stdout]]]
[string match {*CRC error*} \
[exec tail -10 < [dict get $srv stdout]]]
} else {
fail "Server started even if RDB was corrupted!"
}
}
}
}
Loading

0 comments on commit bdcf80e

Please sign in to comment.