diff options
Diffstat (limited to 'net')
| -rw-r--r-- | net/tipc/bcast.c | 226 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 2 | ||||
| -rw-r--r-- | net/tipc/link.c | 21 | ||||
| -rw-r--r-- | net/tipc/node.c | 4 | ||||
| -rw-r--r-- | net/tipc/node.h | 12 | 
5 files changed, 100 insertions, 165 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index facc216c6a9..1f3b1607d9d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)  	return bcl->fsm_msg_cnt;  } -/** - * bclink_set_gap - set gap according to contents of current deferred pkt queue - * - * Called with 'node' locked, bc_lock unlocked - */ - -static void bclink_set_gap(struct tipc_node *n_ptr) -{ -	struct sk_buff *buf = n_ptr->bclink.deferred_head; - -	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = -		mod(n_ptr->bclink.last_in); -	if (unlikely(buf != NULL)) -		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); -} - -/** - * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment - * - * This mechanism endeavours to prevent all nodes in network from trying - * to ACK or NACK at the same time. - * - * Note: TIPC uses a different trigger to distribute ACKs than it does to - *       distribute NACKs, but tries to use the same spacing (divide by 16). - */ - -static int bclink_ack_allowed(u32 n) +static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)  { -	return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; +	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? +						seqno : node->bclink.last_sent;  } -/** +/*   * tipc_bclink_retransmit_to - get most recent node to request retransmission   *   * Called with bc_lock locked @@ -300,44 +275,56 @@ exit:  	spin_unlock_bh(&bc_lock);  } -/** - * bclink_send_ack - unicast an ACK msg +/* + * tipc_bclink_update_link_state - update broadcast link state   *   * tipc_net_lock and node lock set   */ -static void bclink_send_ack(struct tipc_node *n_ptr) +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)  { -	struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; +	struct sk_buff *buf; -	if (l_ptr != NULL) -		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); -} +	/* Ignore "stale" link state info */ -/** - * bclink_send_nack- broadcast a NACK msg - * - * tipc_net_lock and node lock set - */ +	if (less_eq(last_sent, n_ptr->bclink.last_in)) +		return; -static void bclink_send_nack(struct tipc_node *n_ptr) -{ -	struct sk_buff *buf; -	struct tipc_msg *msg; +	/* Update link synchronization state; quit if in sync */ + +	bclink_update_last_sent(n_ptr, last_sent); + +	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) +		return; + +	/* Update out-of-sync state; quit if loss is still unconfirmed */ -	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) +	if ((++n_ptr->bclink.oos_state) == 1) { +		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) +			return; +		n_ptr->bclink.oos_state++; +	} + +	/* Don't NACK if one has been recently sent (or seen) */ + +	if (n_ptr->bclink.oos_state & 0x1)  		return; +	/* Send NACK */ +  	buf = tipc_buf_acquire(INT_H_SIZE);  	if (buf) { -		msg = buf_msg(buf); +		struct tipc_msg *msg = buf_msg(buf); +  		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, -			 INT_H_SIZE, n_ptr->addr); +			      INT_H_SIZE, n_ptr->addr);  		msg_set_non_seq(msg, 1);  		msg_set_mc_netid(msg, tipc_net_id); -		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); -		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); -		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); +		msg_set_bcast_ack(msg, n_ptr->bclink.last_in); +		msg_set_bcgap_after(msg, n_ptr->bclink.last_in); +		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head +				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1 +				 : n_ptr->bclink.last_sent);  		msg_set_bcast_tag(msg, tipc_own_tag);  		spin_lock_bh(&bc_lock); @@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr)  		spin_unlock_bh(&bc_lock);  		buf_discard(buf); -		/* -		 * Ensure we doesn't send another NACK msg to the node -		 * until 16 more deferred messages arrive from it -		 * (i.e. helps prevent all nodes from NACK'ing at same time) -		 */ - -		n_ptr->bclink.nack_sync = tipc_own_tag; +		n_ptr->bclink.oos_state++;  	}  } -/** - * tipc_bclink_check_gap - send a NACK if a sequence gap exists +/* + * bclink_peek_nack - monitor retransmission requests sent by other nodes   * - * tipc_net_lock and node lock set - */ - -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) -{ -	if (!n_ptr->bclink.supported || -	    less_eq(last_sent, mod(n_ptr->bclink.last_in))) -		return; - -	bclink_set_gap(n_ptr); -	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) -		n_ptr->bclink.gap_to = last_sent; -	bclink_send_nack(n_ptr); -} - -/** - * tipc_bclink_peek_nack - process a NACK msg meant for another node + * Delay any upcoming NACK by this node if another node has already + * requested the first message this node is going to ask for.   *   * Only tipc_net_lock set.   */ -static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) +static void bclink_peek_nack(struct tipc_msg *msg)  { -	struct tipc_node *n_ptr = tipc_node_find(dest); -	u32 my_after, my_to; +	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); -	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) +	if (unlikely(!n_ptr))  		return; -	tipc_node_lock(n_ptr); -	/* -	 * Modify gap to suppress unnecessary NACKs from this node -	 */ -	my_after = n_ptr->bclink.gap_after; -	my_to = n_ptr->bclink.gap_to; -	if (less_eq(gap_after, my_after)) { -		if (less(my_after, gap_to) && less(gap_to, my_to)) -			n_ptr->bclink.gap_after = gap_to; -		else if (less_eq(my_to, gap_to)) -			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; -	} else if (less_eq(gap_after, my_to)) { -		if (less_eq(my_to, gap_to)) -			n_ptr->bclink.gap_to = gap_after; -	} else { -		/* -		 * Expand gap if missing bufs not in deferred queue: -		 */ -		struct sk_buff *buf = n_ptr->bclink.deferred_head; -		u32 prev = n_ptr->bclink.gap_to; +	tipc_node_lock(n_ptr); -		for (; buf; buf = buf->next) { -			u32 seqno = buf_seqno(buf); +	if (n_ptr->bclink.supported && +	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && +	    (n_ptr->bclink.last_in == msg_bcgap_after(msg))) +		n_ptr->bclink.oos_state = 2; -			if (mod(seqno - prev) != 1) { -				buf = NULL; -				break; -			} -			if (seqno == gap_after) -				break; -			prev = seqno; -		} -		if (buf == NULL) -			n_ptr->bclink.gap_to = gap_after; -	} -	/* -	 * Some nodes may send a complementary NACK now: -	 */ -	if (bclink_ack_allowed(sender_tag + 1)) { -		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { -			bclink_send_nack(n_ptr); -			bclink_set_gap(n_ptr); -		} -	}  	tipc_node_unlock(n_ptr);  } -/** +/*   * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster   */ @@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)  			spin_unlock_bh(&bc_lock);  		} else {  			tipc_node_unlock(node); -			tipc_bclink_peek_nack(msg_destnode(msg), -					      msg_bcast_tag(msg), -					      msg_bcgap_after(msg), -					      msg_bcgap_to(msg)); +			bclink_peek_nack(msg);  		}  		goto exit;  	} @@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)  	next_in = mod(node->bclink.last_in + 1);  	if (likely(seqno == next_in)) { +		bclink_update_last_sent(node, seqno);  receive: +		node->bclink.last_in = seqno; +		node->bclink.oos_state = 0; +  		spin_lock_bh(&bc_lock);  		bcl->stats.recv_info++; -		node->bclink.last_in++; -		bclink_set_gap(node); -		if (unlikely(bclink_ack_allowed(seqno))) { -			bclink_send_ack(node); + +		/* +		 * Unicast an ACK periodically, ensuring that +		 * all nodes in the cluster don't ACK at the same time +		 */ + +		if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { +			tipc_link_send_proto_msg( +				node->active_links[node->addr & 1], +				STATE_MSG, 0, 0, 0, 0, 0);  			bcl->stats.sent_acks++;  		} +		/* Deliver message to destination */ +  		if (likely(msg_isdata(msg))) {  			spin_unlock_bh(&bc_lock);  			tipc_node_unlock(node); @@ -567,8 +504,13 @@ receive:  		if (unlikely(!tipc_node_is_up(node)))  			goto unlock; -		if (!node->bclink.deferred_head) +		if (node->bclink.last_in == node->bclink.last_sent) +			goto unlock; + +		if (!node->bclink.deferred_head) { +			node->bclink.oos_state = 1;  			goto unlock; +		}  		msg = buf_msg(node->bclink.deferred_head);  		seqno = msg_seqno(msg); @@ -580,31 +522,19 @@ receive:  		buf = node->bclink.deferred_head;  		node->bclink.deferred_head = buf->next; +		node->bclink.deferred_size--;  		goto receive;  	}  	/* Handle out-of-sequence broadcast message */  	if (less(next_in, seqno)) { -		u32 gap_after = node->bclink.gap_after; -		u32 gap_to = node->bclink.gap_to; -  		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,  					       &node->bclink.deferred_tail,  					       buf); -		if (deferred) { -			node->bclink.nack_sync++; -			if (seqno == mod(gap_after + 1)) -				node->bclink.gap_after = seqno; -			else if (less(gap_after, seqno) && less(seqno, gap_to)) -				node->bclink.gap_to = seqno; -		} +		node->bclink.deferred_size += deferred; +		bclink_update_last_sent(node, seqno);  		buf = NULL; -		if (bclink_ack_allowed(node->bclink.nack_sync)) { -			if (gap_to != gap_after) -				bclink_send_nack(node); -			bclink_set_gap(node); -		}  	} else  		deferred = 0; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index b009666c60b..5571394098f 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -96,7 +96,7 @@ int  tipc_bclink_send_msg(struct sk_buff *buf);  void tipc_bclink_recv_pkt(struct sk_buff *buf);  u32  tipc_bclink_get_last_sent(void);  u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr); -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno); +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);  int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);  int  tipc_bclink_reset_stats(void);  int  tipc_bclink_set_queue_limits(u32 limit); diff --git a/net/tipc/link.c b/net/tipc/link.c index 1150ba5a648..cce953723dd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1501,14 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  		tipc_node_lock(n_ptr);  		tipc_addr_string_fill(addr_string, n_ptr->addr); -		info("Multicast link info for %s\n", addr_string); +		info("Broadcast link info for %s\n", addr_string);  		info("Supportable: %d,  ", n_ptr->bclink.supportable);  		info("Supported: %d,  ", n_ptr->bclink.supported);  		info("Acked: %u\n", n_ptr->bclink.acked);  		info("Last in: %u,  ", n_ptr->bclink.last_in); -		info("Gap after: %u,  ", n_ptr->bclink.gap_after); -		info("Gap to: %u\n", n_ptr->bclink.gap_to); -		info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync); +		info("Oos state: %u,  ", n_ptr->bclink.oos_state); +		info("Last sent: %u\n", n_ptr->bclink.last_sent);  		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); @@ -1974,7 +1973,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,  	msg_set_type(msg, msg_typ);  	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); -	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); +	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);  	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());  	if (msg_typ == STATE_MSG) { @@ -2133,8 +2132,12 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		/* Synchronize broadcast link info, if not done previously */ -		if (!tipc_node_is_up(l_ptr->owner)) -			l_ptr->owner->bclink.last_in = msg_last_bcast(msg); +		if (!tipc_node_is_up(l_ptr->owner)) { +			l_ptr->owner->bclink.last_sent = +				l_ptr->owner->bclink.last_in = +				msg_last_bcast(msg); +			l_ptr->owner->bclink.oos_state = 0; +		}  		l_ptr->peer_session = msg_session(msg);  		l_ptr->peer_bearer_id = msg_bearer_id(msg); @@ -2181,7 +2184,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		/* Protocol message before retransmits, reduce loss risk */ -		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); +		if (l_ptr->owner->bclink.supported) +			tipc_bclink_update_link_state(l_ptr->owner, +						      msg_last_bcast(msg));  		if (rec_gap || (msg_probe(msg))) {  			tipc_link_send_proto_msg(l_ptr, STATE_MSG, diff --git a/net/tipc/node.c b/net/tipc/node.c index 9196f943b83..6d8bdfd95cd 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -339,12 +339,12 @@ static void node_lost_contact(struct tipc_node *n_ptr)  	/* Flush broadcast link info associated with lost node */  	if (n_ptr->bclink.supported) { -		n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;  		while (n_ptr->bclink.deferred_head) {  			struct sk_buff *buf = n_ptr->bclink.deferred_head;  			n_ptr->bclink.deferred_head = buf->next;  			buf_discard(buf);  		} +		n_ptr->bclink.deferred_size = 0;  		if (n_ptr->bclink.defragm) {  			buf_discard(n_ptr->bclink.defragm); @@ -450,7 +450,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  	read_lock_bh(&tipc_net_lock); -	/* Get space for all unicast links + multicast link */ +	/* Get space for all unicast links + broadcast link */  	payload_size = TLV_SPACE(sizeof(link_info)) *  		(atomic_read(&tipc_num_links) + 1); diff --git a/net/tipc/node.h b/net/tipc/node.h index 90689f48761..c88ce64f8a3 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -66,9 +66,9 @@   *    @supported: non-zero if node supports TIPC b'cast capability   *    @acked: sequence # of last outbound b'cast message acknowledged by node   *    @last_in: sequence # of last in-sequence b'cast message received from node - *    @gap_after: sequence # of last message not requiring a NAK request - *    @gap_to: sequence # of last message requiring a NAK request - *    @nack_sync: counter that determines when NAK requests should be sent + *    @last_sent: sequence # of last b'cast message sent by node + *    @oos_state: state tracker for handling OOS b'cast messages + *    @deferred_size: number of OOS b'cast messages in deferred queue   *    @deferred_head: oldest OOS b'cast message received from node   *    @deferred_tail: newest OOS b'cast message received from node   *    @defragm: list of partially reassembled b'cast message fragments from node @@ -91,9 +91,9 @@ struct tipc_node {  		u8 supported;  		u32 acked;  		u32 last_in; -		u32 gap_after; -		u32 gap_to; -		u32 nack_sync; +		u32 last_sent; +		u32 oos_state; +		u32 deferred_size;  		struct sk_buff *deferred_head;  		struct sk_buff *deferred_tail;  		struct sk_buff *defragm;  |