diff options
Diffstat (limited to 'net/ceph/messenger.c')
| -rw-r--r-- | net/ceph/messenger.c | 71 | 
1 files changed, 54 insertions, 17 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 35b36b86d76..05f357828a2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -336,7 +336,6 @@ static void reset_connection(struct ceph_connection *con)  		ceph_msg_put(con->out_msg);  		con->out_msg = NULL;  	} -	con->out_keepalive_pending = false;  	con->in_seq = 0;  	con->in_seq_acked = 0;  } @@ -1248,8 +1247,6 @@ static int process_connect(struct ceph_connection *con)  		     con->auth_retry);  		if (con->auth_retry == 2) {  			con->error_msg = "connect authorization failure"; -			reset_connection(con); -			set_bit(CLOSED, &con->state);  			return -1;  		}  		con->auth_retry = 1; @@ -1715,14 +1712,6 @@ more:  	/* open the socket first? */  	if (con->sock == NULL) { -		/* -		 * if we were STANDBY and are reconnecting _this_ -		 * connection, bump connect_seq now.  Always bump -		 * global_seq. -		 */ -		if (test_and_clear_bit(STANDBY, &con->state)) -			con->connect_seq++; -  		prepare_write_banner(msgr, con);  		prepare_write_connect(msgr, con, 1);  		prepare_read_banner(con); @@ -1951,7 +1940,24 @@ static void con_work(struct work_struct *work)  						   work.work);  	mutex_lock(&con->mutex); +	if (test_and_clear_bit(BACKOFF, &con->state)) { +		dout("con_work %p backing off\n", con); +		if (queue_delayed_work(ceph_msgr_wq, &con->work, +				       round_jiffies_relative(con->delay))) { +			dout("con_work %p backoff %lu\n", con, con->delay); +			mutex_unlock(&con->mutex); +			return; +		} else { +			con->ops->put(con); +			dout("con_work %p FAILED to back off %lu\n", con, +			     con->delay); +		} +	} +	if (test_bit(STANDBY, &con->state)) { +		dout("con_work %p STANDBY\n", con); +		goto done; +	}  	if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */  		dout("con_work CLOSED\n");  		con_close_socket(con); @@ -2008,10 +2014,12 @@ static void ceph_fault(struct ceph_connection *con)  	/* Requeue anything that hasn't been acked */  	list_splice_init(&con->out_sent, &con->out_queue); -	/* If there are no messages in the queue, place the connection -	 * in a STANDBY state (i.e., don't try to reconnect just yet). */ -	if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { -		dout("fault setting STANDBY\n"); +	/* If there are no messages queued or keepalive pending, place +	 * the connection in a STANDBY state */ +	if (list_empty(&con->out_queue) && +	    !test_bit(KEEPALIVE_PENDING, &con->state)) { +		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); +		clear_bit(WRITE_PENDING, &con->state);  		set_bit(STANDBY, &con->state);  	} else {  		/* retry after a delay. */ @@ -2019,11 +2027,24 @@ static void ceph_fault(struct ceph_connection *con)  			con->delay = BASE_DELAY_INTERVAL;  		else if (con->delay < MAX_DELAY_INTERVAL)  			con->delay *= 2; -		dout("fault queueing %p delay %lu\n", con, con->delay);  		con->ops->get(con);  		if (queue_delayed_work(ceph_msgr_wq, &con->work, -				       round_jiffies_relative(con->delay)) == 0) +				       round_jiffies_relative(con->delay))) { +			dout("fault queued %p delay %lu\n", con, con->delay); +		} else {  			con->ops->put(con); +			dout("fault failed to queue %p delay %lu, backoff\n", +			     con, con->delay); +			/* +			 * In many cases we see a socket state change +			 * while con_work is running and end up +			 * queuing (non-delayed) work, such that we +			 * can't backoff with a delay.  Set a flag so +			 * that when con_work restarts we schedule the +			 * delay then. +			 */ +			set_bit(BACKOFF, &con->state); +		}  	}  out_unlock: @@ -2094,6 +2115,19 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)  }  EXPORT_SYMBOL(ceph_messenger_destroy); +static void clear_standby(struct ceph_connection *con) +{ +	/* come back from STANDBY? */ +	if (test_and_clear_bit(STANDBY, &con->state)) { +		mutex_lock(&con->mutex); +		dout("clear_standby %p and ++connect_seq\n", con); +		con->connect_seq++; +		WARN_ON(test_bit(WRITE_PENDING, &con->state)); +		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); +		mutex_unlock(&con->mutex); +	} +} +  /*   * Queue up an outgoing message on the given connection.   */ @@ -2126,6 +2160,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)  	/* if there wasn't anything waiting to send before, queue  	 * new work */ +	clear_standby(con);  	if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)  		queue_con(con);  } @@ -2191,6 +2226,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)   */  void ceph_con_keepalive(struct ceph_connection *con)  { +	dout("con_keepalive %p\n", con); +	clear_standby(con);  	if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&  	    test_and_set_bit(WRITE_PENDING, &con->state) == 0)  		queue_con(con);  |