diff options
author | Sage Weil <sage@inktank.com> | 2013-03-25 08:47:40 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-01 21:17:09 -0700 |
commit | 3a23083bda56850a1dc0e1c6d270b1f5dc789f07 (patch) | |
tree | ddd52f6a7c8da4334d68ae3e28825d89155d29a0 /net | |
parent | 022f3e2ee2354599faccf5a764a5a24a5dd194c9 (diff) |
libceph: implement RECONNECT_SEQ feature
This is an old protocol extension that allows the client and server to
avoid resending old messages after a reconnect (following a socket error).
Instead, the exchange their sequence numbers during the handshake. This
avoids sending a bunch of useless data over the socket.
It has been supported in the server code since v0.22 (Sep 2010).
Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Alex Elder <elder@inktank.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/messenger.c | 43 |
1 files changed, 38 insertions, 5 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 997daccf973a..e8491db43f5e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1247,6 +1247,24 @@ static void prepare_write_ack(struct ceph_connection *con) } /* + * Prepare to share the seq during handshake + */ +static void prepare_write_seq(struct ceph_connection *con) +{ + dout("prepare_write_seq %p %llu -> %llu\n", con, + con->in_seq_acked, con->in_seq); + con->in_seq_acked = con->in_seq; + + con_out_kvec_reset(con); + + con->out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); + + con_flag_set(con, CON_FLAG_WRITE_PENDING); +} + +/* * Prepare to write keepalive byte. */ static void prepare_write_keepalive(struct ceph_connection *con) @@ -1582,6 +1600,13 @@ static void prepare_read_ack(struct ceph_connection *con) con->in_base_pos = 0; } +static void prepare_read_seq(struct ceph_connection *con) +{ + dout("prepare_read_seq %p\n", con); + con->in_base_pos = 0; + con->in_tag = CEPH_MSGR_TAG_SEQ; +} + static void prepare_read_tag(struct ceph_connection *con) { dout("prepare_read_tag %p\n", con); @@ -2059,6 +2084,7 @@ static int process_connect(struct ceph_connection *con) prepare_read_connect(con); break; + case CEPH_MSGR_TAG_SEQ: case CEPH_MSGR_TAG_READY: if (req_feat & ~server_feat) { pr_err("%s%lld %s protocol feature mismatch," @@ -2089,7 +2115,12 @@ static int process_connect(struct ceph_connection *con) con->delay = 0; /* reset backoff memory */ - prepare_read_tag(con); + if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { + prepare_write_seq(con); + prepare_read_seq(con); + } else { + prepare_read_tag(con); + } break; case CEPH_MSGR_TAG_WAIT: @@ -2123,7 +2154,6 @@ static int read_partial_ack(struct ceph_connection *con) return read_partial(con, end, size, &con->in_temp_ack); } - /* * We can finally discard anything that's been acked. */ @@ -2148,8 +2178,6 @@ static void process_ack(struct ceph_connection *con) } - - static int read_partial_message_section(struct ceph_connection *con, struct kvec *section, unsigned int sec_len, u32 *crc) @@ -2672,7 +2700,12 @@ more: prepare_read_tag(con); goto more; } - if (con->in_tag == CEPH_MSGR_TAG_ACK) { + if (con->in_tag == CEPH_MSGR_TAG_ACK || + con->in_tag == CEPH_MSGR_TAG_SEQ) { + /* + * the final handshake seq exchange is semantically + * equivalent to an ACK + */ ret = read_partial_ack(con); if (ret <= 0) goto out; |