diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2010-10-23 11:47:02 -0700 | 
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2010-10-23 11:47:02 -0700 | 
| commit | 5f05647dd81c11a6a165ccc8f0c1370b16f3bcb0 (patch) | |
| tree | 7851ef1c93aa1aba7ef327ca4b75fd35e6d10f29 /net/rds/ib_send.c | |
| parent | 02f36038c568111ad4fc433f6fa760ff5e38fab4 (diff) | |
| parent | ec37a48d1d16c30b655ac5280209edf52a6775d4 (diff) | |
| download | olio-linux-3.10-5f05647dd81c11a6a165ccc8f0c1370b16f3bcb0.tar.xz olio-linux-3.10-5f05647dd81c11a6a165ccc8f0c1370b16f3bcb0.zip  | |
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next-2.6
* git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next-2.6: (1699 commits)
  bnx2/bnx2x: Unsupported Ethtool operations should return -EINVAL.
  vlan: Calling vlan_hwaccel_do_receive() is always valid.
  tproxy: use the interface primary IP address as a default value for --on-ip
  tproxy: added IPv6 support to the socket match
  cxgb3: function namespace cleanup
  tproxy: added IPv6 support to the TPROXY target
  tproxy: added IPv6 socket lookup function to nf_tproxy_core
  be2net: Changes to use only priority codes allowed by f/w
  tproxy: allow non-local binds of IPv6 sockets if IP_TRANSPARENT is enabled
  tproxy: added tproxy sockopt interface in the IPV6 layer
  tproxy: added udp6_lib_lookup function
  tproxy: added const specifiers to udp lookup functions
  tproxy: split off ipv6 defragmentation to a separate module
  l2tp: small cleanup
  nf_nat: restrict ICMP translation for embedded header
  can: mcp251x: fix generation of error frames
  can: mcp251x: fix endless loop in interrupt handler if CANINTF_MERRF is set
  can-raw: add msg_flags to distinguish local traffic
  9p: client code cleanup
  rds: make local functions/variables static
  ...
Fix up conflicts in net/core/dev.c, drivers/net/pcmcia/smc91c92_cs.c and
drivers/net/wireless/ath/ath9k/debug.c as per David
Diffstat (limited to 'net/rds/ib_send.c')
| -rw-r--r-- | net/rds/ib_send.c | 674 | 
1 files changed, 404 insertions, 270 deletions
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 17fa80803ab..71f373c421b 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -36,11 +36,49 @@  #include <linux/dmapool.h>  #include "rds.h" -#include "rdma.h"  #include "ib.h" -static void rds_ib_send_rdma_complete(struct rds_message *rm, -				      int wc_status) +static char *rds_ib_wc_status_strings[] = { +#define RDS_IB_WC_STATUS_STR(foo) \ +		[IB_WC_##foo] = __stringify(IB_WC_##foo) +	RDS_IB_WC_STATUS_STR(SUCCESS), +	RDS_IB_WC_STATUS_STR(LOC_LEN_ERR), +	RDS_IB_WC_STATUS_STR(LOC_QP_OP_ERR), +	RDS_IB_WC_STATUS_STR(LOC_EEC_OP_ERR), +	RDS_IB_WC_STATUS_STR(LOC_PROT_ERR), +	RDS_IB_WC_STATUS_STR(WR_FLUSH_ERR), +	RDS_IB_WC_STATUS_STR(MW_BIND_ERR), +	RDS_IB_WC_STATUS_STR(BAD_RESP_ERR), +	RDS_IB_WC_STATUS_STR(LOC_ACCESS_ERR), +	RDS_IB_WC_STATUS_STR(REM_INV_REQ_ERR), +	RDS_IB_WC_STATUS_STR(REM_ACCESS_ERR), +	RDS_IB_WC_STATUS_STR(REM_OP_ERR), +	RDS_IB_WC_STATUS_STR(RETRY_EXC_ERR), +	RDS_IB_WC_STATUS_STR(RNR_RETRY_EXC_ERR), +	RDS_IB_WC_STATUS_STR(LOC_RDD_VIOL_ERR), +	RDS_IB_WC_STATUS_STR(REM_INV_RD_REQ_ERR), +	RDS_IB_WC_STATUS_STR(REM_ABORT_ERR), +	RDS_IB_WC_STATUS_STR(INV_EECN_ERR), +	RDS_IB_WC_STATUS_STR(INV_EEC_STATE_ERR), +	RDS_IB_WC_STATUS_STR(FATAL_ERR), +	RDS_IB_WC_STATUS_STR(RESP_TIMEOUT_ERR), +	RDS_IB_WC_STATUS_STR(GENERAL_ERR), +#undef RDS_IB_WC_STATUS_STR +}; + +char *rds_ib_wc_status_str(enum ib_wc_status status) +{ +	return rds_str_array(rds_ib_wc_status_strings, +			     ARRAY_SIZE(rds_ib_wc_status_strings), status); +} + +/* + * Convert IB-specific error message to RDS error message and call core + * completion handler. + */ +static void rds_ib_send_complete(struct rds_message *rm, +				 int wc_status, +				 void (*complete)(struct rds_message *rm, int status))  {  	int notify_status; @@ -60,69 +98,125 @@ static void rds_ib_send_rdma_complete(struct rds_message *rm,  		notify_status = RDS_RDMA_OTHER_ERROR;  		break;  	} -	rds_rdma_send_complete(rm, notify_status); +	complete(rm, notify_status); +} + +static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, +				   struct rm_data_op *op, +				   int wc_status) +{ +	if (op->op_nents) +		ib_dma_unmap_sg(ic->i_cm_id->device, +				op->op_sg, op->op_nents, +				DMA_TO_DEVICE);  }  static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, -				   struct rds_rdma_op *op) +				   struct rm_rdma_op *op, +				   int wc_status)  { -	if (op->r_mapped) { +	if (op->op_mapped) {  		ib_dma_unmap_sg(ic->i_cm_id->device, -			op->r_sg, op->r_nents, -			op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); -		op->r_mapped = 0; +				op->op_sg, op->op_nents, +				op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); +		op->op_mapped = 0;  	} + +	/* If the user asked for a completion notification on this +	 * message, we can implement three different semantics: +	 *  1.	Notify when we received the ACK on the RDS message +	 *	that was queued with the RDMA. This provides reliable +	 *	notification of RDMA status at the expense of a one-way +	 *	packet delay. +	 *  2.	Notify when the IB stack gives us the completion event for +	 *	the RDMA operation. +	 *  3.	Notify when the IB stack gives us the completion event for +	 *	the accompanying RDS messages. +	 * Here, we implement approach #3. To implement approach #2, +	 * we would need to take an event for the rdma WR. To implement #1, +	 * don't call rds_rdma_send_complete at all, and fall back to the notify +	 * handling in the ACK processing code. +	 * +	 * Note: There's no need to explicitly sync any RDMA buffers using +	 * ib_dma_sync_sg_for_cpu - the completion for the RDMA +	 * operation itself unmapped the RDMA buffers, which takes care +	 * of synching. +	 */ +	rds_ib_send_complete(container_of(op, struct rds_message, rdma), +			     wc_status, rds_rdma_send_complete); + +	if (op->op_write) +		rds_stats_add(s_send_rdma_bytes, op->op_bytes); +	else +		rds_stats_add(s_recv_rdma_bytes, op->op_bytes);  } -static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, -			  struct rds_ib_send_work *send, -			  int wc_status) +static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic, +				     struct rm_atomic_op *op, +				     int wc_status)  { -	struct rds_message *rm = send->s_rm; - -	rdsdebug("ic %p send %p rm %p\n", ic, send, rm); +	/* unmap atomic recvbuf */ +	if (op->op_mapped) { +		ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1, +				DMA_FROM_DEVICE); +		op->op_mapped = 0; +	} -	ib_dma_unmap_sg(ic->i_cm_id->device, -		     rm->m_sg, rm->m_nents, -		     DMA_TO_DEVICE); +	rds_ib_send_complete(container_of(op, struct rds_message, atomic), +			     wc_status, rds_atomic_send_complete); -	if (rm->m_rdma_op != NULL) { -		rds_ib_send_unmap_rdma(ic, rm->m_rdma_op); +	if (op->op_type == RDS_ATOMIC_TYPE_CSWP) +		rds_ib_stats_inc(s_ib_atomic_cswp); +	else +		rds_ib_stats_inc(s_ib_atomic_fadd); +} -		/* If the user asked for a completion notification on this -		 * message, we can implement three different semantics: -		 *  1.	Notify when we received the ACK on the RDS message -		 *	that was queued with the RDMA. This provides reliable -		 *	notification of RDMA status at the expense of a one-way -		 *	packet delay. -		 *  2.	Notify when the IB stack gives us the completion event for -		 *	the RDMA operation. -		 *  3.	Notify when the IB stack gives us the completion event for -		 *	the accompanying RDS messages. -		 * Here, we implement approach #3. To implement approach #2, -		 * call rds_rdma_send_complete from the cq_handler. To implement #1, -		 * don't call rds_rdma_send_complete at all, and fall back to the notify -		 * handling in the ACK processing code. -		 * -		 * Note: There's no need to explicitly sync any RDMA buffers using -		 * ib_dma_sync_sg_for_cpu - the completion for the RDMA -		 * operation itself unmapped the RDMA buffers, which takes care -		 * of synching. -		 */ -		rds_ib_send_rdma_complete(rm, wc_status); +/* + * Unmap the resources associated with a struct send_work. + * + * Returns the rm for no good reason other than it is unobtainable + * other than by switching on wr.opcode, currently, and the caller, + * the event handler, needs it. + */ +static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic, +						struct rds_ib_send_work *send, +						int wc_status) +{ +	struct rds_message *rm = NULL; -		if (rm->m_rdma_op->r_write) -			rds_stats_add(s_send_rdma_bytes, rm->m_rdma_op->r_bytes); -		else -			rds_stats_add(s_recv_rdma_bytes, rm->m_rdma_op->r_bytes); +	/* In the error case, wc.opcode sometimes contains garbage */ +	switch (send->s_wr.opcode) { +	case IB_WR_SEND: +		if (send->s_op) { +			rm = container_of(send->s_op, struct rds_message, data); +			rds_ib_send_unmap_data(ic, send->s_op, wc_status); +		} +		break; +	case IB_WR_RDMA_WRITE: +	case IB_WR_RDMA_READ: +		if (send->s_op) { +			rm = container_of(send->s_op, struct rds_message, rdma); +			rds_ib_send_unmap_rdma(ic, send->s_op, wc_status); +		} +		break; +	case IB_WR_ATOMIC_FETCH_AND_ADD: +	case IB_WR_ATOMIC_CMP_AND_SWP: +		if (send->s_op) { +			rm = container_of(send->s_op, struct rds_message, atomic); +			rds_ib_send_unmap_atomic(ic, send->s_op, wc_status); +		} +		break; +	default: +		if (printk_ratelimit()) +			printk(KERN_NOTICE +			       "RDS/IB: %s: unexpected opcode 0x%x in WR!\n", +			       __func__, send->s_wr.opcode); +		break;  	} -	/* If anyone waited for this message to get flushed out, wake -	 * them up now */ -	rds_message_unmapped(rm); +	send->s_wr.opcode = 0xdead; -	rds_message_put(rm); -	send->s_rm = NULL; +	return rm;  }  void rds_ib_send_init_ring(struct rds_ib_connection *ic) @@ -133,23 +227,18 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)  	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {  		struct ib_sge *sge; -		send->s_rm = NULL;  		send->s_op = NULL;  		send->s_wr.wr_id = i;  		send->s_wr.sg_list = send->s_sge; -		send->s_wr.num_sge = 1; -		send->s_wr.opcode = IB_WR_SEND; -		send->s_wr.send_flags = 0;  		send->s_wr.ex.imm_data = 0; -		sge = rds_ib_data_sge(ic, send->s_sge); -		sge->lkey = ic->i_mr->lkey; - -		sge = rds_ib_header_sge(ic, send->s_sge); +		sge = &send->s_sge[0];  		sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));  		sge->length = sizeof(struct rds_header);  		sge->lkey = ic->i_mr->lkey; + +		send->s_sge[1].lkey = ic->i_mr->lkey;  	}  } @@ -159,16 +248,24 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)  	u32 i;  	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { -		if (send->s_wr.opcode == 0xdead) -			continue; -		if (send->s_rm) -			rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); -		if (send->s_op) -			rds_ib_send_unmap_rdma(ic, send->s_op); +		if (send->s_op && send->s_wr.opcode != 0xdead) +			rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);  	}  }  /* + * The only fast path caller always has a non-zero nr, so we don't + * bother testing nr before performing the atomic sub. + */ +static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) +{ +	if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) && +	    waitqueue_active(&rds_ib_ring_empty_wait)) +		wake_up(&rds_ib_ring_empty_wait); +	BUG_ON(atomic_read(&ic->i_signaled_sends) < 0); +} + +/*   * The _oldest/_free ring operations here race cleanly with the alloc/unalloc   * operations performed in the send path.  As the sender allocs and potentially   * unallocs the next free entry in the ring it doesn't alter which is @@ -178,12 +275,14 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)  {  	struct rds_connection *conn = context;  	struct rds_ib_connection *ic = conn->c_transport_data; +	struct rds_message *rm = NULL;  	struct ib_wc wc;  	struct rds_ib_send_work *send;  	u32 completed;  	u32 oldest;  	u32 i = 0;  	int ret; +	int nr_sig = 0;  	rdsdebug("cq %p conn %p\n", cq, conn);  	rds_ib_stats_inc(s_ib_tx_cq_call); @@ -192,8 +291,9 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)  		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);  	while (ib_poll_cq(cq, 1, &wc) > 0) { -		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", -			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len, +		rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", +			 (unsigned long long)wc.wr_id, wc.status, +			 rds_ib_wc_status_str(wc.status), wc.byte_len,  			 be32_to_cpu(wc.ex.imm_data));  		rds_ib_stats_inc(s_ib_tx_cq_event); @@ -210,51 +310,30 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)  		for (i = 0; i < completed; i++) {  			send = &ic->i_sends[oldest]; +			if (send->s_wr.send_flags & IB_SEND_SIGNALED) +				nr_sig++; -			/* In the error case, wc.opcode sometimes contains garbage */ -			switch (send->s_wr.opcode) { -			case IB_WR_SEND: -				if (send->s_rm) -					rds_ib_send_unmap_rm(ic, send, wc.status); -				break; -			case IB_WR_RDMA_WRITE: -			case IB_WR_RDMA_READ: -				/* Nothing to be done - the SG list will be unmapped -				 * when the SEND completes. */ -				break; -			default: -				if (printk_ratelimit()) -					printk(KERN_NOTICE -						"RDS/IB: %s: unexpected opcode 0x%x in WR!\n", -						__func__, send->s_wr.opcode); -				break; -			} +			rm = rds_ib_send_unmap_op(ic, send, wc.status); -			send->s_wr.opcode = 0xdead; -			send->s_wr.num_sge = 1;  			if (send->s_queued + HZ/2 < jiffies)  				rds_ib_stats_inc(s_ib_tx_stalled); -			/* If a RDMA operation produced an error, signal this right -			 * away. If we don't, the subsequent SEND that goes with this -			 * RDMA will be canceled with ERR_WFLUSH, and the application -			 * never learn that the RDMA failed. */ -			if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { -				struct rds_message *rm; - -				rm = rds_send_get_message(conn, send->s_op); -				if (rm) { -					if (rm->m_rdma_op) -						rds_ib_send_unmap_rdma(ic, rm->m_rdma_op); -					rds_ib_send_rdma_complete(rm, wc.status); -					rds_message_put(rm); +			if (send->s_op) { +				if (send->s_op == rm->m_final_op) { +					/* If anyone waited for this message to get flushed out, wake +					 * them up now */ +					rds_message_unmapped(rm);  				} +				rds_message_put(rm); +				send->s_op = NULL;  			}  			oldest = (oldest + 1) % ic->i_send_ring.w_nr;  		}  		rds_ib_ring_free(&ic->i_send_ring, completed); +		rds_ib_sub_signaled(ic, nr_sig); +		nr_sig = 0;  		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||  		    test_bit(0, &conn->c_map_queued)) @@ -262,10 +341,10 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)  		/* We expect errors as the qp is drained during shutdown */  		if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { -			rds_ib_conn_error(conn, -				"send completion on %pI4 " -				"had status %u, disconnecting and reconnecting\n", -				&conn->c_faddr, wc.status); +			rds_ib_conn_error(conn, "send completion on %pI4 had status " +					  "%u (%s), disconnecting and reconnecting\n", +					  &conn->c_faddr, wc.status, +					  rds_ib_wc_status_str(wc.status));  		}  	}  } @@ -294,7 +373,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)   * credits (see rds_ib_send_add_credits below).   *   * The RDS send code is essentially single-threaded; rds_send_xmit - * grabs c_send_lock to ensure exclusive access to the send ring. + * sets RDS_IN_XMIT to ensure exclusive access to the send ring.   * However, the ACK sending code is independent and can race with   * message SENDs.   * @@ -413,40 +492,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)  		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);  } -static inline void -rds_ib_xmit_populate_wr(struct rds_ib_connection *ic, -		struct rds_ib_send_work *send, unsigned int pos, -		unsigned long buffer, unsigned int length, -		int send_flags) +static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, +					     struct rds_ib_send_work *send, +					     bool notify)  { -	struct ib_sge *sge; - -	WARN_ON(pos != send - ic->i_sends); - -	send->s_wr.send_flags = send_flags; -	send->s_wr.opcode = IB_WR_SEND; -	send->s_wr.num_sge = 2; -	send->s_wr.next = NULL; -	send->s_queued = jiffies; -	send->s_op = NULL; - -	if (length != 0) { -		sge = rds_ib_data_sge(ic, send->s_sge); -		sge->addr = buffer; -		sge->length = length; -		sge->lkey = ic->i_mr->lkey; - -		sge = rds_ib_header_sge(ic, send->s_sge); -	} else { -		/* We're sending a packet with no payload. There is only -		 * one SGE */ -		send->s_wr.num_sge = 1; -		sge = &send->s_sge[0]; +	/* +	 * We want to delay signaling completions just enough to get +	 * the batching benefits but not so much that we create dead time +	 * on the wire. +	 */ +	if (ic->i_unsignaled_wrs-- == 0 || notify) { +		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; +		send->s_wr.send_flags |= IB_SEND_SIGNALED; +		return 1;  	} - -	sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header)); -	sge->length = sizeof(struct rds_header); -	sge->lkey = ic->i_mr->lkey; +	return 0;  }  /* @@ -475,13 +535,14 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,  	u32 pos;  	u32 i;  	u32 work_alloc; -	u32 credit_alloc; +	u32 credit_alloc = 0;  	u32 posted;  	u32 adv_credits = 0;  	int send_flags = 0; -	int sent; +	int bytes_sent = 0;  	int ret;  	int flow_controlled = 0; +	int nr_sig = 0;  	BUG_ON(off % RDS_FRAG_SIZE);  	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); @@ -507,14 +568,13 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,  		goto out;  	} -	credit_alloc = work_alloc;  	if (ic->i_flowctl) {  		credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);  		adv_credits += posted;  		if (credit_alloc < work_alloc) {  			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);  			work_alloc = credit_alloc; -			flow_controlled++; +			flow_controlled = 1;  		}  		if (work_alloc == 0) {  			set_bit(RDS_LL_SEND_FULL, &conn->c_flags); @@ -525,31 +585,25 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,  	}  	/* map the message the first time we see it */ -	if (ic->i_rm == NULL) { -		/* -		printk(KERN_NOTICE "rds_ib_xmit prep msg dport=%u flags=0x%x len=%d\n", -				be16_to_cpu(rm->m_inc.i_hdr.h_dport), -				rm->m_inc.i_hdr.h_flags, -				be32_to_cpu(rm->m_inc.i_hdr.h_len)); -		   */ -		if (rm->m_nents) { -			rm->m_count = ib_dma_map_sg(dev, -					 rm->m_sg, rm->m_nents, DMA_TO_DEVICE); -			rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->m_count); -			if (rm->m_count == 0) { +	if (!ic->i_data_op) { +		if (rm->data.op_nents) { +			rm->data.op_count = ib_dma_map_sg(dev, +							  rm->data.op_sg, +							  rm->data.op_nents, +							  DMA_TO_DEVICE); +			rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count); +			if (rm->data.op_count == 0) {  				rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);  				rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);  				ret = -ENOMEM; /* XXX ? */  				goto out;  			}  		} else { -			rm->m_count = 0; +			rm->data.op_count = 0;  		} -		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; -		ic->i_unsignaled_bytes = rds_ib_sysctl_max_unsig_bytes;  		rds_message_addref(rm); -		ic->i_rm = rm; +		ic->i_data_op = &rm->data;  		/* Finalize the header */  		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) @@ -559,10 +613,10 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,  		/* If it has a RDMA op, tell the peer we did it. This is  		 * used by the peer to release use-once RDMA MRs. */ -		if (rm->m_rdma_op) { +		if (rm->rdma.op_active) {  			struct rds_ext_header_rdma ext_hdr; -			ext_hdr.h_rdma_rkey = cpu_to_be32(rm->m_rdma_op->r_key); +			ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);  			rds_message_add_extension(&rm->m_inc.i_hdr,  					RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));  		} @@ -582,99 +636,77 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,  		/*  		 * Update adv_credits since we reset the ACK_REQUIRED bit.  		 */ -		rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits); -		adv_credits += posted; -		BUG_ON(adv_credits > 255); +		if (ic->i_flowctl) { +			rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits); +			adv_credits += posted; +			BUG_ON(adv_credits > 255); +		}  	} -	send = &ic->i_sends[pos]; -	first = send; -	prev = NULL; -	scat = &rm->m_sg[sg]; -	sent = 0; -	i = 0; -  	/* Sometimes you want to put a fence between an RDMA  	 * READ and the following SEND.  	 * We could either do this all the time  	 * or when requested by the user. Right now, we let  	 * the application choose.  	 */ -	if (rm->m_rdma_op && rm->m_rdma_op->r_fence) +	if (rm->rdma.op_active && rm->rdma.op_fence)  		send_flags = IB_SEND_FENCE; -	/* -	 * We could be copying the header into the unused tail of the page. -	 * That would need to be changed in the future when those pages might -	 * be mapped userspace pages or page cache pages.  So instead we always -	 * use a second sge and our long-lived ring of mapped headers.  We send -	 * the header after the data so that the data payload can be aligned on -	 * the receiver. -	 */ +	/* Each frag gets a header. Msgs may be 0 bytes */ +	send = &ic->i_sends[pos]; +	first = send; +	prev = NULL; +	scat = &ic->i_data_op->op_sg[sg]; +	i = 0; +	do { +		unsigned int len = 0; -	/* handle a 0-len message */ -	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) { -		rds_ib_xmit_populate_wr(ic, send, pos, 0, 0, send_flags); -		goto add_header; -	} +		/* Set up the header */ +		send->s_wr.send_flags = send_flags; +		send->s_wr.opcode = IB_WR_SEND; +		send->s_wr.num_sge = 1; +		send->s_wr.next = NULL; +		send->s_queued = jiffies; +		send->s_op = NULL; -	/* if there's data reference it with a chain of work reqs */ -	for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) { -		unsigned int len; +		send->s_sge[0].addr = ic->i_send_hdrs_dma +			+ (pos * sizeof(struct rds_header)); +		send->s_sge[0].length = sizeof(struct rds_header); -		send = &ic->i_sends[pos]; +		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); -		len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); -		rds_ib_xmit_populate_wr(ic, send, pos, -				ib_sg_dma_address(dev, scat) + off, len, -				send_flags); +		/* Set up the data, if present */ +		if (i < work_alloc +		    && scat != &rm->data.op_sg[rm->data.op_count]) { +			len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); +			send->s_wr.num_sge = 2; -		/* -		 * We want to delay signaling completions just enough to get -		 * the batching benefits but not so much that we create dead time -		 * on the wire. -		 */ -		if (ic->i_unsignaled_wrs-- == 0) { -			ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; -			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; -		} +			send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off; +			send->s_sge[1].length = len; -		ic->i_unsignaled_bytes -= len; -		if (ic->i_unsignaled_bytes <= 0) { -			ic->i_unsignaled_bytes = rds_ib_sysctl_max_unsig_bytes; -			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; +			bytes_sent += len; +			off += len; +			if (off == ib_sg_dma_len(dev, scat)) { +				scat++; +				off = 0; +			}  		} +		rds_ib_set_wr_signal_state(ic, send, 0); +  		/*  		 * Always signal the last one if we're stopping due to flow control.  		 */ -		if (flow_controlled && i == (work_alloc-1)) +		if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))  			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; +		if (send->s_wr.send_flags & IB_SEND_SIGNALED) +			nr_sig++; +  		rdsdebug("send %p wr %p num_sge %u next %p\n", send,  			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); -		sent += len; -		off += len; -		if (off == ib_sg_dma_len(dev, scat)) { -			scat++; -			off = 0; -		} - -add_header: -		/* Tack on the header after the data. The header SGE should already -		 * have been set up to point to the right header buffer. */ -		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); - -		if (0) { -			struct rds_header *hdr = &ic->i_send_hdrs[pos]; - -			printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n", -				be16_to_cpu(hdr->h_dport), -				hdr->h_flags, -				be32_to_cpu(hdr->h_len)); -		} -		if (adv_credits) { +		if (ic->i_flowctl && adv_credits) {  			struct rds_header *hdr = &ic->i_send_hdrs[pos];  			/* add credit and redo the header checksum */ @@ -689,20 +721,25 @@ add_header:  		prev = send;  		pos = (pos + 1) % ic->i_send_ring.w_nr; -	} +		send = &ic->i_sends[pos]; +		i++; + +	} while (i < work_alloc +		 && scat != &rm->data.op_sg[rm->data.op_count]);  	/* Account the RDS header in the number of bytes we sent, but just once.  	 * The caller has no concept of fragmentation. */  	if (hdr_off == 0) -		sent += sizeof(struct rds_header); +		bytes_sent += sizeof(struct rds_header);  	/* if we finished the message then send completion owns it */ -	if (scat == &rm->m_sg[rm->m_count]) { -		prev->s_rm = ic->i_rm; -		prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; -		ic->i_rm = NULL; +	if (scat == &rm->data.op_sg[rm->data.op_count]) { +		prev->s_op = ic->i_data_op; +		prev->s_wr.send_flags |= IB_SEND_SOLICITED; +		ic->i_data_op = NULL;  	} +	/* Put back wrs & credits we didn't use */  	if (i < work_alloc) {  		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);  		work_alloc = i; @@ -710,6 +747,9 @@ add_header:  	if (ic->i_flowctl && i < credit_alloc)  		rds_ib_send_add_credits(conn, credit_alloc - i); +	if (nr_sig) +		atomic_add(nr_sig, &ic->i_signaled_sends); +  	/* XXX need to worry about failed_wr and partial sends. */  	failed_wr = &first->s_wr;  	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); @@ -720,32 +760,127 @@ add_header:  		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "  		       "returned %d\n", &conn->c_faddr, ret);  		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); -		if (prev->s_rm) { -			ic->i_rm = prev->s_rm; -			prev->s_rm = NULL; +		rds_ib_sub_signaled(ic, nr_sig); +		if (prev->s_op) { +			ic->i_data_op = prev->s_op; +			prev->s_op = NULL;  		}  		rds_ib_conn_error(ic->conn, "ib_post_send failed\n");  		goto out;  	} -	ret = sent; +	ret = bytes_sent;  out:  	BUG_ON(adv_credits);  	return ret;  } -int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) +/* + * Issue atomic operation. + * A simplified version of the rdma case, we always map 1 SG, and + * only 8 bytes, for the return value from the atomic operation. + */ +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) +{ +	struct rds_ib_connection *ic = conn->c_transport_data; +	struct rds_ib_send_work *send = NULL; +	struct ib_send_wr *failed_wr; +	struct rds_ib_device *rds_ibdev; +	u32 pos; +	u32 work_alloc; +	int ret; +	int nr_sig = 0; + +	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); + +	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos); +	if (work_alloc != 1) { +		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); +		rds_ib_stats_inc(s_ib_tx_ring_full); +		ret = -ENOMEM; +		goto out; +	} + +	/* address of send request in ring */ +	send = &ic->i_sends[pos]; +	send->s_queued = jiffies; + +	if (op->op_type == RDS_ATOMIC_TYPE_CSWP) { +		send->s_wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP; +		send->s_wr.wr.atomic.compare_add = op->op_m_cswp.compare; +		send->s_wr.wr.atomic.swap = op->op_m_cswp.swap; +		send->s_wr.wr.atomic.compare_add_mask = op->op_m_cswp.compare_mask; +		send->s_wr.wr.atomic.swap_mask = op->op_m_cswp.swap_mask; +	} else { /* FADD */ +		send->s_wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD; +		send->s_wr.wr.atomic.compare_add = op->op_m_fadd.add; +		send->s_wr.wr.atomic.swap = 0; +		send->s_wr.wr.atomic.compare_add_mask = op->op_m_fadd.nocarry_mask; +		send->s_wr.wr.atomic.swap_mask = 0; +	} +	nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify); +	send->s_wr.num_sge = 1; +	send->s_wr.next = NULL; +	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; +	send->s_wr.wr.atomic.rkey = op->op_rkey; +	send->s_op = op; +	rds_message_addref(container_of(send->s_op, struct rds_message, atomic)); + +	/* map 8 byte retval buffer to the device */ +	ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); +	rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); +	if (ret != 1) { +		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); +		rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); +		ret = -ENOMEM; /* XXX ? */ +		goto out; +	} + +	/* Convert our struct scatterlist to struct ib_sge */ +	send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg); +	send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg); +	send->s_sge[0].lkey = ic->i_mr->lkey; + +	rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, +		 send->s_sge[0].addr, send->s_sge[0].length); + +	if (nr_sig) +		atomic_add(nr_sig, &ic->i_signaled_sends); + +	failed_wr = &send->s_wr; +	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); +	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, +		 send, &send->s_wr, ret, failed_wr); +	BUG_ON(failed_wr != &send->s_wr); +	if (ret) { +		printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " +		       "returned %d\n", &conn->c_faddr, ret); +		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); +		rds_ib_sub_signaled(ic, nr_sig); +		goto out; +	} + +	if (unlikely(failed_wr != &send->s_wr)) { +		printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret); +		BUG_ON(failed_wr != &send->s_wr); +	} + +out: +	return ret; +} + +int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)  {  	struct rds_ib_connection *ic = conn->c_transport_data;  	struct rds_ib_send_work *send = NULL;  	struct rds_ib_send_work *first;  	struct rds_ib_send_work *prev;  	struct ib_send_wr *failed_wr; -	struct rds_ib_device *rds_ibdev;  	struct scatterlist *scat;  	unsigned long len; -	u64 remote_addr = op->r_remote_addr; +	u64 remote_addr = op->op_remote_addr; +	u32 max_sge = ic->rds_ibdev->max_sge;  	u32 pos;  	u32 work_alloc;  	u32 i; @@ -753,29 +888,28 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)  	int sent;  	int ret;  	int num_sge; +	int nr_sig = 0; -	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); - -	/* map the message the first time we see it */ -	if (!op->r_mapped) { -		op->r_count = ib_dma_map_sg(ic->i_cm_id->device, -					op->r_sg, op->r_nents, (op->r_write) ? -					DMA_TO_DEVICE : DMA_FROM_DEVICE); -		rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count); -		if (op->r_count == 0) { +	/* map the op the first time we see it */ +	if (!op->op_mapped) { +		op->op_count = ib_dma_map_sg(ic->i_cm_id->device, +					     op->op_sg, op->op_nents, (op->op_write) ? +					     DMA_TO_DEVICE : DMA_FROM_DEVICE); +		rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count); +		if (op->op_count == 0) {  			rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);  			ret = -ENOMEM; /* XXX ? */  			goto out;  		} -		op->r_mapped = 1; +		op->op_mapped = 1;  	}  	/*  	 * Instead of knowing how to return a partial rdma read/write we insist that there  	 * be enough work requests to send the entire message.  	 */ -	i = ceil(op->r_count, rds_ibdev->max_sge); +	i = ceil(op->op_count, max_sge);  	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);  	if (work_alloc != i) { @@ -788,30 +922,24 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)  	send = &ic->i_sends[pos];  	first = send;  	prev = NULL; -	scat = &op->r_sg[0]; +	scat = &op->op_sg[0];  	sent = 0; -	num_sge = op->r_count; +	num_sge = op->op_count; -	for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) { +	for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {  		send->s_wr.send_flags = 0;  		send->s_queued = jiffies; -		/* -		 * We want to delay signaling completions just enough to get -		 * the batching benefits but not so much that we create dead time on the wire. -		 */ -		if (ic->i_unsignaled_wrs-- == 0) { -			ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; -			send->s_wr.send_flags = IB_SEND_SIGNALED; -		} +		send->s_op = NULL; + +		nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); -		send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; +		send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;  		send->s_wr.wr.rdma.remote_addr = remote_addr; -		send->s_wr.wr.rdma.rkey = op->r_key; -		send->s_op = op; +		send->s_wr.wr.rdma.rkey = op->op_rkey; -		if (num_sge > rds_ibdev->max_sge) { -			send->s_wr.num_sge = rds_ibdev->max_sge; -			num_sge -= rds_ibdev->max_sge; +		if (num_sge > max_sge) { +			send->s_wr.num_sge = max_sge; +			num_sge -= max_sge;  		} else {  			send->s_wr.num_sge = num_sge;  		} @@ -821,7 +949,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)  		if (prev)  			prev->s_wr.next = &send->s_wr; -		for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) { +		for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {  			len = ib_sg_dma_len(ic->i_cm_id->device, scat);  			send->s_sge[j].addr =  				 ib_sg_dma_address(ic->i_cm_id->device, scat); @@ -843,15 +971,20 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)  			send = ic->i_sends;  	} -	/* if we finished the message then send completion owns it */ -	if (scat == &op->r_sg[op->r_count]) -		prev->s_wr.send_flags = IB_SEND_SIGNALED; +	/* give a reference to the last op */ +	if (scat == &op->op_sg[op->op_count]) { +		prev->s_op = op; +		rds_message_addref(container_of(op, struct rds_message, rdma)); +	}  	if (i < work_alloc) {  		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);  		work_alloc = i;  	} +	if (nr_sig) +		atomic_add(nr_sig, &ic->i_signaled_sends); +  	failed_wr = &first->s_wr;  	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);  	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, @@ -861,6 +994,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)  		printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "  		       "returned %d\n", &conn->c_faddr, ret);  		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); +		rds_ib_sub_signaled(ic, nr_sig);  		goto out;  	}  |