diff options
Diffstat (limited to 'net/ceph/messenger.c')
| -rw-r--r-- | net/ceph/messenger.c | 82 | 
1 files changed, 63 insertions, 19 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e15a82ccc05..78b55f49de7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -76,7 +76,8 @@ const char *ceph_pr_addr(const struct sockaddr_storage *ss)  		break;  	default: -		sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family); +		snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", +			 (int)ss->ss_family);  	}  	return s; @@ -598,7 +599,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)   * Connection negotiation.   */ -static void prepare_connect_authorizer(struct ceph_connection *con) +static int prepare_connect_authorizer(struct ceph_connection *con)  {  	void *auth_buf;  	int auth_len = 0; @@ -612,13 +613,20 @@ static void prepare_connect_authorizer(struct ceph_connection *con)  					 con->auth_retry);  	mutex_lock(&con->mutex); +	if (test_bit(CLOSED, &con->state) || +	    test_bit(OPENING, &con->state)) +		return -EAGAIN; +  	con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);  	con->out_connect.authorizer_len = cpu_to_le32(auth_len); -	con->out_kvec[con->out_kvec_left].iov_base = auth_buf; -	con->out_kvec[con->out_kvec_left].iov_len = auth_len; -	con->out_kvec_left++; -	con->out_kvec_bytes += auth_len; +	if (auth_len) { +		con->out_kvec[con->out_kvec_left].iov_base = auth_buf; +		con->out_kvec[con->out_kvec_left].iov_len = auth_len; +		con->out_kvec_left++; +		con->out_kvec_bytes += auth_len; +	} +	return 0;  }  /* @@ -640,9 +648,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,  	set_bit(WRITE_PENDING, &con->state);  } -static void prepare_write_connect(struct ceph_messenger *msgr, -				  struct ceph_connection *con, -				  int after_banner) +static int prepare_write_connect(struct ceph_messenger *msgr, +				 struct ceph_connection *con, +				 int after_banner)  {  	unsigned global_seq = get_global_seq(con->msgr, 0);  	int proto; @@ -683,7 +691,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,  	con->out_more = 0;  	set_bit(WRITE_PENDING, &con->state); -	prepare_connect_authorizer(con); +	return prepare_connect_authorizer(con);  } @@ -1065,8 +1073,10 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)  	switch (ss->ss_family) {  	case AF_INET:  		((struct sockaddr_in *)ss)->sin_port = htons(p); +		break;  	case AF_INET6:  		((struct sockaddr_in6 *)ss)->sin6_port = htons(p); +		break;  	}  } @@ -1216,6 +1226,7 @@ static int process_connect(struct ceph_connection *con)  	u64 sup_feat = con->msgr->supported_features;  	u64 req_feat = con->msgr->required_features;  	u64 server_feat = le64_to_cpu(con->in_reply.features); +	int ret;  	dout("process_connect on %p tag %d\n", con, (int)con->in_tag); @@ -1250,7 +1261,9 @@ static int process_connect(struct ceph_connection *con)  			return -1;  		}  		con->auth_retry = 1; -		prepare_write_connect(con->msgr, con, 0); +		ret = prepare_write_connect(con->msgr, con, 0); +		if (ret < 0) +			return ret;  		prepare_read_connect(con);  		break; @@ -1277,6 +1290,9 @@ static int process_connect(struct ceph_connection *con)  		if (con->ops->peer_reset)  			con->ops->peer_reset(con);  		mutex_lock(&con->mutex); +		if (test_bit(CLOSED, &con->state) || +		    test_bit(OPENING, &con->state)) +			return -EAGAIN;  		break;  	case CEPH_MSGR_TAG_RETRY_SESSION: @@ -1341,7 +1357,9 @@ static int process_connect(struct ceph_connection *con)  		 * to WAIT.  This shouldn't happen if we are the  		 * client.  		 */ -		pr_err("process_connect peer connecting WAIT\n"); +		pr_err("process_connect got WAIT as client\n"); +		con->error_msg = "protocol error, got WAIT as client"; +		return -1;  	default:  		pr_err("connect protocol error, will retry\n"); @@ -1810,6 +1828,17 @@ static int try_read(struct ceph_connection *con)  more:  	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,  	     con->in_base_pos); + +	/* +	 * process_connect and process_message drop and re-take +	 * con->mutex.  make sure we handle a racing close or reopen. +	 */ +	if (test_bit(CLOSED, &con->state) || +	    test_bit(OPENING, &con->state)) { +		ret = -EAGAIN; +		goto out; +	} +  	if (test_bit(CONNECTING, &con->state)) {  		if (!test_bit(NEGOTIATING, &con->state)) {  			dout("try_read connecting\n"); @@ -1938,8 +1967,10 @@ static void con_work(struct work_struct *work)  {  	struct ceph_connection *con = container_of(work, struct ceph_connection,  						   work.work); +	int ret;  	mutex_lock(&con->mutex); +restart:  	if (test_and_clear_bit(BACKOFF, &con->state)) {  		dout("con_work %p backing off\n", con);  		if (queue_delayed_work(ceph_msgr_wq, &con->work, @@ -1969,18 +2000,31 @@ static void con_work(struct work_struct *work)  		con_close_socket(con);  	} -	if (test_and_clear_bit(SOCK_CLOSED, &con->state) || -	    try_read(con) < 0 || -	    try_write(con) < 0) { -		mutex_unlock(&con->mutex); -		ceph_fault(con);     /* error/fault path */ -		goto done_unlocked; -	} +	if (test_and_clear_bit(SOCK_CLOSED, &con->state)) +		goto fault; + +	ret = try_read(con); +	if (ret == -EAGAIN) +		goto restart; +	if (ret < 0) +		goto fault; + +	ret = try_write(con); +	if (ret == -EAGAIN) +		goto restart; +	if (ret < 0) +		goto fault;  done:  	mutex_unlock(&con->mutex);  done_unlocked:  	con->ops->put(con); +	return; + +fault: +	mutex_unlock(&con->mutex); +	ceph_fault(con);     /* error/fault path */ +	goto done_unlocked;  }  |