Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983..b86b5fd 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,7 +71,6 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Read list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
 	size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
 
 	/* Minimal Read chunk size */
@@ -97,7 +96,6 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Write list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
 	size = sizeof(__be32);		/* segment count */
 	size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
 	size += sizeof(__be32);	/* list discriminator */
@@ -107,16 +105,23 @@
 	return size;
 }
 
+/**
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
+ * @r_xprt: transport instance to initialize
+ *
+ * The max_inline fields contain the maximum size of an RPC message
+ * so the marshaling code doesn't have to repeat this calculation
+ * for every RPC.
+ */
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 {
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	unsigned int maxsegs = ia->ri_max_segs;
+	unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
-	ia->ri_max_inline_write = cdata->inline_wsize -
-				  rpcrdma_max_call_header_size(maxsegs);
-	ia->ri_max_inline_read = cdata->inline_rsize -
-				 rpcrdma_max_reply_header_size(maxsegs);
+	ep->rep_max_inline_send =
+		ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
+	ep->rep_max_inline_recv =
+		ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 }
 
 /* The client can send a request inline as long as the RPCRDMA header
@@ -133,7 +138,7 @@
 	struct xdr_buf *xdr = &rqst->rq_snd_buf;
 	unsigned int count, remaining, offset;
 
-	if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
+	if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
 		return false;
 
 	if (xdr->page_len) {
@@ -161,9 +166,21 @@
 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 				   struct rpc_rqst *rqst)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
+}
 
-	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
+/* The client is required to provide a Reply chunk if the maximum
+ * size of the non-payload part of the RPC Reply is larger than
+ * the inline threshold.
+ */
+static bool
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
+			  const struct rpc_rqst *rqst)
+{
+	const struct xdr_buf *buf = &rqst->rq_rcv_buf;
+
+	return (buf->head[0].iov_len + buf->tail[0].iov_len) <
+		r_xprt->rx_ep.rep_max_inline_recv;
 }
 
 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -220,11 +237,12 @@
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 	page_base = offset_in_page(xdrbuf->page_base);
 	while (len) {
-		if (unlikely(!*ppages)) {
-			/* XXX: Certain upper layer operations do
-			 *	not provide receive buffer pages.
-			 */
-			*ppages = alloc_page(GFP_ATOMIC);
+		/* ACL likes to be lazy in allocating pages - ACLs
+		 * are small by default but can get huge.
+		 */
+		if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
+			if (!*ppages)
+				*ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
 			if (!*ppages)
 				return -ENOBUFS;
 		}
@@ -324,6 +342,32 @@
 	return 0;
 }
 
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+						 struct rpcrdma_req *req,
+						 struct rpcrdma_mr_seg *seg,
+						 int nsegs, bool writing,
+						 struct rpcrdma_mr **mr)
+{
+	*mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+	if (!*mr) {
+		*mr = rpcrdma_mr_get(r_xprt);
+		if (!*mr)
+			goto out_getmr_err;
+		trace_xprtrdma_mr_get(req);
+		(*mr)->mr_req = req;
+	}
+
+	rpcrdma_mr_push(*mr, &req->rl_registered);
+	return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+	trace_xprtrdma_nomrs(req);
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+	if (r_xprt->rx_ep.rep_connected != -ENODEV)
+		schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
+	return ERR_PTR(-EAGAIN);
+}
+
 /* Register and XDR encode the Read list. Supports encoding a list of read
  * segments that belong to a single read chunk.
  *
@@ -338,9 +382,10 @@
  *
  * Only a single @pos value is currently supported.
  */
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req,
+				    struct rpc_rqst *rqst,
+				    enum rpcrdma_chunktype rtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -348,6 +393,9 @@
 	unsigned int pos;
 	int nsegs;
 
+	if (rtype == rpcrdma_noch)
+		goto done;
+
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
@@ -358,21 +406,20 @@
 		return nsegs;
 
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   false, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_read_segment(xdr, mr, pos) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
+		trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 		r_xprt->rx_stats.read_chunk_count++;
 		nsegs -= mr->mr_nents;
 	} while (nsegs);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Write list. Supports encoding a list
@@ -390,9 +437,10 @@
  *
  * Only a single Write chunk is currently supported.
  */
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req,
+				     struct rpc_rqst *rqst,
+				     enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -400,6 +448,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_writech)
+		goto done;
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
@@ -416,16 +467,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.write_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -435,7 +484,8 @@
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -450,9 +500,10 @@
  * Returns zero on success, or a negative errno if a failure occurred.
  * @xdr is advanced to the next position in the stream.
  */
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+				      struct rpcrdma_req *req,
+				      struct rpc_rqst *rqst,
+				      enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -460,6 +511,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_replych)
+		return encode_item_not_present(xdr);
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
@@ -474,16 +528,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.reply_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -496,51 +548,57 @@
 	return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	rpcrdma_complete_rqst(rep);
+	rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
  *
  */
-void
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
-	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
 	struct ib_sge *sge;
-	unsigned int count;
+
+	if (!sc->sc_unmap_count)
+		return;
 
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
 	 */
-	sge = &sc->sc_sges[2];
-	for (count = sc->sc_unmap_count; count; ++sge, --count)
-		ib_dma_unmap_page(ia->ri_device,
-				  sge->addr, sge->length, DMA_TO_DEVICE);
+	for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
+	     ++sge, --sc->sc_unmap_count)
+		ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
+				  DMA_TO_DEVICE);
 
-	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
-		smp_mb__after_atomic();
-		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
-	}
+	kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
  */
-static bool
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			u32 len)
+static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req, u32 len)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
 	struct ib_sge *sge = sc->sc_sges;
 
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
+	if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 		goto out_regbuf;
 	sge->addr = rdmab_addr(rb);
 	sge->length = len;
 	sge->lkey = rdmab_lkey(rb);
 
-	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
-				      sge->length, DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+				      DMA_TO_DEVICE);
 	sc->sc_wr.num_sge++;
 	return true;
 
@@ -552,23 +610,23 @@
 /* Prepare the Send SGEs. The head and tail iovec, and each entry
  * in the page list, gets its own SGE.
  */
-static bool
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req,
+				     struct xdr_buf *xdr,
+				     enum rpcrdma_chunktype rtype)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	unsigned int sge_no, page_base, len, remaining;
 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
-	struct ib_device *device = ia->ri_device;
 	struct ib_sge *sge = sc->sc_sges;
-	u32 lkey = ia->ri_pd->local_dma_lkey;
 	struct page *page, **ppages;
 
 	/* The head iovec is straightforward, as it is already
 	 * DMA-mapped. Sync the content that has changed.
 	 */
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
+	if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 		goto out_regbuf;
+	sc->sc_device = rdmab_device(rb);
 	sge_no = 1;
 	sge[sge_no].addr = rdmab_addr(rb);
 	sge[sge_no].length = xdr->head[0].iov_len;
@@ -615,13 +673,14 @@
 				goto out_mapping_overflow;
 
 			len = min_t(u32, PAGE_SIZE - page_base, remaining);
-			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
-							   page_base, len,
-							   DMA_TO_DEVICE);
-			if (ib_dma_mapping_error(device, sge[sge_no].addr))
+			sge[sge_no].addr =
+				ib_dma_map_page(rdmab_device(rb), *ppages,
+						page_base, len, DMA_TO_DEVICE);
+			if (ib_dma_mapping_error(rdmab_device(rb),
+						 sge[sge_no].addr))
 				goto out_mapping_err;
 			sge[sge_no].length = len;
-			sge[sge_no].lkey = lkey;
+			sge[sge_no].lkey = rdmab_lkey(rb);
 
 			sc->sc_unmap_count++;
 			ppages++;
@@ -642,20 +701,20 @@
 
 map_tail:
 		sge_no++;
-		sge[sge_no].addr = ib_dma_map_page(device, page,
-						   page_base, len,
-						   DMA_TO_DEVICE);
-		if (ib_dma_mapping_error(device, sge[sge_no].addr))
+		sge[sge_no].addr =
+			ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+					DMA_TO_DEVICE);
+		if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
 			goto out_mapping_err;
 		sge[sge_no].length = len;
-		sge[sge_no].lkey = lkey;
+		sge[sge_no].lkey = rdmab_lkey(rb);
 		sc->sc_unmap_count++;
 	}
 
 out:
 	sc->sc_wr.num_sge += sge_no;
 	if (sc->sc_unmap_count)
-		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+		kref_get(&req->rl_kref);
 	return true;
 
 out_regbuf:
@@ -663,13 +722,13 @@
 	return false;
 
 out_mapping_overflow:
-	rpcrdma_unmap_sendctx(sc);
+	rpcrdma_sendctx_unmap(sc);
 	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
 	return false;
 
 out_mapping_err:
-	rpcrdma_unmap_sendctx(sc);
-	pr_err("rpcrdma: Send mapping error\n");
+	rpcrdma_sendctx_unmap(sc);
+	trace_xprtrdma_dma_maperr(sge[sge_no].addr);
 	return false;
 }
 
@@ -688,22 +747,28 @@
 			  struct rpcrdma_req *req, u32 hdrlen,
 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
-	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+	int ret;
+
+	ret = -EAGAIN;
+	req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 	if (!req->rl_sendctx)
-		return -EAGAIN;
+		goto err;
 	req->rl_sendctx->sc_wr.num_sge = 0;
 	req->rl_sendctx->sc_unmap_count = 0;
 	req->rl_sendctx->sc_req = req;
-	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	kref_init(&req->rl_kref);
 
-	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
-		return -EIO;
-
+	ret = -EIO;
+	if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
+		goto err;
 	if (rtype != rpcrdma_areadch)
-		if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
-			return -EIO;
-
+		if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
+			goto err;
 	return 0;
+
+err:
+	trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+	return ret;
 }
 
 /**
@@ -736,8 +801,8 @@
 	int ret;
 
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
-	xdr_init_encode(xdr, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+	xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
+			rqst);
 
 	/* Fixed header fields */
 	ret = -EMSGSIZE;
@@ -766,7 +831,8 @@
 	 */
 	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
-	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+	else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
+		 rpcrdma_nonpayload_inline(r_xprt, rqst))
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
@@ -801,12 +867,7 @@
 	 * chunks. Very likely the connection has been replaced,
 	 * so these registrations are invalid and unusable.
 	 */
-	while (unlikely(!list_empty(&req->rl_registered))) {
-		struct rpcrdma_mr *mr;
-
-		mr = rpcrdma_mr_pop(&req->rl_registered);
-		rpcrdma_mr_defer_recovery(mr);
-	}
+	frwr_recycle(req);
 
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
@@ -830,49 +891,28 @@
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	if (rtype != rpcrdma_noch) {
-		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
+	ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 	if (ret)
 		goto out_err;
 
-	if (wtype == rpcrdma_writech) {
-		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
-	if (ret)
-		goto out_err;
-
-	if (wtype != rpcrdma_replych)
-		ret = encode_item_not_present(xdr);
-	else
-		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
-	if (ret)
-		goto out_err;
-
-	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
-	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+	ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
 					&rqst->rq_snd_buf, rtype);
 	if (ret)
 		goto out_err;
+
+	trace_xprtrdma_marshal(req, rtype, wtype);
 	return 0;
 
 out_err:
-	switch (ret) {
-	case -EAGAIN:
-		xprt_wait_for_buffer_space(rqst->rq_task, NULL);
-		break;
-	case -ENOBUFS:
-		break;
-	default:
-		r_xprt->rx_stats.failed_marshal_count++;
-	}
+	trace_xprtrdma_marshal_failed(rqst, ret);
+	r_xprt->rx_stats.failed_marshal_count++;
+	frwr_reset(req);
 	return ret;
 }
 
@@ -1190,17 +1230,20 @@
 		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
 		if (!p)
 			break;
-		dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
-			rqst->rq_task->tk_pid, __func__,
-			be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+		dprintk("RPC:       %s: server reports "
+			"version error (%u-%u), xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(*(p + 1)),
+			be32_to_cpu(rep->rr_xid));
 		break;
 	case err_chunk:
-		dprintk("RPC: %5u: %s: server reports header decoding error\n",
-			rqst->rq_task->tk_pid, __func__);
+		dprintk("RPC:       %s: server reports "
+			"header decoding error, xid %08x\n", __func__,
+			be32_to_cpu(rep->rr_xid));
 		break;
 	default:
-		dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
-			rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+		dprintk("RPC:       %s: server reports "
+			"unrecognized error %d, xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
 	}
 
 	r_xprt->rx_stats.bad_reply_count++;
@@ -1216,11 +1259,8 @@
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpc_rqst *rqst = rep->rr_rqst;
-	unsigned long cwnd;
 	int status;
 
-	xprt->reestablish_timeout = 0;
-
 	switch (rep->rr_proc) {
 	case rdma_msg:
 		status = rpcrdma_decode_msg(r_xprt, rep, rqst);
@@ -1238,15 +1278,10 @@
 		goto out_badheader;
 
 out:
-	spin_lock(&xprt->recv_lock);
-	cwnd = xprt->cwnd;
-	xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
-	if (xprt->cwnd > cwnd)
-		xprt_release_rqst_cong(rqst->rq_task);
-
+	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(rqst->rq_task, status);
 	xprt_unpin_rqst(rqst);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	return;
 
 /* If the incoming reply terminated a pending RPC, the next
@@ -1256,56 +1291,20 @@
 out_badheader:
 	trace_xprtrdma_reply_hdr(rep);
 	r_xprt->rx_stats.bad_reply_count++;
-	status = -EIO;
 	goto out;
 }
 
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-	/* Invalidate and unmap the data payloads before waking
-	 * the waiting application. This guarantees the memory
-	 * regions are properly fenced from the server before the
-	 * application accesses the data. It also ensures proper
-	 * send flow control: waking the next RPC waits until this
-	 * RPC has relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&req->rl_registered))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
-						    &req->rl_registered);
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
 
-	/* Ensure that any DMA mapped pages associated with
-	 * the Send of the RPC Call have been unmapped before
-	 * allowing the RPC to complete. This protects argument
-	 * memory not controlled by the RPC client from being
-	 * re-used before we're done with it.
-	 */
-	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-		r_xprt->rx_stats.reply_waits_for_send++;
-		out_of_line_wait_on_bit(&req->rl_flags,
-					RPCRDMA_REQ_F_TX_RESOURCES,
-					bit_wait,
-					TASK_UNINTERRUPTIBLE);
-	}
+	rpcrdma_complete_rqst(req->rl_reply);
 }
 
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
-{
-	struct rpcrdma_rep *rep =
-			container_of(work, struct rpcrdma_rep, rr_work);
-	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-
-	trace_xprtrdma_defer_cmp(rep);
-	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
-		r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
-	rpcrdma_release_rqst(r_xprt, req);
-	rpcrdma_complete_rqst(rep);
-}
-
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
@@ -1320,14 +1319,15 @@
 	u32 credits;
 	__be32 *p;
 
-	--buf->rb_posted_receives;
-
-	if (rep->rr_hdrbuf.head[0].iov_len == 0)
-		goto out_badstatus;
+	/* Any data means we had a useful conversation, so
+	 * then we don't need to delay the next reconnect.
+	 */
+	if (xprt->reestablish_timeout)
+		xprt->reestablish_timeout = 0;
 
 	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
-			rep->rr_hdrbuf.head[0].iov_base);
+			rep->rr_hdrbuf.head[0].iov_base, NULL);
 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
 	if (unlikely(!p))
 		goto out_shortreply;
@@ -1345,51 +1345,55 @@
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
 	if (!rqst)
 		goto out_norqst;
 	xprt_pin_rqst(rqst);
+	spin_unlock(&xprt->queue_lock);
 
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
 	else if (credits > buf->rb_max_requests)
 		credits = buf->rb_max_requests;
-	buf->rb_credits = credits;
-
-	spin_unlock(&xprt->recv_lock);
+	if (buf->rb_credits != credits) {
+		spin_lock(&xprt->transport_lock);
+		buf->rb_credits = credits;
+		xprt->cwnd = credits << RPC_CWNDSHIFT;
+		spin_unlock(&xprt->transport_lock);
+	}
 
 	req = rpcr_to_rdmar(rqst);
+	if (req->rl_reply) {
+		trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
+		rpcrdma_recv_buffer_put(req->rl_reply);
+	}
 	req->rl_reply = rep;
 	rep->rr_rqst = rqst;
-	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
 
-	rpcrdma_post_recvs(r_xprt, false);
-	queue_work(rpcrdma_receive_wq, &rep->rr_work);
+	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+		frwr_reminv(rep, &req->rl_registered);
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_async(r_xprt, req);
+		/* LocalInv completion will complete the RPC */
+	else
+		kref_put(&req->rl_kref, rpcrdma_reply_done);
 	return;
 
 out_badversion:
 	trace_xprtrdma_reply_vers(rep);
-	goto repost;
+	goto out;
 
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
 out_norqst:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	trace_xprtrdma_reply_rqst(rep);
-	goto repost;
+	goto out;
 
 out_shortreply:
 	trace_xprtrdma_reply_short(rep);
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
-	rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
 	rpcrdma_recv_buffer_put(rep);
 }