Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 8bf19e1..8ed0377 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,8 +1,7 @@
 # SPDX-License-Identifier: GPL-2.0
 obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
 
-rpcrdma-y := transport.o rpc_rdma.o verbs.o \
-	fmr_ops.o frwr_ops.o \
+rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
 	module.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 90adeff..b458bf5 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,7 +5,6 @@
  * Support for backward direction RPCs on RPC/RDMA.
  */
 
-#include <linux/module.h>
 #include <linux/sunrpc/xprt.h>
 #include <linux/sunrpc/svc.h>
 #include <linux/sunrpc/svc_xprt.h>
@@ -20,59 +19,6 @@
 
 #undef RPCRDMA_BACKCHANNEL_DEBUG
 
-static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
-				 struct rpc_rqst *rqst)
-{
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-
-	spin_lock(&buf->rb_reqslock);
-	list_del(&req->rl_all);
-	spin_unlock(&buf->rb_reqslock);
-
-	rpcrdma_destroy_req(req);
-}
-
-static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
-				 unsigned int count)
-{
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-	struct rpc_rqst *rqst;
-	unsigned int i;
-
-	for (i = 0; i < (count << 1); i++) {
-		struct rpcrdma_regbuf *rb;
-		struct rpcrdma_req *req;
-		size_t size;
-
-		req = rpcrdma_create_req(r_xprt);
-		if (IS_ERR(req))
-			return PTR_ERR(req);
-		rqst = &req->rl_slot;
-
-		rqst->rq_xprt = xprt;
-		INIT_LIST_HEAD(&rqst->rq_list);
-		INIT_LIST_HEAD(&rqst->rq_bc_list);
-		__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
-		spin_lock_bh(&xprt->bc_pa_lock);
-		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
-		spin_unlock_bh(&xprt->bc_pa_lock);
-
-		size = r_xprt->rx_data.inline_rsize;
-		rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
-		if (IS_ERR(rb))
-			goto out_fail;
-		req->rl_sendbuf = rb;
-		xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
-			     min_t(size_t, size, PAGE_SIZE));
-	}
-	return 0;
-
-out_fail:
-	rpcrdma_bc_free_rqst(r_xprt, rqst);
-	return -ENOMEM;
-}
-
 /**
  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
  * @xprt: transport associated with these backchannel resources
@@ -83,55 +29,10 @@
 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-	int rc;
 
-	/* The backchannel reply path returns each rpc_rqst to the
-	 * bc_pa_list _after_ the reply is sent. If the server is
-	 * faster than the client, it can send another backward
-	 * direction request before the rpc_rqst is returned to the
-	 * list. The client rejects the request in this case.
-	 *
-	 * Twice as many rpc_rqsts are prepared to ensure there is
-	 * always an rpc_rqst available as soon as a reply is sent.
-	 */
-	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
-		goto out_err;
-
-	rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
-	if (rc)
-		goto out_free;
-
-	r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
-	request_module("svcrdma");
+	r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
 	trace_xprtrdma_cb_setup(r_xprt, reqs);
 	return 0;
-
-out_free:
-	xprt_rdma_bc_destroy(xprt, reqs);
-
-out_err:
-	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
-	return -ENOMEM;
-}
-
-/**
- * xprt_rdma_bc_up - Create transport endpoint for backchannel service
- * @serv: server endpoint
- * @net: network namespace
- *
- * The "xprt" is an implied argument: it supplies the name of the
- * backchannel transport class.
- *
- * Returns zero on success, negative errno on failure
- */
-int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
-{
-	int ret;
-
-	ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
-	if (ret < 0)
-		return ret;
-	return 0;
 }
 
 /**
@@ -143,14 +44,19 @@
 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	size_t maxmsg;
 
-	maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+	maxmsg = min_t(unsigned int, ep->rep_inline_send, ep->rep_inline_recv);
 	maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
 	return maxmsg - RPCRDMA_HDRLEN_MIN;
 }
 
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
+{
+	return RPCRDMA_BACKWARD_WRS >> 1;
+}
+
 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
@@ -159,7 +65,7 @@
 
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 	xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+			rdmab_data(req->rl_rdmabuf), rqst);
 
 	p = xdr_reserve_space(&req->rl_stream, 28);
 	if (unlikely(!p))
@@ -194,18 +100,21 @@
  */
 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
 {
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	int rc;
 
-	if (!xprt_connected(rqst->rq_xprt))
-		goto drop_connection;
+	if (!xprt_connected(xprt))
+		return -ENOTCONN;
+
+	if (!xprt_request_get_cong(xprt, rqst))
+		return -EBADSLT;
 
 	rc = rpcrdma_bc_marshal_reply(rqst);
 	if (rc < 0)
 		goto failed_marshal;
 
-	rpcrdma_post_recvs(r_xprt, true);
 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 		goto drop_connection;
 	return 0;
@@ -214,7 +123,7 @@
 	if (rc != -ENOTCONN)
 		return rc;
 drop_connection:
-	xprt_disconnect_done(rqst->rq_xprt);
+	xprt_rdma_close(xprt);
 	return -ENOTCONN;
 }
 
@@ -225,19 +134,18 @@
  */
 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
 {
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpc_rqst *rqst, *tmp;
 
-	spin_lock_bh(&xprt->bc_pa_lock);
+	spin_lock(&xprt->bc_pa_lock);
 	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 		list_del(&rqst->rq_bc_pa_list);
-		spin_unlock_bh(&xprt->bc_pa_lock);
+		spin_unlock(&xprt->bc_pa_lock);
 
-		rpcrdma_bc_free_rqst(r_xprt, rqst);
+		rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
 
-		spin_lock_bh(&xprt->bc_pa_lock);
+		spin_lock(&xprt->bc_pa_lock);
 	}
-	spin_unlock_bh(&xprt->bc_pa_lock);
+	spin_unlock(&xprt->bc_pa_lock);
 }
 
 /**
@@ -249,15 +157,50 @@
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 
-	dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
-		__func__, rqst, req);
-
 	rpcrdma_recv_buffer_put(req->rl_reply);
 	req->rl_reply = NULL;
 
-	spin_lock_bh(&xprt->bc_pa_lock);
+	spin_lock(&xprt->bc_pa_lock);
 	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
-	spin_unlock_bh(&xprt->bc_pa_lock);
+	spin_unlock(&xprt->bc_pa_lock);
+	xprt_put(xprt);
+}
+
+static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+	struct rpcrdma_req *req;
+	struct rpc_rqst *rqst;
+	size_t size;
+
+	spin_lock(&xprt->bc_pa_lock);
+	rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
+					rq_bc_pa_list);
+	if (!rqst)
+		goto create_req;
+	list_del(&rqst->rq_bc_pa_list);
+	spin_unlock(&xprt->bc_pa_lock);
+	return rqst;
+
+create_req:
+	spin_unlock(&xprt->bc_pa_lock);
+
+	/* Set a limit to prevent a remote from overrunning our resources.
+	 */
+	if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
+		return NULL;
+
+	size = min_t(size_t, r_xprt->rx_ep.rep_inline_recv, PAGE_SIZE);
+	req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
+	if (!req)
+		return NULL;
+
+	xprt->bc_alloc_count++;
+	rqst = &req->rl_slot;
+	rqst->rq_xprt = xprt;
+	__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+	xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+	return rqst;
 }
 
 /**
@@ -291,20 +234,11 @@
 	pr_info("RPC:       %s: %*ph\n", __func__, size, p);
 #endif
 
-	/* Grab a free bc rqst */
-	spin_lock(&xprt->bc_pa_lock);
-	if (list_empty(&xprt->bc_pa_list)) {
-		spin_unlock(&xprt->bc_pa_lock);
+	rqst = rpcrdma_bc_rqst_get(r_xprt);
+	if (!rqst)
 		goto out_overflow;
-	}
-	rqst = list_first_entry(&xprt->bc_pa_list,
-				struct rpc_rqst, rq_bc_pa_list);
-	list_del(&rqst->rq_bc_pa_list);
-	spin_unlock(&xprt->bc_pa_lock);
 
-	/* Prepare rqst */
 	rqst->rq_reply_bytes_recvd = 0;
-	rqst->rq_bytes_sent = 0;
 	rqst->rq_xid = *p;
 
 	rqst->rq_private_buf.len = size;
@@ -326,6 +260,7 @@
 
 	/* Queue rqst for ULP's callback service */
 	bc_serv = xprt->bc_serv;
+	xprt_get(xprt);
 	spin_lock(&bc_serv->sv_cb_lock);
 	list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
 	spin_unlock(&bc_serv->sv_cb_lock);
@@ -337,7 +272,7 @@
 
 out_overflow:
 	pr_warn("RPC/RDMA backchannel overflow\n");
-	xprt_disconnect_done(xprt);
+	xprt_force_disconnect(xprt);
 	/* This receive buffer gets reposted automatically
 	 * when the connection is re-established.
 	 */
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
deleted file mode 100644
index 0f7c465..0000000
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ /dev/null
@@ -1,348 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
- * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
- */
-
-/* Lightweight memory registration using Fast Memory Regions (FMR).
- * Referred to sometimes as MTHCAFMR mode.
- *
- * FMR uses synchronous memory registration and deregistration.
- * FMR registration is known to be fast, but FMR deregistration
- * can take tens of usecs to complete.
- */
-
-/* Normal operation
- *
- * A Memory Region is prepared for RDMA READ or WRITE using the
- * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
- * finished, the Memory Region is unmapped using the ib_unmap_fmr
- * verb (fmr_op_unmap).
- */
-
-#include <linux/sunrpc/svc_rdma.h>
-
-#include "xprt_rdma.h"
-#include <trace/events/rpcrdma.h>
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY	RPCDBG_TRANS
-#endif
-
-/* Maximum scatter/gather per FMR */
-#define RPCRDMA_MAX_FMR_SGES	(64)
-
-/* Access mode of externally registered pages */
-enum {
-	RPCRDMA_FMR_ACCESS_FLAGS	= IB_ACCESS_REMOTE_WRITE |
-					  IB_ACCESS_REMOTE_READ,
-};
-
-bool
-fmr_is_supported(struct rpcrdma_ia *ia)
-{
-	if (!ia->ri_device->alloc_fmr) {
-		pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n",
-			ia->ri_device->name);
-		return false;
-	}
-	return true;
-}
-
-static int
-fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
-	static struct ib_fmr_attr fmr_attr = {
-		.max_pages	= RPCRDMA_MAX_FMR_SGES,
-		.max_maps	= 1,
-		.page_shift	= PAGE_SHIFT
-	};
-
-	mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
-				       sizeof(u64), GFP_KERNEL);
-	if (!mr->fmr.fm_physaddrs)
-		goto out_free;
-
-	mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
-			    sizeof(*mr->mr_sg), GFP_KERNEL);
-	if (!mr->mr_sg)
-		goto out_free;
-
-	sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES);
-
-	mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
-				     &fmr_attr);
-	if (IS_ERR(mr->fmr.fm_mr))
-		goto out_fmr_err;
-
-	INIT_LIST_HEAD(&mr->mr_list);
-	return 0;
-
-out_fmr_err:
-	dprintk("RPC:       %s: ib_alloc_fmr returned %ld\n", __func__,
-		PTR_ERR(mr->fmr.fm_mr));
-
-out_free:
-	kfree(mr->mr_sg);
-	kfree(mr->fmr.fm_physaddrs);
-	return -ENOMEM;
-}
-
-static int
-__fmr_unmap(struct rpcrdma_mr *mr)
-{
-	LIST_HEAD(l);
-	int rc;
-
-	list_add(&mr->fmr.fm_mr->list, &l);
-	rc = ib_unmap_fmr(&l);
-	list_del(&mr->fmr.fm_mr->list);
-	return rc;
-}
-
-static void
-fmr_op_release_mr(struct rpcrdma_mr *mr)
-{
-	LIST_HEAD(unmap_list);
-	int rc;
-
-	kfree(mr->fmr.fm_physaddrs);
-	kfree(mr->mr_sg);
-
-	/* In case this one was left mapped, try to unmap it
-	 * to prevent dealloc_fmr from failing with EBUSY
-	 */
-	rc = __fmr_unmap(mr);
-	if (rc)
-		pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
-		       mr, rc);
-
-	rc = ib_dealloc_fmr(mr->fmr.fm_mr);
-	if (rc)
-		pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
-		       mr, rc);
-
-	kfree(mr);
-}
-
-/* Reset of a single FMR.
- */
-static void
-fmr_op_recover_mr(struct rpcrdma_mr *mr)
-{
-	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-	int rc;
-
-	/* ORDER: invalidate first */
-	rc = __fmr_unmap(mr);
-	if (rc)
-		goto out_release;
-
-	/* ORDER: then DMA unmap */
-	rpcrdma_mr_unmap_and_put(mr);
-
-	r_xprt->rx_stats.mrs_recovered++;
-	return;
-
-out_release:
-	pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mr);
-	r_xprt->rx_stats.mrs_orphaned++;
-
-	trace_xprtrdma_dma_unmap(mr);
-	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
-			mr->mr_sg, mr->mr_nents, mr->mr_dir);
-
-	spin_lock(&r_xprt->rx_buf.rb_mrlock);
-	list_del(&mr->mr_all);
-	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
-	fmr_op_release_mr(mr);
-}
-
-/* On success, sets:
- *	ep->rep_attr.cap.max_send_wr
- *	ep->rep_attr.cap.max_recv_wr
- *	cdata->max_requests
- *	ia->ri_max_segs
- */
-static int
-fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
-	    struct rpcrdma_create_data_internal *cdata)
-{
-	int max_qp_wr;
-
-	max_qp_wr = ia->ri_device->attrs.max_qp_wr;
-	max_qp_wr -= RPCRDMA_BACKWARD_WRS;
-	max_qp_wr -= 1;
-	if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
-		return -ENOMEM;
-	if (cdata->max_requests > max_qp_wr)
-		cdata->max_requests = max_qp_wr;
-	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
-	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
-	ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
-	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
-	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
-	ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
-
-	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
-				RPCRDMA_MAX_FMR_SGES);
-	return 0;
-}
-
-/* FMR mode conveys up to 64 pages of payload per chunk segment.
- */
-static size_t
-fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
-{
-	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
-}
-
-/* Use the ib_map_phys_fmr() verb to register a memory region
- * for remote access via RDMA READ or RDMA WRITE.
- */
-static struct rpcrdma_mr_seg *
-fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-	   int nsegs, bool writing, struct rpcrdma_mr **out)
-{
-	struct rpcrdma_mr_seg *seg1 = seg;
-	int len, pageoff, i, rc;
-	struct rpcrdma_mr *mr;
-	u64 *dma_pages;
-
-	mr = rpcrdma_mr_get(r_xprt);
-	if (!mr)
-		return ERR_PTR(-EAGAIN);
-
-	pageoff = offset_in_page(seg1->mr_offset);
-	seg1->mr_offset -= pageoff;	/* start of page */
-	seg1->mr_len += pageoff;
-	len = -pageoff;
-	if (nsegs > RPCRDMA_MAX_FMR_SGES)
-		nsegs = RPCRDMA_MAX_FMR_SGES;
-	for (i = 0; i < nsegs;) {
-		if (seg->mr_page)
-			sg_set_page(&mr->mr_sg[i],
-				    seg->mr_page,
-				    seg->mr_len,
-				    offset_in_page(seg->mr_offset));
-		else
-			sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
-				   seg->mr_len);
-		len += seg->mr_len;
-		++seg;
-		++i;
-		/* Check for holes */
-		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
-		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
-			break;
-	}
-	mr->mr_dir = rpcrdma_data_dir(writing);
-
-	mr->mr_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device,
-				     mr->mr_sg, i, mr->mr_dir);
-	if (!mr->mr_nents)
-		goto out_dmamap_err;
-	trace_xprtrdma_dma_map(mr);
-
-	for (i = 0, dma_pages = mr->fmr.fm_physaddrs; i < mr->mr_nents; i++)
-		dma_pages[i] = sg_dma_address(&mr->mr_sg[i]);
-	rc = ib_map_phys_fmr(mr->fmr.fm_mr, dma_pages, mr->mr_nents,
-			     dma_pages[0]);
-	if (rc)
-		goto out_maperr;
-
-	mr->mr_handle = mr->fmr.fm_mr->rkey;
-	mr->mr_length = len;
-	mr->mr_offset = dma_pages[0] + pageoff;
-
-	*out = mr;
-	return seg;
-
-out_dmamap_err:
-	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
-	       mr->mr_sg, i);
-	rpcrdma_mr_put(mr);
-	return ERR_PTR(-EIO);
-
-out_maperr:
-	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
-	       len, (unsigned long long)dma_pages[0],
-	       pageoff, mr->mr_nents, rc);
-	rpcrdma_mr_unmap_and_put(mr);
-	return ERR_PTR(-EIO);
-}
-
-/* Post Send WR containing the RPC Call message.
- */
-static int
-fmr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
-{
-	return ib_post_send(ia->ri_id->qp, &req->rl_sendctx->sc_wr, NULL);
-}
-
-/* Invalidate all memory regions that were registered for "req".
- *
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
- */
-static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
-{
-	struct rpcrdma_mr *mr;
-	LIST_HEAD(unmap_list);
-	int rc;
-
-	/* ORDER: Invalidate all of the req's MRs first
-	 *
-	 * ib_unmap_fmr() is slow, so use a single call instead
-	 * of one call per mapped FMR.
-	 */
-	list_for_each_entry(mr, mrs, mr_list) {
-		dprintk("RPC:       %s: unmapping fmr %p\n",
-			__func__, &mr->fmr);
-		trace_xprtrdma_localinv(mr);
-		list_add_tail(&mr->fmr.fm_mr->list, &unmap_list);
-	}
-	r_xprt->rx_stats.local_inv_needed++;
-	rc = ib_unmap_fmr(&unmap_list);
-	if (rc)
-		goto out_reset;
-
-	/* ORDER: Now DMA unmap all of the req's MRs, and return
-	 * them to the free MW list.
-	 */
-	while (!list_empty(mrs)) {
-		mr = rpcrdma_mr_pop(mrs);
-		list_del(&mr->fmr.fm_mr->list);
-		rpcrdma_mr_unmap_and_put(mr);
-	}
-
-	return;
-
-out_reset:
-	pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
-
-	while (!list_empty(mrs)) {
-		mr = rpcrdma_mr_pop(mrs);
-		list_del(&mr->fmr.fm_mr->list);
-		fmr_op_recover_mr(mr);
-	}
-}
-
-const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
-	.ro_map				= fmr_op_map,
-	.ro_send			= fmr_op_send,
-	.ro_unmap_sync			= fmr_op_unmap_sync,
-	.ro_recover_mr			= fmr_op_recover_mr,
-	.ro_open			= fmr_op_open,
-	.ro_maxpages			= fmr_op_maxpages,
-	.ro_init_mr			= fmr_op_init_mr,
-	.ro_release_mr			= fmr_op_release_mr,
-	.ro_displayname			= "fmr",
-	.ro_send_w_inv_ok		= 0,
-};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 1bb00dd..30065a2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -7,67 +7,37 @@
 /* Lightweight memory registration using Fast Registration Work
  * Requests (FRWR).
  *
- * FRWR features ordered asynchronous registration and deregistration
- * of arbitrarily sized memory regions. This is the fastest and safest
+ * FRWR features ordered asynchronous registration and invalidation
+ * of arbitrarily-sized memory regions. This is the fastest and safest
  * but most complex memory registration mode.
  */
 
 /* Normal operation
  *
- * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
- * Work Request (frwr_op_map). When the RDMA operation is finished, this
+ * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
+ * Work Request (frwr_map). When the RDMA operation is finished, this
  * Memory Region is invalidated using a LOCAL_INV Work Request
- * (frwr_op_unmap_sync).
+ * (frwr_unmap_async and frwr_unmap_sync).
  *
- * Typically these Work Requests are not signaled, and neither are RDMA
- * SEND Work Requests (with the exception of signaling occasionally to
- * prevent provider work queue overflows). This greatly reduces HCA
+ * Typically FAST_REG Work Requests are not signaled, and neither are
+ * RDMA Send Work Requests (with the exception of signaling occasionally
+ * to prevent provider work queue overflows). This greatly reduces HCA
  * interrupt workload.
- *
- * As an optimization, frwr_op_unmap marks MRs INVALID before the
- * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
- * rb_mrs immediately so that no work (like managing a linked list
- * under a spinlock) is needed in the completion upcall.
- *
- * But this means that frwr_op_map() can occasionally encounter an MR
- * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
- * ordering prevents a subsequent FAST_REG WR from executing against
- * that MR while it is still being invalidated.
  */
 
 /* Transport recovery
  *
- * ->op_map and the transport connect worker cannot run at the same
- * time, but ->op_unmap can fire while the transport connect worker
- * is running. Thus MR recovery is handled in ->op_map, to guarantee
- * that recovered MRs are owned by a sending RPC, and not one where
- * ->op_unmap could fire at the same time transport reconnect is
- * being done.
+ * frwr_map and frwr_unmap_* cannot run at the same time the transport
+ * connect worker is running. The connect worker holds the transport
+ * send lock, just as ->send_request does. This prevents frwr_map and
+ * the connect worker from running concurrently. When a connection is
+ * closed, the Receive completion queue is drained before the allowing
+ * the connect worker to get control. This prevents frwr_unmap and the
+ * connect worker from running concurrently.
  *
- * When the underlying transport disconnects, MRs are left in one of
- * four states:
- *
- * INVALID:	The MR was not in use before the QP entered ERROR state.
- *
- * VALID:	The MR was registered before the QP entered ERROR state.
- *
- * FLUSHED_FR:	The MR was being registered when the QP entered ERROR
- *		state, and the pending WR was flushed.
- *
- * FLUSHED_LI:	The MR was being invalidated when the QP entered ERROR
- *		state, and the pending WR was flushed.
- *
- * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
- * with ib_dereg_mr and then are re-initialized. Because MR recovery
- * allocates fresh resources, it is deferred to a workqueue, and the
- * recovered MRs are placed back on the rb_mrs list when recovery is
- * complete. frwr_op_map allocates another MR for the current RPC while
- * the broken MR is reset.
- *
- * To ensure that frwr_op_map doesn't encounter an MR that is marked
- * INVALID but that is about to be flushed due to a previous transport
- * disconnect, the transport connect worker attempts to drain all
- * pending send queue WRs before the transport is reconnected.
+ * When the underlying transport disconnects, MRs that are in flight
+ * are flushed and are likely unusable. Thus all flushed MRs are
+ * destroyed. New MRs are created on demand.
  */
 
 #include <linux/sunrpc/rpc_rdma.h>
@@ -80,10 +50,15 @@
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-bool
-frwr_is_supported(struct rpcrdma_ia *ia)
+/**
+ * frwr_is_supported - Check if device supports FRWR
+ * @device: interface adapter to check
+ *
+ * Returns true if device supports FRWR, otherwise false
+ */
+bool frwr_is_supported(struct ib_device *device)
 {
-	struct ib_device_attr *attrs = &ia->ri_device->attrs;
+	struct ib_device_attr *attrs = &device->attrs;
 
 	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 		goto out_not_supported;
@@ -93,142 +68,172 @@
 
 out_not_supported:
 	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
-		ia->ri_device->name);
+		device->name);
 	return false;
 }
 
-static int
-frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
-	unsigned int depth = ia->ri_max_frwr_depth;
-	struct rpcrdma_frwr *frwr = &mr->frwr;
-	int rc;
-
-	frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
-	if (IS_ERR(frwr->fr_mr))
-		goto out_mr_err;
-
-	mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL);
-	if (!mr->mr_sg)
-		goto out_list_err;
-
-	INIT_LIST_HEAD(&mr->mr_list);
-	sg_init_table(mr->mr_sg, depth);
-	init_completion(&frwr->fr_linv_done);
-	return 0;
-
-out_mr_err:
-	rc = PTR_ERR(frwr->fr_mr);
-	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
-		__func__, rc);
-	return rc;
-
-out_list_err:
-	rc = -ENOMEM;
-	dprintk("RPC:       %s: sg allocation failure\n",
-		__func__);
-	ib_dereg_mr(frwr->fr_mr);
-	return rc;
-}
-
-static void
-frwr_op_release_mr(struct rpcrdma_mr *mr)
+/**
+ * frwr_release_mr - Destroy one MR
+ * @mr: MR allocated by frwr_init_mr
+ *
+ */
+void frwr_release_mr(struct rpcrdma_mr *mr)
 {
 	int rc;
 
 	rc = ib_dereg_mr(mr->frwr.fr_mr);
 	if (rc)
-		pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
-		       mr, rc);
+		trace_xprtrdma_frwr_dereg(mr, rc);
 	kfree(mr->mr_sg);
 	kfree(mr);
 }
 
-static int
-__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
 {
-	struct rpcrdma_frwr *frwr = &mr->frwr;
-	int rc;
+	trace_xprtrdma_mr_recycle(mr);
 
-	rc = ib_dereg_mr(frwr->fr_mr);
-	if (rc) {
-		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
-			rc, mr);
-		return rc;
+	if (mr->mr_dir != DMA_NONE) {
+		trace_xprtrdma_mr_unmap(mr);
+		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+				mr->mr_sg, mr->mr_nents, mr->mr_dir);
+		mr->mr_dir = DMA_NONE;
 	}
 
-	frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
-				  ia->ri_max_frwr_depth);
-	if (IS_ERR(frwr->fr_mr)) {
-		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
-			PTR_ERR(frwr->fr_mr), mr);
-		return PTR_ERR(frwr->fr_mr);
-	}
+	spin_lock(&r_xprt->rx_buf.rb_lock);
+	list_del(&mr->mr_all);
+	r_xprt->rx_stats.mrs_recycled++;
+	spin_unlock(&r_xprt->rx_buf.rb_lock);
 
-	dprintk("RPC:       %s: recovered FRWR %p\n", __func__, frwr);
-	frwr->fr_state = FRWR_IS_INVALID;
-	return 0;
+	frwr_release_mr(mr);
 }
 
-/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR.
+/* MRs are dynamically allocated, so simply clean up and release the MR.
+ * A replacement MR will subsequently be allocated on demand.
  */
 static void
-frwr_op_recover_mr(struct rpcrdma_mr *mr)
+frwr_mr_recycle_worker(struct work_struct *work)
 {
-	enum rpcrdma_frwr_state state = mr->frwr.fr_state;
-	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	int rc;
+	struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr,
+					     mr_recycle);
 
-	rc = __frwr_mr_reset(ia, mr);
-	if (state != FRWR_FLUSHED_LI) {
-		trace_xprtrdma_dma_unmap(mr);
-		ib_dma_unmap_sg(ia->ri_device,
-				mr->mr_sg, mr->mr_nents, mr->mr_dir);
-	}
-	if (rc)
-		goto out_release;
-
-	rpcrdma_mr_put(mr);
-	r_xprt->rx_stats.mrs_recovered++;
-	return;
-
-out_release:
-	pr_err("rpcrdma: FRWR reset failed %d, %p released\n", rc, mr);
-	r_xprt->rx_stats.mrs_orphaned++;
-
-	spin_lock(&r_xprt->rx_buf.rb_mrlock);
-	list_del(&mr->mr_all);
-	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
-	frwr_op_release_mr(mr);
+	frwr_mr_recycle(mr->mr_xprt, mr);
 }
 
-/* On success, sets:
+/* frwr_recycle - Discard MRs
+ * @req: request to reset
+ *
+ * Used after a reconnect. These MRs could be in flight, we can't
+ * tell. Safe thing to do is release them.
+ */
+void frwr_recycle(struct rpcrdma_req *req)
+{
+	struct rpcrdma_mr *mr;
+
+	while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
+		frwr_mr_recycle(mr->mr_xprt, mr);
+}
+
+/* frwr_reset - Place MRs back on the free list
+ * @req: request to reset
+ *
+ * Used after a failed marshal. For FRWR, this means the MRs
+ * don't have to be fully released and recreated.
+ *
+ * NB: This is safe only as long as none of @req's MRs are
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
+ * Work Request.
+ */
+void frwr_reset(struct rpcrdma_req *req)
+{
+	struct rpcrdma_mr *mr;
+
+	while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
+		rpcrdma_mr_put(mr);
+}
+
+/**
+ * frwr_init_mr - Initialize one MR
+ * @ia: interface adapter
+ * @mr: generic MR to prepare for FRWR
+ *
+ * Returns zero if successful. Otherwise a negative errno
+ * is returned.
+ */
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+{
+	unsigned int depth = ia->ri_max_frwr_depth;
+	struct scatterlist *sg;
+	struct ib_mr *frmr;
+	int rc;
+
+	/* NB: ib_alloc_mr and device drivers typically allocate
+	 *     memory with GFP_KERNEL.
+	 */
+	frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
+	if (IS_ERR(frmr))
+		goto out_mr_err;
+
+	sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
+	if (!sg)
+		goto out_list_err;
+
+	mr->frwr.fr_mr = frmr;
+	mr->mr_dir = DMA_NONE;
+	INIT_LIST_HEAD(&mr->mr_list);
+	INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
+	init_completion(&mr->frwr.fr_linv_done);
+
+	sg_init_table(sg, depth);
+	mr->mr_sg = sg;
+	return 0;
+
+out_mr_err:
+	rc = PTR_ERR(frmr);
+	trace_xprtrdma_frwr_alloc(mr, rc);
+	return rc;
+
+out_list_err:
+	ib_dereg_mr(frmr);
+	return -ENOMEM;
+}
+
+/**
+ * frwr_open - Prepare an endpoint for use with FRWR
+ * @ia: interface adapter this endpoint will use
+ * @ep: endpoint to prepare
+ *
+ * On success, sets:
  *	ep->rep_attr.cap.max_send_wr
  *	ep->rep_attr.cap.max_recv_wr
- *	cdata->max_requests
+ *	ep->rep_max_requests
  *	ia->ri_max_segs
  *
  * And these FRWR-related fields:
  *	ia->ri_max_frwr_depth
  *	ia->ri_mrtype
+ *
+ * On failure, a negative errno is returned.
  */
-static int
-frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
-	     struct rpcrdma_create_data_internal *cdata)
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
 {
-	struct ib_device_attr *attrs = &ia->ri_device->attrs;
+	struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
 	int max_qp_wr, depth, delta;
 
 	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
 	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
 
-	ia->ri_max_frwr_depth =
-			min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-			      attrs->max_fast_reg_page_list_len);
-	dprintk("RPC:       %s: device's max FR page list len = %u\n",
+	/* Quirk: Some devices advertise a large max_fast_reg_page_list_len
+	 * capability, but perform optimally when the MRs are not larger
+	 * than a page.
+	 */
+	if (attrs->max_sge_rd > 1)
+		ia->ri_max_frwr_depth = attrs->max_sge_rd;
+	else
+		ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
+	if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
+		ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
+	dprintk("RPC:       %s: max FR page list depth = %u\n",
 		__func__, ia->ri_max_frwr_depth);
 
 	/* Add room for frwr register and invalidate WRs.
@@ -253,145 +258,78 @@
 		} while (delta > 0);
 	}
 
-	max_qp_wr = ia->ri_device->attrs.max_qp_wr;
+	max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
 	max_qp_wr -= RPCRDMA_BACKWARD_WRS;
 	max_qp_wr -= 1;
 	if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
 		return -ENOMEM;
-	if (cdata->max_requests > max_qp_wr)
-		cdata->max_requests = max_qp_wr;
-	ep->rep_attr.cap.max_send_wr = cdata->max_requests * depth;
+	if (ep->rep_max_requests > max_qp_wr)
+		ep->rep_max_requests = max_qp_wr;
+	ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
 	if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
-		cdata->max_requests = max_qp_wr / depth;
-		if (!cdata->max_requests)
+		ep->rep_max_requests = max_qp_wr / depth;
+		if (!ep->rep_max_requests)
 			return -EINVAL;
-		ep->rep_attr.cap.max_send_wr = cdata->max_requests *
-					       depth;
+		ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
 	}
 	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 	ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
-	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
 	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 	ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
 
-	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
-				ia->ri_max_frwr_depth);
+	ia->ri_max_segs =
+		DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
+	/* Reply chunks require segments for head and tail buffers */
+	ia->ri_max_segs += 2;
+	if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
+		ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
 	return 0;
 }
 
-/* FRWR mode conveys a list of pages per chunk segment. The
+/**
+ * frwr_maxpages - Compute size of largest payload
+ * @r_xprt: transport
+ *
+ * Returns maximum size of an RPC message, in pages.
+ *
+ * FRWR mode conveys a list of pages per chunk segment. The
  * maximum length of that list is the FRWR page list depth.
  */
-static size_t
-frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth);
-}
-
-static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
-{
-	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
-		       wr, ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
+		     (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
 }
 
 /**
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
+ * frwr_map - Register a memory region
+ * @r_xprt: controlling transport
+ * @seg: memory region co-ordinates
+ * @nsegs: number of segments remaining
+ * @writing: true when RDMA Write will be used
+ * @xid: XID of RPC using the registered memory
+ * @mr: MR to fill in
  *
- */
-static void
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-			container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS) {
-		frwr->fr_state = FRWR_FLUSHED_FR;
-		__frwr_sendcompletion_flush(wc, "fastreg");
-	}
-	trace_xprtrdma_wc_fastreg(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
- *
- */
-static void
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
-						 fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS) {
-		frwr->fr_state = FRWR_FLUSHED_LI;
-		__frwr_sendcompletion_flush(wc, "localinv");
-	}
-	trace_xprtrdma_wc_li(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
- *
- * Awaken anyone waiting for an MR to finish being fenced.
- */
-static void
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
-						 fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS) {
-		frwr->fr_state = FRWR_FLUSHED_LI;
-		__frwr_sendcompletion_flush(wc, "localinv");
-	}
-	complete(&frwr->fr_linv_done);
-	trace_xprtrdma_wc_li_wake(wc, frwr);
-}
-
-/* Post a REG_MR Work Request to register a memory region
+ * Prepare a REG_MR Work Request to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
+ *
+ * Returns the next segment or a negative errno pointer.
+ * On success, @mr is filled in.
  */
-static struct rpcrdma_mr_seg *
-frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-	    int nsegs, bool writing, struct rpcrdma_mr **out)
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+				struct rpcrdma_mr_seg *seg,
+				int nsegs, bool writing, __be32 xid,
+				struct rpcrdma_mr *mr)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
-	struct rpcrdma_frwr *frwr;
-	struct rpcrdma_mr *mr;
-	struct ib_mr *ibmr;
 	struct ib_reg_wr *reg_wr;
+	struct ib_mr *ibmr;
 	int i, n;
 	u8 key;
 
-	mr = NULL;
-	do {
-		if (mr)
-			rpcrdma_mr_defer_recovery(mr);
-		mr = rpcrdma_mr_get(r_xprt);
-		if (!mr)
-			return ERR_PTR(-EAGAIN);
-	} while (mr->frwr.fr_state != FRWR_IS_INVALID);
-	frwr = &mr->frwr;
-	frwr->fr_state = FRWR_IS_VALID;
-
 	if (nsegs > ia->ri_max_frwr_depth)
 		nsegs = ia->ri_max_frwr_depth;
 	for (i = 0; i < nsegs;) {
@@ -406,7 +344,7 @@
 
 		++seg;
 		++i;
-		if (holes_ok)
+		if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
 			continue;
 		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
@@ -414,20 +352,22 @@
 	}
 	mr->mr_dir = rpcrdma_data_dir(writing);
 
-	mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
+	mr->mr_nents =
+		ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
 	if (!mr->mr_nents)
 		goto out_dmamap_err;
-	trace_xprtrdma_dma_map(mr);
 
-	ibmr = frwr->fr_mr;
+	ibmr = mr->frwr.fr_mr;
 	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
 	if (unlikely(n != mr->mr_nents))
 		goto out_mapmr_err;
 
+	ibmr->iova &= 0x00000000ffffffff;
+	ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
 	key = (u8)(ibmr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(ibmr, ++key);
 
-	reg_wr = &frwr->fr_regwr;
+	reg_wr = &mr->frwr.fr_regwr;
 	reg_wr->mr = ibmr;
 	reg_wr->key = ibmr->rkey;
 	reg_wr->access = writing ?
@@ -437,32 +377,49 @@
 	mr->mr_handle = ibmr->rkey;
 	mr->mr_length = ibmr->length;
 	mr->mr_offset = ibmr->iova;
+	trace_xprtrdma_mr_map(mr);
 
-	*out = mr;
 	return seg;
 
 out_dmamap_err:
-	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
-	       mr->mr_sg, i);
-	frwr->fr_state = FRWR_IS_INVALID;
-	rpcrdma_mr_put(mr);
+	mr->mr_dir = DMA_NONE;
+	trace_xprtrdma_frwr_sgerr(mr, i);
 	return ERR_PTR(-EIO);
 
 out_mapmr_err:
-	pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
-	       frwr->fr_mr, n, mr->mr_nents);
-	rpcrdma_mr_defer_recovery(mr);
+	trace_xprtrdma_frwr_maperr(mr, n);
 	return ERR_PTR(-EIO);
 }
 
-/* Post Send WR containing the RPC Call message.
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
  *
- * For FRMR, chain any FastReg WRs to the Send WR. Only a
+ */
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_fastreg(wc, frwr);
+	/* The MR will get recycled when the associated req is retransmitted */
+}
+
+/**
+ * frwr_send - post Send WR containing the RPC Call message
+ * @ia: interface adapter
+ * @req: Prepared RPC Call
+ *
+ * For FRWR, chain any FastReg WRs to the Send WR. Only a
  * single ib_post_send call is needed to register memory
  * and then post the Send WR.
+ *
+ * Returns the result of ib_post_send.
  */
-static int
-frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *post_wr;
 	struct rpcrdma_mr *mr;
@@ -484,45 +441,94 @@
 	}
 
 	/* If ib_post_send fails, the next ->send_request for
-	 * @req will queue these MWs for recovery.
+	 * @req will queue these MRs for recovery.
 	 */
 	return ib_post_send(ia->ri_id->qp, post_wr, NULL);
 }
 
-/* Handle a remotely invalidated mr on the @mrs list
+/**
+ * frwr_reminv - handle a remotely invalidated mr on the @mrs list
+ * @rep: Received reply
+ * @mrs: list of MRs to check
+ *
  */
-static void
-frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
 {
 	struct rpcrdma_mr *mr;
 
 	list_for_each_entry(mr, mrs, mr_list)
 		if (mr->mr_handle == rep->rr_inv_rkey) {
 			list_del_init(&mr->mr_list);
-			trace_xprtrdma_remoteinv(mr);
-			mr->frwr.fr_state = FRWR_IS_INVALID;
-			rpcrdma_mr_unmap_and_put(mr);
+			trace_xprtrdma_mr_remoteinv(mr);
+			rpcrdma_mr_put(mr);
 			break;	/* only one invalidated MR per RPC */
 		}
 }
 
-/* Invalidate all memory regions that were registered for "req".
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
+{
+	if (wc->status != IB_WC_SUCCESS)
+		rpcrdma_mr_recycle(mr);
+	else
+		rpcrdma_mr_put(mr);
+}
+
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
  *
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
  */
-static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li(wc, frwr);
+	__frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li_wake(wc, frwr);
+	__frwr_release_mr(wc, mr);
+	complete(&frwr->fr_linv_done);
+}
+
+/**
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * Sleeps until it is safe for the host CPU to access the previously mapped
+ * memory regions. This guarantees that registered MRs are properly fenced
+ * from the server before the RPC consumer accesses the data in them. It
+ * also ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *first, **prev, *last;
 	const struct ib_send_wr *bad_wr;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_frwr *frwr;
 	struct rpcrdma_mr *mr;
-	int count, rc;
+	int rc;
 
 	/* ORDER: Invalidate all of the MRs first
 	 *
@@ -530,33 +536,31 @@
 	 * a single ib_post_send() call.
 	 */
 	frwr = NULL;
-	count = 0;
 	prev = &first;
-	list_for_each_entry(mr, mrs, mr_list) {
-		mr->frwr.fr_state = FRWR_IS_INVALID;
+	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
+
+		trace_xprtrdma_mr_localinv(mr);
+		r_xprt->rx_stats.local_inv_needed++;
 
 		frwr = &mr->frwr;
-		trace_xprtrdma_localinv(mr);
-
 		frwr->fr_cqe.done = frwr_wc_localinv;
 		last = &frwr->fr_invwr;
-		memset(last, 0, sizeof(*last));
+		last->next = NULL;
 		last->wr_cqe = &frwr->fr_cqe;
+		last->sg_list = NULL;
+		last->num_sge = 0;
 		last->opcode = IB_WR_LOCAL_INV;
+		last->send_flags = IB_SEND_SIGNALED;
 		last->ex.invalidate_rkey = mr->mr_handle;
-		count++;
 
 		*prev = last;
 		prev = &last->next;
 	}
-	if (!frwr)
-		goto unmap;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
 	 * are complete.
 	 */
-	last->send_flags = IB_SEND_SIGNALED;
 	frwr->fr_cqe.done = frwr_wc_localinv_wake;
 	reinit_completion(&frwr->fr_linv_done);
 
@@ -564,52 +568,128 @@
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless ri_id->qp is a valid pointer.
 	 */
-	r_xprt->rx_stats.local_inv_needed++;
 	bad_wr = NULL;
-	rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+	trace_xprtrdma_post_send(req, rc);
+
+	/* The final LOCAL_INV WR in the chain is supposed to
+	 * do the wake. If it was never posted, the wake will
+	 * not happen, so don't wait in that case.
+	 */
 	if (bad_wr != first)
 		wait_for_completion(&frwr->fr_linv_done);
-	if (rc)
-		goto reset_mrs;
+	if (!rc)
+		return;
 
-	/* ORDER: Now DMA unmap all of the MRs, and return
-	 * them to the free MR list.
-	 */
-unmap:
-	while (!list_empty(mrs)) {
-		mr = rpcrdma_mr_pop(mrs);
-		rpcrdma_mr_unmap_and_put(mr);
-	}
-	return;
-
-reset_mrs:
-	pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
-
-	/* Find and reset the MRs in the LOCAL_INV WRs that did not
-	 * get posted.
+	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
 	 */
 	while (bad_wr) {
 		frwr = container_of(bad_wr, struct rpcrdma_frwr,
 				    fr_invwr);
 		mr = container_of(frwr, struct rpcrdma_mr, frwr);
-
-		__frwr_mr_reset(ia, mr);
-
 		bad_wr = bad_wr->next;
+
+		list_del_init(&mr->mr_list);
+		rpcrdma_mr_recycle(mr);
 	}
-	goto unmap;
 }
 
-const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
-	.ro_map				= frwr_op_map,
-	.ro_send			= frwr_op_send,
-	.ro_reminv			= frwr_op_reminv,
-	.ro_unmap_sync			= frwr_op_unmap_sync,
-	.ro_recover_mr			= frwr_op_recover_mr,
-	.ro_open			= frwr_op_open,
-	.ro_maxpages			= frwr_op_maxpages,
-	.ro_init_mr			= frwr_op_init_mr,
-	.ro_release_mr			= frwr_op_release_mr,
-	.ro_displayname			= "frwr",
-	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
-};
+/**
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+	struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li_done(wc, frwr);
+	__frwr_release_mr(wc, mr);
+
+	/* Ensure @rep is generated before __frwr_release_mr */
+	smp_rmb();
+	rpcrdma_complete_rqst(rep);
+}
+
+/**
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * This guarantees that registered MRs are properly fenced from the
+ * server before the RPC consumer accesses the data in them. It also
+ * ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+	struct ib_send_wr *first, *last, **prev;
+	const struct ib_send_wr *bad_wr;
+	struct rpcrdma_frwr *frwr;
+	struct rpcrdma_mr *mr;
+	int rc;
+
+	/* Chain the LOCAL_INV Work Requests and post them with
+	 * a single ib_post_send() call.
+	 */
+	frwr = NULL;
+	prev = &first;
+	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
+
+		trace_xprtrdma_mr_localinv(mr);
+		r_xprt->rx_stats.local_inv_needed++;
+
+		frwr = &mr->frwr;
+		frwr->fr_cqe.done = frwr_wc_localinv;
+		last = &frwr->fr_invwr;
+		last->next = NULL;
+		last->wr_cqe = &frwr->fr_cqe;
+		last->sg_list = NULL;
+		last->num_sge = 0;
+		last->opcode = IB_WR_LOCAL_INV;
+		last->send_flags = IB_SEND_SIGNALED;
+		last->ex.invalidate_rkey = mr->mr_handle;
+
+		*prev = last;
+		prev = &last->next;
+	}
+
+	/* Strong send queue ordering guarantees that when the
+	 * last WR in the chain completes, all WRs in the chain
+	 * are complete. The last completion will wake up the
+	 * RPC waiter.
+	 */
+	frwr->fr_cqe.done = frwr_wc_localinv_done;
+
+	/* Transport disconnect drains the receive CQ before it
+	 * replaces the QP. The RPC reply handler won't call us
+	 * unless ri_id->qp is a valid pointer.
+	 */
+	bad_wr = NULL;
+	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+	trace_xprtrdma_post_send(req, rc);
+	if (!rc)
+		return;
+
+	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
+	 */
+	while (bad_wr) {
+		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
+		mr = container_of(frwr, struct rpcrdma_mr, frwr);
+		bad_wr = bad_wr->next;
+
+		rpcrdma_mr_recycle(mr);
+	}
+
+	/* The final LOCAL_INV WR in the chain is supposed to
+	 * do the wake. If it was never posted, the wake will
+	 * not happen, so wake here in that case.
+	 */
+	rpcrdma_complete_rqst(req->rl_reply);
+}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983..b86b5fd 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,7 +71,6 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Read list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
 	size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
 
 	/* Minimal Read chunk size */
@@ -97,7 +96,6 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Write list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
 	size = sizeof(__be32);		/* segment count */
 	size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
 	size += sizeof(__be32);	/* list discriminator */
@@ -107,16 +105,23 @@
 	return size;
 }
 
+/**
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
+ * @r_xprt: transport instance to initialize
+ *
+ * The max_inline fields contain the maximum size of an RPC message
+ * so the marshaling code doesn't have to repeat this calculation
+ * for every RPC.
+ */
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 {
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	unsigned int maxsegs = ia->ri_max_segs;
+	unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
-	ia->ri_max_inline_write = cdata->inline_wsize -
-				  rpcrdma_max_call_header_size(maxsegs);
-	ia->ri_max_inline_read = cdata->inline_rsize -
-				 rpcrdma_max_reply_header_size(maxsegs);
+	ep->rep_max_inline_send =
+		ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
+	ep->rep_max_inline_recv =
+		ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 }
 
 /* The client can send a request inline as long as the RPCRDMA header
@@ -133,7 +138,7 @@
 	struct xdr_buf *xdr = &rqst->rq_snd_buf;
 	unsigned int count, remaining, offset;
 
-	if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
+	if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
 		return false;
 
 	if (xdr->page_len) {
@@ -161,9 +166,21 @@
 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 				   struct rpc_rqst *rqst)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
+}
 
-	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
+/* The client is required to provide a Reply chunk if the maximum
+ * size of the non-payload part of the RPC Reply is larger than
+ * the inline threshold.
+ */
+static bool
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
+			  const struct rpc_rqst *rqst)
+{
+	const struct xdr_buf *buf = &rqst->rq_rcv_buf;
+
+	return (buf->head[0].iov_len + buf->tail[0].iov_len) <
+		r_xprt->rx_ep.rep_max_inline_recv;
 }
 
 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -220,11 +237,12 @@
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 	page_base = offset_in_page(xdrbuf->page_base);
 	while (len) {
-		if (unlikely(!*ppages)) {
-			/* XXX: Certain upper layer operations do
-			 *	not provide receive buffer pages.
-			 */
-			*ppages = alloc_page(GFP_ATOMIC);
+		/* ACL likes to be lazy in allocating pages - ACLs
+		 * are small by default but can get huge.
+		 */
+		if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
+			if (!*ppages)
+				*ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
 			if (!*ppages)
 				return -ENOBUFS;
 		}
@@ -324,6 +342,32 @@
 	return 0;
 }
 
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+						 struct rpcrdma_req *req,
+						 struct rpcrdma_mr_seg *seg,
+						 int nsegs, bool writing,
+						 struct rpcrdma_mr **mr)
+{
+	*mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+	if (!*mr) {
+		*mr = rpcrdma_mr_get(r_xprt);
+		if (!*mr)
+			goto out_getmr_err;
+		trace_xprtrdma_mr_get(req);
+		(*mr)->mr_req = req;
+	}
+
+	rpcrdma_mr_push(*mr, &req->rl_registered);
+	return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+	trace_xprtrdma_nomrs(req);
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+	if (r_xprt->rx_ep.rep_connected != -ENODEV)
+		schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
+	return ERR_PTR(-EAGAIN);
+}
+
 /* Register and XDR encode the Read list. Supports encoding a list of read
  * segments that belong to a single read chunk.
  *
@@ -338,9 +382,10 @@
  *
  * Only a single @pos value is currently supported.
  */
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req,
+				    struct rpc_rqst *rqst,
+				    enum rpcrdma_chunktype rtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -348,6 +393,9 @@
 	unsigned int pos;
 	int nsegs;
 
+	if (rtype == rpcrdma_noch)
+		goto done;
+
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
@@ -358,21 +406,20 @@
 		return nsegs;
 
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   false, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_read_segment(xdr, mr, pos) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
+		trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 		r_xprt->rx_stats.read_chunk_count++;
 		nsegs -= mr->mr_nents;
 	} while (nsegs);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Write list. Supports encoding a list
@@ -390,9 +437,10 @@
  *
  * Only a single Write chunk is currently supported.
  */
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req,
+				     struct rpc_rqst *rqst,
+				     enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -400,6 +448,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_writech)
+		goto done;
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
@@ -416,16 +467,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.write_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -435,7 +484,8 @@
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -450,9 +500,10 @@
  * Returns zero on success, or a negative errno if a failure occurred.
  * @xdr is advanced to the next position in the stream.
  */
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+				      struct rpcrdma_req *req,
+				      struct rpc_rqst *rqst,
+				      enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -460,6 +511,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_replych)
+		return encode_item_not_present(xdr);
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
@@ -474,16 +528,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.reply_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -496,51 +548,57 @@
 	return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	rpcrdma_complete_rqst(rep);
+	rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
  *
  */
-void
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
-	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
 	struct ib_sge *sge;
-	unsigned int count;
+
+	if (!sc->sc_unmap_count)
+		return;
 
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
 	 */
-	sge = &sc->sc_sges[2];
-	for (count = sc->sc_unmap_count; count; ++sge, --count)
-		ib_dma_unmap_page(ia->ri_device,
-				  sge->addr, sge->length, DMA_TO_DEVICE);
+	for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
+	     ++sge, --sc->sc_unmap_count)
+		ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
+				  DMA_TO_DEVICE);
 
-	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
-		smp_mb__after_atomic();
-		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
-	}
+	kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
  */
-static bool
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			u32 len)
+static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req, u32 len)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
 	struct ib_sge *sge = sc->sc_sges;
 
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
+	if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 		goto out_regbuf;
 	sge->addr = rdmab_addr(rb);
 	sge->length = len;
 	sge->lkey = rdmab_lkey(rb);
 
-	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
-				      sge->length, DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+				      DMA_TO_DEVICE);
 	sc->sc_wr.num_sge++;
 	return true;
 
@@ -552,23 +610,23 @@
 /* Prepare the Send SGEs. The head and tail iovec, and each entry
  * in the page list, gets its own SGE.
  */
-static bool
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req,
+				     struct xdr_buf *xdr,
+				     enum rpcrdma_chunktype rtype)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	unsigned int sge_no, page_base, len, remaining;
 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
-	struct ib_device *device = ia->ri_device;
 	struct ib_sge *sge = sc->sc_sges;
-	u32 lkey = ia->ri_pd->local_dma_lkey;
 	struct page *page, **ppages;
 
 	/* The head iovec is straightforward, as it is already
 	 * DMA-mapped. Sync the content that has changed.
 	 */
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
+	if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 		goto out_regbuf;
+	sc->sc_device = rdmab_device(rb);
 	sge_no = 1;
 	sge[sge_no].addr = rdmab_addr(rb);
 	sge[sge_no].length = xdr->head[0].iov_len;
@@ -615,13 +673,14 @@
 				goto out_mapping_overflow;
 
 			len = min_t(u32, PAGE_SIZE - page_base, remaining);
-			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
-							   page_base, len,
-							   DMA_TO_DEVICE);
-			if (ib_dma_mapping_error(device, sge[sge_no].addr))
+			sge[sge_no].addr =
+				ib_dma_map_page(rdmab_device(rb), *ppages,
+						page_base, len, DMA_TO_DEVICE);
+			if (ib_dma_mapping_error(rdmab_device(rb),
+						 sge[sge_no].addr))
 				goto out_mapping_err;
 			sge[sge_no].length = len;
-			sge[sge_no].lkey = lkey;
+			sge[sge_no].lkey = rdmab_lkey(rb);
 
 			sc->sc_unmap_count++;
 			ppages++;
@@ -642,20 +701,20 @@
 
 map_tail:
 		sge_no++;
-		sge[sge_no].addr = ib_dma_map_page(device, page,
-						   page_base, len,
-						   DMA_TO_DEVICE);
-		if (ib_dma_mapping_error(device, sge[sge_no].addr))
+		sge[sge_no].addr =
+			ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+					DMA_TO_DEVICE);
+		if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
 			goto out_mapping_err;
 		sge[sge_no].length = len;
-		sge[sge_no].lkey = lkey;
+		sge[sge_no].lkey = rdmab_lkey(rb);
 		sc->sc_unmap_count++;
 	}
 
 out:
 	sc->sc_wr.num_sge += sge_no;
 	if (sc->sc_unmap_count)
-		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+		kref_get(&req->rl_kref);
 	return true;
 
 out_regbuf:
@@ -663,13 +722,13 @@
 	return false;
 
 out_mapping_overflow:
-	rpcrdma_unmap_sendctx(sc);
+	rpcrdma_sendctx_unmap(sc);
 	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
 	return false;
 
 out_mapping_err:
-	rpcrdma_unmap_sendctx(sc);
-	pr_err("rpcrdma: Send mapping error\n");
+	rpcrdma_sendctx_unmap(sc);
+	trace_xprtrdma_dma_maperr(sge[sge_no].addr);
 	return false;
 }
 
@@ -688,22 +747,28 @@
 			  struct rpcrdma_req *req, u32 hdrlen,
 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
-	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+	int ret;
+
+	ret = -EAGAIN;
+	req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 	if (!req->rl_sendctx)
-		return -EAGAIN;
+		goto err;
 	req->rl_sendctx->sc_wr.num_sge = 0;
 	req->rl_sendctx->sc_unmap_count = 0;
 	req->rl_sendctx->sc_req = req;
-	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	kref_init(&req->rl_kref);
 
-	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
-		return -EIO;
-
+	ret = -EIO;
+	if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
+		goto err;
 	if (rtype != rpcrdma_areadch)
-		if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
-			return -EIO;
-
+		if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
+			goto err;
 	return 0;
+
+err:
+	trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+	return ret;
 }
 
 /**
@@ -736,8 +801,8 @@
 	int ret;
 
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
-	xdr_init_encode(xdr, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+	xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
+			rqst);
 
 	/* Fixed header fields */
 	ret = -EMSGSIZE;
@@ -766,7 +831,8 @@
 	 */
 	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
-	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+	else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
+		 rpcrdma_nonpayload_inline(r_xprt, rqst))
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
@@ -801,12 +867,7 @@
 	 * chunks. Very likely the connection has been replaced,
 	 * so these registrations are invalid and unusable.
 	 */
-	while (unlikely(!list_empty(&req->rl_registered))) {
-		struct rpcrdma_mr *mr;
-
-		mr = rpcrdma_mr_pop(&req->rl_registered);
-		rpcrdma_mr_defer_recovery(mr);
-	}
+	frwr_recycle(req);
 
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
@@ -830,49 +891,28 @@
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	if (rtype != rpcrdma_noch) {
-		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
+	ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 	if (ret)
 		goto out_err;
 
-	if (wtype == rpcrdma_writech) {
-		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
-	if (ret)
-		goto out_err;
-
-	if (wtype != rpcrdma_replych)
-		ret = encode_item_not_present(xdr);
-	else
-		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
-	if (ret)
-		goto out_err;
-
-	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
-	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+	ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
 					&rqst->rq_snd_buf, rtype);
 	if (ret)
 		goto out_err;
+
+	trace_xprtrdma_marshal(req, rtype, wtype);
 	return 0;
 
 out_err:
-	switch (ret) {
-	case -EAGAIN:
-		xprt_wait_for_buffer_space(rqst->rq_task, NULL);
-		break;
-	case -ENOBUFS:
-		break;
-	default:
-		r_xprt->rx_stats.failed_marshal_count++;
-	}
+	trace_xprtrdma_marshal_failed(rqst, ret);
+	r_xprt->rx_stats.failed_marshal_count++;
+	frwr_reset(req);
 	return ret;
 }
 
@@ -1190,17 +1230,20 @@
 		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
 		if (!p)
 			break;
-		dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
-			rqst->rq_task->tk_pid, __func__,
-			be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+		dprintk("RPC:       %s: server reports "
+			"version error (%u-%u), xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(*(p + 1)),
+			be32_to_cpu(rep->rr_xid));
 		break;
 	case err_chunk:
-		dprintk("RPC: %5u: %s: server reports header decoding error\n",
-			rqst->rq_task->tk_pid, __func__);
+		dprintk("RPC:       %s: server reports "
+			"header decoding error, xid %08x\n", __func__,
+			be32_to_cpu(rep->rr_xid));
 		break;
 	default:
-		dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
-			rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+		dprintk("RPC:       %s: server reports "
+			"unrecognized error %d, xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
 	}
 
 	r_xprt->rx_stats.bad_reply_count++;
@@ -1216,11 +1259,8 @@
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpc_rqst *rqst = rep->rr_rqst;
-	unsigned long cwnd;
 	int status;
 
-	xprt->reestablish_timeout = 0;
-
 	switch (rep->rr_proc) {
 	case rdma_msg:
 		status = rpcrdma_decode_msg(r_xprt, rep, rqst);
@@ -1238,15 +1278,10 @@
 		goto out_badheader;
 
 out:
-	spin_lock(&xprt->recv_lock);
-	cwnd = xprt->cwnd;
-	xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
-	if (xprt->cwnd > cwnd)
-		xprt_release_rqst_cong(rqst->rq_task);
-
+	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(rqst->rq_task, status);
 	xprt_unpin_rqst(rqst);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	return;
 
 /* If the incoming reply terminated a pending RPC, the next
@@ -1256,56 +1291,20 @@
 out_badheader:
 	trace_xprtrdma_reply_hdr(rep);
 	r_xprt->rx_stats.bad_reply_count++;
-	status = -EIO;
 	goto out;
 }
 
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-	/* Invalidate and unmap the data payloads before waking
-	 * the waiting application. This guarantees the memory
-	 * regions are properly fenced from the server before the
-	 * application accesses the data. It also ensures proper
-	 * send flow control: waking the next RPC waits until this
-	 * RPC has relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&req->rl_registered))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
-						    &req->rl_registered);
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
 
-	/* Ensure that any DMA mapped pages associated with
-	 * the Send of the RPC Call have been unmapped before
-	 * allowing the RPC to complete. This protects argument
-	 * memory not controlled by the RPC client from being
-	 * re-used before we're done with it.
-	 */
-	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-		r_xprt->rx_stats.reply_waits_for_send++;
-		out_of_line_wait_on_bit(&req->rl_flags,
-					RPCRDMA_REQ_F_TX_RESOURCES,
-					bit_wait,
-					TASK_UNINTERRUPTIBLE);
-	}
+	rpcrdma_complete_rqst(req->rl_reply);
 }
 
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
-{
-	struct rpcrdma_rep *rep =
-			container_of(work, struct rpcrdma_rep, rr_work);
-	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-
-	trace_xprtrdma_defer_cmp(rep);
-	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
-		r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
-	rpcrdma_release_rqst(r_xprt, req);
-	rpcrdma_complete_rqst(rep);
-}
-
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
@@ -1320,14 +1319,15 @@
 	u32 credits;
 	__be32 *p;
 
-	--buf->rb_posted_receives;
-
-	if (rep->rr_hdrbuf.head[0].iov_len == 0)
-		goto out_badstatus;
+	/* Any data means we had a useful conversation, so
+	 * then we don't need to delay the next reconnect.
+	 */
+	if (xprt->reestablish_timeout)
+		xprt->reestablish_timeout = 0;
 
 	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
-			rep->rr_hdrbuf.head[0].iov_base);
+			rep->rr_hdrbuf.head[0].iov_base, NULL);
 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
 	if (unlikely(!p))
 		goto out_shortreply;
@@ -1345,51 +1345,55 @@
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
 	if (!rqst)
 		goto out_norqst;
 	xprt_pin_rqst(rqst);
+	spin_unlock(&xprt->queue_lock);
 
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
 	else if (credits > buf->rb_max_requests)
 		credits = buf->rb_max_requests;
-	buf->rb_credits = credits;
-
-	spin_unlock(&xprt->recv_lock);
+	if (buf->rb_credits != credits) {
+		spin_lock(&xprt->transport_lock);
+		buf->rb_credits = credits;
+		xprt->cwnd = credits << RPC_CWNDSHIFT;
+		spin_unlock(&xprt->transport_lock);
+	}
 
 	req = rpcr_to_rdmar(rqst);
+	if (req->rl_reply) {
+		trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
+		rpcrdma_recv_buffer_put(req->rl_reply);
+	}
 	req->rl_reply = rep;
 	rep->rr_rqst = rqst;
-	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
 
-	rpcrdma_post_recvs(r_xprt, false);
-	queue_work(rpcrdma_receive_wq, &rep->rr_work);
+	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+		frwr_reminv(rep, &req->rl_registered);
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_async(r_xprt, req);
+		/* LocalInv completion will complete the RPC */
+	else
+		kref_put(&req->rl_kref, rpcrdma_reply_done);
 	return;
 
 out_badversion:
 	trace_xprtrdma_reply_vers(rep);
-	goto repost;
+	goto out;
 
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
 out_norqst:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	trace_xprtrdma_reply_rqst(rep);
-	goto repost;
+	goto out;
 
 out_shortreply:
 	trace_xprtrdma_reply_short(rep);
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
-	rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
 	rpcrdma_recv_buffer_put(rep);
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 134bef6..97bca50 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -73,8 +73,6 @@
 atomic_t rdma_stat_sq_poll;
 atomic_t rdma_stat_sq_prod;
 
-struct workqueue_struct *svc_rdma_wq;
-
 /*
  * This function implements reading and resetting an atomic_t stat
  * variable through read/write to a proc file. Any write to the file
@@ -230,14 +228,10 @@
 void svc_rdma_cleanup(void)
 {
 	dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
-	destroy_workqueue(svc_rdma_wq);
 	if (svcrdma_table_header) {
 		unregister_sysctl_table(svcrdma_table_header);
 		svcrdma_table_header = NULL;
 	}
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-	svc_unreg_xprt_class(&svc_rdma_bc_class);
-#endif
 	svc_unreg_xprt_class(&svc_rdma_class);
 }
 
@@ -249,18 +243,11 @@
 	dprintk("\tmax_bc_requests  : %u\n", svcrdma_max_bc_requests);
 	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
 
-	svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0);
-	if (!svc_rdma_wq)
-		return -ENOMEM;
-
 	if (!svcrdma_table_header)
 		svcrdma_table_header =
 			register_sysctl_table(svcrdma_root_table);
 
 	/* Register RDMA with the SVC transport switch */
 	svc_reg_xprt_class(&svc_rdma_class);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-	svc_reg_xprt_class(&svc_rdma_bc_class);
-#endif
 	return 0;
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index b982766..d1fcc41 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -5,8 +5,6 @@
  * Support for backward direction RPCs on RPC/RDMA (server-side).
  */
 
-#include <linux/module.h>
-
 #include <linux/sunrpc/svc_rdma.h>
 
 #include "xprt_rdma.h"
@@ -32,7 +30,6 @@
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct kvec *dst, *src = &rcvbuf->head[0];
 	struct rpc_rqst *req;
-	unsigned long cwnd;
 	u32 credits;
 	size_t len;
 	__be32 xid;
@@ -56,7 +53,7 @@
 	if (src->iov_len < 24)
 		goto out_shortreply;
 
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	req = xprt_lookup_rqst(xprt, xid);
 	if (!req)
 		goto out_notfound;
@@ -66,6 +63,8 @@
 	if (dst->iov_len < len)
 		goto out_unlock;
 	memcpy(dst->iov_base, p, len);
+	xprt_pin_rqst(req);
+	spin_unlock(&xprt->queue_lock);
 
 	credits = be32_to_cpup(rdma_resp + 2);
 	if (credits == 0)
@@ -73,20 +72,18 @@
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 		credits = r_xprt->rx_buf.rb_bc_max_requests;
 
-	spin_lock_bh(&xprt->transport_lock);
-	cwnd = xprt->cwnd;
+	spin_lock(&xprt->transport_lock);
 	xprt->cwnd = credits << RPC_CWNDSHIFT;
-	if (xprt->cwnd > cwnd)
-		xprt_release_rqst_cong(req->rq_task);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
-
+	spin_lock(&xprt->queue_lock);
 	ret = 0;
 	xprt_complete_rqst(req->rq_task, rcvbuf->len);
+	xprt_unpin_rqst(req);
 	rcvbuf->len = 0;
 
 out_unlock:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 out:
 	return ret;
 
@@ -203,11 +200,10 @@
 		svc_rdma_send_ctxt_put(rdma, ctxt);
 		goto drop_connection;
 	}
-	return rc;
+	return 0;
 
 drop_connection:
 	dprintk("svcrdma: failed to send bc call\n");
-	xprt_disconnect_done(xprt);
 	return -ENOTCONN;
 }
 
@@ -215,9 +211,8 @@
  * connection.
  */
 static int
-xprt_rdma_bc_send_request(struct rpc_task *task)
+xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
 {
-	struct rpc_rqst *rqst = task->tk_rqstp;
 	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 	struct svcxprt_rdma *rdma;
 	int ret;
@@ -225,17 +220,15 @@
 	dprintk("svcrdma: sending bc call with xid: %08x\n",
 		be32_to_cpu(rqst->rq_xid));
 
-	if (!mutex_trylock(&sxprt->xpt_mutex)) {
-		rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
-		if (!mutex_trylock(&sxprt->xpt_mutex))
-			return -EAGAIN;
-		rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
-	}
+	mutex_lock(&sxprt->xpt_mutex);
 
 	ret = -ENOTCONN;
 	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
-	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
 		ret = rpcrdma_bc_send_request(rdma, rqst);
+		if (ret == -ENOTCONN)
+			svc_close_xprt(sxprt);
+	}
 
 	mutex_unlock(&sxprt->xpt_mutex);
 
@@ -257,7 +250,6 @@
 	dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 
 	xprt_free(xprt);
-	module_put(THIS_MODULE);
 }
 
 static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
@@ -269,7 +261,7 @@
 	.buf_alloc		= xprt_rdma_bc_allocate,
 	.buf_free		= xprt_rdma_bc_free,
 	.send_request		= xprt_rdma_bc_send_request,
-	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
+	.wait_for_reply_request	= xprt_wait_for_reply_request_def,
 	.close			= xprt_rdma_bc_close,
 	.destroy		= xprt_rdma_bc_put,
 	.print_stats		= xprt_rdma_print_stats
@@ -312,7 +304,6 @@
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
-	xprt->tsh_size = 0;
 	xprt->ops = &xprt_rdma_bc_procs;
 
 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
@@ -329,20 +320,9 @@
 	args->bc_xprt->xpt_bc_xprt = xprt;
 	xprt->bc_xprt = args->bc_xprt;
 
-	if (!try_module_get(THIS_MODULE))
-		goto out_fail;
-
 	/* Final put for backchannel xprt is in __svc_rdma_free */
 	xprt_get(xprt);
 	return xprt;
-
-out_fail:
-	xprt_rdma_free_addresses(xprt);
-	args->bc_xprt->xpt_bc_xprt = NULL;
-	args->bc_xprt->xpt_bc_xps = NULL;
-	xprt_put(xprt);
-	xprt_free(xprt);
-	return ERR_PTR(-EINVAL);
 }
 
 struct xprt_class xprt_rdma_bc = {
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index b24d5b8..96bccd3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -172,9 +172,10 @@
 void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 {
 	struct svc_rdma_recv_ctxt *ctxt;
+	struct llist_node *node;
 
-	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) {
-		list_del(&ctxt->rc_list);
+	while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
+		ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
 		svc_rdma_recv_ctxt_destroy(rdma, ctxt);
 	}
 }
@@ -183,21 +184,18 @@
 svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
 {
 	struct svc_rdma_recv_ctxt *ctxt;
+	struct llist_node *node;
 
-	spin_lock(&rdma->sc_recv_lock);
-	ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts);
-	if (!ctxt)
+	node = llist_del_first(&rdma->sc_recv_ctxts);
+	if (!node)
 		goto out_empty;
-	list_del(&ctxt->rc_list);
-	spin_unlock(&rdma->sc_recv_lock);
+	ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
 
 out:
 	ctxt->rc_page_count = 0;
 	return ctxt;
 
 out_empty:
-	spin_unlock(&rdma->sc_recv_lock);
-
 	ctxt = svc_rdma_recv_ctxt_alloc(rdma);
 	if (!ctxt)
 		return NULL;
@@ -218,11 +216,9 @@
 	for (i = 0; i < ctxt->rc_page_count; i++)
 		put_page(ctxt->rc_pages[i]);
 
-	if (!ctxt->rc_temp) {
-		spin_lock(&rdma->sc_recv_lock);
-		list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts);
-		spin_unlock(&rdma->sc_recv_lock);
-	} else
+	if (!ctxt->rc_temp)
+		llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
+	else
 		svc_rdma_recv_ctxt_destroy(rdma, ctxt);
 }
 
@@ -272,11 +268,8 @@
 			return false;
 		ctxt->rc_temp = true;
 		ret = __svc_rdma_post_recv(rdma, ctxt);
-		if (ret) {
-			pr_err("svcrdma: failure posting recv buffers: %d\n",
-			       ret);
+		if (ret)
 			return false;
-		}
 	}
 	return true;
 }
@@ -314,17 +307,14 @@
 
 	spin_lock(&rdma->sc_rq_dto_lock);
 	list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
-	spin_unlock(&rdma->sc_rq_dto_lock);
+	/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
 	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+	spin_unlock(&rdma->sc_rq_dto_lock);
 	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
 		svc_xprt_enqueue(&rdma->sc_xprt);
 	goto out;
 
 flushed:
-	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
-		       ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
 post_err:
 	svc_rdma_recv_ctxt_put(rdma, ctxt);
 	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
@@ -485,6 +475,68 @@
 	return p;
 }
 
+/* RPC-over-RDMA Version One private extension: Remote Invalidation.
+ * Responder's choice: requester signals it can handle Send With
+ * Invalidate, and responder chooses one R_key to invalidate.
+ *
+ * If there is exactly one distinct R_key in the received transport
+ * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
+ *
+ * Perform this operation while the received transport header is
+ * still in the CPU cache.
+ */
+static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
+				  struct svc_rdma_recv_ctxt *ctxt)
+{
+	__be32 inv_rkey, *p;
+	u32 i, segcount;
+
+	ctxt->rc_inv_rkey = 0;
+
+	if (!rdma->sc_snd_w_inv)
+		return;
+
+	inv_rkey = xdr_zero;
+	p = ctxt->rc_recv_buf;
+	p += rpcrdma_fixed_maxsz;
+
+	/* Read list */
+	while (*p++ != xdr_zero) {
+		p++;	/* position */
+		if (inv_rkey == xdr_zero)
+			inv_rkey = *p;
+		else if (inv_rkey != *p)
+			return;
+		p += 4;
+	}
+
+	/* Write list */
+	while (*p++ != xdr_zero) {
+		segcount = be32_to_cpup(p++);
+		for (i = 0; i < segcount; i++) {
+			if (inv_rkey == xdr_zero)
+				inv_rkey = *p;
+			else if (inv_rkey != *p)
+				return;
+			p += 4;
+		}
+	}
+
+	/* Reply chunk */
+	if (*p++ != xdr_zero) {
+		segcount = be32_to_cpup(p++);
+		for (i = 0; i < segcount; i++) {
+			if (inv_rkey == xdr_zero)
+				inv_rkey = *p;
+			else if (inv_rkey != *p)
+				return;
+			p += 4;
+		}
+	}
+
+	ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
+}
+
 /* On entry, xdr->head[0].iov_base points to first byte in the
  * RPC-over-RDMA header.
  *
@@ -746,6 +798,7 @@
 		svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 		return ret;
 	}
+	svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
 
 	p += rpcrdma_fixed_maxsz;
 	if (*p != xdr_zero)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index dc19517..48fe3b1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -64,8 +64,7 @@
 		spin_unlock(&rdma->sc_rw_ctxt_lock);
 	} else {
 		spin_unlock(&rdma->sc_rw_ctxt_lock);
-		ctxt = kmalloc(sizeof(*ctxt) +
-			       SG_CHUNK_SIZE * sizeof(struct scatterlist),
+		ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
 			       GFP_KERNEL);
 		if (!ctxt)
 			goto out;
@@ -74,7 +73,8 @@
 
 	ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
 	if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
-				   ctxt->rw_sg_table.sgl)) {
+				   ctxt->rw_sg_table.sgl,
+				   SG_CHUNK_SIZE)) {
 		kfree(ctxt);
 		ctxt = NULL;
 	}
@@ -85,7 +85,7 @@
 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
 				 struct svc_rdma_rw_ctxt *ctxt)
 {
-	sg_free_table_chained(&ctxt->rw_sg_table, true);
+	sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
 
 	spin_lock(&rdma->sc_rw_ctxt_lock);
 	list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
@@ -213,13 +213,8 @@
 	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 	wake_up(&rdma->sc_send_wait);
 
-	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+	if (unlikely(wc->status != IB_WC_SUCCESS))
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
-	}
 
 	svc_rdma_write_info_free(info);
 }
@@ -278,18 +273,15 @@
 
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
 		svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
 	} else {
 		spin_lock(&rdma->sc_rq_dto_lock);
 		list_add_tail(&info->ri_readctxt->rc_list,
 			      &rdma->sc_read_complete_q);
+		/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
+		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 		spin_unlock(&rdma->sc_rq_dto_lock);
 
-		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 		svc_xprt_enqueue(&rdma->sc_xprt);
 	}
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 8602a5f..6fdba72 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -272,10 +272,6 @@
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 		svc_xprt_enqueue(&rdma->sc_xprt);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: Send: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
 	}
 
 	svc_xprt_put(&rdma->sc_xprt);
@@ -484,32 +480,6 @@
 		*reply = NULL;
 }
 
-/* RPC-over-RDMA Version One private extension: Remote Invalidation.
- * Responder's choice: requester signals it can handle Send With
- * Invalidate, and responder chooses one rkey to invalidate.
- *
- * Find a candidate rkey to invalidate when sending a reply.  Picks the
- * first R_key it finds in the chunk lists.
- *
- * Returns zero if RPC's chunk lists are empty.
- */
-static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
-				 __be32 *wr_lst, __be32 *rp_ch)
-{
-	__be32 *p;
-
-	p = rdma_argp + rpcrdma_fixed_maxsz;
-	if (*p != xdr_zero)
-		p += 2;
-	else if (wr_lst && be32_to_cpup(wr_lst + 1))
-		p = wr_lst + 2;
-	else if (rp_ch && be32_to_cpup(rp_ch + 1))
-		p = rp_ch + 2;
-	else
-		return 0;
-	return be32_to_cpup(p);
-}
-
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
 				 struct svc_rdma_send_ctxt *ctxt,
 				 struct page *page,
@@ -563,6 +533,99 @@
 				      DMA_TO_DEVICE);
 }
 
+/* If the xdr_buf has more elements than the device can
+ * transmit in a single RDMA Send, then the reply will
+ * have to be copied into a bounce buffer.
+ */
+static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
+				    struct xdr_buf *xdr,
+				    __be32 *wr_lst)
+{
+	int elements;
+
+	/* xdr->head */
+	elements = 1;
+
+	/* xdr->pages */
+	if (!wr_lst) {
+		unsigned int remaining;
+		unsigned long pageoff;
+
+		pageoff = xdr->page_base & ~PAGE_MASK;
+		remaining = xdr->page_len;
+		while (remaining) {
+			++elements;
+			remaining -= min_t(u32, PAGE_SIZE - pageoff,
+					   remaining);
+			pageoff = 0;
+		}
+	}
+
+	/* xdr->tail */
+	if (xdr->tail[0].iov_len)
+		++elements;
+
+	/* assume 1 SGE is needed for the transport header */
+	return elements >= rdma->sc_max_send_sges;
+}
+
+/* The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header
+ * buffer.
+ */
+static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
+				      struct svc_rdma_send_ctxt *ctxt,
+				      struct xdr_buf *xdr, __be32 *wr_lst)
+{
+	unsigned char *dst, *tailbase;
+	unsigned int taillen;
+
+	dst = ctxt->sc_xprt_buf;
+	dst += ctxt->sc_sges[0].length;
+
+	memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
+	dst += xdr->head[0].iov_len;
+
+	tailbase = xdr->tail[0].iov_base;
+	taillen = xdr->tail[0].iov_len;
+	if (wr_lst) {
+		u32 xdrpad;
+
+		xdrpad = xdr_padsize(xdr->page_len);
+		if (taillen && xdrpad) {
+			tailbase += xdrpad;
+			taillen -= xdrpad;
+		}
+	} else {
+		unsigned int len, remaining;
+		unsigned long pageoff;
+		struct page **ppages;
+
+		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+		pageoff = xdr->page_base & ~PAGE_MASK;
+		remaining = xdr->page_len;
+		while (remaining) {
+			len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+
+			memcpy(dst, page_address(*ppages), len);
+			remaining -= len;
+			dst += len;
+			pageoff = 0;
+		}
+	}
+
+	if (taillen)
+		memcpy(dst, tailbase, taillen);
+
+	ctxt->sc_sges[0].length += xdr->len;
+	ib_dma_sync_single_for_device(rdma->sc_pd->device,
+				      ctxt->sc_sges[0].addr,
+				      ctxt->sc_sges[0].length,
+				      DMA_TO_DEVICE);
+
+	return 0;
+}
+
 /* svc_rdma_map_reply_msg - Map the buffer holding RPC message
  * @rdma: controlling transport
  * @ctxt: send_ctxt for the Send WR
@@ -585,8 +648,10 @@
 	u32 xdr_pad;
 	int ret;
 
-	if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
-		return -EIO;
+	if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
+		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+
+	++ctxt->sc_cur_sge_no;
 	ret = svc_rdma_dma_map_buf(rdma, ctxt,
 				   xdr->head[0].iov_base,
 				   xdr->head[0].iov_len);
@@ -617,8 +682,7 @@
 	while (remaining) {
 		len = min_t(u32, PAGE_SIZE - page_off, remaining);
 
-		if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
-			return -EIO;
+		++ctxt->sc_cur_sge_no;
 		ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
 					    page_off, len);
 		if (ret < 0)
@@ -632,8 +696,7 @@
 	len = xdr->tail[0].iov_len;
 tail:
 	if (len) {
-		if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
-			return -EIO;
+		++ctxt->sc_cur_sge_no;
 		ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
 		if (ret < 0)
 			return ret;
@@ -672,7 +735,7 @@
  *
  * RDMA Send is the last step of transmitting an RPC reply. Pages
  * involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the ctxt's page array. These pages are
+ * of the rqstp and into the sctxt's page array. These pages are
  * DMA unmapped by each Write completion, but the subsequent Send
  * completion finally releases these pages.
  *
@@ -680,32 +743,31 @@
  * - The Reply's transport header will never be larger than a page.
  */
 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
-				   struct svc_rdma_send_ctxt *ctxt,
-				   __be32 *rdma_argp,
+				   struct svc_rdma_send_ctxt *sctxt,
+				   struct svc_rdma_recv_ctxt *rctxt,
 				   struct svc_rqst *rqstp,
 				   __be32 *wr_lst, __be32 *rp_ch)
 {
 	int ret;
 
 	if (!rp_ch) {
-		ret = svc_rdma_map_reply_msg(rdma, ctxt,
+		ret = svc_rdma_map_reply_msg(rdma, sctxt,
 					     &rqstp->rq_res, wr_lst);
 		if (ret < 0)
 			return ret;
 	}
 
-	svc_rdma_save_io_pages(rqstp, ctxt);
+	svc_rdma_save_io_pages(rqstp, sctxt);
 
-	ctxt->sc_send_wr.opcode = IB_WR_SEND;
-	if (rdma->sc_snd_w_inv) {
-		ctxt->sc_send_wr.ex.invalidate_rkey =
-			svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
-		if (ctxt->sc_send_wr.ex.invalidate_rkey)
-			ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+	if (rctxt->rc_inv_rkey) {
+		sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+		sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+	} else {
+		sctxt->sc_send_wr.opcode = IB_WR_SEND;
 	}
 	dprintk("svcrdma: posting Send WR with %u sge(s)\n",
-		ctxt->sc_send_wr.num_sge);
-	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
+		sctxt->sc_send_wr.num_sge);
+	return svc_rdma_send(rdma, &sctxt->sc_send_wr);
 }
 
 /* Given the client-provided Write and Reply chunks, the server was not
@@ -741,10 +803,6 @@
 	return 0;
 }
 
-void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
-{
-}
-
 /**
  * svc_rdma_sendto - Transmit an RPC reply
  * @rqstp: processed RPC request, reply XDR already in ::rq_res
@@ -809,7 +867,7 @@
 	}
 
 	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
-	ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
+	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
 				      wr_lst, rp_ch);
 	if (ret < 0)
 		goto err1;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 2848caf..145a361 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -85,7 +85,6 @@
 	.xpo_release_rqst = svc_rdma_release_rqst,
 	.xpo_detach = svc_rdma_detach,
 	.xpo_free = svc_rdma_free,
-	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
 	.xpo_has_wspace = svc_rdma_has_wspace,
 	.xpo_accept = svc_rdma_accept,
 	.xpo_secure_port = svc_rdma_secure_port,
@@ -100,64 +99,6 @@
 	.xcl_ident = XPRT_TRANSPORT_RDMA,
 };
 
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
-					   struct sockaddr *, int, int);
-static void svc_rdma_bc_detach(struct svc_xprt *);
-static void svc_rdma_bc_free(struct svc_xprt *);
-
-static const struct svc_xprt_ops svc_rdma_bc_ops = {
-	.xpo_create = svc_rdma_bc_create,
-	.xpo_detach = svc_rdma_bc_detach,
-	.xpo_free = svc_rdma_bc_free,
-	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
-	.xpo_secure_port = svc_rdma_secure_port,
-};
-
-struct svc_xprt_class svc_rdma_bc_class = {
-	.xcl_name = "rdma-bc",
-	.xcl_owner = THIS_MODULE,
-	.xcl_ops = &svc_rdma_bc_ops,
-	.xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
-};
-
-static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
-					   struct net *net,
-					   struct sockaddr *sa, int salen,
-					   int flags)
-{
-	struct svcxprt_rdma *cma_xprt;
-	struct svc_xprt *xprt;
-
-	cma_xprt = svc_rdma_create_xprt(serv, net);
-	if (!cma_xprt)
-		return ERR_PTR(-ENOMEM);
-	xprt = &cma_xprt->sc_xprt;
-
-	svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
-	set_bit(XPT_CONG_CTRL, &xprt->xpt_flags);
-	serv->sv_bc_xprt = xprt;
-
-	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
-	return xprt;
-}
-
-static void svc_rdma_bc_detach(struct svc_xprt *xprt)
-{
-	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
-}
-
-static void svc_rdma_bc_free(struct svc_xprt *xprt)
-{
-	struct svcxprt_rdma *rdma =
-		container_of(xprt, struct svcxprt_rdma, sc_xprt);
-
-	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
-	if (xprt)
-		kfree(rdma);
-}
-#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -199,14 +140,13 @@
 	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
-	INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
+	init_llist_head(&cma_xprt->sc_recv_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_send_lock);
-	spin_lock_init(&cma_xprt->sc_recv_lock);
 	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
 	/*
@@ -270,9 +210,14 @@
 	/* Save client advertised inbound read limit for use later in accept. */
 	newxprt->sc_ord = param->initiator_depth;
 
-	/* Set the local and remote addresses in the transport */
 	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+	/* The remote port is arbitrary and not under the control of the
+	 * client ULP. Set it to a fixed value so that the DRC continues
+	 * to be effective after a reconnect.
+	 */
+	rpc_set_port((struct sockaddr *)&newxprt->sc_xprt.xpt_remote, 0);
+
 	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 
@@ -280,9 +225,9 @@
 	 * Enqueue the new transport on the accept queue of the listening
 	 * transport
 	 */
-	spin_lock_bh(&listen_xprt->sc_lock);
+	spin_lock(&listen_xprt->sc_lock);
 	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
-	spin_unlock_bh(&listen_xprt->sc_lock);
+	spin_unlock(&listen_xprt->sc_lock);
 
 	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 	svc_xprt_enqueue(&listen_xprt->sc_xprt);
@@ -449,13 +394,13 @@
 	struct ib_qp_init_attr qp_attr;
 	unsigned int ctxts, rq_depth;
 	struct ib_device *dev;
-	struct sockaddr *sap;
 	int ret = 0;
+	RPC_IFDEBUG(struct sockaddr *sap);
 
 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	clear_bit(XPT_CONN, &xprt->xpt_flags);
 	/* Get the next entry off the accept list */
-	spin_lock_bh(&listen_rdma->sc_lock);
+	spin_lock(&listen_rdma->sc_lock);
 	if (!list_empty(&listen_rdma->sc_accept_q)) {
 		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 				     struct svcxprt_rdma, sc_accept_q);
@@ -463,7 +408,7 @@
 	}
 	if (!list_empty(&listen_rdma->sc_accept_q))
 		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
-	spin_unlock_bh(&listen_rdma->sc_lock);
+	spin_unlock(&listen_rdma->sc_lock);
 	if (!newxprt)
 		return NULL;
 
@@ -475,13 +420,12 @@
 
 	/* Qualify the transport resource defaults with the
 	 * capabilities of this particular device */
-	newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
-	/* transport hdr, head iovec, one page list entry, tail iovec */
-	if (newxprt->sc_max_send_sges < 4) {
-		pr_err("svcrdma: too few Send SGEs available (%d)\n",
-		       newxprt->sc_max_send_sges);
-		goto errout;
-	}
+	/* Transport header, head iovec, tail iovec */
+	newxprt->sc_max_send_sges = 3;
+	/* Add one SGE per page list entry */
+	newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1;
+	if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
+		newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
 	newxprt->sc_max_req_size = svcrdma_max_req_size;
 	newxprt->sc_max_requests = svcrdma_max_requests;
 	newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
@@ -509,14 +453,14 @@
 		dprintk("svcrdma: error creating PD for connect request\n");
 		goto errout;
 	}
-	newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
-					0, IB_POLL_WORKQUEUE);
+	newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth,
+					    IB_POLL_WORKQUEUE);
 	if (IS_ERR(newxprt->sc_sq_cq)) {
 		dprintk("svcrdma: error creating SQ CQ for connect request\n");
 		goto errout;
 	}
-	newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, rq_depth,
-					0, IB_POLL_WORKQUEUE);
+	newxprt->sc_rq_cq =
+		ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE);
 	if (IS_ERR(newxprt->sc_rq_cq)) {
 		dprintk("svcrdma: error creating RQ CQ for connect request\n");
 		goto errout;
@@ -585,6 +529,7 @@
 	if (ret)
 		goto errout;
 
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	dprintk("svcrdma: new connection %p accepted:\n", newxprt);
 	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 	dprintk("    local address   : %pIS:%u\n", sap, rpc_get_port(sap));
@@ -595,6 +540,7 @@
 	dprintk("    rdma_rw_ctxs    : %d\n", ctxts);
 	dprintk("    max_requests    : %d\n", newxprt->sc_max_requests);
 	dprintk("    ord             : %d\n", conn_param.initiator_depth);
+#endif
 
 	trace_svcrdma_xprt_accept(&newxprt->sc_xprt);
 	return &newxprt->sc_xprt;
@@ -648,11 +594,6 @@
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
 		ib_drain_qp(rdma->sc_qp);
 
-	/* We should only be called from kref_put */
-	if (kref_read(&xprt->xpt_ref) != 0)
-		pr_err("svcrdma: sc_xprt still in use? (%d)\n",
-		       kref_read(&xprt->xpt_ref));
-
 	svc_rdma_flush_recv_queues(rdma);
 
 	/* Final put of backchannel client transport */
@@ -688,8 +629,9 @@
 {
 	struct svcxprt_rdma *rdma =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
 	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
-	queue_work(svc_rdma_wq, &rdma->sc_work);
+	schedule_work(&rdma->sc_work);
 }
 
 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 98cbc7b..160558b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -68,9 +68,9 @@
  * tunables
  */
 
-static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
 unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
-static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
 unsigned int xprt_rdma_memreg_strategy		= RPCRDMA_FRWR;
 int xprt_rdma_pad_optimize;
 
@@ -80,7 +80,6 @@
 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
 static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
 static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
-static unsigned int zero;
 static unsigned int max_padding = PAGE_SIZE;
 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
 static unsigned int max_memreg = RPCRDMA_LAST - 1;
@@ -122,7 +121,7 @@
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec_minmax,
-		.extra1		= &zero,
+		.extra1		= SYSCTL_ZERO,
 		.extra2		= &max_padding,
 	},
 	{
@@ -225,82 +224,70 @@
 		}
 }
 
-void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
-{
-	schedule_delayed_work(&ep->rep_connect_worker, 0);
-}
-
-void
-rpcrdma_connect_worker(struct work_struct *work)
-{
-	struct rpcrdma_ep *ep =
-		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
-	struct rpcrdma_xprt *r_xprt =
-		container_of(ep, struct rpcrdma_xprt, rx_ep);
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-
-	spin_lock_bh(&xprt->transport_lock);
-	if (ep->rep_connected > 0) {
-		if (!xprt_test_and_set_connected(xprt))
-			xprt_wake_pending_tasks(xprt, 0);
-	} else {
-		if (xprt_test_and_clear_connected(xprt))
-			xprt_wake_pending_tasks(xprt, -ENOTCONN);
-	}
-	spin_unlock_bh(&xprt->transport_lock);
-}
-
+/**
+ * xprt_rdma_connect_worker - establish connection in the background
+ * @work: worker thread context
+ *
+ * Requester holds the xprt's send lock to prevent activity on this
+ * transport while a fresh connection is being established. RPC tasks
+ * sleep on the xprt's pending queue waiting for connect to complete.
+ */
 static void
 xprt_rdma_connect_worker(struct work_struct *work)
 {
 	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 						   rx_connect_worker.work);
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-	int rc = 0;
-
-	xprt_clear_connected(xprt);
+	int rc;
 
 	rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
-	if (rc)
-		xprt_wake_pending_tasks(xprt, rc);
-
 	xprt_clear_connecting(xprt);
+	if (r_xprt->rx_ep.rep_connected > 0) {
+		if (!xprt_test_and_set_connected(xprt)) {
+			xprt->stat.connect_count++;
+			xprt->stat.connect_time += (long)jiffies -
+						   xprt->stat.connect_start;
+			xprt_wake_pending_tasks(xprt, -EAGAIN);
+		}
+	} else {
+		if (xprt_test_and_clear_connected(xprt))
+			xprt_wake_pending_tasks(xprt, rc);
+	}
 }
 
+/**
+ * xprt_rdma_inject_disconnect - inject a connection fault
+ * @xprt: transport context
+ *
+ * If @xprt is connected, disconnect it to simulate spurious connection
+ * loss.
+ */
 static void
 xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 {
-	struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
-						   rx_xprt);
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
-	trace_xprtrdma_inject_dsc(r_xprt);
+	trace_xprtrdma_op_inject_dsc(r_xprt);
 	rdma_disconnect(r_xprt->rx_ia.ri_id);
 }
 
-/*
- * xprt_rdma_destroy
+/**
+ * xprt_rdma_destroy - Full tear down of transport
+ * @xprt: doomed transport context
  *
- * Destroy the xprt.
- * Free all memory associated with the object, including its own.
- * NOTE: none of the *destroy methods free memory for their top-level
- * objects, even though they may have allocated it (they do free
- * private memory). It's up to the caller to handle it. In this
- * case (RDMA transport), all structure memory is inlined with the
- * struct rpcrdma_xprt.
+ * Caller guarantees there will be no more calls to us with
+ * this @xprt.
  */
 static void
 xprt_rdma_destroy(struct rpc_xprt *xprt)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
-	trace_xprtrdma_destroy(r_xprt);
+	trace_xprtrdma_op_destroy(r_xprt);
 
 	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
-	xprt_clear_connected(xprt);
-
-	rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+	rpcrdma_ep_destroy(r_xprt);
 	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 	rpcrdma_ia_close(&r_xprt->rx_ia);
 
@@ -310,6 +297,7 @@
 	module_put(THIS_MODULE);
 }
 
+/* 60 second timeout, no retries */
 static const struct rpc_timeout xprt_rdma_default_timeout = {
 	.to_initval = 60 * HZ,
 	.to_maxval = 60 * HZ,
@@ -323,33 +311,26 @@
 static struct rpc_xprt *
 xprt_setup_rdma(struct xprt_create *args)
 {
-	struct rpcrdma_create_data_internal cdata;
 	struct rpc_xprt *xprt;
 	struct rpcrdma_xprt *new_xprt;
-	struct rpcrdma_ep *new_ep;
 	struct sockaddr *sap;
 	int rc;
 
-	if (args->addrlen > sizeof(xprt->addr)) {
-		dprintk("RPC:       %s: address too large\n", __func__);
+	if (args->addrlen > sizeof(xprt->addr))
 		return ERR_PTR(-EBADF);
-	}
 
 	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
-	if (xprt == NULL) {
-		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
-			__func__);
+	if (!xprt)
 		return ERR_PTR(-ENOMEM);
-	}
 
-	/* 60 second timeout, no retries */
 	xprt->timeout = &xprt_rdma_default_timeout;
+	xprt->connect_timeout = xprt->timeout->to_initval;
+	xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
 	xprt->bind_timeout = RPCRDMA_BIND_TO;
 	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 	xprt->resvport = 0;		/* privileged port not needed */
-	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
 	xprt->ops = &xprt_rdma_procs;
 
 	/*
@@ -367,40 +348,12 @@
 		xprt_set_bound(xprt);
 	xprt_rdma_format_addresses(xprt, sap);
 
-	cdata.max_requests = xprt_rdma_slot_table_entries;
-
-	cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
-	cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
-
-	cdata.inline_wsize = xprt_rdma_max_inline_write;
-	if (cdata.inline_wsize > cdata.wsize)
-		cdata.inline_wsize = cdata.wsize;
-
-	cdata.inline_rsize = xprt_rdma_max_inline_read;
-	if (cdata.inline_rsize > cdata.rsize)
-		cdata.inline_rsize = cdata.rsize;
-
-	/*
-	 * Create new transport instance, which includes initialized
-	 *  o ia
-	 *  o endpoint
-	 *  o buffers
-	 */
-
 	new_xprt = rpcx_to_rdmax(xprt);
-
 	rc = rpcrdma_ia_open(new_xprt);
 	if (rc)
 		goto out1;
 
-	/*
-	 * initialize and create ep
-	 */
-	new_xprt->rx_data = cdata;
-	new_ep = &new_xprt->rx_ep;
-
-	rc = rpcrdma_ep_create(&new_xprt->rx_ep,
-				&new_xprt->rx_ia, &new_xprt->rx_data);
+	rc = rpcrdma_ep_create(new_xprt);
 	if (rc)
 		goto out2;
 
@@ -411,7 +364,7 @@
 	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 			  xprt_rdma_connect_worker);
 
-	xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
+	xprt->max_payload = frwr_maxpages(new_xprt);
 	if (xprt->max_payload == 0)
 		goto out4;
 	xprt->max_payload <<= PAGE_SHIFT;
@@ -431,42 +384,45 @@
 	rpcrdma_buffer_destroy(&new_xprt->rx_buf);
 	rc = -ENODEV;
 out3:
-	rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+	rpcrdma_ep_destroy(new_xprt);
 out2:
 	rpcrdma_ia_close(&new_xprt->rx_ia);
 out1:
-	trace_xprtrdma_destroy(new_xprt);
+	trace_xprtrdma_op_destroy(new_xprt);
 	xprt_rdma_free_addresses(xprt);
 	xprt_free(xprt);
 	return ERR_PTR(rc);
 }
 
 /**
- * xprt_rdma_close - Close down RDMA connection
- * @xprt: generic transport to be closed
+ * xprt_rdma_close - close a transport connection
+ * @xprt: transport context
  *
- * Called during transport shutdown reconnect, or device
- * removal. Caller holds the transport's write lock.
+ * Called during autoclose or device removal.
+ *
+ * Caller holds @xprt's send lock to prevent activity on this
+ * transport while the connection is torn down.
  */
-static void
-xprt_rdma_close(struct rpc_xprt *xprt)
+void xprt_rdma_close(struct rpc_xprt *xprt)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
+	might_sleep();
+
+	trace_xprtrdma_op_close(r_xprt);
+
+	/* Prevent marshaling and sending of new requests */
+	xprt_clear_connected(xprt);
 
 	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
-		xprt_clear_connected(xprt);
 		rpcrdma_ia_remove(ia);
-		return;
+		goto out;
 	}
+
 	if (ep->rep_connected == -ENODEV)
 		return;
-	if (ep->rep_connected > 0)
-		xprt->reestablish_timeout = 0;
-	xprt_disconnect_done(xprt);
 	rpcrdma_ep_disconnect(ep, ia);
 
 	/* Prepare @xprt for the next connection by reinitializing
@@ -474,6 +430,11 @@
 	 */
 	r_xprt->rx_buf.rb_credits = 1;
 	xprt->cwnd = RPC_CWNDSHIFT;
+
+out:
+	xprt->reestablish_timeout = 0;
+	++xprt->connect_cookie;
+	xprt_disconnect_done(xprt);
 }
 
 /**
@@ -525,25 +486,65 @@
 	xprt_force_disconnect(xprt);
 }
 
+/**
+ * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
+ * @xprt: controlling transport instance
+ * @connect_timeout: reconnect timeout after client disconnects
+ * @reconnect_timeout: reconnect timeout after server disconnects
+ *
+ */
+static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
+					  unsigned long connect_timeout,
+					  unsigned long reconnect_timeout)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
+
+	spin_lock(&xprt->transport_lock);
+
+	if (connect_timeout < xprt->connect_timeout) {
+		struct rpc_timeout to;
+		unsigned long initval;
+
+		to = *xprt->timeout;
+		initval = connect_timeout;
+		if (initval < RPCRDMA_INIT_REEST_TO << 1)
+			initval = RPCRDMA_INIT_REEST_TO << 1;
+		to.to_initval = initval;
+		to.to_maxval = initval;
+		r_xprt->rx_timeout = to;
+		xprt->timeout = &r_xprt->rx_timeout;
+		xprt->connect_timeout = connect_timeout;
+	}
+
+	if (reconnect_timeout < xprt->max_reconnect_timeout)
+		xprt->max_reconnect_timeout = reconnect_timeout;
+
+	spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * xprt_rdma_connect - schedule an attempt to reconnect
+ * @xprt: transport state
+ * @task: RPC scheduler context (unused)
+ *
+ */
 static void
 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	unsigned long delay;
 
+	trace_xprtrdma_op_connect(r_xprt);
+
+	delay = 0;
 	if (r_xprt->rx_ep.rep_connected != 0) {
-		/* Reconnect */
-		schedule_delayed_work(&r_xprt->rx_connect_worker,
-				      xprt->reestablish_timeout);
-		xprt->reestablish_timeout <<= 1;
-		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
-			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
-		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
-			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
-	} else {
-		schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
-		if (!RPC_IS_ASYNC(task))
-			flush_delayed_work(&r_xprt->rx_connect_worker);
+		delay = xprt_reconnect_delay(xprt);
+		xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
 	}
+	queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
+			   delay);
 }
 
 /**
@@ -569,6 +570,7 @@
 	return;
 
 out_sleep:
+	set_bit(XPRT_CONGESTED, &xprt->state);
 	rpc_sleep_on(&xprt->backlog, task, NULL);
 	task->tk_status = -EAGAIN;
 }
@@ -582,57 +584,24 @@
 static void
 xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 {
+	struct rpcrdma_xprt *r_xprt =
+		container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
 	memset(rqst, 0, sizeof(*rqst));
-	rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
-	rpc_wake_up_next(&xprt->backlog);
+	rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+	if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
+		clear_bit(XPRT_CONGESTED, &xprt->state);
 }
 
-static bool
-rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-		    size_t size, gfp_t flags)
+static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
+				 struct rpcrdma_regbuf *rb, size_t size,
+				 gfp_t flags)
 {
-	struct rpcrdma_regbuf *rb;
-
-	if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
-		return true;
-
-	rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
-	if (IS_ERR(rb))
-		return false;
-
-	rpcrdma_free_regbuf(req->rl_sendbuf);
-	r_xprt->rx_stats.hardway_register_count += size;
-	req->rl_sendbuf = rb;
-	return true;
-}
-
-/* The rq_rcv_buf is used only if a Reply chunk is necessary.
- * The decision to use a Reply chunk is made later in
- * rpcrdma_marshal_req. This buffer is registered at that time.
- *
- * Otherwise, the associated RPC Reply arrives in a separate
- * Receive buffer, arbitrarily chosen by the HCA. The buffer
- * allocated here for the RPC Reply is not utilized in that
- * case. See rpcrdma_inline_fixup.
- *
- * A regbuf is used here to remember the buffer size.
- */
-static bool
-rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-		    size_t size, gfp_t flags)
-{
-	struct rpcrdma_regbuf *rb;
-
-	if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
-		return true;
-
-	rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
-	if (IS_ERR(rb))
-		return false;
-
-	rpcrdma_free_regbuf(req->rl_recvbuf);
-	r_xprt->rx_stats.hardway_register_count += size;
-	req->rl_recvbuf = rb;
+	if (unlikely(rdmab_length(rb) < size)) {
+		if (!rpcrdma_regbuf_realloc(rb, size, flags))
+			return false;
+		r_xprt->rx_stats.hardway_register_count += size;
+	}
 	return true;
 }
 
@@ -644,13 +613,6 @@
  *        0:	Success; rq_buffer points to RPC buffer to use
  *   ENOMEM:	Out of memory, call again later
  *      EIO:	A permanent error occurred, do not retry
- *
- * The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual
- * send/recv sequence.
- *
- * xprt_rdma_allocate provides buffers that are already mapped for
- * DMA, and a local DMA lkey is provided for each.
  */
 static int
 xprt_rdma_allocate(struct rpc_task *task)
@@ -664,18 +626,20 @@
 	if (RPC_IS_SWAPPER(task))
 		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 
-	if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+	if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
+				  flags))
 		goto out_fail;
-	if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+	if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
+				  flags))
 		goto out_fail;
 
-	rqst->rq_buffer = req->rl_sendbuf->rg_base;
-	rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
-	trace_xprtrdma_allocate(task, req);
+	rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
+	rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
+	trace_xprtrdma_op_allocate(task, req);
 	return 0;
 
 out_fail:
-	trace_xprtrdma_allocate(task, NULL);
+	trace_xprtrdma_op_allocate(task, NULL);
 	return -ENOMEM;
 }
 
@@ -692,14 +656,21 @@
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
-	if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
-		rpcrdma_release_rqst(r_xprt, req);
-	trace_xprtrdma_rpc_done(task, req);
+	trace_xprtrdma_op_free(task, req);
+
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_sync(r_xprt, req);
+
+	/* XXX: If the RPC is completing because of a signal and
+	 * not because a reply was received, we ought to ensure
+	 * that the Send completion has fired, so that memory
+	 * involved with the Send is not still visible to the NIC.
+	 */
 }
 
 /**
  * xprt_rdma_send_request - marshal and send an RPC request
- * @task: RPC task with an RPC message in rq_snd_buf
+ * @rqst: RPC message in rq_snd_buf
  *
  * Caller holds the transport's write lock.
  *
@@ -708,13 +679,14 @@
  *	%-ENOTCONN if the caller should reconnect and call again
  *	%-EAGAIN if the caller should call again
  *	%-ENOBUFS if the caller should call again after a delay
- *	%-EIO if a permanent error occurred and the request was not
- *		sent. Do not try to send this message again.
+ *	%-EMSGSIZE if encoding ran out of buffer space. The request
+ *		was not sent. Do not try to send this message again.
+ *	%-EIO if an I/O error occurred. The request was not sent.
+ *		Do not try to send this message again.
  */
 static int
-xprt_rdma_send_request(struct rpc_task *task)
+xprt_rdma_send_request(struct rpc_rqst *rqst)
 {
-	struct rpc_rqst *rqst = task->tk_rqstp;
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -726,7 +698,10 @@
 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 
 	if (!xprt_connected(xprt))
-		goto drop_connection;
+		return -ENOTCONN;
+
+	if (!xprt_request_get_cong(xprt, rqst))
+		return -EBADSLT;
 
 	rc = rpcrdma_marshal_req(r_xprt, rqst);
 	if (rc < 0)
@@ -737,17 +712,15 @@
 		goto drop_connection;
 	rqst->rq_xtime = ktime_get();
 
-	__set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 		goto drop_connection;
 
 	rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
-	rqst->rq_bytes_sent = 0;
 
 	/* An RPC with no reply will throw off credit accounting,
 	 * so drop the connection to reset the credit grant.
 	 */
-	if (!rpc_reply_expected(task))
+	if (!rpc_reply_expected(rqst->rq_task))
 		goto drop_connection;
 	return 0;
 
@@ -755,8 +728,8 @@
 	if (rc != -ENOTCONN)
 		return rc;
 drop_connection:
-	xprt_disconnect_done(xprt);
-	return -ENOTCONN;	/* implies disconnect */
+	xprt_rdma_close(xprt);
+	return -ENOTCONN;
 }
 
 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
@@ -772,7 +745,7 @@
 		   0,	/* need a local port? */
 		   xprt->stat.bind_count,
 		   xprt->stat.connect_count,
-		   xprt->stat.connect_time,
+		   xprt->stat.connect_time / HZ,
 		   idle_time,
 		   xprt->stat.sends,
 		   xprt->stat.recvs,
@@ -792,7 +765,7 @@
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
 	seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
-		   r_xprt->rx_stats.mrs_recovered,
+		   r_xprt->rx_stats.mrs_recycled,
 		   r_xprt->rx_stats.mrs_orphaned,
 		   r_xprt->rx_stats.mrs_allocated,
 		   r_xprt->rx_stats.local_inv_needed,
@@ -821,7 +794,7 @@
 	.alloc_slot		= xprt_rdma_alloc_slot,
 	.free_slot		= xprt_rdma_free_slot,
 	.release_request	= xprt_release_rqst_cong,       /* ditto */
-	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
+	.wait_for_reply_request	= xprt_wait_for_reply_request_def, /* ditto */
 	.timer			= xprt_rdma_timer,
 	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
 	.set_port		= xprt_rdma_set_port,
@@ -831,14 +804,15 @@
 	.send_request		= xprt_rdma_send_request,
 	.close			= xprt_rdma_close,
 	.destroy		= xprt_rdma_destroy,
+	.set_connect_timeout	= xprt_rdma_set_connect_timeout,
 	.print_stats		= xprt_rdma_print_stats,
 	.enable_swap		= xprt_rdma_enable_swap,
 	.disable_swap		= xprt_rdma_disable_swap,
 	.inject_disconnect	= xprt_rdma_inject_disconnect,
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	.bc_setup		= xprt_rdma_bc_setup,
-	.bc_up			= xprt_rdma_bc_up,
 	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
+	.bc_num_slots		= xprt_rdma_bc_max_slots,
 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
 	.bc_destroy		= xprt_rdma_bc_destroy,
 #endif
@@ -854,58 +828,31 @@
 
 void xprt_rdma_cleanup(void)
 {
-	int rc;
-
-	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	if (sunrpc_table_header) {
 		unregister_sysctl_table(sunrpc_table_header);
 		sunrpc_table_header = NULL;
 	}
 #endif
-	rc = xprt_unregister_transport(&xprt_rdma);
-	if (rc)
-		dprintk("RPC:       %s: xprt_unregister returned %i\n",
-			__func__, rc);
 
-	rpcrdma_destroy_wq();
-
-	rc = xprt_unregister_transport(&xprt_rdma_bc);
-	if (rc)
-		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
-			__func__, rc);
+	xprt_unregister_transport(&xprt_rdma);
+	xprt_unregister_transport(&xprt_rdma_bc);
 }
 
 int xprt_rdma_init(void)
 {
 	int rc;
 
-	rc = rpcrdma_alloc_wq();
+	rc = xprt_register_transport(&xprt_rdma);
 	if (rc)
 		return rc;
 
-	rc = xprt_register_transport(&xprt_rdma);
-	if (rc) {
-		rpcrdma_destroy_wq();
-		return rc;
-	}
-
 	rc = xprt_register_transport(&xprt_rdma_bc);
 	if (rc) {
 		xprt_unregister_transport(&xprt_rdma);
-		rpcrdma_destroy_wq();
 		return rc;
 	}
 
-	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
-
-	dprintk("Defaults:\n");
-	dprintk("\tSlots %d\n"
-		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
-		xprt_rdma_slot_table_entries,
-		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
-	dprintk("\tPadding 0\n\tMemreg %d\n", xprt_rdma_memreg_strategy);
-
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	if (!sunrpc_table_header)
 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 956a5ea..3a90753 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -53,6 +53,7 @@
 #include <linux/slab.h>
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/svc_rdma.h>
+#include <linux/log2.h>
 
 #include <asm-generic/barrier.h>
 #include <asm/bitops.h>
@@ -74,56 +75,52 @@
  * internal functions
  */
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
-static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
+		     gfp_t flags);
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
+static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish. ib_drain_qp
+ * handles the drains in the wrong order for us, so open code
+ * them here.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-	struct workqueue_struct *recv_wq;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	recv_wq = alloc_workqueue("xprtrdma_receive",
-				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
-				  0);
-	if (!recv_wq)
-		return -ENOMEM;
+	/* Flush Receives, then wait for deferred Reply work
+	 * to complete.
+	 */
+	ib_drain_rq(ia->ri_id->qp);
 
-	rpcrdma_receive_wq = recv_wq;
-	return 0;
+	/* Deferred Reply processing might have scheduled
+	 * local invalidations.
+	 */
+	ib_drain_sq(ia->ri_id->qp);
 }
 
-void
-rpcrdma_destroy_wq(void)
-{
-	struct workqueue_struct *wq;
-
-	if (rpcrdma_receive_wq) {
-		wq = rpcrdma_receive_wq;
-		rpcrdma_receive_wq = NULL;
-		destroy_workqueue(wq);
-	}
-}
-
+/**
+ * rpcrdma_qp_event_handler - Handle one QP event (error notification)
+ * @event: details of the event
+ * @context: ep that owns QP where event occurred
+ *
+ * Called from the RDMA provider (device driver) possibly in an interrupt
+ * context.
+ */
 static void
-rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 {
 	struct rpcrdma_ep *ep = context;
 	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 						   rx_ep);
 
-	trace_xprtrdma_qp_error(r_xprt, event);
-	pr_err("rpcrdma: %s on device %s ep %p\n",
-	       ib_event_msg(event->event), event->device->name, context);
-
-	if (ep->rep_connected == 1) {
-		ep->rep_connected = -EIO;
-		rpcrdma_conn_func(ep);
-		wake_up_all(&ep->rep_connect_wait);
-	}
+	trace_xprtrdma_qp_event(r_xprt, event);
 }
 
 /**
@@ -141,11 +138,6 @@
 
 	/* WARNING: Only wr_cqe and status are reliable at this point */
 	trace_xprtrdma_wc_send(sc, wc);
-	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
-		       ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
-
 	rpcrdma_sendctx_put_locked(sc);
 }
 
@@ -161,11 +153,13 @@
 	struct ib_cqe *cqe = wc->wr_cqe;
 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 					       rr_cqe);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 
-	/* WARNING: Only wr_id and status are reliable at this point */
+	/* WARNING: Only wr_cqe and status are reliable at this point */
 	trace_xprtrdma_wc_receive(wc);
+	--r_xprt->rx_ep.rep_receive_count;
 	if (wc->status != IB_WC_SUCCESS)
-		goto out_fail;
+		goto out_flushed;
 
 	/* status == SUCCESS means all fields in wc are trustworthy */
 	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
@@ -176,24 +170,18 @@
 				   rdmab_addr(rep->rr_rdmabuf),
 				   wc->byte_len, DMA_FROM_DEVICE);
 
-out_schedule:
+	rpcrdma_post_recvs(r_xprt, false);
 	rpcrdma_reply_handler(rep);
 	return;
 
-out_fail:
-	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
-		       ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
-	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
-	goto out_schedule;
+out_flushed:
+	rpcrdma_recv_buffer_put(rep);
 }
 
 static void
 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 			       struct rdma_conn_param *param)
 {
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 	const struct rpcrdma_connect_private *pmsg = param->private_data;
 	unsigned int rsize, wsize;
 
@@ -210,89 +198,96 @@
 		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 	}
 
-	if (rsize < cdata->inline_rsize)
-		cdata->inline_rsize = rsize;
-	if (wsize < cdata->inline_wsize)
-		cdata->inline_wsize = wsize;
-	dprintk("RPC:       %s: max send %u, max recv %u\n",
-		__func__, cdata->inline_wsize, cdata->inline_rsize);
+	if (rsize < r_xprt->rx_ep.rep_inline_recv)
+		r_xprt->rx_ep.rep_inline_recv = rsize;
+	if (wsize < r_xprt->rx_ep.rep_inline_send)
+		r_xprt->rx_ep.rep_inline_send = wsize;
+	dprintk("RPC:       %s: max send %u, max recv %u\n", __func__,
+		r_xprt->rx_ep.rep_inline_send,
+		r_xprt->rx_ep.rep_inline_recv);
 	rpcrdma_set_max_header_sizes(r_xprt);
 }
 
+/**
+ * rpcrdma_cm_event_handler - Handle RDMA CM events
+ * @id: rdma_cm_id on which an event has occurred
+ * @event: details of the event
+ *
+ * Called with @id's mutex held. Returns 1 if caller should
+ * destroy @id, otherwise 0.
+ */
 static int
-rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 {
-	struct rpcrdma_xprt *xprt = id->context;
-	struct rpcrdma_ia *ia = &xprt->rx_ia;
-	struct rpcrdma_ep *ep = &xprt->rx_ep;
-	int connstate = 0;
+	struct rpcrdma_xprt *r_xprt = id->context;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 
-	trace_xprtrdma_conn_upcall(xprt, event);
+	might_sleep();
+
+	trace_xprtrdma_cm_event(r_xprt, event);
 	switch (event->event) {
 	case RDMA_CM_EVENT_ADDR_RESOLVED:
 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 		ia->ri_async_rc = 0;
 		complete(&ia->ri_done);
-		break;
+		return 0;
 	case RDMA_CM_EVENT_ADDR_ERROR:
 		ia->ri_async_rc = -EPROTO;
 		complete(&ia->ri_done);
-		break;
+		return 0;
 	case RDMA_CM_EVENT_ROUTE_ERROR:
 		ia->ri_async_rc = -ENETUNREACH;
 		complete(&ia->ri_done);
-		break;
+		return 0;
 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 		pr_info("rpcrdma: removing device %s for %s:%s\n",
-			ia->ri_device->name,
-			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
+			ia->ri_id->device->name,
+			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
 #endif
 		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 		ep->rep_connected = -ENODEV;
-		xprt_force_disconnect(&xprt->rx_xprt);
+		xprt_force_disconnect(xprt);
 		wait_for_completion(&ia->ri_remove_done);
 
 		ia->ri_id = NULL;
-		ia->ri_device = NULL;
 		/* Return 1 to ensure the core destroys the id. */
 		return 1;
 	case RDMA_CM_EVENT_ESTABLISHED:
-		++xprt->rx_xprt.connect_cookie;
-		connstate = 1;
-		rpcrdma_update_connect_private(xprt, &event->param.conn);
-		goto connected;
+		++xprt->connect_cookie;
+		ep->rep_connected = 1;
+		rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+		wake_up_all(&ep->rep_connect_wait);
+		break;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
-		connstate = -ENOTCONN;
-		goto connected;
+		ep->rep_connected = -ENOTCONN;
+		goto disconnected;
 	case RDMA_CM_EVENT_UNREACHABLE:
-		connstate = -ENETUNREACH;
-		goto connected;
+		ep->rep_connected = -ENETUNREACH;
+		goto disconnected;
 	case RDMA_CM_EVENT_REJECTED:
 		dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
-			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
+			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
 			rdma_reject_msg(id, event->status));
-		connstate = -ECONNREFUSED;
+		ep->rep_connected = -ECONNREFUSED;
 		if (event->status == IB_CM_REJ_STALE_CONN)
-			connstate = -EAGAIN;
-		goto connected;
+			ep->rep_connected = -EAGAIN;
+		goto disconnected;
 	case RDMA_CM_EVENT_DISCONNECTED:
-		++xprt->rx_xprt.connect_cookie;
-		connstate = -ECONNABORTED;
-connected:
-		ep->rep_connected = connstate;
-		rpcrdma_conn_func(ep);
+		ep->rep_connected = -ECONNABORTED;
+disconnected:
+		xprt_force_disconnect(xprt);
 		wake_up_all(&ep->rep_connect_wait);
-		/*FALLTHROUGH*/
+		break;
 	default:
-		dprintk("RPC:       %s: %s:%s on %s/%s (ep 0x%p): %s\n",
-			__func__,
-			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
-			ia->ri_device->name, ia->ri_ops->ro_displayname,
-			ep, rdma_event_msg(event->event));
 		break;
 	}
 
+	dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
+		rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
+		ia->ri_id->device->name, rdma_event_msg(event->event));
 	return 0;
 }
 
@@ -308,24 +303,17 @@
 	init_completion(&ia->ri_done);
 	init_completion(&ia->ri_remove_done);
 
-	id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
+	id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
 			    xprt, RDMA_PS_TCP, IB_QPT_RC);
-	if (IS_ERR(id)) {
-		rc = PTR_ERR(id);
-		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
-			__func__, rc);
+	if (IS_ERR(id))
 		return id;
-	}
 
 	ia->ri_async_rc = -ETIMEDOUT;
 	rc = rdma_resolve_addr(id, NULL,
 			       (struct sockaddr *)&xprt->rx_xprt.addr,
 			       RDMA_RESOLVE_TIMEOUT);
-	if (rc) {
-		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
-			__func__, rc);
+	if (rc)
 		goto out;
-	}
 	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 	if (rc < 0) {
 		trace_xprtrdma_conn_tout(xprt);
@@ -338,11 +326,8 @@
 
 	ia->ri_async_rc = -ETIMEDOUT;
 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
-	if (rc) {
-		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
-			__func__, rc);
+	if (rc)
 		goto out;
-	}
 	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 	if (rc < 0) {
 		trace_xprtrdma_conn_tout(xprt);
@@ -381,9 +366,8 @@
 		rc = PTR_ERR(ia->ri_id);
 		goto out_err;
 	}
-	ia->ri_device = ia->ri_id->device;
 
-	ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
+	ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
 	if (IS_ERR(ia->ri_pd)) {
 		rc = PTR_ERR(ia->ri_pd);
 		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
@@ -392,20 +376,12 @@
 
 	switch (xprt_rdma_memreg_strategy) {
 	case RPCRDMA_FRWR:
-		if (frwr_is_supported(ia)) {
-			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
+		if (frwr_is_supported(ia->ri_id->device))
 			break;
-		}
-		/*FALLTHROUGH*/
-	case RPCRDMA_MTHCAFMR:
-		if (fmr_is_supported(ia)) {
-			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
-			break;
-		}
 		/*FALLTHROUGH*/
 	default:
 		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
-		       ia->ri_device->name, xprt_rdma_memreg_strategy);
+		       ia->ri_id->device->name, xprt_rdma_memreg_strategy);
 		rc = -EINVAL;
 		goto out_err;
 	}
@@ -432,9 +408,8 @@
 	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_req *req;
-	struct rpcrdma_rep *rep;
 
-	cancel_delayed_work_sync(&buf->rb_refresh_worker);
+	cancel_work_sync(&buf->rb_refresh_worker);
 
 	/* This is similar to rpcrdma_ep_destroy, but:
 	 * - Don't cancel the connect worker.
@@ -444,7 +419,7 @@
 	 *   connection is already gone.
 	 */
 	if (ia->ri_id->qp) {
-		ib_drain_qp(ia->ri_id->qp);
+		rpcrdma_xprt_drain(r_xprt);
 		rdma_destroy_qp(ia->ri_id);
 		ia->ri_id->qp = NULL;
 	}
@@ -456,12 +431,11 @@
 	/* The ULP is responsible for ensuring all DMA
 	 * mappings and MRs are gone.
 	 */
-	list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
-		rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
+	rpcrdma_reps_destroy(buf);
 	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
-		rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
-		rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
-		rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
+		rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
+		rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
+		rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
 	}
 	rpcrdma_mrs_destroy(buf);
 	ib_dealloc_pd(ia->ri_pd);
@@ -487,7 +461,6 @@
 		rdma_destroy_id(ia->ri_id);
 	}
 	ia->ri_id = NULL;
-	ia->ri_device = NULL;
 
 	/* If the pd is still busy, xprtrdma missed freeing a resource */
 	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
@@ -495,19 +468,26 @@
 	ia->ri_pd = NULL;
 }
 
-/*
- * Create unconnected endpoint.
+/**
+ * rpcrdma_ep_create - Create unconnected endpoint
+ * @r_xprt: transport to instantiate
+ *
+ * Returns zero on success, or a negative errno.
  */
-int
-rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
-		  struct rpcrdma_create_data_internal *cdata)
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 	struct ib_cq *sendcq, *recvcq;
 	unsigned int max_sge;
 	int rc;
 
-	max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
+	ep->rep_max_requests = xprt_rdma_slot_table_entries;
+	ep->rep_inline_send = xprt_rdma_max_inline_write;
+	ep->rep_inline_recv = xprt_rdma_max_inline_read;
+
+	max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
 			RPCRDMA_MAX_SEND_SGES);
 	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
 		pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
@@ -515,11 +495,11 @@
 	}
 	ia->ri_max_send_sges = max_sge;
 
-	rc = ia->ri_ops->ro_open(ia, ep, cdata);
+	rc = frwr_open(ia, ep);
 	if (rc)
 		return rc;
 
-	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+	ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
 	ep->rep_attr.qp_context = ep;
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_sge = max_sge;
@@ -537,30 +517,24 @@
 		ep->rep_attr.cap.max_send_sge,
 		ep->rep_attr.cap.max_recv_sge);
 
-	/* set trigger for requesting send completion */
-	ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
-				   cdata->max_requests >> 2);
+	ep->rep_send_batch = ep->rep_max_requests >> 3;
 	ep->rep_send_count = ep->rep_send_batch;
 	init_waitqueue_head(&ep->rep_connect_wait);
-	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
+	ep->rep_receive_count = 0;
 
-	sendcq = ib_alloc_cq(ia->ri_device, NULL,
-			     ep->rep_attr.cap.max_send_wr + 1,
-			     1, IB_POLL_WORKQUEUE);
+	sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+				 ep->rep_attr.cap.max_send_wr + 1,
+				 IB_POLL_WORKQUEUE);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
-		dprintk("RPC:       %s: failed to create send CQ: %i\n",
-			__func__, rc);
 		goto out1;
 	}
 
-	recvcq = ib_alloc_cq(ia->ri_device, NULL,
-			     ep->rep_attr.cap.max_recv_wr + 1,
-			     0, IB_POLL_WORKQUEUE);
+	recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+				 ep->rep_attr.cap.max_recv_wr + 1,
+				 IB_POLL_WORKQUEUE);
 	if (IS_ERR(recvcq)) {
 		rc = PTR_ERR(recvcq);
-		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
-			__func__, rc);
 		goto out2;
 	}
 
@@ -573,16 +547,16 @@
 	/* Prepare RDMA-CM private message */
 	pmsg->cp_magic = rpcrdma_cmp_magic;
 	pmsg->cp_version = RPCRDMA_CMP_VERSION;
-	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
-	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
-	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
+	pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
+	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
 	ep->rep_remote_cma.private_data = pmsg;
 	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 
 	/* Client offers RDMA Read but does not initiate */
 	ep->rep_remote_cma.initiator_depth = 0;
 	ep->rep_remote_cma.responder_resources =
-		min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
+		min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
 
 	/* Limit transport retries so client can detect server
 	 * GID changes quickly. RPC layer handles re-establishing
@@ -605,17 +579,15 @@
 	return rc;
 }
 
-/*
- * rpcrdma_ep_destroy
+/**
+ * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
+ * @r_xprt: transport instance to shut down
  *
- * Disconnect and destroy endpoint. After this, the only
- * valid operations on the ep are to free it (if dynamically
- * allocated) or re-create it.
  */
-void
-rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
 {
-	cancel_delayed_work_sync(&ep->rep_connect_worker);
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 	if (ia->ri_id && ia->ri_id->qp) {
 		rpcrdma_ep_disconnect(ep, ia);
@@ -633,10 +605,10 @@
  * Unlike a normal reconnection, a fresh PD and a new set
  * of MRs and buffers is needed.
  */
-static int
-rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
-			 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+				    struct ib_qp_init_attr *qp_init_attr)
 {
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	int rc, err;
 
 	trace_xprtrdma_reinsert(r_xprt);
@@ -646,14 +618,14 @@
 		goto out1;
 
 	rc = -ENOMEM;
-	err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
+	err = rpcrdma_ep_create(r_xprt);
 	if (err) {
 		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
 		goto out2;
 	}
 
 	rc = -ENETUNREACH;
-	err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+	err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
 	if (err) {
 		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
 		goto out3;
@@ -663,23 +635,23 @@
 	return 0;
 
 out3:
-	rpcrdma_ep_destroy(ep, ia);
+	rpcrdma_ep_destroy(r_xprt);
 out2:
 	rpcrdma_ia_close(ia);
 out1:
 	return rc;
 }
 
-static int
-rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
-		     struct rpcrdma_ia *ia)
+static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+				struct ib_qp_init_attr *qp_init_attr)
 {
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rdma_cm_id *id, *old;
 	int err, rc;
 
 	trace_xprtrdma_reconnect(r_xprt);
 
-	rpcrdma_ep_disconnect(ep, ia);
+	rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 
 	rc = -EHOSTUNREACH;
 	id = rpcrdma_create_id(r_xprt, ia);
@@ -696,17 +668,14 @@
 	 */
 	old = id;
 	rc = -ENETUNREACH;
-	if (ia->ri_device != id->device) {
+	if (ia->ri_id->device != id->device) {
 		pr_err("rpcrdma: can't reconnect on different device!\n");
 		goto out_destroy;
 	}
 
-	err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
-	if (err) {
-		dprintk("RPC:       %s: rdma_create_qp returned %d\n",
-			__func__, err);
+	err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
+	if (err)
 		goto out_destroy;
-	}
 
 	/* Atomically replace the transport's ID and QP. */
 	rc = 0;
@@ -728,41 +697,43 @@
 {
 	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 						   rx_ia);
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+	struct ib_qp_init_attr qp_init_attr;
 	int rc;
 
 retry:
+	memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
 	switch (ep->rep_connected) {
 	case 0:
 		dprintk("RPC:       %s: connecting...\n", __func__);
-		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
 		if (rc) {
-			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
-				__func__, rc);
 			rc = -ENETUNREACH;
 			goto out_noupdate;
 		}
 		break;
 	case -ENODEV:
-		rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+		rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
 		if (rc)
 			goto out_noupdate;
 		break;
 	default:
-		rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+		rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
 		if (rc)
 			goto out;
 	}
 
 	ep->rep_connected = 0;
+	xprt_clear_connected(xprt);
+
 	rpcrdma_post_recvs(r_xprt, true);
 
 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
-	if (rc) {
-		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
-				__func__, rc);
+	if (rc)
 		goto out;
-	}
 
+	if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+		xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 	if (ep->rep_connected <= 0) {
 		if (ep->rep_connected == -EAGAIN)
@@ -781,8 +752,10 @@
 	return rc;
 }
 
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
  *
  * This is separate from destroy to facilitate the ability
  * to reconnect without recreating the endpoint.
@@ -793,19 +766,20 @@
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
+	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+						   rx_ep);
 	int rc;
 
+	/* returns without wait if ID is not connected */
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc)
-		/* returns without wait if not connected */
 		wait_event_interruptible(ep->rep_connect_wait,
 							ep->rep_connected != 1);
 	else
 		ep->rep_connected = rc;
-	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
-					       rx_ep), rc);
+	trace_xprtrdma_disconnect(r_xprt, rc);
 
-	ib_drain_qp(ia->ri_id->qp);
+	rpcrdma_xprt_drain(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -822,8 +796,8 @@
  */
 
 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
- * queue activity, and ib_drain_qp has flushed all remaining Send
- * requests.
+ * queue activity, and rpcrdma_xprt_drain has flushed all remaining
+ * Send requests.
  */
 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
 {
@@ -838,8 +812,7 @@
 {
 	struct rpcrdma_sendctx *sc;
 
-	sc = kzalloc(sizeof(*sc) +
-		     ia->ri_max_send_sges * sizeof(struct ib_sge),
+	sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges),
 		     GFP_KERNEL);
 	if (!sc)
 		return NULL;
@@ -872,18 +845,13 @@
 	for (i = 0; i <= buf->rb_sc_last; i++) {
 		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
 		if (!sc)
-			goto out_destroy;
+			return -ENOMEM;
 
 		sc->sc_xprt = r_xprt;
 		buf->rb_sc_ctxs[i] = sc;
 	}
-	buf->rb_flags = 0;
 
 	return 0;
-
-out_destroy:
-	rpcrdma_sendctxs_destroy(buf);
-	return -ENOMEM;
 }
 
 /* The sendctx queue is not guaranteed to have a size that is a
@@ -898,20 +866,20 @@
 
 /**
  * rpcrdma_sendctx_get_locked - Acquire a send context
- * @buf: transport buffers from which to acquire an unused context
+ * @r_xprt: controlling transport instance
  *
  * Returns pointer to a free send completion context; or NULL if
  * the queue is empty.
  *
  * Usage: Called to acquire an SGE array before preparing a Send WR.
  *
- * The caller serializes calls to this function (per rpcrdma_buffer),
- * and provides an effective memory barrier that flushes the new value
+ * The caller serializes calls to this function (per transport), and
+ * provides an effective memory barrier that flushes the new value
  * of rb_sc_head.
  */
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 {
-	struct rpcrdma_xprt *r_xprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_sendctx *sc;
 	unsigned long next_head;
 
@@ -935,8 +903,7 @@
 	 * completions recently. This is a sign the Send Queue is
 	 * backing up. Cause the caller to pause and try again.
 	 */
-	set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
-	r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 	r_xprt->rx_stats.empty_sendctx_q++;
 	return NULL;
 }
@@ -948,7 +915,7 @@
  * Usage: Called from Send completion to return a sendctxt
  * to the queue.
  *
- * The caller serializes calls to this function (per rpcrdma_buffer).
+ * The caller serializes calls to this function (per transport).
  */
 static void
 rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
@@ -956,7 +923,7 @@
 	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
 	unsigned long next_tail;
 
-	/* Unmap SGEs of previously completed by unsignaled
+	/* Unmap SGEs of previously completed but unsignaled
 	 * Sends by walking up the queue until @sc is found.
 	 */
 	next_tail = buf->rb_sc_tail;
@@ -964,50 +931,14 @@
 		next_tail = rpcrdma_sendctx_next(buf, next_tail);
 
 		/* ORDER: item must be accessed _before_ tail is updated */
-		rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
+		rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
 
 	} while (buf->rb_sc_ctxs[next_tail] != sc);
 
 	/* Paired with READ_ONCE */
 	smp_store_release(&buf->rb_sc_tail, next_tail);
 
-	if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
-		smp_mb__after_atomic();
-		xprt_write_space(&sc->sc_xprt->rx_xprt);
-	}
-}
-
-static void
-rpcrdma_mr_recovery_worker(struct work_struct *work)
-{
-	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
-						  rb_recovery_worker.work);
-	struct rpcrdma_mr *mr;
-
-	spin_lock(&buf->rb_recovery_lock);
-	while (!list_empty(&buf->rb_stale_mrs)) {
-		mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
-		spin_unlock(&buf->rb_recovery_lock);
-
-		trace_xprtrdma_recover_mr(mr);
-		mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
-
-		spin_lock(&buf->rb_recovery_lock);
-	}
-	spin_unlock(&buf->rb_recovery_lock);
-}
-
-void
-rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
-{
-	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
-	spin_lock(&buf->rb_recovery_lock);
-	rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
-	spin_unlock(&buf->rb_recovery_lock);
-
-	schedule_delayed_work(&buf->rb_recovery_worker, 0);
+	xprt_write_space(&sc->sc_xprt->rx_xprt);
 }
 
 static void
@@ -1016,18 +947,16 @@
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	unsigned int count;
-	LIST_HEAD(free);
-	LIST_HEAD(all);
 
-	for (count = 0; count < 3; count++) {
+	for (count = 0; count < ia->ri_max_segs; count++) {
 		struct rpcrdma_mr *mr;
 		int rc;
 
-		mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+		mr = kzalloc(sizeof(*mr), GFP_NOFS);
 		if (!mr)
 			break;
 
-		rc = ia->ri_ops->ro_init_mr(ia, mr);
+		rc = frwr_init_mr(ia, mr);
 		if (rc) {
 			kfree(mr);
 			break;
@@ -1035,143 +964,185 @@
 
 		mr->mr_xprt = r_xprt;
 
-		list_add(&mr->mr_list, &free);
-		list_add(&mr->mr_all, &all);
+		spin_lock(&buf->rb_lock);
+		list_add(&mr->mr_list, &buf->rb_mrs);
+		list_add(&mr->mr_all, &buf->rb_all_mrs);
+		spin_unlock(&buf->rb_lock);
 	}
 
-	spin_lock(&buf->rb_mrlock);
-	list_splice(&free, &buf->rb_mrs);
-	list_splice(&all, &buf->rb_all);
 	r_xprt->rx_stats.mrs_allocated += count;
-	spin_unlock(&buf->rb_mrlock);
 	trace_xprtrdma_createmrs(r_xprt, count);
-
-	xprt_write_space(&r_xprt->rx_xprt);
 }
 
 static void
 rpcrdma_mr_refresh_worker(struct work_struct *work)
 {
 	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
-						  rb_refresh_worker.work);
+						  rb_refresh_worker);
 	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 						   rx_buf);
 
 	rpcrdma_mrs_create(r_xprt);
+	xprt_write_space(&r_xprt->rx_xprt);
 }
 
-struct rpcrdma_req *
-rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+/**
+ * rpcrdma_req_create - Allocate an rpcrdma_req object
+ * @r_xprt: controlling r_xprt
+ * @size: initial size, in bytes, of send and receive buffers
+ * @flags: GFP flags passed to memory allocators
+ *
+ * Returns an allocated and fully initialized rpcrdma_req or NULL.
+ */
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+				       gfp_t flags)
 {
 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 	struct rpcrdma_regbuf *rb;
 	struct rpcrdma_req *req;
+	size_t maxhdrsize;
 
-	req = kzalloc(sizeof(*req), GFP_KERNEL);
+	req = kzalloc(sizeof(*req), flags);
 	if (req == NULL)
-		return ERR_PTR(-ENOMEM);
+		goto out1;
 
-	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
-				  DMA_TO_DEVICE, GFP_KERNEL);
-	if (IS_ERR(rb)) {
-		kfree(req);
-		return ERR_PTR(-ENOMEM);
-	}
+	/* Compute maximum header buffer size in bytes */
+	maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+		     r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+	maxhdrsize *= sizeof(__be32);
+	rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+				  DMA_TO_DEVICE, flags);
+	if (!rb)
+		goto out2;
 	req->rl_rdmabuf = rb;
-	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
-	req->rl_buffer = buffer;
-	INIT_LIST_HEAD(&req->rl_registered);
+	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
 
-	spin_lock(&buffer->rb_reqslock);
+	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
+	if (!req->rl_sendbuf)
+		goto out3;
+
+	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
+	if (!req->rl_recvbuf)
+		goto out4;
+
+	INIT_LIST_HEAD(&req->rl_free_mrs);
+	INIT_LIST_HEAD(&req->rl_registered);
+	spin_lock(&buffer->rb_lock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
-	spin_unlock(&buffer->rb_reqslock);
+	spin_unlock(&buffer->rb_lock);
 	return req;
+
+out4:
+	kfree(req->rl_sendbuf);
+out3:
+	kfree(req->rl_rdmabuf);
+out2:
+	kfree(req);
+out1:
+	return NULL;
 }
 
-static int
-rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
+static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
+					      bool temp)
 {
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_rep *rep;
-	int rc;
 
-	rc = -ENOMEM;
 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
 	if (rep == NULL)
 		goto out;
 
-	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
+	rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
 					       DMA_FROM_DEVICE, GFP_KERNEL);
-	if (IS_ERR(rep->rr_rdmabuf)) {
-		rc = PTR_ERR(rep->rr_rdmabuf);
+	if (!rep->rr_rdmabuf)
 		goto out_free;
-	}
-	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
-		     rdmab_length(rep->rr_rdmabuf));
 
+	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
+		     rdmab_length(rep->rr_rdmabuf));
 	rep->rr_cqe.done = rpcrdma_wc_receive;
 	rep->rr_rxprt = r_xprt;
-	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
 	rep->rr_recv_wr.next = NULL;
 	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	rep->rr_recv_wr.num_sge = 1;
 	rep->rr_temp = temp;
-
-	spin_lock(&buf->rb_lock);
-	list_add(&rep->rr_list, &buf->rb_recv_bufs);
-	spin_unlock(&buf->rb_lock);
-	return 0;
+	return rep;
 
 out_free:
 	kfree(rep);
 out:
-	dprintk("RPC:       %s: reply buffer %d alloc failed\n",
-		__func__, rc);
-	return rc;
+	return NULL;
 }
 
-int
-rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+	rpcrdma_regbuf_free(rep->rr_rdmabuf);
+	kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+	struct llist_node *node;
+
+	/* Calls to llist_del_first are required to be serialized */
+	node = llist_del_first(&buf->rb_free_reps);
+	if (!node)
+		return NULL;
+	return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+			    struct rpcrdma_rep *rep)
+{
+	if (!rep->rr_temp)
+		llist_add(&rep->rr_node, &buf->rb_free_reps);
+	else
+		rpcrdma_rep_destroy(rep);
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_rep *rep;
+
+	while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+		rpcrdma_rep_destroy(rep);
+}
+
+/**
+ * rpcrdma_buffer_create - Create initial set of req/rep objects
+ * @r_xprt: transport instance to (re)initialize
+ *
+ * Returns zero on success, otherwise a negative errno.
+ */
+int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	int i, rc;
 
-	buf->rb_max_requests = r_xprt->rx_data.max_requests;
+	buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
 	buf->rb_bc_srv_max_requests = 0;
-	spin_lock_init(&buf->rb_mrlock);
 	spin_lock_init(&buf->rb_lock);
-	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mrs);
-	INIT_LIST_HEAD(&buf->rb_all);
-	INIT_LIST_HEAD(&buf->rb_stale_mrs);
-	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
-			  rpcrdma_mr_refresh_worker);
-	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
-			  rpcrdma_mr_recovery_worker);
+	INIT_LIST_HEAD(&buf->rb_all_mrs);
+	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
 
 	rpcrdma_mrs_create(r_xprt);
 
 	INIT_LIST_HEAD(&buf->rb_send_bufs);
 	INIT_LIST_HEAD(&buf->rb_allreqs);
-	spin_lock_init(&buf->rb_reqslock);
+
+	rc = -ENOMEM;
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
 
-		req = rpcrdma_create_req(r_xprt);
-		if (IS_ERR(req)) {
-			dprintk("RPC:       %s: request buffer %d alloc"
-				" failed\n", __func__, i);
-			rc = PTR_ERR(req);
+		req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+					 GFP_KERNEL);
+		if (!req)
 			goto out;
-		}
 		list_add(&req->rl_list, &buf->rb_send_bufs);
 	}
 
 	buf->rb_credits = 1;
-	buf->rb_posted_receives = 0;
-	INIT_LIST_HEAD(&buf->rb_recv_bufs);
+	init_llist_head(&buf->rb_free_reps);
 
 	rc = rpcrdma_sendctxs_create(r_xprt);
 	if (rc)
@@ -1183,19 +1154,23 @@
 	return rc;
 }
 
-static void
-rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
+/**
+ * rpcrdma_req_destroy - Destroy an rpcrdma_req object
+ * @req: unused object to be destroyed
+ *
+ * This function assumes that the caller prevents concurrent device
+ * unload and transport tear-down.
+ */
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
 {
-	rpcrdma_free_regbuf(rep->rr_rdmabuf);
-	kfree(rep);
-}
+	list_del(&req->rl_all);
 
-void
-rpcrdma_destroy_req(struct rpcrdma_req *req)
-{
-	rpcrdma_free_regbuf(req->rl_recvbuf);
-	rpcrdma_free_regbuf(req->rl_sendbuf);
-	rpcrdma_free_regbuf(req->rl_rdmabuf);
+	while (!list_empty(&req->rl_free_mrs))
+		rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+
+	rpcrdma_regbuf_free(req->rl_recvbuf);
+	rpcrdma_regbuf_free(req->rl_sendbuf);
+	rpcrdma_regbuf_free(req->rl_rdmabuf);
 	kfree(req);
 }
 
@@ -1204,62 +1179,49 @@
 {
 	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 						   rx_buf);
-	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 	struct rpcrdma_mr *mr;
 	unsigned int count;
 
 	count = 0;
-	spin_lock(&buf->rb_mrlock);
-	while (!list_empty(&buf->rb_all)) {
-		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+	spin_lock(&buf->rb_lock);
+	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+					      struct rpcrdma_mr,
+					      mr_all)) != NULL) {
 		list_del(&mr->mr_all);
+		spin_unlock(&buf->rb_lock);
 
-		spin_unlock(&buf->rb_mrlock);
-
-		/* Ensure MW is not on any rl_registered list */
-		if (!list_empty(&mr->mr_list))
-			list_del(&mr->mr_list);
-
-		ia->ri_ops->ro_release_mr(mr);
+		frwr_release_mr(mr);
 		count++;
-		spin_lock(&buf->rb_mrlock);
+		spin_lock(&buf->rb_lock);
 	}
-	spin_unlock(&buf->rb_mrlock);
+	spin_unlock(&buf->rb_lock);
 	r_xprt->rx_stats.mrs_allocated = 0;
-
-	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
 }
 
+/**
+ * rpcrdma_buffer_destroy - Release all hw resources
+ * @buf: root control block for resources
+ *
+ * ORDERING: relies on a prior rpcrdma_xprt_drain :
+ * - No more Send or Receive completions can occur
+ * - All MRs, reps, and reqs are returned to their free lists
+ */
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
-	cancel_delayed_work_sync(&buf->rb_recovery_worker);
-	cancel_delayed_work_sync(&buf->rb_refresh_worker);
+	cancel_work_sync(&buf->rb_refresh_worker);
 
 	rpcrdma_sendctxs_destroy(buf);
+	rpcrdma_reps_destroy(buf);
 
-	while (!list_empty(&buf->rb_recv_bufs)) {
-		struct rpcrdma_rep *rep;
-
-		rep = list_first_entry(&buf->rb_recv_bufs,
-				       struct rpcrdma_rep, rr_list);
-		list_del(&rep->rr_list);
-		rpcrdma_destroy_rep(rep);
-	}
-
-	spin_lock(&buf->rb_reqslock);
-	while (!list_empty(&buf->rb_allreqs)) {
+	while (!list_empty(&buf->rb_send_bufs)) {
 		struct rpcrdma_req *req;
 
-		req = list_first_entry(&buf->rb_allreqs,
-				       struct rpcrdma_req, rl_all);
-		list_del(&req->rl_all);
-
-		spin_unlock(&buf->rb_reqslock);
-		rpcrdma_destroy_req(req);
-		spin_lock(&buf->rb_reqslock);
+		req = list_first_entry(&buf->rb_send_bufs,
+				       struct rpcrdma_req, rl_list);
+		list_del(&req->rl_list);
+		rpcrdma_req_destroy(req);
 	}
-	spin_unlock(&buf->rb_reqslock);
 
 	rpcrdma_mrs_destroy(buf);
 }
@@ -1275,61 +1237,42 @@
 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct rpcrdma_mr *mr = NULL;
+	struct rpcrdma_mr *mr;
 
-	spin_lock(&buf->rb_mrlock);
-	if (!list_empty(&buf->rb_mrs))
-		mr = rpcrdma_mr_pop(&buf->rb_mrs);
-	spin_unlock(&buf->rb_mrlock);
-
-	if (!mr)
-		goto out_nomrs;
+	spin_lock(&buf->rb_lock);
+	mr = rpcrdma_mr_pop(&buf->rb_mrs);
+	spin_unlock(&buf->rb_lock);
 	return mr;
-
-out_nomrs:
-	trace_xprtrdma_nomrs(r_xprt);
-	if (r_xprt->rx_ep.rep_connected != -ENODEV)
-		schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
-	/* Allow the reply handler and refresh worker to run */
-	cond_resched();
-
-	return NULL;
-}
-
-static void
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
-{
-	spin_lock(&buf->rb_mrlock);
-	rpcrdma_mr_push(mr, &buf->rb_mrs);
-	spin_unlock(&buf->rb_mrlock);
 }
 
 /**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
  *
  */
-void
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
-{
-	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
-}
-
-/**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
- *
- */
-void
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
 {
 	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
 
-	trace_xprtrdma_dma_unmap(mr);
-	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
-			mr->mr_sg, mr->mr_nents, mr->mr_dir);
-	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+	if (mr->mr_dir != DMA_NONE) {
+		trace_xprtrdma_mr_unmap(mr);
+		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+				mr->mr_sg, mr->mr_nents, mr->mr_dir);
+		mr->mr_dir = DMA_NONE;
+	}
+
+	rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+}
+
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+{
+	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+	mr->mr_req = NULL;
+	spin_lock(&buf->rb_lock);
+	rpcrdma_mr_push(mr, &buf->rb_mrs);
+	spin_unlock(&buf->rb_lock);
 }
 
 /**
@@ -1354,105 +1297,112 @@
 
 /**
  * rpcrdma_buffer_put - Put request/reply buffers back into pool
+ * @buffers: buffer pool
  * @req: object to return
  *
  */
-void
-rpcrdma_buffer_put(struct rpcrdma_req *req)
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 {
-	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	struct rpcrdma_rep *rep = req->rl_reply;
-
+	if (req->rl_reply)
+		rpcrdma_rep_put(buffers, req->rl_reply);
 	req->rl_reply = NULL;
 
 	spin_lock(&buffers->rb_lock);
 	list_add(&req->rl_list, &buffers->rb_send_bufs);
-	if (rep) {
-		if (!rep->rr_temp) {
-			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-			rep = NULL;
-		}
-	}
 	spin_unlock(&buffers->rb_lock);
-	if (rep)
-		rpcrdma_destroy_rep(rep);
-}
-
-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
- */
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
-{
-	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
-	if (!rep->rr_temp) {
-		spin_lock(&buffers->rb_lock);
-		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-		spin_unlock(&buffers->rb_lock);
-	} else {
-		rpcrdma_destroy_rep(rep);
-	}
 }
 
 /**
- * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
- * @size: size of buffer to be allocated, in bytes
- * @direction: direction of data movement
- * @flags: GFP flags
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
  *
- * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
- * can be persistently DMA-mapped for I/O.
+ * Used after error conditions.
+ */
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+{
+	rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
+}
+
+/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
  *
  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
  * receiving the payload of RDMA RECV operations. During Long Calls
- * or Replies they may be registered externally via ro_map.
+ * or Replies they may be registered externally via frwr_map.
  */
-struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
 		     gfp_t flags)
 {
 	struct rpcrdma_regbuf *rb;
 
-	rb = kmalloc(sizeof(*rb) + size, flags);
-	if (rb == NULL)
-		return ERR_PTR(-ENOMEM);
+	rb = kmalloc(sizeof(*rb), flags);
+	if (!rb)
+		return NULL;
+	rb->rg_data = kmalloc(size, flags);
+	if (!rb->rg_data) {
+		kfree(rb);
+		return NULL;
+	}
 
 	rb->rg_device = NULL;
 	rb->rg_direction = direction;
 	rb->rg_iov.length = size;
-
 	return rb;
 }
 
 /**
- * __rpcrdma_map_regbuf - DMA-map a regbuf
- * @ia: controlling rpcrdma_ia
- * @rb: regbuf to be mapped
+ * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
+ * @rb: regbuf to reallocate
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns true if reallocation was successful. If false is
+ * returned, @rb is left untouched.
  */
-bool
-__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
 {
-	struct ib_device *device = ia->ri_device;
+	void *buf;
+
+	buf = kmalloc(size, flags);
+	if (!buf)
+		return false;
+
+	rpcrdma_regbuf_dma_unmap(rb);
+	kfree(rb->rg_data);
+
+	rb->rg_data = buf;
+	rb->rg_iov.length = size;
+	return true;
+}
+
+/**
+ * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
+ * @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is now DMA mapped to @r_xprt's device
+ */
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+			      struct rpcrdma_regbuf *rb)
+{
+	struct ib_device *device = r_xprt->rx_ia.ri_id->device;
 
 	if (rb->rg_direction == DMA_NONE)
 		return false;
 
-	rb->rg_iov.addr = ib_dma_map_single(device,
-					    (void *)rb->rg_base,
-					    rdmab_length(rb),
-					    rb->rg_direction);
-	if (ib_dma_mapping_error(device, rdmab_addr(rb)))
+	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
+					    rdmab_length(rb), rb->rg_direction);
+	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
+		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
 		return false;
+	}
 
 	rb->rg_device = device;
-	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+	rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
 	return true;
 }
 
-static void
-rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
 {
 	if (!rb)
 		return;
@@ -1460,26 +1410,27 @@
 	if (!rpcrdma_regbuf_is_mapped(rb))
 		return;
 
-	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
-			    rdmab_length(rb), rb->rg_direction);
+	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
+			    rb->rg_direction);
 	rb->rg_device = NULL;
 }
 
-/**
- * rpcrdma_free_regbuf - deregister and free registered buffer
- * @rb: regbuf to be deregistered and freed
- */
-void
-rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
 {
-	rpcrdma_dma_unmap_regbuf(rb);
+	rpcrdma_regbuf_dma_unmap(rb);
+	if (rb)
+		kfree(rb->rg_data);
 	kfree(rb);
 }
 
-/*
- * Prepost any receive buffer, then post send.
+/**
+ * rpcrdma_ep_post - Post WRs to a transport's Send Queue
+ * @ia: transport's device information
+ * @ep: transport's RDMA endpoint information
+ * @req: rpcrdma_req containing the Send WR to post
  *
- * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ * Returns 0 if the post was successful, otherwise -ENOTCONN
+ * is returned.
  */
 int
 rpcrdma_ep_post(struct rpcrdma_ia *ia,
@@ -1489,8 +1440,7 @@
 	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 	int rc;
 
-	if (!ep->rep_send_count ||
-	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+	if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
 		send_wr->send_flags |= IB_SEND_SIGNALED;
 		ep->rep_send_count = ep->rep_send_batch;
 	} else {
@@ -1498,77 +1448,79 @@
 		--ep->rep_send_count;
 	}
 
-	rc = ia->ri_ops->ro_send(ia, req);
+	rc = frwr_send(ia, req);
 	trace_xprtrdma_post_send(req, rc);
 	if (rc)
 		return -ENOTCONN;
 	return 0;
 }
 
-/**
- * rpcrdma_post_recvs - Maybe post some Receive buffers
- * @r_xprt: controlling transport
- * @temp: when true, allocate temp rpcrdma_rep objects
- *
- */
-void
+static void
 rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct ib_recv_wr *wr, *bad_wr;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct ib_recv_wr *i, *wr, *bad_wr;
+	struct rpcrdma_rep *rep;
 	int needed, count, rc;
 
-	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
-	if (buf->rb_posted_receives > needed)
-		return;
-	needed -= buf->rb_posted_receives;
-
+	rc = 0;
 	count = 0;
+
+	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
+	if (likely(ep->rep_receive_count > needed))
+		goto out;
+	needed -= ep->rep_receive_count;
+	if (!temp)
+		needed += RPCRDMA_MAX_RECV_BATCH;
+
+	/* fast path: all needed reps can be found on the free list */
 	wr = NULL;
 	while (needed) {
-		struct rpcrdma_regbuf *rb;
-		struct rpcrdma_rep *rep;
+		rep = rpcrdma_rep_get_locked(buf);
+		if (!rep)
+			rep = rpcrdma_rep_create(r_xprt, temp);
+		if (!rep)
+			break;
 
-		spin_lock(&buf->rb_lock);
-		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
-					       struct rpcrdma_rep, rr_list);
-		if (likely(rep))
-			list_del(&rep->rr_list);
-		spin_unlock(&buf->rb_lock);
-		if (!rep) {
-			if (rpcrdma_create_rep(r_xprt, temp))
-				break;
-			continue;
-		}
-
-		rb = rep->rr_rdmabuf;
-		if (!rpcrdma_regbuf_is_mapped(rb)) {
-			if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
-				rpcrdma_recv_buffer_put(rep);
-				break;
-			}
-		}
-
-		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
 		rep->rr_recv_wr.next = wr;
 		wr = &rep->rr_recv_wr;
-		++count;
 		--needed;
 	}
-	if (!count)
-		return;
+	if (!wr)
+		goto out;
+
+	for (i = wr; i; i = i->next) {
+		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+
+		if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
+			goto release_wrs;
+
+		trace_xprtrdma_post_recv(rep);
+		++count;
+	}
 
 	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
 			  (const struct ib_recv_wr **)&bad_wr);
+out:
+	trace_xprtrdma_post_recvs(r_xprt, count, rc);
 	if (rc) {
-		for (wr = bad_wr; wr; wr = wr->next) {
+		for (wr = bad_wr; wr;) {
 			struct rpcrdma_rep *rep;
 
 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
+			wr = wr->next;
 			rpcrdma_recv_buffer_put(rep);
 			--count;
 		}
 	}
-	buf->rb_posted_receives += count;
-	trace_xprtrdma_post_recvs(r_xprt, count, rc);
+	ep->rep_receive_count += count;
+	return;
+
+release_wrs:
+	for (i = wr; i;) {
+		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+		i = i->next;
+		rpcrdma_recv_buffer_put(rep);
+	}
 }
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ca14f7..65e6b0e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,8 +44,10 @@
 
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
-#include <linux/atomic.h>			/* atomic_t, etc */
+#include <linux/atomic.h>		/* atomic_t, etc */
+#include <linux/kref.h>			/* struct kref */
 #include <linux/workqueue.h>		/* struct work_struct */
+#include <linux/llist.h>
 
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
 #include <rdma/ib_verbs.h>		/* RDMA verbs api */
@@ -66,23 +68,17 @@
  * Interface Adapter -- one per transport instance
  */
 struct rpcrdma_ia {
-	const struct rpcrdma_memreg_ops	*ri_ops;
-	struct ib_device	*ri_device;
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
-	struct completion	ri_done;
-	struct completion	ri_remove_done;
 	int			ri_async_rc;
 	unsigned int		ri_max_segs;
 	unsigned int		ri_max_frwr_depth;
-	unsigned int		ri_max_inline_write;
-	unsigned int		ri_max_inline_read;
 	unsigned int		ri_max_send_sges;
 	bool			ri_implicit_roundup;
 	enum ib_mr_type		ri_mrtype;
 	unsigned long		ri_flags;
-	struct ib_qp_attr	ri_qp_attr;
-	struct ib_qp_init_attr	ri_qp_init_attr;
+	struct completion	ri_done;
+	struct completion	ri_remove_done;
 };
 
 enum {
@@ -96,84 +92,86 @@
 struct rpcrdma_ep {
 	unsigned int		rep_send_count;
 	unsigned int		rep_send_batch;
+	unsigned int		rep_max_inline_send;
+	unsigned int		rep_max_inline_recv;
 	int			rep_connected;
 	struct ib_qp_init_attr	rep_attr;
 	wait_queue_head_t 	rep_connect_wait;
 	struct rpcrdma_connect_private	rep_cm_private;
 	struct rdma_conn_param	rep_remote_cma;
-	struct delayed_work	rep_connect_worker;
+	unsigned int		rep_max_requests;	/* set by /proc */
+	unsigned int		rep_inline_send;	/* negotiated */
+	unsigned int		rep_inline_recv;	/* negotiated */
+	int			rep_receive_count;
 };
 
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are
- * allocated when the forward channel is set up.
+ * allocated when the forward channel is set up, long before the
+ * backchannel is provisioned. This value is two times
+ * NFS4_DEF_CB_SLOT_TABLE_SIZE.
  */
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-#define RPCRDMA_BACKWARD_WRS		(8)
+#define RPCRDMA_BACKWARD_WRS (32)
 #else
-#define RPCRDMA_BACKWARD_WRS		(0)
+#define RPCRDMA_BACKWARD_WRS (0)
 #endif
 
 /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
- *
- * The below structure appears at the front of a large region of kmalloc'd
- * memory, which always starts on a good alignment boundary.
  */
 
 struct rpcrdma_regbuf {
 	struct ib_sge		rg_iov;
 	struct ib_device	*rg_device;
 	enum dma_data_direction	rg_direction;
-	__be32			rg_base[0] __attribute__ ((aligned(256)));
+	void			*rg_data;
 };
 
-static inline u64
-rdmab_addr(struct rpcrdma_regbuf *rb)
+static inline u64 rdmab_addr(struct rpcrdma_regbuf *rb)
 {
 	return rb->rg_iov.addr;
 }
 
-static inline u32
-rdmab_length(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_length(struct rpcrdma_regbuf *rb)
 {
 	return rb->rg_iov.length;
 }
 
-static inline u32
-rdmab_lkey(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_lkey(struct rpcrdma_regbuf *rb)
 {
 	return rb->rg_iov.lkey;
 }
 
-static inline struct ib_device *
-rdmab_device(struct rpcrdma_regbuf *rb)
+static inline struct ib_device *rdmab_device(struct rpcrdma_regbuf *rb)
 {
 	return rb->rg_device;
 }
 
+static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_data;
+}
+
 #define RPCRDMA_DEF_GFP		(GFP_NOIO | __GFP_NOWARN)
 
 /* To ensure a transport can always make forward progress,
  * the number of RDMA segments allowed in header chunk lists
- * is capped at 8. This prevents less-capable devices and
- * memory registrations from overrunning the Send buffer
- * while building chunk lists.
+ * is capped at 16. This prevents less-capable devices from
+ * overrunning the Send buffer while building chunk lists.
  *
  * Elements of the Read list take up more room than the
- * Write list or Reply chunk. 8 read segments means the Read
- * list (or Write list or Reply chunk) cannot consume more
- * than
+ * Write list or Reply chunk. 16 read segments means the
+ * chunk lists cannot consume more than
  *
- * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ * ((16 + 2) * read segment size) + 1 XDR words,
  *
- * And the fixed part of the header is another 24 bytes.
- *
- * The smallest inline threshold is 1024 bytes, ensuring that
- * at least 750 bytes are available for RPC messages.
+ * or about 400 bytes. The fixed part of the header is
+ * another 24 bytes. Thus when the inline threshold is
+ * 1024 bytes, at least 600 bytes are available for RPC
+ * message bodies.
  */
 enum {
-	RPCRDMA_MAX_HDR_SEGS = 8,
-	RPCRDMA_HDRBUF_SIZE = 256,
+	RPCRDMA_MAX_HDR_SEGS = 16,
 };
 
 /*
@@ -200,14 +198,23 @@
 	bool			rr_temp;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 	struct rpcrdma_xprt	*rr_rxprt;
-	struct work_struct	rr_work;
+	struct rpc_rqst		*rr_rqst;
 	struct xdr_buf		rr_hdrbuf;
 	struct xdr_stream	rr_stream;
-	struct rpc_rqst		*rr_rqst;
-	struct list_head	rr_list;
+	struct llist_node	rr_node;
 	struct ib_recv_wr	rr_recv_wr;
 };
 
+/* To reduce the rate at which a transport invokes ib_post_recv
+ * (and thus the hardware doorbell rate), xprtrdma posts Receive
+ * WRs in batches.
+ *
+ * Setting this to zero disables Receive post batching.
+ */
+enum {
+	RPCRDMA_MAX_RECV_BATCH = 7,
+};
+
 /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
  */
 struct rpcrdma_req;
@@ -215,46 +222,22 @@
 struct rpcrdma_sendctx {
 	struct ib_send_wr	sc_wr;
 	struct ib_cqe		sc_cqe;
+	struct ib_device	*sc_device;
 	struct rpcrdma_xprt	*sc_xprt;
 	struct rpcrdma_req	*sc_req;
 	unsigned int		sc_unmap_count;
 	struct ib_sge		sc_sges[];
 };
 
-/* Limit the number of SGEs that can be unmapped during one
- * Send completion. This caps the amount of work a single
- * completion can do before returning to the provider.
- *
- * Setting this to zero disables Send completion batching.
- */
-enum {
-	RPCRDMA_MAX_SEND_BATCH = 7,
-};
-
 /*
  * struct rpcrdma_mr - external memory region metadata
  *
  * An external memory region is any buffer or page that is registered
  * on the fly (ie, not pre-registered).
- *
- * Each rpcrdma_buffer has a list of free MWs anchored in rb_mrs. During
- * call_allocate, rpcrdma_buffer_get() assigns one to each segment in
- * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
- * track of registration metadata while each RPC is pending.
- * rpcrdma_deregister_external() uses this metadata to unmap and
- * release these resources when an RPC is complete.
  */
-enum rpcrdma_frwr_state {
-	FRWR_IS_INVALID,	/* ready to be used */
-	FRWR_IS_VALID,		/* in use */
-	FRWR_FLUSHED_FR,	/* flushed FASTREG WR */
-	FRWR_FLUSHED_LI,	/* flushed LOCALINV WR */
-};
-
 struct rpcrdma_frwr {
 	struct ib_mr			*fr_mr;
 	struct ib_cqe			fr_cqe;
-	enum rpcrdma_frwr_state		fr_state;
 	struct completion		fr_linv_done;
 	union {
 		struct ib_reg_wr	fr_regwr;
@@ -262,24 +245,19 @@
 	};
 };
 
-struct rpcrdma_fmr {
-	struct ib_fmr		*fm_mr;
-	u64			*fm_physaddrs;
-};
-
+struct rpcrdma_req;
 struct rpcrdma_mr {
 	struct list_head	mr_list;
+	struct rpcrdma_req	*mr_req;
 	struct scatterlist	*mr_sg;
 	int			mr_nents;
 	enum dma_data_direction	mr_dir;
-	union {
-		struct rpcrdma_fmr	fmr;
-		struct rpcrdma_frwr	frwr;
-	};
+	struct rpcrdma_frwr	frwr;
 	struct rpcrdma_xprt	*mr_xprt;
 	u32			mr_handle;
 	u32			mr_length;
 	u64			mr_offset;
+	struct work_struct	mr_recycle;
 	struct list_head	mr_all;
 };
 
@@ -337,7 +315,6 @@
 struct rpcrdma_req {
 	struct list_head	rl_list;
 	struct rpc_rqst		rl_slot;
-	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;
 	struct xdr_stream	rl_stream;
 	struct xdr_buf		rl_hdrbuf;
@@ -347,18 +324,13 @@
 	struct rpcrdma_regbuf	*rl_recvbuf;	/* rq_rcv_buf */
 
 	struct list_head	rl_all;
-	unsigned long		rl_flags;
+	struct kref		rl_kref;
 
-	struct list_head	rl_registered;	/* registered segments */
+	struct list_head	rl_free_mrs;
+	struct list_head	rl_registered;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 };
 
-/* rl_flags */
-enum {
-	RPCRDMA_REQ_F_PENDING = 0,
-	RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
 static inline struct rpcrdma_req *
 rpcr_to_rdmar(const struct rpc_rqst *rqst)
 {
@@ -368,7 +340,7 @@
 static inline void
 rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list)
 {
-	list_add_tail(&mr->mr_list, list);
+	list_add(&mr->mr_list, list);
 }
 
 static inline struct rpcrdma_mr *
@@ -376,8 +348,9 @@
 {
 	struct rpcrdma_mr *mr;
 
-	mr = list_first_entry(list, struct rpcrdma_mr, mr_list);
-	list_del_init(&mr->mr_list);
+	mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list);
+	if (mr)
+		list_del_init(&mr->mr_list);
 	return mr;
 }
 
@@ -388,53 +361,27 @@
  * One of these is associated with a transport instance
  */
 struct rpcrdma_buffer {
-	spinlock_t		rb_mrlock;	/* protect rb_mrs list */
+	spinlock_t		rb_lock;
+	struct list_head	rb_send_bufs;
 	struct list_head	rb_mrs;
-	struct list_head	rb_all;
 
 	unsigned long		rb_sc_head;
 	unsigned long		rb_sc_tail;
 	unsigned long		rb_sc_last;
 	struct rpcrdma_sendctx	**rb_sc_ctxs;
 
-	spinlock_t		rb_lock;	/* protect buf lists */
-	struct list_head	rb_send_bufs;
-	struct list_head	rb_recv_bufs;
-	unsigned long		rb_flags;
+	struct list_head	rb_allreqs;
+	struct list_head	rb_all_mrs;
+
+	struct llist_head	rb_free_reps;
+
 	u32			rb_max_requests;
 	u32			rb_credits;	/* most recent credit grant */
-	int			rb_posted_receives;
 
 	u32			rb_bc_srv_max_requests;
-	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
-	struct list_head	rb_allreqs;
-
 	u32			rb_bc_max_requests;
 
-	spinlock_t		rb_recovery_lock; /* protect rb_stale_mrs */
-	struct list_head	rb_stale_mrs;
-	struct delayed_work	rb_recovery_worker;
-	struct delayed_work	rb_refresh_worker;
-};
-#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
-
-/* rb_flags */
-enum {
-	RPCRDMA_BUF_F_EMPTY_SCQ = 0,
-};
-
-/*
- * Internal structure for transport instance creation. This
- * exists primarily for modularity.
- *
- * This data should be set with mount options
- */
-struct rpcrdma_create_data_internal {
-	unsigned int	max_requests;	/* max requests (slots) in flight */
-	unsigned int	rsize;		/* mount rsize - max read hdr+data */
-	unsigned int	wsize;		/* mount wsize - max write hdr+data */
-	unsigned int	inline_rsize;	/* max non-rdma read data payload */
-	unsigned int	inline_wsize;	/* max non-rdma write data payload */
+	struct work_struct	rb_refresh_worker;
 };
 
 /*
@@ -452,7 +399,7 @@
 	unsigned long		hardway_register_count;
 	unsigned long		failed_marshal_count;
 	unsigned long		bad_reply_count;
-	unsigned long		mrs_recovered;
+	unsigned long		mrs_recycled;
 	unsigned long		mrs_orphaned;
 	unsigned long		mrs_allocated;
 	unsigned long		empty_sendctx_q;
@@ -467,36 +414,6 @@
 };
 
 /*
- * Per-registration mode operations
- */
-struct rpcrdma_xprt;
-struct rpcrdma_memreg_ops {
-	struct rpcrdma_mr_seg *
-			(*ro_map)(struct rpcrdma_xprt *,
-				  struct rpcrdma_mr_seg *, int, bool,
-				  struct rpcrdma_mr **);
-	int		(*ro_send)(struct rpcrdma_ia *ia,
-				   struct rpcrdma_req *req);
-	void		(*ro_reminv)(struct rpcrdma_rep *rep,
-				     struct list_head *mrs);
-	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
-					 struct list_head *);
-	void		(*ro_recover_mr)(struct rpcrdma_mr *mr);
-	int		(*ro_open)(struct rpcrdma_ia *,
-				   struct rpcrdma_ep *,
-				   struct rpcrdma_create_data_internal *);
-	size_t		(*ro_maxpages)(struct rpcrdma_xprt *);
-	int		(*ro_init_mr)(struct rpcrdma_ia *,
-				      struct rpcrdma_mr *);
-	void		(*ro_release_mr)(struct rpcrdma_mr *mr);
-	const char	*ro_displayname;
-	const int	ro_send_w_inv_ok;
-};
-
-extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
-extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
-
-/*
  * RPCRDMA transport -- encapsulates the structures above for
  * integration with RPC.
  *
@@ -511,13 +428,12 @@
 	struct rpcrdma_ia	rx_ia;
 	struct rpcrdma_ep	rx_ep;
 	struct rpcrdma_buffer	rx_buf;
-	struct rpcrdma_create_data_internal rx_data;
 	struct delayed_work	rx_connect_worker;
+	struct rpc_timeout	rx_timeout;
 	struct rpcrdma_stats	rx_stats;
 };
 
 #define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
-#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
 
 static inline const char *
 rpcrdma_addrstr(const struct rpcrdma_xprt *r_xprt)
@@ -547,65 +463,72 @@
 int rpcrdma_ia_open(struct rpcrdma_xprt *xprt);
 void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
 void rpcrdma_ia_close(struct rpcrdma_ia *);
-bool frwr_is_supported(struct rpcrdma_ia *);
-bool fmr_is_supported(struct rpcrdma_ia *);
-
-extern struct workqueue_struct *rpcrdma_receive_wq;
 
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
-int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
-				struct rpcrdma_create_data_internal *);
-void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt);
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
 int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
-void rpcrdma_conn_func(struct rpcrdma_ep *ep);
 void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 				struct rpcrdma_req *);
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
-struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_req *);
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+				       gfp_t flags);
+void rpcrdma_req_destroy(struct rpcrdma_req *req);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
 
 struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
 void rpcrdma_mr_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr);
+
+static inline void
+rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
+{
+	schedule_work(&mr->mr_recycle);
+}
 
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
-void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
+			struct rpcrdma_req *req);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
-					    gfp_t);
-bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
-void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
+			    gfp_t flags);
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+			      struct rpcrdma_regbuf *rb);
 
-static inline bool
-rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_is_mapped - check if buffer is DMA mapped
+ *
+ * Returns true if the buffer is now mapped to rb->rg_device.
+ */
+static inline bool rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
 {
 	return rb->rg_device != NULL;
 }
 
-static inline bool
-rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
+ * @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is currently DMA mapped.
+ */
+static inline bool rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+					  struct rpcrdma_regbuf *rb)
 {
 	if (likely(rpcrdma_regbuf_is_mapped(rb)))
 		return true;
-	return __rpcrdma_dma_map_regbuf(ia, rb);
+	return __rpcrdma_regbuf_dma_map(r_xprt, rb);
 }
 
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
@@ -616,6 +539,24 @@
 	return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 }
 
+/* Memory registration calls xprtrdma/frwr_ops.c
+ */
+bool frwr_is_supported(struct ib_device *device);
+void frwr_recycle(struct rpcrdma_req *req);
+void frwr_reset(struct rpcrdma_req *req);
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
+void frwr_release_mr(struct rpcrdma_mr *mr);
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+				struct rpcrdma_mr_seg *seg,
+				int nsegs, bool writing, __be32 xid,
+				struct rpcrdma_mr *mr);
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+
 /*
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
@@ -632,14 +573,11 @@
 			      struct rpcrdma_req *req, u32 hdrlen,
 			      struct xdr_buf *xdr,
 			      enum rpcrdma_chunktype rtype);
-void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
 int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
-			  struct rpcrdma_req *req);
-void rpcrdma_deferred_completion(struct work_struct *work);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
 {
@@ -649,10 +587,12 @@
 
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
+extern unsigned int xprt_rdma_slot_table_entries;
 extern unsigned int xprt_rdma_max_inline_read;
+extern unsigned int xprt_rdma_max_inline_write;
 void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
 void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
-void rpcrdma_connect_worker(struct work_struct *work);
+void xprt_rdma_close(struct rpc_xprt *xprt);
 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
 int xprt_rdma_init(void);
 void xprt_rdma_cleanup(void);
@@ -661,8 +601,8 @@
  */
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
-int xprt_rdma_bc_up(struct svc_serv *, struct net *);
 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
 int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);