diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bcast.c | 111 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 1 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 8 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 4 | ||||
| -rw-r--r-- | net/tipc/config.h | 1 | ||||
| -rw-r--r-- | net/tipc/discover.c | 6 | ||||
| -rw-r--r-- | net/tipc/eth_media.c | 30 | ||||
| -rw-r--r-- | net/tipc/link.c | 111 | ||||
| -rw-r--r-- | net/tipc/link.h | 1 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 35 | ||||
| -rw-r--r-- | net/tipc/net.c | 11 | ||||
| -rw-r--r-- | net/tipc/node.c | 45 | ||||
| -rw-r--r-- | net/tipc/node.h | 10 | ||||
| -rw-r--r-- | net/tipc/socket.c | 51 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 3 | ||||
| -rw-r--r-- | net/tipc/subscr.h | 6 | 
16 files changed, 238 insertions, 196 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 759b318b5ff..28908f54459 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -39,6 +39,7 @@  #include "link.h"  #include "port.h"  #include "bcast.h" +#include "name_distr.h"  #define MAX_PKT_DEFAULT_MCAST 1500	/* bcast link max packet size (fixed) */ @@ -298,14 +299,9 @@ static void bclink_send_nack(struct tipc_node *n_ptr)  		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);  		msg_set_bcast_tag(msg, tipc_own_tag); -		if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { -			bcl->stats.sent_nacks++; -			buf_discard(buf); -		} else { -			tipc_bearer_schedule(bcl->b_ptr, bcl); -			bcl->proto_msg_queue = buf; -			bcl->stats.bearer_congs++; -		} +		tipc_bearer_send(&bcbearer->bearer, buf, NULL); +		bcl->stats.sent_nacks++; +		buf_discard(buf);  		/*  		 * Ensure we doesn't send another NACK msg to the node @@ -426,20 +422,28 @@ int tipc_bclink_send_msg(struct sk_buff *buf)  void tipc_bclink_recv_pkt(struct sk_buff *buf)  {  	struct tipc_msg *msg = buf_msg(buf); -	struct tipc_node *node = tipc_node_find(msg_prevnode(msg)); +	struct tipc_node *node;  	u32 next_in;  	u32 seqno;  	struct sk_buff *deferred; -	if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || -		     (msg_mc_netid(msg) != tipc_net_id))) { -		buf_discard(buf); -		return; -	} +	/* Screen out unwanted broadcast messages */ + +	if (msg_mc_netid(msg) != tipc_net_id) +		goto exit; + +	node = tipc_node_find(msg_prevnode(msg)); +	if (unlikely(!node)) +		goto exit; + +	tipc_node_lock(node); +	if (unlikely(!node->bclink.supported)) +		goto unlock;  	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { +		if (msg_type(msg) != STATE_MSG) +			goto unlock;  		if (msg_destnode(msg) == tipc_own_addr) { -			tipc_node_lock(node);  			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));  			tipc_node_unlock(node);  			spin_lock_bh(&bc_lock); @@ -449,18 +453,18 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)  					      msg_bcgap_to(msg));  			spin_unlock_bh(&bc_lock);  		} else { +			tipc_node_unlock(node);  			tipc_bclink_peek_nack(msg_destnode(msg),  					      msg_bcast_tag(msg),  					      msg_bcgap_after(msg),  					      msg_bcgap_to(msg));  		} -		buf_discard(buf); -		return; +		goto exit;  	} -	tipc_node_lock(node); +	/* Handle in-sequence broadcast message */ +  receive: -	deferred = node->bclink.deferred_head;  	next_in = mod(node->bclink.last_in + 1);  	seqno = msg_seqno(msg); @@ -474,7 +478,10 @@ receive:  		}  		if (likely(msg_isdata(msg))) {  			tipc_node_unlock(node); -			tipc_port_recv_mcast(buf, NULL); +			if (likely(msg_mcast(msg))) +				tipc_port_recv_mcast(buf, NULL); +			else +				buf_discard(buf);  		} else if (msg_user(msg) == MSG_BUNDLER) {  			bcl->stats.recv_bundles++;  			bcl->stats.recv_bundled += msg_msgcnt(msg); @@ -487,18 +494,22 @@ receive:  				bcl->stats.recv_fragmented++;  			tipc_node_unlock(node);  			tipc_net_route_msg(buf); +		} else if (msg_user(msg) == NAME_DISTRIBUTOR) { +			tipc_node_unlock(node); +			tipc_named_recv(buf);  		} else {  			tipc_node_unlock(node); -			tipc_net_route_msg(buf); +			buf_discard(buf);  		} +		buf = NULL; +		tipc_node_lock(node); +		deferred = node->bclink.deferred_head;  		if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { -			tipc_node_lock(node);  			buf = deferred;  			msg = buf_msg(buf);  			node->bclink.deferred_head = deferred->next;  			goto receive;  		} -		return;  	} else if (less(next_in, seqno)) {  		u32 gap_after = node->bclink.gap_after;  		u32 gap_to = node->bclink.gap_to; @@ -513,6 +524,7 @@ receive:  			else if (less(gap_after, seqno) && less(seqno, gap_to))  				node->bclink.gap_to = seqno;  		} +		buf = NULL;  		if (bclink_ack_allowed(node->bclink.nack_sync)) {  			if (gap_to != gap_after)  				bclink_send_nack(node); @@ -520,9 +532,11 @@ receive:  		}  	} else {  		bcl->stats.duplicates++; -		buf_discard(buf);  	} +unlock:  	tipc_node_unlock(node); +exit: +	buf_discard(buf);  }  u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) @@ -535,10 +549,11 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)  /**   * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer   * - * Send through as many bearers as necessary to reach all nodes - * that support TIPC multicasting. + * Send packet over as many bearers as necessary to reach all nodes + * that have joined the broadcast link.   * - * Returns 0 if packet sent successfully, non-zero if not + * Returns 0 (packet sent successfully) under all circumstances, + * since the broadcast link's pseudo-bearer never blocks   */  static int tipc_bcbearer_send(struct sk_buff *buf, @@ -547,7 +562,12 @@ static int tipc_bcbearer_send(struct sk_buff *buf,  {  	int bp_index; -	/* Prepare buffer for broadcasting (if first time trying to send it) */ +	/* +	 * Prepare broadcast link message for reliable transmission, +	 * if first time trying to send it; +	 * preparation is skipped for broadcast link protocol messages +	 * since they are sent in an unreliable manner and don't need it +	 */  	if (likely(!msg_non_seq(buf_msg(buf)))) {  		struct tipc_msg *msg; @@ -596,18 +616,12 @@ static int tipc_bcbearer_send(struct sk_buff *buf,  		}  		if (bcbearer->remains_new.count == 0) -			return 0; +			break;	/* all targets reached */  		bcbearer->remains = bcbearer->remains_new;  	} -	/* -	 * Unable to reach all targets (indicate success, since currently -	 * there isn't code in place to properly block & unblock the -	 * pseudo-bearer used by the broadcast link) -	 */ - -	return TIPC_OK; +	return 0;  }  /** @@ -667,27 +681,6 @@ void tipc_bcbearer_sort(void)  	spin_unlock_bh(&bc_lock);  } -/** - * tipc_bcbearer_push - resolve bearer congestion - * - * Forces bclink to push out any unsent packets, until all packets are gone - * or congestion reoccurs. - * No locks set when function called - */ - -void tipc_bcbearer_push(void) -{ -	struct tipc_bearer *b_ptr; - -	spin_lock_bh(&bc_lock); -	b_ptr = &bcbearer->bearer; -	if (b_ptr->blocked) { -		b_ptr->blocked = 0; -		tipc_bearer_lock_push(b_ptr); -	} -	spin_unlock_bh(&bc_lock); -} -  int tipc_bclink_stats(char *buf, const u32 buf_size)  { @@ -764,7 +757,7 @@ int tipc_bclink_init(void)  	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);  	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);  	if (!bcbearer || !bclink) { -		warn("Multicast link creation failed, no memory\n"); +		warn("Broadcast link creation failed, no memory\n");  		kfree(bcbearer);  		bcbearer = NULL;  		kfree(bclink); @@ -775,7 +768,7 @@ int tipc_bclink_init(void)  	INIT_LIST_HEAD(&bcbearer->bearer.cong_links);  	bcbearer->bearer.media = &bcbearer->media;  	bcbearer->media.send_msg = tipc_bcbearer_send; -	sprintf(bcbearer->media.name, "tipc-multicast"); +	sprintf(bcbearer->media.name, "tipc-broadcast");  	bcl = &bclink->link;  	INIT_LIST_HEAD(&bcl->waiting_ports); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 500c97f1c85..06740da5ae6 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -101,6 +101,5 @@ int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);  int  tipc_bclink_reset_stats(void);  int  tipc_bclink_set_queue_limits(u32 limit);  void tipc_bcbearer_sort(void); -void tipc_bcbearer_push(void);  #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 85eba9c08ee..e2202de3d93 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -385,13 +385,9 @@ static int bearer_push(struct tipc_bearer *b_ptr)  void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)  { -	int res; -  	spin_lock_bh(&b_ptr->lock); -	res = bearer_push(b_ptr); +	bearer_push(b_ptr);  	spin_unlock_bh(&b_ptr->lock); -	if (res) -		tipc_bcbearer_push();  } @@ -608,6 +604,7 @@ int tipc_block_bearer(const char *name)  	info("Blocking bearer <%s>\n", name);  	spin_lock_bh(&b_ptr->lock);  	b_ptr->blocked = 1; +	list_splice_init(&b_ptr->cong_links, &b_ptr->links);  	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {  		struct tipc_node *n_ptr = l_ptr->owner; @@ -635,6 +632,7 @@ static void bearer_disable(struct tipc_bearer *b_ptr)  	spin_lock_bh(&b_ptr->lock);  	b_ptr->blocked = 1;  	b_ptr->media->disable_bearer(b_ptr); +	list_splice_init(&b_ptr->cong_links, &b_ptr->links);  	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {  		tipc_link_delete(l_ptr);  	} diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 5ad70eff1eb..d696f9e414e 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -39,8 +39,8 @@  #include "bcast.h" -#define MAX_BEARERS 8 -#define MAX_MEDIA 4 +#define MAX_BEARERS	2 +#define MAX_MEDIA	2  /*   * Identifiers of supported TIPC media types diff --git a/net/tipc/config.h b/net/tipc/config.h index 443159a166f..80da6ebc278 100644 --- a/net/tipc/config.h +++ b/net/tipc/config.h @@ -65,7 +65,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd,  				const void *req_tlv_area, int req_tlv_space,  				int headroom); -void tipc_cfg_link_event(u32 addr, char *name, int up);  int  tipc_cfg_init(void);  void tipc_cfg_stop(void); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 0987933155b..f2fb96e86ee 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -159,12 +159,6 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)  	}  	tipc_node_lock(n_ptr); -	/* Don't talk to neighbor during cleanup after last session */ -	if (n_ptr->cleanup_required) { -		tipc_node_unlock(n_ptr); -		return; -	} -  	link = n_ptr->links[b_ptr->identity];  	/* Create a link endpoint for this bearer, if necessary */ diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index b69092eb95d..e728d4ce2a1 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -2,7 +2,7 @@   * net/tipc/eth_media.c: Ethernet bearer support for TIPC   *   * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2005-2007, Wind River Systems + * Copyright (c) 2005-2008, 2011, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -37,7 +37,7 @@  #include "core.h"  #include "bearer.h" -#define MAX_ETH_BEARERS		2 +#define MAX_ETH_BEARERS		MAX_BEARERS  #define ETH_LINK_PRIORITY	TIPC_DEF_LINK_PRI  #define ETH_LINK_TOLERANCE	TIPC_DEF_LINK_TOL  #define ETH_LINK_WINDOW		TIPC_DEF_LINK_WIN @@ -144,31 +144,27 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)  	/* Find device with specified name */ +	read_lock(&dev_base_lock);  	for_each_netdev(&init_net, pdev) {  		if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) {  			dev = pdev; +			dev_hold(dev);  			break;  		}  	} +	read_unlock(&dev_base_lock);  	if (!dev)  		return -ENODEV; -	/* Find Ethernet bearer for device (or create one) */ +	/* Create Ethernet bearer for device */ -	while ((eb_ptr != stop) && eb_ptr->dev && (eb_ptr->dev != dev)) -		eb_ptr++; -	if (eb_ptr == stop) -		return -EDQUOT; -	if (!eb_ptr->dev) { -		eb_ptr->dev = dev; -		eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC); -		eb_ptr->tipc_packet_type.dev = dev; -		eb_ptr->tipc_packet_type.func = recv_msg; -		eb_ptr->tipc_packet_type.af_packet_priv = eb_ptr; -		INIT_LIST_HEAD(&(eb_ptr->tipc_packet_type.list)); -		dev_hold(dev); -		dev_add_pack(&eb_ptr->tipc_packet_type); -	} +	eb_ptr->dev = dev; +	eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC); +	eb_ptr->tipc_packet_type.dev = dev; +	eb_ptr->tipc_packet_type.func = recv_msg; +	eb_ptr->tipc_packet_type.af_packet_priv = eb_ptr; +	INIT_LIST_HEAD(&(eb_ptr->tipc_packet_type.list)); +	dev_add_pack(&eb_ptr->tipc_packet_type);  	/* Associate TIPC bearer with Ethernet bearer */ diff --git a/net/tipc/link.c b/net/tipc/link.c index f89570c54f5..ae98a72da11 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -332,15 +332,16 @@ struct link *tipc_link_create(struct tipc_node *n_ptr,  	l_ptr->addr = peer;  	if_name = strchr(b_ptr->name, ':') + 1; -	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", +	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",  		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),  		tipc_node(tipc_own_addr),  		if_name,  		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); -		/* note: peer i/f is appended to link name by reset/activate */ +		/* note: peer i/f name is updated by reset/activate message */  	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));  	l_ptr->owner = n_ptr;  	l_ptr->checkpoint = 1; +	l_ptr->peer_session = INVALID_SESSION;  	l_ptr->b_ptr = b_ptr;  	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);  	l_ptr->state = RESET_UNKNOWN; @@ -536,9 +537,6 @@ void tipc_link_stop(struct link *l_ptr)  	l_ptr->proto_msg_queue = NULL;  } -/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ -#define link_send_event(fcn, l_ptr, up) do { } while (0) -  void tipc_link_reset(struct link *l_ptr)  {  	struct sk_buff *buf; @@ -596,10 +594,6 @@ void tipc_link_reset(struct link *l_ptr)  	l_ptr->fsm_msg_cnt = 0;  	l_ptr->stale_count = 0;  	link_reset_statistics(l_ptr); - -	link_send_event(tipc_cfg_link_event, l_ptr, 0); -	if (!in_own_cluster(l_ptr->addr)) -		link_send_event(tipc_disc_link_event, l_ptr, 0);  } @@ -608,9 +602,6 @@ static void link_activate(struct link *l_ptr)  	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;  	tipc_node_link_up(l_ptr->owner, l_ptr);  	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); -	link_send_event(tipc_cfg_link_event, l_ptr, 1); -	if (!in_own_cluster(l_ptr->addr)) -		link_send_event(tipc_disc_link_event, l_ptr, 1);  }  /** @@ -985,6 +976,51 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)  }  /* + * tipc_link_send_names - send name table entries to new neighbor + * + * Send routine for bulk delivery of name table messages when contact + * with a new neighbor occurs. No link congestion checking is performed + * because name table messages *must* be delivered. The messages must be + * small enough not to require fragmentation. + * Called without any locks held. + */ + +void tipc_link_send_names(struct list_head *message_list, u32 dest) +{ +	struct tipc_node *n_ptr; +	struct link *l_ptr; +	struct sk_buff *buf; +	struct sk_buff *temp_buf; + +	if (list_empty(message_list)) +		return; + +	read_lock_bh(&tipc_net_lock); +	n_ptr = tipc_node_find(dest); +	if (n_ptr) { +		tipc_node_lock(n_ptr); +		l_ptr = n_ptr->active_links[0]; +		if (l_ptr) { +			/* convert circular list to linear list */ +			((struct sk_buff *)message_list->prev)->next = NULL; +			link_add_chain_to_outqueue(l_ptr, +				(struct sk_buff *)message_list->next, 0); +			tipc_link_push_queue(l_ptr); +			INIT_LIST_HEAD(message_list); +		} +		tipc_node_unlock(n_ptr); +	} +	read_unlock_bh(&tipc_net_lock); + +	/* discard the messages if they couldn't be sent */ + +	list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { +		list_del((struct list_head *)buf); +		buf_discard(buf); +	} +} + +/*   * link_send_buf_fast: Entry for data messages where the   * destination link is known and the header is complete,   * inclusive total message length. Very time critical. @@ -1031,9 +1067,6 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)  	u32 selector = msg_origport(buf_msg(buf)) & 1;  	u32 dummy; -	if (destnode == tipc_own_addr) -		return tipc_port_recv_msg(buf); -  	read_lock_bh(&tipc_net_lock);  	n_ptr = tipc_node_find(destnode);  	if (likely(n_ptr)) { @@ -1658,19 +1691,12 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)  			continue;  		} +		/* Discard unicast link messages destined for another node */ +  		if (unlikely(!msg_short(msg) &&  			     (msg_destnode(msg) != tipc_own_addr)))  			goto cont; -		/* Discard non-routeable messages destined for another node */ - -		if (unlikely(!msg_isdata(msg) && -			     (msg_destnode(msg) != tipc_own_addr))) { -			if ((msg_user(msg) != CONN_MANAGER) && -			    (msg_user(msg) != MSG_FRAGMENTER)) -				goto cont; -		} -  		/* Locate neighboring node that sent message */  		n_ptr = tipc_node_find(msg_prevnode(msg)); @@ -1678,17 +1704,24 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)  			goto cont;  		tipc_node_lock(n_ptr); -		/* Don't talk to neighbor during cleanup after last session */ +		/* Locate unicast link endpoint that should handle message */ -		if (n_ptr->cleanup_required) { +		l_ptr = n_ptr->links[b_ptr->identity]; +		if (unlikely(!l_ptr)) {  			tipc_node_unlock(n_ptr);  			goto cont;  		} -		/* Locate unicast link endpoint that should handle message */ +		/* Verify that communication with node is currently allowed */ -		l_ptr = n_ptr->links[b_ptr->identity]; -		if (unlikely(!l_ptr)) { +		if ((n_ptr->block_setup & WAIT_PEER_DOWN) && +			msg_user(msg) == LINK_PROTOCOL && +			(msg_type(msg) == RESET_MSG || +					msg_type(msg) == ACTIVATE_MSG) && +			!msg_redundant_link(msg)) +			n_ptr->block_setup &= ~WAIT_PEER_DOWN; + +		if (n_ptr->block_setup) {  			tipc_node_unlock(n_ptr);  			goto cont;  		} @@ -1923,6 +1956,12 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,  	if (link_blocked(l_ptr))  		return; + +	/* Abort non-RESET send if communication with node is prohibited */ + +	if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) +		return; +  	msg_set_type(msg, msg_typ);  	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);  	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); @@ -2051,9 +2090,19 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)  	case RESET_MSG:  		if (!link_working_unknown(l_ptr) &&  		    (l_ptr->peer_session != INVALID_SESSION)) { -			if (msg_session(msg) == l_ptr->peer_session) -				break; /* duplicate: ignore */ +			if (less_eq(msg_session(msg), l_ptr->peer_session)) +				break; /* duplicate or old reset: ignore */ +		} + +		if (!msg_redundant_link(msg) && (link_working_working(l_ptr) || +				link_working_unknown(l_ptr))) { +			/* +			 * peer has lost contact -- don't allow peer's links +			 * to reactivate before we recognize loss & clean up +			 */ +			l_ptr->owner->block_setup = WAIT_NODE_DOWN;  		} +  		/* fall thru' */  	case ACTIVATE_MSG:  		/* Update link settings according other endpoint's values */ diff --git a/net/tipc/link.h b/net/tipc/link.h index 74fbecab1ea..e56cb532913 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -223,6 +223,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s  struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space);  void tipc_link_reset(struct link *l_ptr);  int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); +void tipc_link_send_names(struct list_head *message_list, u32 dest);  int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf);  u32 tipc_link_get_max_pkt(u32 dest, u32 selector);  int tipc_link_send_sections_fast(struct tipc_port *sender, diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index cd356e50433..b7ca1bd7b15 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -173,18 +173,40 @@ void tipc_named_withdraw(struct publication *publ)   * tipc_named_node_up - tell specified node about all publications by this node   */ -void tipc_named_node_up(unsigned long node) +void tipc_named_node_up(unsigned long nodearg)  { +	struct tipc_node *n_ptr; +	struct link *l_ptr;  	struct publication *publ;  	struct distr_item *item = NULL;  	struct sk_buff *buf = NULL; +	struct list_head message_list; +	u32 node = (u32)nodearg;  	u32 left = 0;  	u32 rest; -	u32 max_item_buf; +	u32 max_item_buf = 0; + +	/* compute maximum amount of publication data to send per message */ + +	read_lock_bh(&tipc_net_lock); +	n_ptr = tipc_node_find(node); +	if (n_ptr) { +		tipc_node_lock(n_ptr); +		l_ptr = n_ptr->active_links[0]; +		if (l_ptr) +			max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) / +				ITEM_SIZE) * ITEM_SIZE; +		tipc_node_unlock(n_ptr); +	} +	read_unlock_bh(&tipc_net_lock); +	if (!max_item_buf) +		return; + +	/* create list of publication messages, then send them as a unit */ + +	INIT_LIST_HEAD(&message_list);  	read_lock_bh(&tipc_nametbl_lock); -	max_item_buf = TIPC_MAX_USER_MSG_SIZE / ITEM_SIZE; -	max_item_buf *= ITEM_SIZE;  	rest = publ_cnt * ITEM_SIZE;  	list_for_each_entry(publ, &publ_root, local_list) { @@ -202,13 +224,14 @@ void tipc_named_node_up(unsigned long node)  		item++;  		left -= ITEM_SIZE;  		if (!left) { -			msg_set_link_selector(buf_msg(buf), node); -			tipc_link_send(buf, node, node); +			list_add_tail((struct list_head *)buf, &message_list);  			buf = NULL;  		}  	}  exit:  	read_unlock_bh(&tipc_nametbl_lock); + +	tipc_link_send_names(&message_list, (u32)node);  }  /** diff --git a/net/tipc/net.c b/net/tipc/net.c index 68b3dd63729..fafef6c3c0f 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -141,17 +141,6 @@ void tipc_net_route_msg(struct sk_buff *buf)  		return;  	msg = buf_msg(buf); -	msg_incr_reroute_cnt(msg); -	if (msg_reroute_cnt(msg) > 6) { -		if (msg_errcode(msg)) { -			buf_discard(buf); -		} else { -			tipc_reject_msg(buf, msg_destport(msg) ? -					TIPC_ERR_NO_PORT : TIPC_ERR_NO_NAME); -		} -		return; -	} -  	/* Handle message for this node */  	dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);  	if (tipc_in_scope(dnode, tipc_own_addr)) { diff --git a/net/tipc/node.c b/net/tipc/node.c index 2d106ef4fa4..27b4bb0cca6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -112,6 +112,7 @@ struct tipc_node *tipc_node_create(u32 addr)  			break;  	}  	list_add_tail(&n_ptr->list, &temp_node->list); +	n_ptr->block_setup = WAIT_PEER_DOWN;  	tipc_num_nodes++; @@ -312,7 +313,7 @@ static void node_established_contact(struct tipc_node *n_ptr)  	}  } -static void node_cleanup_finished(unsigned long node_addr) +static void node_name_purge_complete(unsigned long node_addr)  {  	struct tipc_node *n_ptr; @@ -320,7 +321,7 @@ static void node_cleanup_finished(unsigned long node_addr)  	n_ptr = tipc_node_find(node_addr);  	if (n_ptr) {  		tipc_node_lock(n_ptr); -		n_ptr->cleanup_required = 0; +		n_ptr->block_setup &= ~WAIT_NAMES_GONE;  		tipc_node_unlock(n_ptr);  	}  	read_unlock_bh(&tipc_net_lock); @@ -331,28 +332,32 @@ static void node_lost_contact(struct tipc_node *n_ptr)  	char addr_string[16];  	u32 i; -	/* Clean up broadcast reception remains */ -	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0; -	while (n_ptr->bclink.deferred_head) { -		struct sk_buff *buf = n_ptr->bclink.deferred_head; -		n_ptr->bclink.deferred_head = buf->next; -		buf_discard(buf); -	} -	if (n_ptr->bclink.defragm) { -		buf_discard(n_ptr->bclink.defragm); -		n_ptr->bclink.defragm = NULL; -	} +	info("Lost contact with %s\n", +	     tipc_addr_string_fill(addr_string, n_ptr->addr)); + +	/* Flush broadcast link info associated with lost node */  	if (n_ptr->bclink.supported) { +		n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0; +		while (n_ptr->bclink.deferred_head) { +			struct sk_buff *buf = n_ptr->bclink.deferred_head; +			n_ptr->bclink.deferred_head = buf->next; +			buf_discard(buf); +		} + +		if (n_ptr->bclink.defragm) { +			buf_discard(n_ptr->bclink.defragm); +			n_ptr->bclink.defragm = NULL; +		} + +		tipc_nmap_remove(&tipc_bcast_nmap, n_ptr->addr);  		tipc_bclink_acknowledge(n_ptr,  					mod(n_ptr->bclink.acked + 10000)); -		tipc_nmap_remove(&tipc_bcast_nmap, n_ptr->addr);  		if (n_ptr->addr < tipc_own_addr)  			tipc_own_tag--; -	} -	info("Lost contact with %s\n", -	     tipc_addr_string_fill(addr_string, n_ptr->addr)); +		n_ptr->bclink.supported = 0; +	}  	/* Abort link changeover */  	for (i = 0; i < MAX_BEARERS; i++) { @@ -367,10 +372,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)  	/* Notify subscribers */  	tipc_nodesub_notify(n_ptr); -	/* Prevent re-contact with node until all cleanup is done */ +	/* Prevent re-contact with node until cleanup is done */ -	n_ptr->cleanup_required = 1; -	tipc_k_signal((Handler)node_cleanup_finished, n_ptr->addr); +	n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE; +	tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);  }  struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) diff --git a/net/tipc/node.h b/net/tipc/node.h index 5c61afc7a0b..4f15cb40aaa 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -42,6 +42,12 @@  #include "net.h"  #include "bearer.h" +/* Flags used to block (re)establishment of contact with a neighboring node */ + +#define WAIT_PEER_DOWN	0x0001	/* wait to see that peer's links are down */ +#define WAIT_NAMES_GONE	0x0002	/* wait for peer's publications to be purged */ +#define WAIT_NODE_DOWN	0x0004	/* wait until peer node is declared down */ +  /**   * struct tipc_node - TIPC node structure   * @addr: network address of node @@ -52,7 +58,7 @@   * @active_links: pointers to active links to node   * @links: pointers to all links to node   * @working_links: number of working links to node (both active and standby) - * @cleanup_required: non-zero if cleaning up after a prior loss of contact + * @block_setup: bit mask of conditions preventing link establishment to node   * @link_cnt: number of links to node   * @permit_changeover: non-zero if node has redundant links to this system   * @bclink: broadcast-related info @@ -77,7 +83,7 @@ struct tipc_node {  	struct link *links[MAX_BEARERS];  	int link_cnt;  	int working_links; -	int cleanup_required; +	int block_setup;  	int permit_changeover;  	struct {  		int supported; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index adb2eff4a10..9440a3d48ca 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -49,7 +49,7 @@ struct tipc_sock {  	struct sock sk;  	struct tipc_port *p;  	struct tipc_portid peer_name; -	long conn_timeout; +	unsigned int conn_timeout;  };  #define tipc_sk(sk) ((struct tipc_sock *)(sk)) @@ -231,7 +231,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,  	sock_init_data(sock, sk);  	sk->sk_backlog_rcv = backlog_rcv;  	tipc_sk(sk)->p = tp_ptr; -	tipc_sk(sk)->conn_timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT); +	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;  	spin_unlock_bh(tp_ptr->lock); @@ -525,6 +525,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  	struct tipc_port *tport = tipc_sk_port(sk);  	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;  	int needs_conn; +	long timeout_val;  	int res = -EINVAL;  	if (unlikely(!dest)) @@ -564,6 +565,8 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  		reject_rx_queue(sk);  	} +	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +  	do {  		if (dest->addrtype == TIPC_ADDR_NAME) {  			res = dest_name_check(dest, m); @@ -600,16 +603,14 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  				sock->state = SS_CONNECTING;  			break;  		} -		if (m->msg_flags & MSG_DONTWAIT) { -			res = -EWOULDBLOCK; +		if (timeout_val <= 0L) { +			res = timeout_val ? timeout_val : -EWOULDBLOCK;  			break;  		}  		release_sock(sk); -		res = wait_event_interruptible(*sk_sleep(sk), -					       !tport->congested); +		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk), +					       !tport->congested, timeout_val);  		lock_sock(sk); -		if (res) -			break;  	} while (1);  exit: @@ -636,6 +637,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,  	struct sock *sk = sock->sk;  	struct tipc_port *tport = tipc_sk_port(sk);  	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; +	long timeout_val;  	int res;  	/* Handle implied connection establishment */ @@ -650,6 +652,8 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,  	if (iocb)  		lock_sock(sk); +	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +  	do {  		if (unlikely(sock->state != SS_CONNECTED)) {  			if (sock->state == SS_DISCONNECTING) @@ -663,16 +667,14 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,  				total_len);  		if (likely(res != -ELINKCONG))  			break; -		if (m->msg_flags & MSG_DONTWAIT) { -			res = -EWOULDBLOCK; +		if (timeout_val <= 0L) { +			res = timeout_val ? timeout_val : -EWOULDBLOCK;  			break;  		}  		release_sock(sk); -		res = wait_event_interruptible(*sk_sleep(sk), -			(!tport->congested || !tport->connected)); +		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk), +			(!tport->congested || !tport->connected), timeout_val);  		lock_sock(sk); -		if (res) -			break;  	} while (1);  	if (iocb) @@ -1369,7 +1371,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  	struct msghdr m = {NULL,};  	struct sk_buff *buf;  	struct tipc_msg *msg; -	long timeout; +	unsigned int timeout;  	int res;  	lock_sock(sk); @@ -1434,7 +1436,8 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  	res = wait_event_interruptible_timeout(*sk_sleep(sk),  			(!skb_queue_empty(&sk->sk_receive_queue) ||  			(sock->state != SS_CONNECTING)), -			timeout ? timeout : MAX_SCHEDULE_TIMEOUT); +			timeout ? (long)msecs_to_jiffies(timeout) +				: MAX_SCHEDULE_TIMEOUT);  	lock_sock(sk);  	if (res > 0) { @@ -1480,9 +1483,7 @@ static int listen(struct socket *sock, int len)  	lock_sock(sk); -	if (sock->state == SS_READY) -		res = -EOPNOTSUPP; -	else if (sock->state != SS_UNCONNECTED) +	if (sock->state != SS_UNCONNECTED)  		res = -EINVAL;  	else {  		sock->state = SS_LISTENING; @@ -1510,10 +1511,6 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  	lock_sock(sk); -	if (sock->state == SS_READY) { -		res = -EOPNOTSUPP; -		goto exit; -	}  	if (sock->state != SS_LISTENING) {  		res = -EINVAL;  		goto exit; @@ -1696,7 +1693,7 @@ static int setsockopt(struct socket *sock,  		res = tipc_set_portunreturnable(tport->ref, value);  		break;  	case TIPC_CONN_TIMEOUT: -		tipc_sk(sk)->conn_timeout = msecs_to_jiffies(value); +		tipc_sk(sk)->conn_timeout = value;  		/* no need to set "res", since already 0 at this point */  		break;  	default: @@ -1752,7 +1749,7 @@ static int getsockopt(struct socket *sock,  		res = tipc_portunreturnable(tport->ref, &value);  		break;  	case TIPC_CONN_TIMEOUT: -		value = jiffies_to_msecs(tipc_sk(sk)->conn_timeout); +		value = tipc_sk(sk)->conn_timeout;  		/* no need to set "res", since already 0 at this point */  		break;  	case TIPC_NODE_RECVQ_DEPTH: @@ -1790,11 +1787,11 @@ static const struct proto_ops msg_ops = {  	.bind		= bind,  	.connect	= connect,  	.socketpair	= sock_no_socketpair, -	.accept		= accept, +	.accept		= sock_no_accept,  	.getname	= get_name,  	.poll		= poll,  	.ioctl		= sock_no_ioctl, -	.listen		= listen, +	.listen		= sock_no_listen,  	.shutdown	= shutdown,  	.setsockopt	= setsockopt,  	.getsockopt	= getsockopt, diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 6cf72686348..198371723b4 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -151,7 +151,7 @@ void tipc_subscr_report_overlap(struct subscription *sub,  	if (!must && !(sub->filter & TIPC_SUB_PORTS))  		return; -	sub->event_cb(sub, found_lower, found_upper, event, port_ref, node); +	subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);  }  /** @@ -365,7 +365,6 @@ static struct subscription *subscr_subscribe(struct tipc_subscr *s,  		subscr_terminate(subscriber);  		return NULL;  	} -	sub->event_cb = subscr_send_event;  	INIT_LIST_HEAD(&sub->nameseq_list);  	list_add(&sub->subscription_list, &subscriber->subscription_list);  	sub->server_ref = subscriber->port_ref; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 45d89bf4d20..4b06ef6f840 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -39,16 +39,11 @@  struct subscription; -typedef void (*tipc_subscr_event) (struct subscription *sub, -				   u32 found_lower, u32 found_upper, -				   u32 event, u32 port_ref, u32 node); -  /**   * struct subscription - TIPC network topology subscription object   * @seq: name sequence associated with subscription   * @timeout: duration of subscription (in ms)   * @filter: event filtering to be done for subscription - * @event_cb: routine invoked when a subscription event is detected   * @timer: timer governing subscription duration (optional)   * @nameseq_list: adjacent subscriptions in name sequence's subscription list   * @subscription_list: adjacent subscriptions in subscriber's subscription list @@ -61,7 +56,6 @@ struct subscription {  	struct tipc_name_seq seq;  	u32 timeout;  	u32 filter; -	tipc_subscr_event event_cb;  	struct timer_list timer;  	struct list_head nameseq_list;  	struct list_head subscription_list;  |