Update Linux to v5.4.2
Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 8bf19e1..8ed0377 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,8 +1,7 @@
# SPDX-License-Identifier: GPL-2.0
obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
-rpcrdma-y := transport.o rpc_rdma.o verbs.o \
- fmr_ops.o frwr_ops.o \
+rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
module.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 90adeff..b458bf5 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,7 +5,6 @@
* Support for backward direction RPCs on RPC/RDMA.
*/
-#include <linux/module.h>
#include <linux/sunrpc/xprt.h>
#include <linux/sunrpc/svc.h>
#include <linux/sunrpc/svc_xprt.h>
@@ -20,59 +19,6 @@
#undef RPCRDMA_BACKCHANNEL_DEBUG
-static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
- struct rpc_rqst *rqst)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-
- spin_lock(&buf->rb_reqslock);
- list_del(&req->rl_all);
- spin_unlock(&buf->rb_reqslock);
-
- rpcrdma_destroy_req(req);
-}
-
-static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
- unsigned int count)
-{
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- struct rpc_rqst *rqst;
- unsigned int i;
-
- for (i = 0; i < (count << 1); i++) {
- struct rpcrdma_regbuf *rb;
- struct rpcrdma_req *req;
- size_t size;
-
- req = rpcrdma_create_req(r_xprt);
- if (IS_ERR(req))
- return PTR_ERR(req);
- rqst = &req->rl_slot;
-
- rqst->rq_xprt = xprt;
- INIT_LIST_HEAD(&rqst->rq_list);
- INIT_LIST_HEAD(&rqst->rq_bc_list);
- __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
- spin_lock_bh(&xprt->bc_pa_lock);
- list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
-
- size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_sendbuf = rb;
- xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
- min_t(size_t, size, PAGE_SIZE));
- }
- return 0;
-
-out_fail:
- rpcrdma_bc_free_rqst(r_xprt, rqst);
- return -ENOMEM;
-}
-
/**
* xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
@@ -83,55 +29,10 @@
int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- int rc;
- /* The backchannel reply path returns each rpc_rqst to the
- * bc_pa_list _after_ the reply is sent. If the server is
- * faster than the client, it can send another backward
- * direction request before the rpc_rqst is returned to the
- * list. The client rejects the request in this case.
- *
- * Twice as many rpc_rqsts are prepared to ensure there is
- * always an rpc_rqst available as soon as a reply is sent.
- */
- if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
- goto out_err;
-
- rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
- if (rc)
- goto out_free;
-
- r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
- request_module("svcrdma");
+ r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
trace_xprtrdma_cb_setup(r_xprt, reqs);
return 0;
-
-out_free:
- xprt_rdma_bc_destroy(xprt, reqs);
-
-out_err:
- pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
- return -ENOMEM;
-}
-
-/**
- * xprt_rdma_bc_up - Create transport endpoint for backchannel service
- * @serv: server endpoint
- * @net: network namespace
- *
- * The "xprt" is an implied argument: it supplies the name of the
- * backchannel transport class.
- *
- * Returns zero on success, negative errno on failure
- */
-int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
-{
- int ret;
-
- ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
- if (ret < 0)
- return ret;
- return 0;
}
/**
@@ -143,14 +44,19 @@
size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
size_t maxmsg;
- maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+ maxmsg = min_t(unsigned int, ep->rep_inline_send, ep->rep_inline_recv);
maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
return maxmsg - RPCRDMA_HDRLEN_MIN;
}
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
+{
+ return RPCRDMA_BACKWARD_WRS >> 1;
+}
+
static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
@@ -159,7 +65,7 @@
rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
- req->rl_rdmabuf->rg_base);
+ rdmab_data(req->rl_rdmabuf), rqst);
p = xdr_reserve_space(&req->rl_stream, 28);
if (unlikely(!p))
@@ -194,18 +100,21 @@
*/
int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
int rc;
- if (!xprt_connected(rqst->rq_xprt))
- goto drop_connection;
+ if (!xprt_connected(xprt))
+ return -ENOTCONN;
+
+ if (!xprt_request_get_cong(xprt, rqst))
+ return -EBADSLT;
rc = rpcrdma_bc_marshal_reply(rqst);
if (rc < 0)
goto failed_marshal;
- rpcrdma_post_recvs(r_xprt, true);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
return 0;
@@ -214,7 +123,7 @@
if (rc != -ENOTCONN)
return rc;
drop_connection:
- xprt_disconnect_done(rqst->rq_xprt);
+ xprt_rdma_close(xprt);
return -ENOTCONN;
}
@@ -225,19 +134,18 @@
*/
void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpc_rqst *rqst, *tmp;
- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
list_del(&rqst->rq_bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);
- rpcrdma_bc_free_rqst(r_xprt, rqst);
+ rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
}
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);
}
/**
@@ -249,15 +157,50 @@
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpc_xprt *xprt = rqst->rq_xprt;
- dprintk("RPC: %s: freeing rqst %p (req %p)\n",
- __func__, rqst, req);
-
rpcrdma_recv_buffer_put(req->rl_reply);
req->rl_reply = NULL;
- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);
+ xprt_put(xprt);
+}
+
+static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ size_t size;
+
+ spin_lock(&xprt->bc_pa_lock);
+ rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
+ rq_bc_pa_list);
+ if (!rqst)
+ goto create_req;
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+ return rqst;
+
+create_req:
+ spin_unlock(&xprt->bc_pa_lock);
+
+ /* Set a limit to prevent a remote from overrunning our resources.
+ */
+ if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
+ return NULL;
+
+ size = min_t(size_t, r_xprt->rx_ep.rep_inline_recv, PAGE_SIZE);
+ req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
+ if (!req)
+ return NULL;
+
+ xprt->bc_alloc_count++;
+ rqst = &req->rl_slot;
+ rqst->rq_xprt = xprt;
+ __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+ xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+ return rqst;
}
/**
@@ -291,20 +234,11 @@
pr_info("RPC: %s: %*ph\n", __func__, size, p);
#endif
- /* Grab a free bc rqst */
- spin_lock(&xprt->bc_pa_lock);
- if (list_empty(&xprt->bc_pa_list)) {
- spin_unlock(&xprt->bc_pa_lock);
+ rqst = rpcrdma_bc_rqst_get(r_xprt);
+ if (!rqst)
goto out_overflow;
- }
- rqst = list_first_entry(&xprt->bc_pa_list,
- struct rpc_rqst, rq_bc_pa_list);
- list_del(&rqst->rq_bc_pa_list);
- spin_unlock(&xprt->bc_pa_lock);
- /* Prepare rqst */
rqst->rq_reply_bytes_recvd = 0;
- rqst->rq_bytes_sent = 0;
rqst->rq_xid = *p;
rqst->rq_private_buf.len = size;
@@ -326,6 +260,7 @@
/* Queue rqst for ULP's callback service */
bc_serv = xprt->bc_serv;
+ xprt_get(xprt);
spin_lock(&bc_serv->sv_cb_lock);
list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
spin_unlock(&bc_serv->sv_cb_lock);
@@ -337,7 +272,7 @@
out_overflow:
pr_warn("RPC/RDMA backchannel overflow\n");
- xprt_disconnect_done(xprt);
+ xprt_force_disconnect(xprt);
/* This receive buffer gets reposted automatically
* when the connection is re-established.
*/
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
deleted file mode 100644
index 0f7c465..0000000
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ /dev/null
@@ -1,348 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * Copyright (c) 2015, 2017 Oracle. All rights reserved.
- * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
- */
-
-/* Lightweight memory registration using Fast Memory Regions (FMR).
- * Referred to sometimes as MTHCAFMR mode.
- *
- * FMR uses synchronous memory registration and deregistration.
- * FMR registration is known to be fast, but FMR deregistration
- * can take tens of usecs to complete.
- */
-
-/* Normal operation
- *
- * A Memory Region is prepared for RDMA READ or WRITE using the
- * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
- * finished, the Memory Region is unmapped using the ib_unmap_fmr
- * verb (fmr_op_unmap).
- */
-
-#include <linux/sunrpc/svc_rdma.h>
-
-#include "xprt_rdma.h"
-#include <trace/events/rpcrdma.h>
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY RPCDBG_TRANS
-#endif
-
-/* Maximum scatter/gather per FMR */
-#define RPCRDMA_MAX_FMR_SGES (64)
-
-/* Access mode of externally registered pages */
-enum {
- RPCRDMA_FMR_ACCESS_FLAGS = IB_ACCESS_REMOTE_WRITE |
- IB_ACCESS_REMOTE_READ,
-};
-
-bool
-fmr_is_supported(struct rpcrdma_ia *ia)
-{
- if (!ia->ri_device->alloc_fmr) {
- pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n",
- ia->ri_device->name);
- return false;
- }
- return true;
-}
-
-static int
-fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
- static struct ib_fmr_attr fmr_attr = {
- .max_pages = RPCRDMA_MAX_FMR_SGES,
- .max_maps = 1,
- .page_shift = PAGE_SHIFT
- };
-
- mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(u64), GFP_KERNEL);
- if (!mr->fmr.fm_physaddrs)
- goto out_free;
-
- mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(*mr->mr_sg), GFP_KERNEL);
- if (!mr->mr_sg)
- goto out_free;
-
- sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES);
-
- mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
- &fmr_attr);
- if (IS_ERR(mr->fmr.fm_mr))
- goto out_fmr_err;
-
- INIT_LIST_HEAD(&mr->mr_list);
- return 0;
-
-out_fmr_err:
- dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__,
- PTR_ERR(mr->fmr.fm_mr));
-
-out_free:
- kfree(mr->mr_sg);
- kfree(mr->fmr.fm_physaddrs);
- return -ENOMEM;
-}
-
-static int
-__fmr_unmap(struct rpcrdma_mr *mr)
-{
- LIST_HEAD(l);
- int rc;
-
- list_add(&mr->fmr.fm_mr->list, &l);
- rc = ib_unmap_fmr(&l);
- list_del(&mr->fmr.fm_mr->list);
- return rc;
-}
-
-static void
-fmr_op_release_mr(struct rpcrdma_mr *mr)
-{
- LIST_HEAD(unmap_list);
- int rc;
-
- kfree(mr->fmr.fm_physaddrs);
- kfree(mr->mr_sg);
-
- /* In case this one was left mapped, try to unmap it
- * to prevent dealloc_fmr from failing with EBUSY
- */
- rc = __fmr_unmap(mr);
- if (rc)
- pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
- mr, rc);
-
- rc = ib_dealloc_fmr(mr->fmr.fm_mr);
- if (rc)
- pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
- mr, rc);
-
- kfree(mr);
-}
-
-/* Reset of a single FMR.
- */
-static void
-fmr_op_recover_mr(struct rpcrdma_mr *mr)
-{
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- int rc;
-
- /* ORDER: invalidate first */
- rc = __fmr_unmap(mr);
- if (rc)
- goto out_release;
-
- /* ORDER: then DMA unmap */
- rpcrdma_mr_unmap_and_put(mr);
-
- r_xprt->rx_stats.mrs_recovered++;
- return;
-
-out_release:
- pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mr);
- r_xprt->rx_stats.mrs_orphaned++;
-
- trace_xprtrdma_dma_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
-
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
- list_del(&mr->mr_all);
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
- fmr_op_release_mr(mr);
-}
-
-/* On success, sets:
- * ep->rep_attr.cap.max_send_wr
- * ep->rep_attr.cap.max_recv_wr
- * cdata->max_requests
- * ia->ri_max_segs
- */
-static int
-fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
-{
- int max_qp_wr;
-
- max_qp_wr = ia->ri_device->attrs.max_qp_wr;
- max_qp_wr -= RPCRDMA_BACKWARD_WRS;
- max_qp_wr -= 1;
- if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
- return -ENOMEM;
- if (cdata->max_requests > max_qp_wr)
- cdata->max_requests = max_qp_wr;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests;
- ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
- ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
-
- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
- RPCRDMA_MAX_FMR_SGES);
- return 0;
-}
-
-/* FMR mode conveys up to 64 pages of payload per chunk segment.
- */
-static size_t
-fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
-{
- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
-}
-
-/* Use the ib_map_phys_fmr() verb to register a memory region
- * for remote access via RDMA READ or RDMA WRITE.
- */
-static struct rpcrdma_mr_seg *
-fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, struct rpcrdma_mr **out)
-{
- struct rpcrdma_mr_seg *seg1 = seg;
- int len, pageoff, i, rc;
- struct rpcrdma_mr *mr;
- u64 *dma_pages;
-
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- return ERR_PTR(-EAGAIN);
-
- pageoff = offset_in_page(seg1->mr_offset);
- seg1->mr_offset -= pageoff; /* start of page */
- seg1->mr_len += pageoff;
- len = -pageoff;
- if (nsegs > RPCRDMA_MAX_FMR_SGES)
- nsegs = RPCRDMA_MAX_FMR_SGES;
- for (i = 0; i < nsegs;) {
- if (seg->mr_page)
- sg_set_page(&mr->mr_sg[i],
- seg->mr_page,
- seg->mr_len,
- offset_in_page(seg->mr_offset));
- else
- sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
- seg->mr_len);
- len += seg->mr_len;
- ++seg;
- ++i;
- /* Check for holes */
- if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
- offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
- break;
- }
- mr->mr_dir = rpcrdma_data_dir(writing);
-
- mr->mr_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, i, mr->mr_dir);
- if (!mr->mr_nents)
- goto out_dmamap_err;
- trace_xprtrdma_dma_map(mr);
-
- for (i = 0, dma_pages = mr->fmr.fm_physaddrs; i < mr->mr_nents; i++)
- dma_pages[i] = sg_dma_address(&mr->mr_sg[i]);
- rc = ib_map_phys_fmr(mr->fmr.fm_mr, dma_pages, mr->mr_nents,
- dma_pages[0]);
- if (rc)
- goto out_maperr;
-
- mr->mr_handle = mr->fmr.fm_mr->rkey;
- mr->mr_length = len;
- mr->mr_offset = dma_pages[0] + pageoff;
-
- *out = mr;
- return seg;
-
-out_dmamap_err:
- pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
- mr->mr_sg, i);
- rpcrdma_mr_put(mr);
- return ERR_PTR(-EIO);
-
-out_maperr:
- pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
- len, (unsigned long long)dma_pages[0],
- pageoff, mr->mr_nents, rc);
- rpcrdma_mr_unmap_and_put(mr);
- return ERR_PTR(-EIO);
-}
-
-/* Post Send WR containing the RPC Call message.
- */
-static int
-fmr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
-{
- return ib_post_send(ia->ri_id->qp, &req->rl_sendctx->sc_wr, NULL);
-}
-
-/* Invalidate all memory regions that were registered for "req".
- *
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
- */
-static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
-{
- struct rpcrdma_mr *mr;
- LIST_HEAD(unmap_list);
- int rc;
-
- /* ORDER: Invalidate all of the req's MRs first
- *
- * ib_unmap_fmr() is slow, so use a single call instead
- * of one call per mapped FMR.
- */
- list_for_each_entry(mr, mrs, mr_list) {
- dprintk("RPC: %s: unmapping fmr %p\n",
- __func__, &mr->fmr);
- trace_xprtrdma_localinv(mr);
- list_add_tail(&mr->fmr.fm_mr->list, &unmap_list);
- }
- r_xprt->rx_stats.local_inv_needed++;
- rc = ib_unmap_fmr(&unmap_list);
- if (rc)
- goto out_reset;
-
- /* ORDER: Now DMA unmap all of the req's MRs, and return
- * them to the free MW list.
- */
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- list_del(&mr->fmr.fm_mr->list);
- rpcrdma_mr_unmap_and_put(mr);
- }
-
- return;
-
-out_reset:
- pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
-
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- list_del(&mr->fmr.fm_mr->list);
- fmr_op_recover_mr(mr);
- }
-}
-
-const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
- .ro_map = fmr_op_map,
- .ro_send = fmr_op_send,
- .ro_unmap_sync = fmr_op_unmap_sync,
- .ro_recover_mr = fmr_op_recover_mr,
- .ro_open = fmr_op_open,
- .ro_maxpages = fmr_op_maxpages,
- .ro_init_mr = fmr_op_init_mr,
- .ro_release_mr = fmr_op_release_mr,
- .ro_displayname = "fmr",
- .ro_send_w_inv_ok = 0,
-};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 1bb00dd..30065a2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -7,67 +7,37 @@
/* Lightweight memory registration using Fast Registration Work
* Requests (FRWR).
*
- * FRWR features ordered asynchronous registration and deregistration
- * of arbitrarily sized memory regions. This is the fastest and safest
+ * FRWR features ordered asynchronous registration and invalidation
+ * of arbitrarily-sized memory regions. This is the fastest and safest
* but most complex memory registration mode.
*/
/* Normal operation
*
- * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
- * Work Request (frwr_op_map). When the RDMA operation is finished, this
+ * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
+ * Work Request (frwr_map). When the RDMA operation is finished, this
* Memory Region is invalidated using a LOCAL_INV Work Request
- * (frwr_op_unmap_sync).
+ * (frwr_unmap_async and frwr_unmap_sync).
*
- * Typically these Work Requests are not signaled, and neither are RDMA
- * SEND Work Requests (with the exception of signaling occasionally to
- * prevent provider work queue overflows). This greatly reduces HCA
+ * Typically FAST_REG Work Requests are not signaled, and neither are
+ * RDMA Send Work Requests (with the exception of signaling occasionally
+ * to prevent provider work queue overflows). This greatly reduces HCA
* interrupt workload.
- *
- * As an optimization, frwr_op_unmap marks MRs INVALID before the
- * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
- * rb_mrs immediately so that no work (like managing a linked list
- * under a spinlock) is needed in the completion upcall.
- *
- * But this means that frwr_op_map() can occasionally encounter an MR
- * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
- * ordering prevents a subsequent FAST_REG WR from executing against
- * that MR while it is still being invalidated.
*/
/* Transport recovery
*
- * ->op_map and the transport connect worker cannot run at the same
- * time, but ->op_unmap can fire while the transport connect worker
- * is running. Thus MR recovery is handled in ->op_map, to guarantee
- * that recovered MRs are owned by a sending RPC, and not one where
- * ->op_unmap could fire at the same time transport reconnect is
- * being done.
+ * frwr_map and frwr_unmap_* cannot run at the same time the transport
+ * connect worker is running. The connect worker holds the transport
+ * send lock, just as ->send_request does. This prevents frwr_map and
+ * the connect worker from running concurrently. When a connection is
+ * closed, the Receive completion queue is drained before the allowing
+ * the connect worker to get control. This prevents frwr_unmap and the
+ * connect worker from running concurrently.
*
- * When the underlying transport disconnects, MRs are left in one of
- * four states:
- *
- * INVALID: The MR was not in use before the QP entered ERROR state.
- *
- * VALID: The MR was registered before the QP entered ERROR state.
- *
- * FLUSHED_FR: The MR was being registered when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
- * with ib_dereg_mr and then are re-initialized. Because MR recovery
- * allocates fresh resources, it is deferred to a workqueue, and the
- * recovered MRs are placed back on the rb_mrs list when recovery is
- * complete. frwr_op_map allocates another MR for the current RPC while
- * the broken MR is reset.
- *
- * To ensure that frwr_op_map doesn't encounter an MR that is marked
- * INVALID but that is about to be flushed due to a previous transport
- * disconnect, the transport connect worker attempts to drain all
- * pending send queue WRs before the transport is reconnected.
+ * When the underlying transport disconnects, MRs that are in flight
+ * are flushed and are likely unusable. Thus all flushed MRs are
+ * destroyed. New MRs are created on demand.
*/
#include <linux/sunrpc/rpc_rdma.h>
@@ -80,10 +50,15 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-bool
-frwr_is_supported(struct rpcrdma_ia *ia)
+/**
+ * frwr_is_supported - Check if device supports FRWR
+ * @device: interface adapter to check
+ *
+ * Returns true if device supports FRWR, otherwise false
+ */
+bool frwr_is_supported(struct ib_device *device)
{
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
+ struct ib_device_attr *attrs = &device->attrs;
if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
goto out_not_supported;
@@ -93,142 +68,172 @@
out_not_supported:
pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
- ia->ri_device->name);
+ device->name);
return false;
}
-static int
-frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
- unsigned int depth = ia->ri_max_frwr_depth;
- struct rpcrdma_frwr *frwr = &mr->frwr;
- int rc;
-
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
- if (IS_ERR(frwr->fr_mr))
- goto out_mr_err;
-
- mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL);
- if (!mr->mr_sg)
- goto out_list_err;
-
- INIT_LIST_HEAD(&mr->mr_list);
- sg_init_table(mr->mr_sg, depth);
- init_completion(&frwr->fr_linv_done);
- return 0;
-
-out_mr_err:
- rc = PTR_ERR(frwr->fr_mr);
- dprintk("RPC: %s: ib_alloc_mr status %i\n",
- __func__, rc);
- return rc;
-
-out_list_err:
- rc = -ENOMEM;
- dprintk("RPC: %s: sg allocation failure\n",
- __func__);
- ib_dereg_mr(frwr->fr_mr);
- return rc;
-}
-
-static void
-frwr_op_release_mr(struct rpcrdma_mr *mr)
+/**
+ * frwr_release_mr - Destroy one MR
+ * @mr: MR allocated by frwr_init_mr
+ *
+ */
+void frwr_release_mr(struct rpcrdma_mr *mr)
{
int rc;
rc = ib_dereg_mr(mr->frwr.fr_mr);
if (rc)
- pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
- mr, rc);
+ trace_xprtrdma_frwr_dereg(mr, rc);
kfree(mr->mr_sg);
kfree(mr);
}
-static int
-__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
{
- struct rpcrdma_frwr *frwr = &mr->frwr;
- int rc;
+ trace_xprtrdma_mr_recycle(mr);
- rc = ib_dereg_mr(frwr->fr_mr);
- if (rc) {
- pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
- rc, mr);
- return rc;
+ if (mr->mr_dir != DMA_NONE) {
+ trace_xprtrdma_mr_unmap(mr);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+ mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ mr->mr_dir = DMA_NONE;
}
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
- ia->ri_max_frwr_depth);
- if (IS_ERR(frwr->fr_mr)) {
- pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
- PTR_ERR(frwr->fr_mr), mr);
- return PTR_ERR(frwr->fr_mr);
- }
+ spin_lock(&r_xprt->rx_buf.rb_lock);
+ list_del(&mr->mr_all);
+ r_xprt->rx_stats.mrs_recycled++;
+ spin_unlock(&r_xprt->rx_buf.rb_lock);
- dprintk("RPC: %s: recovered FRWR %p\n", __func__, frwr);
- frwr->fr_state = FRWR_IS_INVALID;
- return 0;
+ frwr_release_mr(mr);
}
-/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR.
+/* MRs are dynamically allocated, so simply clean up and release the MR.
+ * A replacement MR will subsequently be allocated on demand.
*/
static void
-frwr_op_recover_mr(struct rpcrdma_mr *mr)
+frwr_mr_recycle_worker(struct work_struct *work)
{
- enum rpcrdma_frwr_state state = mr->frwr.fr_state;
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- int rc;
+ struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr,
+ mr_recycle);
- rc = __frwr_mr_reset(ia, mr);
- if (state != FRWR_FLUSHED_LI) {
- trace_xprtrdma_dma_unmap(mr);
- ib_dma_unmap_sg(ia->ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
- }
- if (rc)
- goto out_release;
-
- rpcrdma_mr_put(mr);
- r_xprt->rx_stats.mrs_recovered++;
- return;
-
-out_release:
- pr_err("rpcrdma: FRWR reset failed %d, %p released\n", rc, mr);
- r_xprt->rx_stats.mrs_orphaned++;
-
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
- list_del(&mr->mr_all);
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
- frwr_op_release_mr(mr);
+ frwr_mr_recycle(mr->mr_xprt, mr);
}
-/* On success, sets:
+/* frwr_recycle - Discard MRs
+ * @req: request to reset
+ *
+ * Used after a reconnect. These MRs could be in flight, we can't
+ * tell. Safe thing to do is release them.
+ */
+void frwr_recycle(struct rpcrdma_req *req)
+{
+ struct rpcrdma_mr *mr;
+
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
+ frwr_mr_recycle(mr->mr_xprt, mr);
+}
+
+/* frwr_reset - Place MRs back on the free list
+ * @req: request to reset
+ *
+ * Used after a failed marshal. For FRWR, this means the MRs
+ * don't have to be fully released and recreated.
+ *
+ * NB: This is safe only as long as none of @req's MRs are
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
+ * Work Request.
+ */
+void frwr_reset(struct rpcrdma_req *req)
+{
+ struct rpcrdma_mr *mr;
+
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
+ rpcrdma_mr_put(mr);
+}
+
+/**
+ * frwr_init_mr - Initialize one MR
+ * @ia: interface adapter
+ * @mr: generic MR to prepare for FRWR
+ *
+ * Returns zero if successful. Otherwise a negative errno
+ * is returned.
+ */
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+{
+ unsigned int depth = ia->ri_max_frwr_depth;
+ struct scatterlist *sg;
+ struct ib_mr *frmr;
+ int rc;
+
+ /* NB: ib_alloc_mr and device drivers typically allocate
+ * memory with GFP_KERNEL.
+ */
+ frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
+ if (IS_ERR(frmr))
+ goto out_mr_err;
+
+ sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
+ if (!sg)
+ goto out_list_err;
+
+ mr->frwr.fr_mr = frmr;
+ mr->mr_dir = DMA_NONE;
+ INIT_LIST_HEAD(&mr->mr_list);
+ INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
+ init_completion(&mr->frwr.fr_linv_done);
+
+ sg_init_table(sg, depth);
+ mr->mr_sg = sg;
+ return 0;
+
+out_mr_err:
+ rc = PTR_ERR(frmr);
+ trace_xprtrdma_frwr_alloc(mr, rc);
+ return rc;
+
+out_list_err:
+ ib_dereg_mr(frmr);
+ return -ENOMEM;
+}
+
+/**
+ * frwr_open - Prepare an endpoint for use with FRWR
+ * @ia: interface adapter this endpoint will use
+ * @ep: endpoint to prepare
+ *
+ * On success, sets:
* ep->rep_attr.cap.max_send_wr
* ep->rep_attr.cap.max_recv_wr
- * cdata->max_requests
+ * ep->rep_max_requests
* ia->ri_max_segs
*
* And these FRWR-related fields:
* ia->ri_max_frwr_depth
* ia->ri_mrtype
+ *
+ * On failure, a negative errno is returned.
*/
-static int
-frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
{
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
+ struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
int max_qp_wr, depth, delta;
ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
- ia->ri_max_frwr_depth =
- min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- attrs->max_fast_reg_page_list_len);
- dprintk("RPC: %s: device's max FR page list len = %u\n",
+ /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
+ * capability, but perform optimally when the MRs are not larger
+ * than a page.
+ */
+ if (attrs->max_sge_rd > 1)
+ ia->ri_max_frwr_depth = attrs->max_sge_rd;
+ else
+ ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
+ if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
+ ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
+ dprintk("RPC: %s: max FR page list depth = %u\n",
__func__, ia->ri_max_frwr_depth);
/* Add room for frwr register and invalidate WRs.
@@ -253,145 +258,78 @@
} while (delta > 0);
}
- max_qp_wr = ia->ri_device->attrs.max_qp_wr;
+ max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
max_qp_wr -= RPCRDMA_BACKWARD_WRS;
max_qp_wr -= 1;
if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
return -ENOMEM;
- if (cdata->max_requests > max_qp_wr)
- cdata->max_requests = max_qp_wr;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests * depth;
+ if (ep->rep_max_requests > max_qp_wr)
+ ep->rep_max_requests = max_qp_wr;
+ ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
- cdata->max_requests = max_qp_wr / depth;
- if (!cdata->max_requests)
+ ep->rep_max_requests = max_qp_wr / depth;
+ if (!ep->rep_max_requests)
return -EINVAL;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests *
- depth;
+ ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
}
ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
- ia->ri_max_frwr_depth);
+ ia->ri_max_segs =
+ DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
+ /* Reply chunks require segments for head and tail buffers */
+ ia->ri_max_segs += 2;
+ if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
+ ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
return 0;
}
-/* FRWR mode conveys a list of pages per chunk segment. The
+/**
+ * frwr_maxpages - Compute size of largest payload
+ * @r_xprt: transport
+ *
+ * Returns maximum size of an RPC message, in pages.
+ *
+ * FRWR mode conveys a list of pages per chunk segment. The
* maximum length of that list is the FRWR page list depth.
*/
-static size_t
-frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth);
-}
-
-static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
-{
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
- wr, ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
+ (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
}
/**
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * frwr_map - Register a memory region
+ * @r_xprt: controlling transport
+ * @seg: memory region co-ordinates
+ * @nsegs: number of segments remaining
+ * @writing: true when RDMA Write will be used
+ * @xid: XID of RPC using the registered memory
+ * @mr: MR to fill in
*
- */
-static void
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr =
- container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
- frwr->fr_state = FRWR_FLUSHED_FR;
- __frwr_sendcompletion_flush(wc, "fastreg");
- }
- trace_xprtrdma_wc_fastreg(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- */
-static void
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
- frwr->fr_state = FRWR_FLUSHED_LI;
- __frwr_sendcompletion_flush(wc, "localinv");
- }
- trace_xprtrdma_wc_li(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- * Awaken anyone waiting for an MR to finish being fenced.
- */
-static void
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
- frwr->fr_state = FRWR_FLUSHED_LI;
- __frwr_sendcompletion_flush(wc, "localinv");
- }
- complete(&frwr->fr_linv_done);
- trace_xprtrdma_wc_li_wake(wc, frwr);
-}
-
-/* Post a REG_MR Work Request to register a memory region
+ * Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
+ *
+ * Returns the next segment or a negative errno pointer.
+ * On success, @mr is filled in.
*/
-static struct rpcrdma_mr_seg *
-frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, struct rpcrdma_mr **out)
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing, __be32 xid,
+ struct rpcrdma_mr *mr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
- struct rpcrdma_frwr *frwr;
- struct rpcrdma_mr *mr;
- struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
+ struct ib_mr *ibmr;
int i, n;
u8 key;
- mr = NULL;
- do {
- if (mr)
- rpcrdma_mr_defer_recovery(mr);
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- return ERR_PTR(-EAGAIN);
- } while (mr->frwr.fr_state != FRWR_IS_INVALID);
- frwr = &mr->frwr;
- frwr->fr_state = FRWR_IS_VALID;
-
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
for (i = 0; i < nsegs;) {
@@ -406,7 +344,7 @@
++seg;
++i;
- if (holes_ok)
+ if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
continue;
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
@@ -414,20 +352,22 @@
}
mr->mr_dir = rpcrdma_data_dir(writing);
- mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
+ mr->mr_nents =
+ ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
if (!mr->mr_nents)
goto out_dmamap_err;
- trace_xprtrdma_dma_map(mr);
- ibmr = frwr->fr_mr;
+ ibmr = mr->frwr.fr_mr;
n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
if (unlikely(n != mr->mr_nents))
goto out_mapmr_err;
+ ibmr->iova &= 0x00000000ffffffff;
+ ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
key = (u8)(ibmr->rkey & 0x000000FF);
ib_update_fast_reg_key(ibmr, ++key);
- reg_wr = &frwr->fr_regwr;
+ reg_wr = &mr->frwr.fr_regwr;
reg_wr->mr = ibmr;
reg_wr->key = ibmr->rkey;
reg_wr->access = writing ?
@@ -437,32 +377,49 @@
mr->mr_handle = ibmr->rkey;
mr->mr_length = ibmr->length;
mr->mr_offset = ibmr->iova;
+ trace_xprtrdma_mr_map(mr);
- *out = mr;
return seg;
out_dmamap_err:
- pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
- mr->mr_sg, i);
- frwr->fr_state = FRWR_IS_INVALID;
- rpcrdma_mr_put(mr);
+ mr->mr_dir = DMA_NONE;
+ trace_xprtrdma_frwr_sgerr(mr, i);
return ERR_PTR(-EIO);
out_mapmr_err:
- pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
- frwr->fr_mr, n, mr->mr_nents);
- rpcrdma_mr_defer_recovery(mr);
+ trace_xprtrdma_frwr_maperr(mr, n);
return ERR_PTR(-EIO);
}
-/* Post Send WR containing the RPC Call message.
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * For FRMR, chain any FastReg WRs to the Send WR. Only a
+ */
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_fastreg(wc, frwr);
+ /* The MR will get recycled when the associated req is retransmitted */
+}
+
+/**
+ * frwr_send - post Send WR containing the RPC Call message
+ * @ia: interface adapter
+ * @req: Prepared RPC Call
+ *
+ * For FRWR, chain any FastReg WRs to the Send WR. Only a
* single ib_post_send call is needed to register memory
* and then post the Send WR.
+ *
+ * Returns the result of ib_post_send.
*/
-static int
-frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
struct ib_send_wr *post_wr;
struct rpcrdma_mr *mr;
@@ -484,45 +441,94 @@
}
/* If ib_post_send fails, the next ->send_request for
- * @req will queue these MWs for recovery.
+ * @req will queue these MRs for recovery.
*/
return ib_post_send(ia->ri_id->qp, post_wr, NULL);
}
-/* Handle a remotely invalidated mr on the @mrs list
+/**
+ * frwr_reminv - handle a remotely invalidated mr on the @mrs list
+ * @rep: Received reply
+ * @mrs: list of MRs to check
+ *
*/
-static void
-frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
{
struct rpcrdma_mr *mr;
list_for_each_entry(mr, mrs, mr_list)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
- trace_xprtrdma_remoteinv(mr);
- mr->frwr.fr_state = FRWR_IS_INVALID;
- rpcrdma_mr_unmap_and_put(mr);
+ trace_xprtrdma_mr_remoteinv(mr);
+ rpcrdma_mr_put(mr);
break; /* only one invalidated MR per RPC */
}
}
-/* Invalidate all memory regions that were registered for "req".
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
+{
+ if (wc->status != IB_WC_SUCCESS)
+ rpcrdma_mr_recycle(mr);
+ else
+ rpcrdma_mr_put(mr);
+}
+
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
*/
-static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li(wc, frwr);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_wake(wc, frwr);
+ __frwr_release_mr(wc, mr);
+ complete(&frwr->fr_linv_done);
+}
+
+/**
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * Sleeps until it is safe for the host CPU to access the previously mapped
+ * memory regions. This guarantees that registered MRs are properly fenced
+ * from the server before the RPC consumer accesses the data in them. It
+ * also ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *first, **prev, *last;
const struct ib_send_wr *bad_wr;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_frwr *frwr;
struct rpcrdma_mr *mr;
- int count, rc;
+ int rc;
/* ORDER: Invalidate all of the MRs first
*
@@ -530,33 +536,31 @@
* a single ib_post_send() call.
*/
frwr = NULL;
- count = 0;
prev = &first;
- list_for_each_entry(mr, mrs, mr_list) {
- mr->frwr.fr_state = FRWR_IS_INVALID;
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
+
+ trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
frwr = &mr->frwr;
- trace_xprtrdma_localinv(mr);
-
frwr->fr_cqe.done = frwr_wc_localinv;
last = &frwr->fr_invwr;
- memset(last, 0, sizeof(*last));
+ last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
last->ex.invalidate_rkey = mr->mr_handle;
- count++;
*prev = last;
prev = &last->next;
}
- if (!frwr)
- goto unmap;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
* are complete.
*/
- last->send_flags = IB_SEND_SIGNALED;
frwr->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&frwr->fr_linv_done);
@@ -564,52 +568,128 @@
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
*/
- r_xprt->rx_stats.local_inv_needed++;
bad_wr = NULL;
- rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so don't wait in that case.
+ */
if (bad_wr != first)
wait_for_completion(&frwr->fr_linv_done);
- if (rc)
- goto reset_mrs;
+ if (!rc)
+ return;
- /* ORDER: Now DMA unmap all of the MRs, and return
- * them to the free MR list.
- */
-unmap:
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- rpcrdma_mr_unmap_and_put(mr);
- }
- return;
-
-reset_mrs:
- pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
-
- /* Find and reset the MRs in the LOCAL_INV WRs that did not
- * get posted.
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr,
fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
-
- __frwr_mr_reset(ia, mr);
-
bad_wr = bad_wr->next;
+
+ list_del_init(&mr->mr_list);
+ rpcrdma_mr_recycle(mr);
}
- goto unmap;
}
-const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
- .ro_map = frwr_op_map,
- .ro_send = frwr_op_send,
- .ro_reminv = frwr_op_reminv,
- .ro_unmap_sync = frwr_op_unmap_sync,
- .ro_recover_mr = frwr_op_recover_mr,
- .ro_open = frwr_op_open,
- .ro_maxpages = frwr_op_maxpages,
- .ro_init_mr = frwr_op_init_mr,
- .ro_release_mr = frwr_op_release_mr,
- .ro_displayname = "frwr",
- .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK,
-};
+/**
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_done(wc, frwr);
+ __frwr_release_mr(wc, mr);
+
+ /* Ensure @rep is generated before __frwr_release_mr */
+ smp_rmb();
+ rpcrdma_complete_rqst(rep);
+}
+
+/**
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * This guarantees that registered MRs are properly fenced from the
+ * server before the RPC consumer accesses the data in them. It also
+ * ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ struct ib_send_wr *first, *last, **prev;
+ const struct ib_send_wr *bad_wr;
+ struct rpcrdma_frwr *frwr;
+ struct rpcrdma_mr *mr;
+ int rc;
+
+ /* Chain the LOCAL_INV Work Requests and post them with
+ * a single ib_post_send() call.
+ */
+ frwr = NULL;
+ prev = &first;
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
+
+ trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
+
+ frwr = &mr->frwr;
+ frwr->fr_cqe.done = frwr_wc_localinv;
+ last = &frwr->fr_invwr;
+ last->next = NULL;
+ last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
+ last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
+ last->ex.invalidate_rkey = mr->mr_handle;
+
+ *prev = last;
+ prev = &last->next;
+ }
+
+ /* Strong send queue ordering guarantees that when the
+ * last WR in the chain completes, all WRs in the chain
+ * are complete. The last completion will wake up the
+ * RPC waiter.
+ */
+ frwr->fr_cqe.done = frwr_wc_localinv_done;
+
+ /* Transport disconnect drains the receive CQ before it
+ * replaces the QP. The RPC reply handler won't call us
+ * unless ri_id->qp is a valid pointer.
+ */
+ bad_wr = NULL;
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+ if (!rc)
+ return;
+
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
+ */
+ while (bad_wr) {
+ frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
+ mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ bad_wr = bad_wr->next;
+
+ rpcrdma_mr_recycle(mr);
+ }
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so wake here in that case.
+ */
+ rpcrdma_complete_rqst(req->rl_reply);
+}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983..b86b5fd 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,7 +71,6 @@
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Read list size */
- maxsegs += 2; /* segment for head and tail buffers */
size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
/* Minimal Read chunk size */
@@ -97,7 +96,6 @@
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Write list size */
- maxsegs += 2; /* segment for head and tail buffers */
size = sizeof(__be32); /* segment count */
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
@@ -107,16 +105,23 @@
return size;
}
+/**
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
+ * @r_xprt: transport instance to initialize
+ *
+ * The max_inline fields contain the maximum size of an RPC message
+ * so the marshaling code doesn't have to repeat this calculation
+ * for every RPC.
+ */
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- unsigned int maxsegs = ia->ri_max_segs;
+ unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- ia->ri_max_inline_write = cdata->inline_wsize -
- rpcrdma_max_call_header_size(maxsegs);
- ia->ri_max_inline_read = cdata->inline_rsize -
- rpcrdma_max_reply_header_size(maxsegs);
+ ep->rep_max_inline_send =
+ ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
+ ep->rep_max_inline_recv =
+ ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
}
/* The client can send a request inline as long as the RPCRDMA header
@@ -133,7 +138,7 @@
struct xdr_buf *xdr = &rqst->rq_snd_buf;
unsigned int count, remaining, offset;
- if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
+ if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
return false;
if (xdr->page_len) {
@@ -161,9 +166,21 @@
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
+}
- return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
+/* The client is required to provide a Reply chunk if the maximum
+ * size of the non-payload part of the RPC Reply is larger than
+ * the inline threshold.
+ */
+static bool
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
+ const struct rpc_rqst *rqst)
+{
+ const struct xdr_buf *buf = &rqst->rq_rcv_buf;
+
+ return (buf->head[0].iov_len + buf->tail[0].iov_len) <
+ r_xprt->rx_ep.rep_max_inline_recv;
}
/* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -220,11 +237,12 @@
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdrbuf->page_base);
while (len) {
- if (unlikely(!*ppages)) {
- /* XXX: Certain upper layer operations do
- * not provide receive buffer pages.
- */
- *ppages = alloc_page(GFP_ATOMIC);
+ /* ACL likes to be lazy in allocating pages - ACLs
+ * are small by default but can get huge.
+ */
+ if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
+ if (!*ppages)
+ *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
if (!*ppages)
return -ENOBUFS;
}
@@ -324,6 +342,32 @@
return 0;
}
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing,
+ struct rpcrdma_mr **mr)
+{
+ *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+ if (!*mr) {
+ *mr = rpcrdma_mr_get(r_xprt);
+ if (!*mr)
+ goto out_getmr_err;
+ trace_xprtrdma_mr_get(req);
+ (*mr)->mr_req = req;
+ }
+
+ rpcrdma_mr_push(*mr, &req->rl_registered);
+ return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+ trace_xprtrdma_nomrs(req);
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+ if (r_xprt->rx_ep.rep_connected != -ENODEV)
+ schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
+ return ERR_PTR(-EAGAIN);
+}
+
/* Register and XDR encode the Read list. Supports encoding a list of read
* segments that belong to a single read chunk.
*
@@ -338,9 +382,10 @@
*
* Only a single @pos value is currently supported.
*/
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype rtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -348,6 +393,9 @@
unsigned int pos;
int nsegs;
+ if (rtype == rpcrdma_noch)
+ goto done;
+
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
@@ -358,21 +406,20 @@
return nsegs;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- false, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
- trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
+ trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
r_xprt->rx_stats.read_chunk_count++;
nsegs -= mr->mr_nents;
} while (nsegs);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -390,9 +437,10 @@
*
* Only a single Write chunk is currently supported.
*/
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -400,6 +448,9 @@
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_writech)
+ goto done;
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
@@ -416,16 +467,14 @@
nchunks = 0;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- true, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
r_xprt->rx_stats.write_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
@@ -435,7 +484,8 @@
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -450,9 +500,10 @@
* Returns zero on success, or a negative errno if a failure occurred.
* @xdr is advanced to the next position in the stream.
*/
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -460,6 +511,9 @@
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_replych)
+ return encode_item_not_present(xdr);
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
@@ -474,16 +528,14 @@
nchunks = 0;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- true, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
r_xprt->rx_stats.reply_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
@@ -496,51 +548,57 @@
return 0;
}
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
+ struct rpcrdma_rep *rep = req->rl_reply;
+
+ rpcrdma_complete_rqst(rep);
+ rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
/**
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap
*
*/
-void
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
- struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
struct ib_sge *sge;
- unsigned int count;
+
+ if (!sc->sc_unmap_count)
+ return;
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
*/
- sge = &sc->sc_sges[2];
- for (count = sc->sc_unmap_count; count; ++sge, --count)
- ib_dma_unmap_page(ia->ri_device,
- sge->addr, sge->length, DMA_TO_DEVICE);
+ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
+ ++sge, --sc->sc_unmap_count)
+ ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
+ DMA_TO_DEVICE);
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
- smp_mb__after_atomic();
- wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
- }
+ kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
*/
-static bool
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- u32 len)
+static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, u32 len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
struct ib_sge *sge = sc->sc_sges;
- if (!rpcrdma_dma_map_regbuf(ia, rb))
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
- ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
- sge->length, DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+ DMA_TO_DEVICE);
sc->sc_wr.num_sge++;
return true;
@@ -552,23 +610,23 @@
/* Prepare the Send SGEs. The head and tail iovec, and each entry
* in the page list, gets its own SGE.
*/
-static bool
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr,
+ enum rpcrdma_chunktype rtype)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
- struct ib_device *device = ia->ri_device;
struct ib_sge *sge = sc->sc_sges;
- u32 lkey = ia->ri_pd->local_dma_lkey;
struct page *page, **ppages;
/* The head iovec is straightforward, as it is already
* DMA-mapped. Sync the content that has changed.
*/
- if (!rpcrdma_dma_map_regbuf(ia, rb))
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
+ sc->sc_device = rdmab_device(rb);
sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len;
@@ -615,13 +673,14 @@
goto out_mapping_overflow;
len = min_t(u32, PAGE_SIZE - page_base, remaining);
- sge[sge_no].addr = ib_dma_map_page(device, *ppages,
- page_base, len,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ sge[sge_no].addr =
+ ib_dma_map_page(rdmab_device(rb), *ppages,
+ page_base, len, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb),
+ sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
- sge[sge_no].lkey = lkey;
+ sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
ppages++;
@@ -642,20 +701,20 @@
map_tail:
sge_no++;
- sge[sge_no].addr = ib_dma_map_page(device, page,
- page_base, len,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ sge[sge_no].addr =
+ ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
- sge[sge_no].lkey = lkey;
+ sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
}
out:
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
- __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_get(&req->rl_kref);
return true;
out_regbuf:
@@ -663,13 +722,13 @@
return false;
out_mapping_overflow:
- rpcrdma_unmap_sendctx(sc);
+ rpcrdma_sendctx_unmap(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false;
out_mapping_err:
- rpcrdma_unmap_sendctx(sc);
- pr_err("rpcrdma: Send mapping error\n");
+ rpcrdma_sendctx_unmap(sc);
+ trace_xprtrdma_dma_maperr(sge[sge_no].addr);
return false;
}
@@ -688,22 +747,28 @@
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
- req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+ int ret;
+
+ ret = -EAGAIN;
+ req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
- return -EAGAIN;
+ goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
- __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_init(&req->rl_kref);
- if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
- return -EIO;
-
+ ret = -EIO;
+ if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
+ goto err;
if (rtype != rpcrdma_areadch)
- if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
- return -EIO;
-
+ if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
+ goto err;
return 0;
+
+err:
+ trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+ return ret;
}
/**
@@ -736,8 +801,8 @@
int ret;
rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
- xdr_init_encode(xdr, &req->rl_hdrbuf,
- req->rl_rdmabuf->rg_base);
+ xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
+ rqst);
/* Fixed header fields */
ret = -EMSGSIZE;
@@ -766,7 +831,8 @@
*/
if (rpcrdma_results_inline(r_xprt, rqst))
wtype = rpcrdma_noch;
- else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
+ rpcrdma_nonpayload_inline(r_xprt, rqst))
wtype = rpcrdma_writech;
else
wtype = rpcrdma_replych;
@@ -801,12 +867,7 @@
* chunks. Very likely the connection has been replaced,
* so these registrations are invalid and unusable.
*/
- while (unlikely(!list_empty(&req->rl_registered))) {
- struct rpcrdma_mr *mr;
-
- mr = rpcrdma_mr_pop(&req->rl_registered);
- rpcrdma_mr_defer_recovery(mr);
- }
+ frwr_recycle(req);
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
@@ -830,49 +891,28 @@
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- if (rtype != rpcrdma_noch) {
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+ if (ret)
+ goto out_err;
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+ if (ret)
+ goto out_err;
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
- if (wtype == rpcrdma_writech) {
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
- if (ret)
- goto out_err;
-
- if (wtype != rpcrdma_replych)
- ret = encode_item_not_present(xdr);
- else
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
- if (ret)
- goto out_err;
-
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
- ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
if (ret)
goto out_err;
+
+ trace_xprtrdma_marshal(req, rtype, wtype);
return 0;
out_err:
- switch (ret) {
- case -EAGAIN:
- xprt_wait_for_buffer_space(rqst->rq_task, NULL);
- break;
- case -ENOBUFS:
- break;
- default:
- r_xprt->rx_stats.failed_marshal_count++;
- }
+ trace_xprtrdma_marshal_failed(rqst, ret);
+ r_xprt->rx_stats.failed_marshal_count++;
+ frwr_reset(req);
return ret;
}
@@ -1190,17 +1230,20 @@
p = xdr_inline_decode(xdr, 2 * sizeof(*p));
if (!p)
break;
- dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
- rqst->rq_task->tk_pid, __func__,
- be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+ dprintk("RPC: %s: server reports "
+ "version error (%u-%u), xid %08x\n", __func__,
+ be32_to_cpup(p), be32_to_cpu(*(p + 1)),
+ be32_to_cpu(rep->rr_xid));
break;
case err_chunk:
- dprintk("RPC: %5u: %s: server reports header decoding error\n",
- rqst->rq_task->tk_pid, __func__);
+ dprintk("RPC: %s: server reports "
+ "header decoding error, xid %08x\n", __func__,
+ be32_to_cpu(rep->rr_xid));
break;
default:
- dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
- rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+ dprintk("RPC: %s: server reports "
+ "unrecognized error %d, xid %08x\n", __func__,
+ be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
}
r_xprt->rx_stats.bad_reply_count++;
@@ -1216,11 +1259,8 @@
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpc_rqst *rqst = rep->rr_rqst;
- unsigned long cwnd;
int status;
- xprt->reestablish_timeout = 0;
-
switch (rep->rr_proc) {
case rdma_msg:
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
@@ -1238,15 +1278,10 @@
goto out_badheader;
out:
- spin_lock(&xprt->recv_lock);
- cwnd = xprt->cwnd;
- xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
- if (xprt->cwnd > cwnd)
- xprt_release_rqst_cong(rqst->rq_task);
-
+ spin_lock(&xprt->queue_lock);
xprt_complete_rqst(rqst->rq_task, status);
xprt_unpin_rqst(rqst);
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
return;
/* If the incoming reply terminated a pending RPC, the next
@@ -1256,56 +1291,20 @@
out_badheader:
trace_xprtrdma_reply_hdr(rep);
r_xprt->rx_stats.bad_reply_count++;
- status = -EIO;
goto out;
}
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
{
- /* Invalidate and unmap the data payloads before waking
- * the waiting application. This guarantees the memory
- * regions are properly fenced from the server before the
- * application accesses the data. It also ensures proper
- * send flow control: waking the next RPC waits until this
- * RPC has relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
- &req->rl_registered);
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
- /* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
- if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
- r_xprt->rx_stats.reply_waits_for_send++;
- out_of_line_wait_on_bit(&req->rl_flags,
- RPCRDMA_REQ_F_TX_RESOURCES,
- bit_wait,
- TASK_UNINTERRUPTIBLE);
- }
+ rpcrdma_complete_rqst(req->rl_reply);
}
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
-{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
- struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-
- trace_xprtrdma_defer_cmp(rep);
- if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
- r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
- rpcrdma_release_rqst(r_xprt, req);
- rpcrdma_complete_rqst(rep);
-}
-
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
@@ -1320,14 +1319,15 @@
u32 credits;
__be32 *p;
- --buf->rb_posted_receives;
-
- if (rep->rr_hdrbuf.head[0].iov_len == 0)
- goto out_badstatus;
+ /* Any data means we had a useful conversation, so
+ * then we don't need to delay the next reconnect.
+ */
+ if (xprt->reestablish_timeout)
+ xprt->reestablish_timeout = 0;
/* Fixed transport header fields */
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
- rep->rr_hdrbuf.head[0].iov_base);
+ rep->rr_hdrbuf.head[0].iov_base, NULL);
p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
if (unlikely(!p))
goto out_shortreply;
@@ -1345,51 +1345,55 @@
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
if (!rqst)
goto out_norqst;
xprt_pin_rqst(rqst);
+ spin_unlock(&xprt->queue_lock);
if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > buf->rb_max_requests)
credits = buf->rb_max_requests;
- buf->rb_credits = credits;
-
- spin_unlock(&xprt->recv_lock);
+ if (buf->rb_credits != credits) {
+ spin_lock(&xprt->transport_lock);
+ buf->rb_credits = credits;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
+ spin_unlock(&xprt->transport_lock);
+ }
req = rpcr_to_rdmar(rqst);
+ if (req->rl_reply) {
+ trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
+ rpcrdma_recv_buffer_put(req->rl_reply);
+ }
req->rl_reply = rep;
rep->rr_rqst = rqst;
- clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
- rpcrdma_post_recvs(r_xprt, false);
- queue_work(rpcrdma_receive_wq, &rep->rr_work);
+ if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+ frwr_reminv(rep, &req->rl_registered);
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_async(r_xprt, req);
+ /* LocalInv completion will complete the RPC */
+ else
+ kref_put(&req->rl_kref, rpcrdma_reply_done);
return;
out_badversion:
trace_xprtrdma_reply_vers(rep);
- goto repost;
+ goto out;
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
out_norqst:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
trace_xprtrdma_reply_rqst(rep);
- goto repost;
+ goto out;
out_shortreply:
trace_xprtrdma_reply_short(rep);
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
- rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 134bef6..97bca50 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -73,8 +73,6 @@
atomic_t rdma_stat_sq_poll;
atomic_t rdma_stat_sq_prod;
-struct workqueue_struct *svc_rdma_wq;
-
/*
* This function implements reading and resetting an atomic_t stat
* variable through read/write to a proc file. Any write to the file
@@ -230,14 +228,10 @@
void svc_rdma_cleanup(void)
{
dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
- destroy_workqueue(svc_rdma_wq);
if (svcrdma_table_header) {
unregister_sysctl_table(svcrdma_table_header);
svcrdma_table_header = NULL;
}
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
- svc_unreg_xprt_class(&svc_rdma_bc_class);
-#endif
svc_unreg_xprt_class(&svc_rdma_class);
}
@@ -249,18 +243,11 @@
dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests);
dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
- svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0);
- if (!svc_rdma_wq)
- return -ENOMEM;
-
if (!svcrdma_table_header)
svcrdma_table_header =
register_sysctl_table(svcrdma_root_table);
/* Register RDMA with the SVC transport switch */
svc_reg_xprt_class(&svc_rdma_class);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
- svc_reg_xprt_class(&svc_rdma_bc_class);
-#endif
return 0;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index b982766..d1fcc41 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -5,8 +5,6 @@
* Support for backward direction RPCs on RPC/RDMA (server-side).
*/
-#include <linux/module.h>
-
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
@@ -32,7 +30,6 @@
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct kvec *dst, *src = &rcvbuf->head[0];
struct rpc_rqst *req;
- unsigned long cwnd;
u32 credits;
size_t len;
__be32 xid;
@@ -56,7 +53,7 @@
if (src->iov_len < 24)
goto out_shortreply;
- spin_lock(&xprt->recv_lock);
+ spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, xid);
if (!req)
goto out_notfound;
@@ -66,6 +63,8 @@
if (dst->iov_len < len)
goto out_unlock;
memcpy(dst->iov_base, p, len);
+ xprt_pin_rqst(req);
+ spin_unlock(&xprt->queue_lock);
credits = be32_to_cpup(rdma_resp + 2);
if (credits == 0)
@@ -73,20 +72,18 @@
else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
credits = r_xprt->rx_buf.rb_bc_max_requests;
- spin_lock_bh(&xprt->transport_lock);
- cwnd = xprt->cwnd;
+ spin_lock(&xprt->transport_lock);
xprt->cwnd = credits << RPC_CWNDSHIFT;
- if (xprt->cwnd > cwnd)
- xprt_release_rqst_cong(req->rq_task);
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
-
+ spin_lock(&xprt->queue_lock);
ret = 0;
xprt_complete_rqst(req->rq_task, rcvbuf->len);
+ xprt_unpin_rqst(req);
rcvbuf->len = 0;
out_unlock:
- spin_unlock(&xprt->recv_lock);
+ spin_unlock(&xprt->queue_lock);
out:
return ret;
@@ -203,11 +200,10 @@
svc_rdma_send_ctxt_put(rdma, ctxt);
goto drop_connection;
}
- return rc;
+ return 0;
drop_connection:
dprintk("svcrdma: failed to send bc call\n");
- xprt_disconnect_done(xprt);
return -ENOTCONN;
}
@@ -215,9 +211,8 @@
* connection.
*/
static int
-xprt_rdma_bc_send_request(struct rpc_task *task)
+xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
{
- struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
struct svcxprt_rdma *rdma;
int ret;
@@ -225,17 +220,15 @@
dprintk("svcrdma: sending bc call with xid: %08x\n",
be32_to_cpu(rqst->rq_xid));
- if (!mutex_trylock(&sxprt->xpt_mutex)) {
- rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
- if (!mutex_trylock(&sxprt->xpt_mutex))
- return -EAGAIN;
- rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
- }
+ mutex_lock(&sxprt->xpt_mutex);
ret = -ENOTCONN;
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
- if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+ if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
ret = rpcrdma_bc_send_request(rdma, rqst);
+ if (ret == -ENOTCONN)
+ svc_close_xprt(sxprt);
+ }
mutex_unlock(&sxprt->xpt_mutex);
@@ -257,7 +250,6 @@
dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
xprt_free(xprt);
- module_put(THIS_MODULE);
}
static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
@@ -269,7 +261,7 @@
.buf_alloc = xprt_rdma_bc_allocate,
.buf_free = xprt_rdma_bc_free,
.send_request = xprt_rdma_bc_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xprt_rdma_bc_close,
.destroy = xprt_rdma_bc_put,
.print_stats = xprt_rdma_print_stats
@@ -312,7 +304,6 @@
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
xprt->prot = XPRT_TRANSPORT_BC_RDMA;
- xprt->tsh_size = 0;
xprt->ops = &xprt_rdma_bc_procs;
memcpy(&xprt->addr, args->dstaddr, args->addrlen);
@@ -329,20 +320,9 @@
args->bc_xprt->xpt_bc_xprt = xprt;
xprt->bc_xprt = args->bc_xprt;
- if (!try_module_get(THIS_MODULE))
- goto out_fail;
-
/* Final put for backchannel xprt is in __svc_rdma_free */
xprt_get(xprt);
return xprt;
-
-out_fail:
- xprt_rdma_free_addresses(xprt);
- args->bc_xprt->xpt_bc_xprt = NULL;
- args->bc_xprt->xpt_bc_xps = NULL;
- xprt_put(xprt);
- xprt_free(xprt);
- return ERR_PTR(-EINVAL);
}
struct xprt_class xprt_rdma_bc = {
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index b24d5b8..96bccd3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -172,9 +172,10 @@
void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
+ struct llist_node *node;
- while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) {
- list_del(&ctxt->rc_list);
+ while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
+ ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
svc_rdma_recv_ctxt_destroy(rdma, ctxt);
}
}
@@ -183,21 +184,18 @@
svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
+ struct llist_node *node;
- spin_lock(&rdma->sc_recv_lock);
- ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts);
- if (!ctxt)
+ node = llist_del_first(&rdma->sc_recv_ctxts);
+ if (!node)
goto out_empty;
- list_del(&ctxt->rc_list);
- spin_unlock(&rdma->sc_recv_lock);
+ ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
out:
ctxt->rc_page_count = 0;
return ctxt;
out_empty:
- spin_unlock(&rdma->sc_recv_lock);
-
ctxt = svc_rdma_recv_ctxt_alloc(rdma);
if (!ctxt)
return NULL;
@@ -218,11 +216,9 @@
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
- if (!ctxt->rc_temp) {
- spin_lock(&rdma->sc_recv_lock);
- list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts);
- spin_unlock(&rdma->sc_recv_lock);
- } else
+ if (!ctxt->rc_temp)
+ llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
+ else
svc_rdma_recv_ctxt_destroy(rdma, ctxt);
}
@@ -272,11 +268,8 @@
return false;
ctxt->rc_temp = true;
ret = __svc_rdma_post_recv(rdma, ctxt);
- if (ret) {
- pr_err("svcrdma: failure posting recv buffers: %d\n",
- ret);
+ if (ret)
return false;
- }
}
return true;
}
@@ -314,17 +307,14 @@
spin_lock(&rdma->sc_rq_dto_lock);
list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
- spin_unlock(&rdma->sc_rq_dto_lock);
+ /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ spin_unlock(&rdma->sc_rq_dto_lock);
if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
svc_xprt_enqueue(&rdma->sc_xprt);
goto out;
flushed:
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
post_err:
svc_rdma_recv_ctxt_put(rdma, ctxt);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
@@ -485,6 +475,68 @@
return p;
}
+/* RPC-over-RDMA Version One private extension: Remote Invalidation.
+ * Responder's choice: requester signals it can handle Send With
+ * Invalidate, and responder chooses one R_key to invalidate.
+ *
+ * If there is exactly one distinct R_key in the received transport
+ * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
+ *
+ * Perform this operation while the received transport header is
+ * still in the CPU cache.
+ */
+static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
+ struct svc_rdma_recv_ctxt *ctxt)
+{
+ __be32 inv_rkey, *p;
+ u32 i, segcount;
+
+ ctxt->rc_inv_rkey = 0;
+
+ if (!rdma->sc_snd_w_inv)
+ return;
+
+ inv_rkey = xdr_zero;
+ p = ctxt->rc_recv_buf;
+ p += rpcrdma_fixed_maxsz;
+
+ /* Read list */
+ while (*p++ != xdr_zero) {
+ p++; /* position */
+ if (inv_rkey == xdr_zero)
+ inv_rkey = *p;
+ else if (inv_rkey != *p)
+ return;
+ p += 4;
+ }
+
+ /* Write list */
+ while (*p++ != xdr_zero) {
+ segcount = be32_to_cpup(p++);
+ for (i = 0; i < segcount; i++) {
+ if (inv_rkey == xdr_zero)
+ inv_rkey = *p;
+ else if (inv_rkey != *p)
+ return;
+ p += 4;
+ }
+ }
+
+ /* Reply chunk */
+ if (*p++ != xdr_zero) {
+ segcount = be32_to_cpup(p++);
+ for (i = 0; i < segcount; i++) {
+ if (inv_rkey == xdr_zero)
+ inv_rkey = *p;
+ else if (inv_rkey != *p)
+ return;
+ p += 4;
+ }
+ }
+
+ ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
+}
+
/* On entry, xdr->head[0].iov_base points to first byte in the
* RPC-over-RDMA header.
*
@@ -746,6 +798,7 @@
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return ret;
}
+ svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
p += rpcrdma_fixed_maxsz;
if (*p != xdr_zero)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index dc19517..48fe3b1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -64,8 +64,7 @@
spin_unlock(&rdma->sc_rw_ctxt_lock);
} else {
spin_unlock(&rdma->sc_rw_ctxt_lock);
- ctxt = kmalloc(sizeof(*ctxt) +
- SG_CHUNK_SIZE * sizeof(struct scatterlist),
+ ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
GFP_KERNEL);
if (!ctxt)
goto out;
@@ -74,7 +73,8 @@
ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
- ctxt->rw_sg_table.sgl)) {
+ ctxt->rw_sg_table.sgl,
+ SG_CHUNK_SIZE)) {
kfree(ctxt);
ctxt = NULL;
}
@@ -85,7 +85,7 @@
static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
struct svc_rdma_rw_ctxt *ctxt)
{
- sg_free_table_chained(&ctxt->rw_sg_table, true);
+ sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
spin_lock(&rdma->sc_rw_ctxt_lock);
list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
@@ -213,13 +213,8 @@
atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
wake_up(&rdma->sc_send_wait);
- if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ if (unlikely(wc->status != IB_WC_SUCCESS))
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- }
svc_rdma_write_info_free(info);
}
@@ -278,18 +273,15 @@
if (unlikely(wc->status != IB_WC_SUCCESS)) {
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
} else {
spin_lock(&rdma->sc_rq_dto_lock);
list_add_tail(&info->ri_readctxt->rc_list,
&rdma->sc_read_complete_q);
+ /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
spin_unlock(&rdma->sc_rq_dto_lock);
- set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 8602a5f..6fdba72 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -272,10 +272,6 @@
if (unlikely(wc->status != IB_WC_SUCCESS)) {
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
}
svc_xprt_put(&rdma->sc_xprt);
@@ -484,32 +480,6 @@
*reply = NULL;
}
-/* RPC-over-RDMA Version One private extension: Remote Invalidation.
- * Responder's choice: requester signals it can handle Send With
- * Invalidate, and responder chooses one rkey to invalidate.
- *
- * Find a candidate rkey to invalidate when sending a reply. Picks the
- * first R_key it finds in the chunk lists.
- *
- * Returns zero if RPC's chunk lists are empty.
- */
-static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
- __be32 *wr_lst, __be32 *rp_ch)
-{
- __be32 *p;
-
- p = rdma_argp + rpcrdma_fixed_maxsz;
- if (*p != xdr_zero)
- p += 2;
- else if (wr_lst && be32_to_cpup(wr_lst + 1))
- p = wr_lst + 2;
- else if (rp_ch && be32_to_cpup(rp_ch + 1))
- p = rp_ch + 2;
- else
- return 0;
- return be32_to_cpup(p);
-}
-
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct page *page,
@@ -563,6 +533,99 @@
DMA_TO_DEVICE);
}
+/* If the xdr_buf has more elements than the device can
+ * transmit in a single RDMA Send, then the reply will
+ * have to be copied into a bounce buffer.
+ */
+static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
+ struct xdr_buf *xdr,
+ __be32 *wr_lst)
+{
+ int elements;
+
+ /* xdr->head */
+ elements = 1;
+
+ /* xdr->pages */
+ if (!wr_lst) {
+ unsigned int remaining;
+ unsigned long pageoff;
+
+ pageoff = xdr->page_base & ~PAGE_MASK;
+ remaining = xdr->page_len;
+ while (remaining) {
+ ++elements;
+ remaining -= min_t(u32, PAGE_SIZE - pageoff,
+ remaining);
+ pageoff = 0;
+ }
+ }
+
+ /* xdr->tail */
+ if (xdr->tail[0].iov_len)
+ ++elements;
+
+ /* assume 1 SGE is needed for the transport header */
+ return elements >= rdma->sc_max_send_sges;
+}
+
+/* The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header
+ * buffer.
+ */
+static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt,
+ struct xdr_buf *xdr, __be32 *wr_lst)
+{
+ unsigned char *dst, *tailbase;
+ unsigned int taillen;
+
+ dst = ctxt->sc_xprt_buf;
+ dst += ctxt->sc_sges[0].length;
+
+ memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
+ dst += xdr->head[0].iov_len;
+
+ tailbase = xdr->tail[0].iov_base;
+ taillen = xdr->tail[0].iov_len;
+ if (wr_lst) {
+ u32 xdrpad;
+
+ xdrpad = xdr_padsize(xdr->page_len);
+ if (taillen && xdrpad) {
+ tailbase += xdrpad;
+ taillen -= xdrpad;
+ }
+ } else {
+ unsigned int len, remaining;
+ unsigned long pageoff;
+ struct page **ppages;
+
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ pageoff = xdr->page_base & ~PAGE_MASK;
+ remaining = xdr->page_len;
+ while (remaining) {
+ len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+
+ memcpy(dst, page_address(*ppages), len);
+ remaining -= len;
+ dst += len;
+ pageoff = 0;
+ }
+ }
+
+ if (taillen)
+ memcpy(dst, tailbase, taillen);
+
+ ctxt->sc_sges[0].length += xdr->len;
+ ib_dma_sync_single_for_device(rdma->sc_pd->device,
+ ctxt->sc_sges[0].addr,
+ ctxt->sc_sges[0].length,
+ DMA_TO_DEVICE);
+
+ return 0;
+}
+
/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
* @rdma: controlling transport
* @ctxt: send_ctxt for the Send WR
@@ -585,8 +648,10 @@
u32 xdr_pad;
int ret;
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
+ if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
+ return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+
+ ++ctxt->sc_cur_sge_no;
ret = svc_rdma_dma_map_buf(rdma, ctxt,
xdr->head[0].iov_base,
xdr->head[0].iov_len);
@@ -617,8 +682,7 @@
while (remaining) {
len = min_t(u32, PAGE_SIZE - page_off, remaining);
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
+ ++ctxt->sc_cur_sge_no;
ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
page_off, len);
if (ret < 0)
@@ -632,8 +696,7 @@
len = xdr->tail[0].iov_len;
tail:
if (len) {
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
+ ++ctxt->sc_cur_sge_no;
ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
if (ret < 0)
return ret;
@@ -672,7 +735,7 @@
*
* RDMA Send is the last step of transmitting an RPC reply. Pages
* involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the ctxt's page array. These pages are
+ * of the rqstp and into the sctxt's page array. These pages are
* DMA unmapped by each Write completion, but the subsequent Send
* completion finally releases these pages.
*
@@ -680,32 +743,31 @@
* - The Reply's transport header will never be larger than a page.
*/
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- __be32 *rdma_argp,
+ struct svc_rdma_send_ctxt *sctxt,
+ struct svc_rdma_recv_ctxt *rctxt,
struct svc_rqst *rqstp,
__be32 *wr_lst, __be32 *rp_ch)
{
int ret;
if (!rp_ch) {
- ret = svc_rdma_map_reply_msg(rdma, ctxt,
+ ret = svc_rdma_map_reply_msg(rdma, sctxt,
&rqstp->rq_res, wr_lst);
if (ret < 0)
return ret;
}
- svc_rdma_save_io_pages(rqstp, ctxt);
+ svc_rdma_save_io_pages(rqstp, sctxt);
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
- if (rdma->sc_snd_w_inv) {
- ctxt->sc_send_wr.ex.invalidate_rkey =
- svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
- if (ctxt->sc_send_wr.ex.invalidate_rkey)
- ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+ if (rctxt->rc_inv_rkey) {
+ sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+ sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+ } else {
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
}
dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- ctxt->sc_send_wr.num_sge);
- return svc_rdma_send(rdma, &ctxt->sc_send_wr);
+ sctxt->sc_send_wr.num_sge);
+ return svc_rdma_send(rdma, &sctxt->sc_send_wr);
}
/* Given the client-provided Write and Reply chunks, the server was not
@@ -741,10 +803,6 @@
return 0;
}
-void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
-{
-}
-
/**
* svc_rdma_sendto - Transmit an RPC reply
* @rqstp: processed RPC request, reply XDR already in ::rq_res
@@ -809,7 +867,7 @@
}
svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
- ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
+ ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
wr_lst, rp_ch);
if (ret < 0)
goto err1;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 2848caf..145a361 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -85,7 +85,6 @@
.xpo_release_rqst = svc_rdma_release_rqst,
.xpo_detach = svc_rdma_detach,
.xpo_free = svc_rdma_free,
- .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
.xpo_has_wspace = svc_rdma_has_wspace,
.xpo_accept = svc_rdma_accept,
.xpo_secure_port = svc_rdma_secure_port,
@@ -100,64 +99,6 @@
.xcl_ident = XPRT_TRANSPORT_RDMA,
};
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
- struct sockaddr *, int, int);
-static void svc_rdma_bc_detach(struct svc_xprt *);
-static void svc_rdma_bc_free(struct svc_xprt *);
-
-static const struct svc_xprt_ops svc_rdma_bc_ops = {
- .xpo_create = svc_rdma_bc_create,
- .xpo_detach = svc_rdma_bc_detach,
- .xpo_free = svc_rdma_bc_free,
- .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
- .xpo_secure_port = svc_rdma_secure_port,
-};
-
-struct svc_xprt_class svc_rdma_bc_class = {
- .xcl_name = "rdma-bc",
- .xcl_owner = THIS_MODULE,
- .xcl_ops = &svc_rdma_bc_ops,
- .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
-};
-
-static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
- struct net *net,
- struct sockaddr *sa, int salen,
- int flags)
-{
- struct svcxprt_rdma *cma_xprt;
- struct svc_xprt *xprt;
-
- cma_xprt = svc_rdma_create_xprt(serv, net);
- if (!cma_xprt)
- return ERR_PTR(-ENOMEM);
- xprt = &cma_xprt->sc_xprt;
-
- svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
- set_bit(XPT_CONG_CTRL, &xprt->xpt_flags);
- serv->sv_bc_xprt = xprt;
-
- dprintk("svcrdma: %s(%p)\n", __func__, xprt);
- return xprt;
-}
-
-static void svc_rdma_bc_detach(struct svc_xprt *xprt)
-{
- dprintk("svcrdma: %s(%p)\n", __func__, xprt);
-}
-
-static void svc_rdma_bc_free(struct svc_xprt *xprt)
-{
- struct svcxprt_rdma *rdma =
- container_of(xprt, struct svcxprt_rdma, sc_xprt);
-
- dprintk("svcrdma: %s(%p)\n", __func__, xprt);
- if (xprt)
- kfree(rdma);
-}
-#endif /* CONFIG_SUNRPC_BACKCHANNEL */
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
@@ -199,14 +140,13 @@
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
- INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
+ init_llist_head(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
spin_lock_init(&cma_xprt->sc_send_lock);
- spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
/*
@@ -270,9 +210,14 @@
/* Save client advertised inbound read limit for use later in accept. */
newxprt->sc_ord = param->initiator_depth;
- /* Set the local and remote addresses in the transport */
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ /* The remote port is arbitrary and not under the control of the
+ * client ULP. Set it to a fixed value so that the DRC continues
+ * to be effective after a reconnect.
+ */
+ rpc_set_port((struct sockaddr *)&newxprt->sc_xprt.xpt_remote, 0);
+
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
@@ -280,9 +225,9 @@
* Enqueue the new transport on the accept queue of the listening
* transport
*/
- spin_lock_bh(&listen_xprt->sc_lock);
+ spin_lock(&listen_xprt->sc_lock);
list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
- spin_unlock_bh(&listen_xprt->sc_lock);
+ spin_unlock(&listen_xprt->sc_lock);
set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
svc_xprt_enqueue(&listen_xprt->sc_xprt);
@@ -449,13 +394,13 @@
struct ib_qp_init_attr qp_attr;
unsigned int ctxts, rq_depth;
struct ib_device *dev;
- struct sockaddr *sap;
int ret = 0;
+ RPC_IFDEBUG(struct sockaddr *sap);
listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
clear_bit(XPT_CONN, &xprt->xpt_flags);
/* Get the next entry off the accept list */
- spin_lock_bh(&listen_rdma->sc_lock);
+ spin_lock(&listen_rdma->sc_lock);
if (!list_empty(&listen_rdma->sc_accept_q)) {
newxprt = list_entry(listen_rdma->sc_accept_q.next,
struct svcxprt_rdma, sc_accept_q);
@@ -463,7 +408,7 @@
}
if (!list_empty(&listen_rdma->sc_accept_q))
set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
- spin_unlock_bh(&listen_rdma->sc_lock);
+ spin_unlock(&listen_rdma->sc_lock);
if (!newxprt)
return NULL;
@@ -475,13 +420,12 @@
/* Qualify the transport resource defaults with the
* capabilities of this particular device */
- newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
- /* transport hdr, head iovec, one page list entry, tail iovec */
- if (newxprt->sc_max_send_sges < 4) {
- pr_err("svcrdma: too few Send SGEs available (%d)\n",
- newxprt->sc_max_send_sges);
- goto errout;
- }
+ /* Transport header, head iovec, tail iovec */
+ newxprt->sc_max_send_sges = 3;
+ /* Add one SGE per page list entry */
+ newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1;
+ if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
+ newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
newxprt->sc_max_req_size = svcrdma_max_req_size;
newxprt->sc_max_requests = svcrdma_max_requests;
newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
@@ -509,14 +453,14 @@
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
- newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
- 0, IB_POLL_WORKQUEUE);
+ newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, rq_depth,
- 0, IB_POLL_WORKQUEUE);
+ newxprt->sc_rq_cq =
+ ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -585,6 +529,7 @@
if (ret)
goto errout;
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
dprintk("svcrdma: new connection %p accepted:\n", newxprt);
sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap));
@@ -595,6 +540,7 @@
dprintk(" rdma_rw_ctxs : %d\n", ctxts);
dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
dprintk(" ord : %d\n", conn_param.initiator_depth);
+#endif
trace_svcrdma_xprt_accept(&newxprt->sc_xprt);
return &newxprt->sc_xprt;
@@ -648,11 +594,6 @@
if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
ib_drain_qp(rdma->sc_qp);
- /* We should only be called from kref_put */
- if (kref_read(&xprt->xpt_ref) != 0)
- pr_err("svcrdma: sc_xprt still in use? (%d)\n",
- kref_read(&xprt->xpt_ref));
-
svc_rdma_flush_recv_queues(rdma);
/* Final put of backchannel client transport */
@@ -688,8 +629,9 @@
{
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
INIT_WORK(&rdma->sc_work, __svc_rdma_free);
- queue_work(svc_rdma_wq, &rdma->sc_work);
+ schedule_work(&rdma->sc_work);
}
static int svc_rdma_has_wspace(struct svc_xprt *xprt)
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 98cbc7b..160558b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -68,9 +68,9 @@
* tunables
*/
-static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
-static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRWR;
int xprt_rdma_pad_optimize;
@@ -80,7 +80,6 @@
static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
-static unsigned int zero;
static unsigned int max_padding = PAGE_SIZE;
static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
static unsigned int max_memreg = RPCRDMA_LAST - 1;
@@ -122,7 +121,7 @@
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
- .extra1 = &zero,
+ .extra1 = SYSCTL_ZERO,
.extra2 = &max_padding,
},
{
@@ -225,82 +224,70 @@
}
}
-void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
-{
- schedule_delayed_work(&ep->rep_connect_worker, 0);
-}
-
-void
-rpcrdma_connect_worker(struct work_struct *work)
-{
- struct rpcrdma_ep *ep =
- container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
- struct rpcrdma_xprt *r_xprt =
- container_of(ep, struct rpcrdma_xprt, rx_ep);
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-
- spin_lock_bh(&xprt->transport_lock);
- if (ep->rep_connected > 0) {
- if (!xprt_test_and_set_connected(xprt))
- xprt_wake_pending_tasks(xprt, 0);
- } else {
- if (xprt_test_and_clear_connected(xprt))
- xprt_wake_pending_tasks(xprt, -ENOTCONN);
- }
- spin_unlock_bh(&xprt->transport_lock);
-}
-
+/**
+ * xprt_rdma_connect_worker - establish connection in the background
+ * @work: worker thread context
+ *
+ * Requester holds the xprt's send lock to prevent activity on this
+ * transport while a fresh connection is being established. RPC tasks
+ * sleep on the xprt's pending queue waiting for connect to complete.
+ */
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
rx_connect_worker.work);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- int rc = 0;
-
- xprt_clear_connected(xprt);
+ int rc;
rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
- if (rc)
- xprt_wake_pending_tasks(xprt, rc);
-
xprt_clear_connecting(xprt);
+ if (r_xprt->rx_ep.rep_connected > 0) {
+ if (!xprt_test_and_set_connected(xprt)) {
+ xprt->stat.connect_count++;
+ xprt->stat.connect_time += (long)jiffies -
+ xprt->stat.connect_start;
+ xprt_wake_pending_tasks(xprt, -EAGAIN);
+ }
+ } else {
+ if (xprt_test_and_clear_connected(xprt))
+ xprt_wake_pending_tasks(xprt, rc);
+ }
}
+/**
+ * xprt_rdma_inject_disconnect - inject a connection fault
+ * @xprt: transport context
+ *
+ * If @xprt is connected, disconnect it to simulate spurious connection
+ * loss.
+ */
static void
xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
- rx_xprt);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- trace_xprtrdma_inject_dsc(r_xprt);
+ trace_xprtrdma_op_inject_dsc(r_xprt);
rdma_disconnect(r_xprt->rx_ia.ri_id);
}
-/*
- * xprt_rdma_destroy
+/**
+ * xprt_rdma_destroy - Full tear down of transport
+ * @xprt: doomed transport context
*
- * Destroy the xprt.
- * Free all memory associated with the object, including its own.
- * NOTE: none of the *destroy methods free memory for their top-level
- * objects, even though they may have allocated it (they do free
- * private memory). It's up to the caller to handle it. In this
- * case (RDMA transport), all structure memory is inlined with the
- * struct rpcrdma_xprt.
+ * Caller guarantees there will be no more calls to us with
+ * this @xprt.
*/
static void
xprt_rdma_destroy(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- trace_xprtrdma_destroy(r_xprt);
+ trace_xprtrdma_op_destroy(r_xprt);
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
- xprt_clear_connected(xprt);
-
- rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ rpcrdma_ep_destroy(r_xprt);
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
rpcrdma_ia_close(&r_xprt->rx_ia);
@@ -310,6 +297,7 @@
module_put(THIS_MODULE);
}
+/* 60 second timeout, no retries */
static const struct rpc_timeout xprt_rdma_default_timeout = {
.to_initval = 60 * HZ,
.to_maxval = 60 * HZ,
@@ -323,33 +311,26 @@
static struct rpc_xprt *
xprt_setup_rdma(struct xprt_create *args)
{
- struct rpcrdma_create_data_internal cdata;
struct rpc_xprt *xprt;
struct rpcrdma_xprt *new_xprt;
- struct rpcrdma_ep *new_ep;
struct sockaddr *sap;
int rc;
- if (args->addrlen > sizeof(xprt->addr)) {
- dprintk("RPC: %s: address too large\n", __func__);
+ if (args->addrlen > sizeof(xprt->addr))
return ERR_PTR(-EBADF);
- }
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
- if (xprt == NULL) {
- dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
- __func__);
+ if (!xprt)
return ERR_PTR(-ENOMEM);
- }
- /* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout;
+ xprt->connect_timeout = xprt->timeout->to_initval;
+ xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
xprt->bind_timeout = RPCRDMA_BIND_TO;
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
xprt->resvport = 0; /* privileged port not needed */
- xprt->tsh_size = 0; /* RPC-RDMA handles framing */
xprt->ops = &xprt_rdma_procs;
/*
@@ -367,40 +348,12 @@
xprt_set_bound(xprt);
xprt_rdma_format_addresses(xprt, sap);
- cdata.max_requests = xprt_rdma_slot_table_entries;
-
- cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
- cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
-
- cdata.inline_wsize = xprt_rdma_max_inline_write;
- if (cdata.inline_wsize > cdata.wsize)
- cdata.inline_wsize = cdata.wsize;
-
- cdata.inline_rsize = xprt_rdma_max_inline_read;
- if (cdata.inline_rsize > cdata.rsize)
- cdata.inline_rsize = cdata.rsize;
-
- /*
- * Create new transport instance, which includes initialized
- * o ia
- * o endpoint
- * o buffers
- */
-
new_xprt = rpcx_to_rdmax(xprt);
-
rc = rpcrdma_ia_open(new_xprt);
if (rc)
goto out1;
- /*
- * initialize and create ep
- */
- new_xprt->rx_data = cdata;
- new_ep = &new_xprt->rx_ep;
-
- rc = rpcrdma_ep_create(&new_xprt->rx_ep,
- &new_xprt->rx_ia, &new_xprt->rx_data);
+ rc = rpcrdma_ep_create(new_xprt);
if (rc)
goto out2;
@@ -411,7 +364,7 @@
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
- xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
+ xprt->max_payload = frwr_maxpages(new_xprt);
if (xprt->max_payload == 0)
goto out4;
xprt->max_payload <<= PAGE_SHIFT;
@@ -431,42 +384,45 @@
rpcrdma_buffer_destroy(&new_xprt->rx_buf);
rc = -ENODEV;
out3:
- rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+ rpcrdma_ep_destroy(new_xprt);
out2:
rpcrdma_ia_close(&new_xprt->rx_ia);
out1:
- trace_xprtrdma_destroy(new_xprt);
+ trace_xprtrdma_op_destroy(new_xprt);
xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
return ERR_PTR(rc);
}
/**
- * xprt_rdma_close - Close down RDMA connection
- * @xprt: generic transport to be closed
+ * xprt_rdma_close - close a transport connection
+ * @xprt: transport context
*
- * Called during transport shutdown reconnect, or device
- * removal. Caller holds the transport's write lock.
+ * Called during autoclose or device removal.
+ *
+ * Caller holds @xprt's send lock to prevent activity on this
+ * transport while the connection is torn down.
*/
-static void
-xprt_rdma_close(struct rpc_xprt *xprt)
+void xprt_rdma_close(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- dprintk("RPC: %s: closing xprt %p\n", __func__, xprt);
+ might_sleep();
+
+ trace_xprtrdma_op_close(r_xprt);
+
+ /* Prevent marshaling and sending of new requests */
+ xprt_clear_connected(xprt);
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
- xprt_clear_connected(xprt);
rpcrdma_ia_remove(ia);
- return;
+ goto out;
}
+
if (ep->rep_connected == -ENODEV)
return;
- if (ep->rep_connected > 0)
- xprt->reestablish_timeout = 0;
- xprt_disconnect_done(xprt);
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
@@ -474,6 +430,11 @@
*/
r_xprt->rx_buf.rb_credits = 1;
xprt->cwnd = RPC_CWNDSHIFT;
+
+out:
+ xprt->reestablish_timeout = 0;
+ ++xprt->connect_cookie;
+ xprt_disconnect_done(xprt);
}
/**
@@ -525,25 +486,65 @@
xprt_force_disconnect(xprt);
}
+/**
+ * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
+ * @xprt: controlling transport instance
+ * @connect_timeout: reconnect timeout after client disconnects
+ * @reconnect_timeout: reconnect timeout after server disconnects
+ *
+ */
+static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
+ unsigned long connect_timeout,
+ unsigned long reconnect_timeout)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
+
+ spin_lock(&xprt->transport_lock);
+
+ if (connect_timeout < xprt->connect_timeout) {
+ struct rpc_timeout to;
+ unsigned long initval;
+
+ to = *xprt->timeout;
+ initval = connect_timeout;
+ if (initval < RPCRDMA_INIT_REEST_TO << 1)
+ initval = RPCRDMA_INIT_REEST_TO << 1;
+ to.to_initval = initval;
+ to.to_maxval = initval;
+ r_xprt->rx_timeout = to;
+ xprt->timeout = &r_xprt->rx_timeout;
+ xprt->connect_timeout = connect_timeout;
+ }
+
+ if (reconnect_timeout < xprt->max_reconnect_timeout)
+ xprt->max_reconnect_timeout = reconnect_timeout;
+
+ spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * xprt_rdma_connect - schedule an attempt to reconnect
+ * @xprt: transport state
+ * @task: RPC scheduler context (unused)
+ *
+ */
static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ unsigned long delay;
+ trace_xprtrdma_op_connect(r_xprt);
+
+ delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) {
- /* Reconnect */
- schedule_delayed_work(&r_xprt->rx_connect_worker,
- xprt->reestablish_timeout);
- xprt->reestablish_timeout <<= 1;
- if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
- else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
- } else {
- schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
- if (!RPC_IS_ASYNC(task))
- flush_delayed_work(&r_xprt->rx_connect_worker);
+ delay = xprt_reconnect_delay(xprt);
+ xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
+ queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
+ delay);
}
/**
@@ -569,6 +570,7 @@
return;
out_sleep:
+ set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
}
@@ -582,57 +584,24 @@
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
- rpc_wake_up_next(&xprt->backlog);
+ rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+ if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
+ clear_bit(XPRT_CONGESTED, &xprt->state);
}
-static bool
-rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- size_t size, gfp_t flags)
+static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb, size_t size,
+ gfp_t flags)
{
- struct rpcrdma_regbuf *rb;
-
- if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
- return true;
-
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
- if (IS_ERR(rb))
- return false;
-
- rpcrdma_free_regbuf(req->rl_sendbuf);
- r_xprt->rx_stats.hardway_register_count += size;
- req->rl_sendbuf = rb;
- return true;
-}
-
-/* The rq_rcv_buf is used only if a Reply chunk is necessary.
- * The decision to use a Reply chunk is made later in
- * rpcrdma_marshal_req. This buffer is registered at that time.
- *
- * Otherwise, the associated RPC Reply arrives in a separate
- * Receive buffer, arbitrarily chosen by the HCA. The buffer
- * allocated here for the RPC Reply is not utilized in that
- * case. See rpcrdma_inline_fixup.
- *
- * A regbuf is used here to remember the buffer size.
- */
-static bool
-rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- size_t size, gfp_t flags)
-{
- struct rpcrdma_regbuf *rb;
-
- if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
- return true;
-
- rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
- if (IS_ERR(rb))
- return false;
-
- rpcrdma_free_regbuf(req->rl_recvbuf);
- r_xprt->rx_stats.hardway_register_count += size;
- req->rl_recvbuf = rb;
+ if (unlikely(rdmab_length(rb) < size)) {
+ if (!rpcrdma_regbuf_realloc(rb, size, flags))
+ return false;
+ r_xprt->rx_stats.hardway_register_count += size;
+ }
return true;
}
@@ -644,13 +613,6 @@
* 0: Success; rq_buffer points to RPC buffer to use
* ENOMEM: Out of memory, call again later
* EIO: A permanent error occurred, do not retry
- *
- * The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual
- * send/recv sequence.
- *
- * xprt_rdma_allocate provides buffers that are already mapped for
- * DMA, and a local DMA lkey is provided for each.
*/
static int
xprt_rdma_allocate(struct rpc_task *task)
@@ -664,18 +626,20 @@
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
- if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+ if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
+ flags))
goto out_fail;
- if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+ if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
+ flags))
goto out_fail;
- rqst->rq_buffer = req->rl_sendbuf->rg_base;
- rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
- trace_xprtrdma_allocate(task, req);
+ rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
+ rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
+ trace_xprtrdma_op_allocate(task, req);
return 0;
out_fail:
- trace_xprtrdma_allocate(task, NULL);
+ trace_xprtrdma_op_allocate(task, NULL);
return -ENOMEM;
}
@@ -692,14 +656,21 @@
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
- rpcrdma_release_rqst(r_xprt, req);
- trace_xprtrdma_rpc_done(task, req);
+ trace_xprtrdma_op_free(task, req);
+
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_sync(r_xprt, req);
+
+ /* XXX: If the RPC is completing because of a signal and
+ * not because a reply was received, we ought to ensure
+ * that the Send completion has fired, so that memory
+ * involved with the Send is not still visible to the NIC.
+ */
}
/**
* xprt_rdma_send_request - marshal and send an RPC request
- * @task: RPC task with an RPC message in rq_snd_buf
+ * @rqst: RPC message in rq_snd_buf
*
* Caller holds the transport's write lock.
*
@@ -708,13 +679,14 @@
* %-ENOTCONN if the caller should reconnect and call again
* %-EAGAIN if the caller should call again
* %-ENOBUFS if the caller should call again after a delay
- * %-EIO if a permanent error occurred and the request was not
- * sent. Do not try to send this message again.
+ * %-EMSGSIZE if encoding ran out of buffer space. The request
+ * was not sent. Do not try to send this message again.
+ * %-EIO if an I/O error occurred. The request was not sent.
+ * Do not try to send this message again.
*/
static int
-xprt_rdma_send_request(struct rpc_task *task)
+xprt_rdma_send_request(struct rpc_rqst *rqst)
{
- struct rpc_rqst *rqst = task->tk_rqstp;
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -726,7 +698,10 @@
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
if (!xprt_connected(xprt))
- goto drop_connection;
+ return -ENOTCONN;
+
+ if (!xprt_request_get_cong(xprt, rqst))
+ return -EBADSLT;
rc = rpcrdma_marshal_req(r_xprt, rqst);
if (rc < 0)
@@ -737,17 +712,15 @@
goto drop_connection;
rqst->rq_xtime = ktime_get();
- __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
- rqst->rq_bytes_sent = 0;
/* An RPC with no reply will throw off credit accounting,
* so drop the connection to reset the credit grant.
*/
- if (!rpc_reply_expected(task))
+ if (!rpc_reply_expected(rqst->rq_task))
goto drop_connection;
return 0;
@@ -755,8 +728,8 @@
if (rc != -ENOTCONN)
return rc;
drop_connection:
- xprt_disconnect_done(xprt);
- return -ENOTCONN; /* implies disconnect */
+ xprt_rdma_close(xprt);
+ return -ENOTCONN;
}
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
@@ -772,7 +745,7 @@
0, /* need a local port? */
xprt->stat.bind_count,
xprt->stat.connect_count,
- xprt->stat.connect_time,
+ xprt->stat.connect_time / HZ,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
@@ -792,7 +765,7 @@
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
- r_xprt->rx_stats.mrs_recovered,
+ r_xprt->rx_stats.mrs_recycled,
r_xprt->rx_stats.mrs_orphaned,
r_xprt->rx_stats.mrs_allocated,
r_xprt->rx_stats.local_inv_needed,
@@ -821,7 +794,7 @@
.alloc_slot = xprt_rdma_alloc_slot,
.free_slot = xprt_rdma_free_slot,
.release_request = xprt_release_rqst_cong, /* ditto */
- .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
+ .wait_for_reply_request = xprt_wait_for_reply_request_def, /* ditto */
.timer = xprt_rdma_timer,
.rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
.set_port = xprt_rdma_set_port,
@@ -831,14 +804,15 @@
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
+ .set_connect_timeout = xprt_rdma_set_connect_timeout,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
.inject_disconnect = xprt_rdma_inject_disconnect,
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
.bc_setup = xprt_rdma_bc_setup,
- .bc_up = xprt_rdma_bc_up,
.bc_maxpayload = xprt_rdma_bc_maxpayload,
+ .bc_num_slots = xprt_rdma_bc_max_slots,
.bc_free_rqst = xprt_rdma_bc_free_rqst,
.bc_destroy = xprt_rdma_bc_destroy,
#endif
@@ -854,58 +828,31 @@
void xprt_rdma_cleanup(void)
{
- int rc;
-
- dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (sunrpc_table_header) {
unregister_sysctl_table(sunrpc_table_header);
sunrpc_table_header = NULL;
}
#endif
- rc = xprt_unregister_transport(&xprt_rdma);
- if (rc)
- dprintk("RPC: %s: xprt_unregister returned %i\n",
- __func__, rc);
- rpcrdma_destroy_wq();
-
- rc = xprt_unregister_transport(&xprt_rdma_bc);
- if (rc)
- dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
- __func__, rc);
+ xprt_unregister_transport(&xprt_rdma);
+ xprt_unregister_transport(&xprt_rdma_bc);
}
int xprt_rdma_init(void)
{
int rc;
- rc = rpcrdma_alloc_wq();
+ rc = xprt_register_transport(&xprt_rdma);
if (rc)
return rc;
- rc = xprt_register_transport(&xprt_rdma);
- if (rc) {
- rpcrdma_destroy_wq();
- return rc;
- }
-
rc = xprt_register_transport(&xprt_rdma_bc);
if (rc) {
xprt_unregister_transport(&xprt_rdma);
- rpcrdma_destroy_wq();
return rc;
}
- dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
-
- dprintk("Defaults:\n");
- dprintk("\tSlots %d\n"
- "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
- xprt_rdma_slot_table_entries,
- xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
- dprintk("\tPadding 0\n\tMemreg %d\n", xprt_rdma_memreg_strategy);
-
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (!sunrpc_table_header)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 956a5ea..3a90753 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -53,6 +53,7 @@
#include <linux/slab.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
+#include <linux/log2.h>
#include <asm-generic/barrier.h>
#include <asm/bitops.h>
@@ -74,56 +75,52 @@
* internal functions
*/
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
-static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
+ gfp_t flags);
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
+static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish. ib_drain_qp
+ * handles the drains in the wrong order for us, so open code
+ * them here.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
- struct workqueue_struct *recv_wq;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- recv_wq = alloc_workqueue("xprtrdma_receive",
- WQ_MEM_RECLAIM | WQ_HIGHPRI,
- 0);
- if (!recv_wq)
- return -ENOMEM;
+ /* Flush Receives, then wait for deferred Reply work
+ * to complete.
+ */
+ ib_drain_rq(ia->ri_id->qp);
- rpcrdma_receive_wq = recv_wq;
- return 0;
+ /* Deferred Reply processing might have scheduled
+ * local invalidations.
+ */
+ ib_drain_sq(ia->ri_id->qp);
}
-void
-rpcrdma_destroy_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (rpcrdma_receive_wq) {
- wq = rpcrdma_receive_wq;
- rpcrdma_receive_wq = NULL;
- destroy_workqueue(wq);
- }
-}
-
+/**
+ * rpcrdma_qp_event_handler - Handle one QP event (error notification)
+ * @event: details of the event
+ * @context: ep that owns QP where event occurred
+ *
+ * Called from the RDMA provider (device driver) possibly in an interrupt
+ * context.
+ */
static void
-rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+rpcrdma_qp_event_handler(struct ib_event *event, void *context)
{
struct rpcrdma_ep *ep = context;
struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
rx_ep);
- trace_xprtrdma_qp_error(r_xprt, event);
- pr_err("rpcrdma: %s on device %s ep %p\n",
- ib_event_msg(event->event), event->device->name, context);
-
- if (ep->rep_connected == 1) {
- ep->rep_connected = -EIO;
- rpcrdma_conn_func(ep);
- wake_up_all(&ep->rep_connect_wait);
- }
+ trace_xprtrdma_qp_event(r_xprt, event);
}
/**
@@ -141,11 +138,6 @@
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
- if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
-
rpcrdma_sendctx_put_locked(sc);
}
@@ -161,11 +153,13 @@
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
rr_cqe);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
- /* WARNING: Only wr_id and status are reliable at this point */
+ /* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_receive(wc);
+ --r_xprt->rx_ep.rep_receive_count;
if (wc->status != IB_WC_SUCCESS)
- goto out_fail;
+ goto out_flushed;
/* status == SUCCESS means all fields in wc are trustworthy */
rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
@@ -176,24 +170,18 @@
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
-out_schedule:
+ rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep);
return;
-out_fail:
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
- goto out_schedule;
+out_flushed:
+ rpcrdma_recv_buffer_put(rep);
}
static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
struct rdma_conn_param *param)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
const struct rpcrdma_connect_private *pmsg = param->private_data;
unsigned int rsize, wsize;
@@ -210,89 +198,96 @@
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
- if (rsize < cdata->inline_rsize)
- cdata->inline_rsize = rsize;
- if (wsize < cdata->inline_wsize)
- cdata->inline_wsize = wsize;
- dprintk("RPC: %s: max send %u, max recv %u\n",
- __func__, cdata->inline_wsize, cdata->inline_rsize);
+ if (rsize < r_xprt->rx_ep.rep_inline_recv)
+ r_xprt->rx_ep.rep_inline_recv = rsize;
+ if (wsize < r_xprt->rx_ep.rep_inline_send)
+ r_xprt->rx_ep.rep_inline_send = wsize;
+ dprintk("RPC: %s: max send %u, max recv %u\n", __func__,
+ r_xprt->rx_ep.rep_inline_send,
+ r_xprt->rx_ep.rep_inline_recv);
rpcrdma_set_max_header_sizes(r_xprt);
}
+/**
+ * rpcrdma_cm_event_handler - Handle RDMA CM events
+ * @id: rdma_cm_id on which an event has occurred
+ * @event: details of the event
+ *
+ * Called with @id's mutex held. Returns 1 if caller should
+ * destroy @id, otherwise 0.
+ */
static int
-rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
- struct rpcrdma_xprt *xprt = id->context;
- struct rpcrdma_ia *ia = &xprt->rx_ia;
- struct rpcrdma_ep *ep = &xprt->rx_ep;
- int connstate = 0;
+ struct rpcrdma_xprt *r_xprt = id->context;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- trace_xprtrdma_conn_upcall(xprt, event);
+ might_sleep();
+
+ trace_xprtrdma_cm_event(r_xprt, event);
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
case RDMA_CM_EVENT_ROUTE_RESOLVED:
ia->ri_async_rc = 0;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_ADDR_ERROR:
ia->ri_async_rc = -EPROTO;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_ROUTE_ERROR:
ia->ri_async_rc = -ENETUNREACH;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
pr_info("rpcrdma: removing device %s for %s:%s\n",
- ia->ri_device->name,
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
+ ia->ri_id->device->name,
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
#endif
set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
ep->rep_connected = -ENODEV;
- xprt_force_disconnect(&xprt->rx_xprt);
+ xprt_force_disconnect(xprt);
wait_for_completion(&ia->ri_remove_done);
ia->ri_id = NULL;
- ia->ri_device = NULL;
/* Return 1 to ensure the core destroys the id. */
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
- ++xprt->rx_xprt.connect_cookie;
- connstate = 1;
- rpcrdma_update_connect_private(xprt, &event->param.conn);
- goto connected;
+ ++xprt->connect_cookie;
+ ep->rep_connected = 1;
+ rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+ wake_up_all(&ep->rep_connect_wait);
+ break;
case RDMA_CM_EVENT_CONNECT_ERROR:
- connstate = -ENOTCONN;
- goto connected;
+ ep->rep_connected = -ENOTCONN;
+ goto disconnected;
case RDMA_CM_EVENT_UNREACHABLE:
- connstate = -ENETUNREACH;
- goto connected;
+ ep->rep_connected = -ENETUNREACH;
+ goto disconnected;
case RDMA_CM_EVENT_REJECTED:
dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
rdma_reject_msg(id, event->status));
- connstate = -ECONNREFUSED;
+ ep->rep_connected = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
- connstate = -EAGAIN;
- goto connected;
+ ep->rep_connected = -EAGAIN;
+ goto disconnected;
case RDMA_CM_EVENT_DISCONNECTED:
- ++xprt->rx_xprt.connect_cookie;
- connstate = -ECONNABORTED;
-connected:
- ep->rep_connected = connstate;
- rpcrdma_conn_func(ep);
+ ep->rep_connected = -ECONNABORTED;
+disconnected:
+ xprt_force_disconnect(xprt);
wake_up_all(&ep->rep_connect_wait);
- /*FALLTHROUGH*/
+ break;
default:
- dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n",
- __func__,
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
- ia->ri_device->name, ia->ri_ops->ro_displayname,
- ep, rdma_event_msg(event->event));
break;
}
+ dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__,
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
+ ia->ri_id->device->name, rdma_event_msg(event->event));
return 0;
}
@@ -308,24 +303,17 @@
init_completion(&ia->ri_done);
init_completion(&ia->ri_remove_done);
- id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
+ id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
xprt, RDMA_PS_TCP, IB_QPT_RC);
- if (IS_ERR(id)) {
- rc = PTR_ERR(id);
- dprintk("RPC: %s: rdma_create_id() failed %i\n",
- __func__, rc);
+ if (IS_ERR(id))
return id;
- }
ia->ri_async_rc = -ETIMEDOUT;
rc = rdma_resolve_addr(id, NULL,
(struct sockaddr *)&xprt->rx_xprt.addr,
RDMA_RESOLVE_TIMEOUT);
- if (rc) {
- dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
@@ -338,11 +326,8 @@
ia->ri_async_rc = -ETIMEDOUT;
rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
- if (rc) {
- dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
@@ -381,9 +366,8 @@
rc = PTR_ERR(ia->ri_id);
goto out_err;
}
- ia->ri_device = ia->ri_id->device;
- ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
+ ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
if (IS_ERR(ia->ri_pd)) {
rc = PTR_ERR(ia->ri_pd);
pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
@@ -392,20 +376,12 @@
switch (xprt_rdma_memreg_strategy) {
case RPCRDMA_FRWR:
- if (frwr_is_supported(ia)) {
- ia->ri_ops = &rpcrdma_frwr_memreg_ops;
+ if (frwr_is_supported(ia->ri_id->device))
break;
- }
- /*FALLTHROUGH*/
- case RPCRDMA_MTHCAFMR:
- if (fmr_is_supported(ia)) {
- ia->ri_ops = &rpcrdma_fmr_memreg_ops;
- break;
- }
/*FALLTHROUGH*/
default:
pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
- ia->ri_device->name, xprt_rdma_memreg_strategy);
+ ia->ri_id->device->name, xprt_rdma_memreg_strategy);
rc = -EINVAL;
goto out_err;
}
@@ -432,9 +408,8 @@
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
@@ -444,7 +419,7 @@
* connection is already gone.
*/
if (ia->ri_id->qp) {
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
@@ -456,12 +431,11 @@
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
*/
- list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
- rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
+ rpcrdma_reps_destroy(buf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
- rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
- rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
- rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
rpcrdma_mrs_destroy(buf);
ib_dealloc_pd(ia->ri_pd);
@@ -487,7 +461,6 @@
rdma_destroy_id(ia->ri_id);
}
ia->ri_id = NULL;
- ia->ri_device = NULL;
/* If the pd is still busy, xprtrdma missed freeing a resource */
if (ia->ri_pd && !IS_ERR(ia->ri_pd))
@@ -495,19 +468,26 @@
ia->ri_pd = NULL;
}
-/*
- * Create unconnected endpoint.
+/**
+ * rpcrdma_ep_create - Create unconnected endpoint
+ * @r_xprt: transport to instantiate
+ *
+ * Returns zero on success, or a negative errno.
*/
-int
-rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
- struct rpcrdma_create_data_internal *cdata)
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
struct ib_cq *sendcq, *recvcq;
unsigned int max_sge;
int rc;
- max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
+ ep->rep_max_requests = xprt_rdma_slot_table_entries;
+ ep->rep_inline_send = xprt_rdma_max_inline_write;
+ ep->rep_inline_recv = xprt_rdma_max_inline_read;
+
+ max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
RPCRDMA_MAX_SEND_SGES);
if (max_sge < RPCRDMA_MIN_SEND_SGES) {
pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
@@ -515,11 +495,11 @@
}
ia->ri_max_send_sges = max_sge;
- rc = ia->ri_ops->ro_open(ia, ep, cdata);
+ rc = frwr_open(ia, ep);
if (rc)
return rc;
- ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+ ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_sge = max_sge;
@@ -537,30 +517,24 @@
ep->rep_attr.cap.max_send_sge,
ep->rep_attr.cap.max_recv_sge);
- /* set trigger for requesting send completion */
- ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
- cdata->max_requests >> 2);
+ ep->rep_send_batch = ep->rep_max_requests >> 3;
ep->rep_send_count = ep->rep_send_batch;
init_waitqueue_head(&ep->rep_connect_wait);
- INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
+ ep->rep_receive_count = 0;
- sendcq = ib_alloc_cq(ia->ri_device, NULL,
- ep->rep_attr.cap.max_send_wr + 1,
- 1, IB_POLL_WORKQUEUE);
+ sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ ep->rep_attr.cap.max_send_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
- dprintk("RPC: %s: failed to create send CQ: %i\n",
- __func__, rc);
goto out1;
}
- recvcq = ib_alloc_cq(ia->ri_device, NULL,
- ep->rep_attr.cap.max_recv_wr + 1,
- 0, IB_POLL_WORKQUEUE);
+ recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
- dprintk("RPC: %s: failed to create recv CQ: %i\n",
- __func__, rc);
goto out2;
}
@@ -573,16 +547,16 @@
/* Prepare RDMA-CM private message */
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
- pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
- pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
- pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+ pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
ep->rep_remote_cma.private_data = pmsg;
ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
ep->rep_remote_cma.responder_resources =
- min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
+ min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
/* Limit transport retries so client can detect server
* GID changes quickly. RPC layer handles re-establishing
@@ -605,17 +579,15 @@
return rc;
}
-/*
- * rpcrdma_ep_destroy
+/**
+ * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
+ * @r_xprt: transport instance to shut down
*
- * Disconnect and destroy endpoint. After this, the only
- * valid operations on the ep are to free it (if dynamically
- * allocated) or re-create it.
*/
-void
-rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
{
- cancel_delayed_work_sync(&ep->rep_connect_worker);
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
if (ia->ri_id && ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
@@ -633,10 +605,10 @@
* Unlike a normal reconnection, a fresh PD and a new set
* of MRs and buffers is needed.
*/
-static int
-rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
int rc, err;
trace_xprtrdma_reinsert(r_xprt);
@@ -646,14 +618,14 @@
goto out1;
rc = -ENOMEM;
- err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
+ err = rpcrdma_ep_create(r_xprt);
if (err) {
pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
goto out2;
}
rc = -ENETUNREACH;
- err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
if (err) {
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
@@ -663,23 +635,23 @@
return 0;
out3:
- rpcrdma_ep_destroy(ep, ia);
+ rpcrdma_ep_destroy(r_xprt);
out2:
rpcrdma_ia_close(ia);
out1:
return rc;
}
-static int
-rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
- struct rpcrdma_ia *ia)
+static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rdma_cm_id *id, *old;
int err, rc;
trace_xprtrdma_reconnect(r_xprt);
- rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
id = rpcrdma_create_id(r_xprt, ia);
@@ -696,17 +668,14 @@
*/
old = id;
rc = -ENETUNREACH;
- if (ia->ri_device != id->device) {
+ if (ia->ri_id->device != id->device) {
pr_err("rpcrdma: can't reconnect on different device!\n");
goto out_destroy;
}
- err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
- if (err) {
- dprintk("RPC: %s: rdma_create_qp returned %d\n",
- __func__, err);
+ err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
+ if (err)
goto out_destroy;
- }
/* Atomically replace the transport's ID and QP. */
rc = 0;
@@ -728,41 +697,43 @@
{
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct ib_qp_init_attr qp_init_attr;
int rc;
retry:
+ memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
dprintk("RPC: %s: connecting...\n", __func__);
- rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
- dprintk("RPC: %s: rdma_create_qp failed %i\n",
- __func__, rc);
rc = -ENETUNREACH;
goto out_noupdate;
}
break;
case -ENODEV:
- rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+ rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
if (rc)
goto out_noupdate;
break;
default:
- rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+ rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
if (rc)
goto out;
}
ep->rep_connected = 0;
+ xprt_clear_connected(xprt);
+
rpcrdma_post_recvs(r_xprt, true);
rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
- if (rc) {
- dprintk("RPC: %s: rdma_connect() failed with %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
+ if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
if (ep->rep_connected <= 0) {
if (ep->rep_connected == -EAGAIN)
@@ -781,8 +752,10 @@
return rc;
}
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
*
* This is separate from destroy to facilitate the ability
* to reconnect without recreating the endpoint.
@@ -793,19 +766,20 @@
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
+ struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+ rx_ep);
int rc;
+ /* returns without wait if ID is not connected */
rc = rdma_disconnect(ia->ri_id);
if (!rc)
- /* returns without wait if not connected */
wait_event_interruptible(ep->rep_connect_wait,
ep->rep_connected != 1);
else
ep->rep_connected = rc;
- trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
- rx_ep), rc);
+ trace_xprtrdma_disconnect(r_xprt, rc);
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -822,8 +796,8 @@
*/
/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
- * queue activity, and ib_drain_qp has flushed all remaining Send
- * requests.
+ * queue activity, and rpcrdma_xprt_drain has flushed all remaining
+ * Send requests.
*/
static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
{
@@ -838,8 +812,7 @@
{
struct rpcrdma_sendctx *sc;
- sc = kzalloc(sizeof(*sc) +
- ia->ri_max_send_sges * sizeof(struct ib_sge),
+ sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges),
GFP_KERNEL);
if (!sc)
return NULL;
@@ -872,18 +845,13 @@
for (i = 0; i <= buf->rb_sc_last; i++) {
sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
if (!sc)
- goto out_destroy;
+ return -ENOMEM;
sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
- buf->rb_flags = 0;
return 0;
-
-out_destroy:
- rpcrdma_sendctxs_destroy(buf);
- return -ENOMEM;
}
/* The sendctx queue is not guaranteed to have a size that is a
@@ -898,20 +866,20 @@
/**
* rpcrdma_sendctx_get_locked - Acquire a send context
- * @buf: transport buffers from which to acquire an unused context
+ * @r_xprt: controlling transport instance
*
* Returns pointer to a free send completion context; or NULL if
* the queue is empty.
*
* Usage: Called to acquire an SGE array before preparing a Send WR.
*
- * The caller serializes calls to this function (per rpcrdma_buffer),
- * and provides an effective memory barrier that flushes the new value
+ * The caller serializes calls to this function (per transport), and
+ * provides an effective memory barrier that flushes the new value
* of rb_sc_head.
*/
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_sendctx *sc;
unsigned long next_head;
@@ -935,8 +903,7 @@
* completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again.
*/
- set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
- r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
}
@@ -948,7 +915,7 @@
* Usage: Called from Send completion to return a sendctxt
* to the queue.
*
- * The caller serializes calls to this function (per rpcrdma_buffer).
+ * The caller serializes calls to this function (per transport).
*/
static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
@@ -956,7 +923,7 @@
struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
unsigned long next_tail;
- /* Unmap SGEs of previously completed by unsignaled
+ /* Unmap SGEs of previously completed but unsignaled
* Sends by walking up the queue until @sc is found.
*/
next_tail = buf->rb_sc_tail;
@@ -964,50 +931,14 @@
next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */
- rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
+ rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
} while (buf->rb_sc_ctxs[next_tail] != sc);
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
- if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
- smp_mb__after_atomic();
- xprt_write_space(&sc->sc_xprt->rx_xprt);
- }
-}
-
-static void
-rpcrdma_mr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_recovery_worker.work);
- struct rpcrdma_mr *mr;
-
- spin_lock(&buf->rb_recovery_lock);
- while (!list_empty(&buf->rb_stale_mrs)) {
- mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
- spin_unlock(&buf->rb_recovery_lock);
-
- trace_xprtrdma_recover_mr(mr);
- mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
-
- spin_lock(&buf->rb_recovery_lock);
- }
- spin_unlock(&buf->rb_recovery_lock);
-}
-
-void
-rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
-{
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
- spin_lock(&buf->rb_recovery_lock);
- rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
- spin_unlock(&buf->rb_recovery_lock);
-
- schedule_delayed_work(&buf->rb_recovery_worker, 0);
+ xprt_write_space(&sc->sc_xprt->rx_xprt);
}
static void
@@ -1016,18 +947,16 @@
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
unsigned int count;
- LIST_HEAD(free);
- LIST_HEAD(all);
- for (count = 0; count < 3; count++) {
+ for (count = 0; count < ia->ri_max_segs; count++) {
struct rpcrdma_mr *mr;
int rc;
- mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+ mr = kzalloc(sizeof(*mr), GFP_NOFS);
if (!mr)
break;
- rc = ia->ri_ops->ro_init_mr(ia, mr);
+ rc = frwr_init_mr(ia, mr);
if (rc) {
kfree(mr);
break;
@@ -1035,143 +964,185 @@
mr->mr_xprt = r_xprt;
- list_add(&mr->mr_list, &free);
- list_add(&mr->mr_all, &all);
+ spin_lock(&buf->rb_lock);
+ list_add(&mr->mr_list, &buf->rb_mrs);
+ list_add(&mr->mr_all, &buf->rb_all_mrs);
+ spin_unlock(&buf->rb_lock);
}
- spin_lock(&buf->rb_mrlock);
- list_splice(&free, &buf->rb_mrs);
- list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
- spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
-
- xprt_write_space(&r_xprt->rx_xprt);
}
static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_refresh_worker.work);
+ rb_refresh_worker);
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
rpcrdma_mrs_create(r_xprt);
+ xprt_write_space(&r_xprt->rx_xprt);
}
-struct rpcrdma_req *
-rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+/**
+ * rpcrdma_req_create - Allocate an rpcrdma_req object
+ * @r_xprt: controlling r_xprt
+ * @size: initial size, in bytes, of send and receive buffers
+ * @flags: GFP flags passed to memory allocators
+ *
+ * Returns an allocated and fully initialized rpcrdma_req or NULL.
+ */
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+ gfp_t flags)
{
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
+ size_t maxhdrsize;
- req = kzalloc(sizeof(*req), GFP_KERNEL);
+ req = kzalloc(sizeof(*req), flags);
if (req == NULL)
- return ERR_PTR(-ENOMEM);
+ goto out1;
- rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
- DMA_TO_DEVICE, GFP_KERNEL);
- if (IS_ERR(rb)) {
- kfree(req);
- return ERR_PTR(-ENOMEM);
- }
+ /* Compute maximum header buffer size in bytes */
+ maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+ r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+ maxhdrsize *= sizeof(__be32);
+ rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+ DMA_TO_DEVICE, flags);
+ if (!rb)
+ goto out2;
req->rl_rdmabuf = rb;
- xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
- req->rl_buffer = buffer;
- INIT_LIST_HEAD(&req->rl_registered);
+ xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
- spin_lock(&buffer->rb_reqslock);
+ req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
+ if (!req->rl_sendbuf)
+ goto out3;
+
+ req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
+ if (!req->rl_recvbuf)
+ goto out4;
+
+ INIT_LIST_HEAD(&req->rl_free_mrs);
+ INIT_LIST_HEAD(&req->rl_registered);
+ spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
- spin_unlock(&buffer->rb_reqslock);
+ spin_unlock(&buffer->rb_lock);
return req;
+
+out4:
+ kfree(req->rl_sendbuf);
+out3:
+ kfree(req->rl_rdmabuf);
+out2:
+ kfree(req);
+out1:
+ return NULL;
}
-static int
-rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
+static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
+ bool temp)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
- int rc;
- rc = -ENOMEM;
rep = kzalloc(sizeof(*rep), GFP_KERNEL);
if (rep == NULL)
goto out;
- rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
+ rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
DMA_FROM_DEVICE, GFP_KERNEL);
- if (IS_ERR(rep->rr_rdmabuf)) {
- rc = PTR_ERR(rep->rr_rdmabuf);
+ if (!rep->rr_rdmabuf)
goto out_free;
- }
- xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
- rdmab_length(rep->rr_rdmabuf));
+ xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
+ rdmab_length(rep->rr_rdmabuf));
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
- INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
rep->rr_recv_wr.num_sge = 1;
rep->rr_temp = temp;
-
- spin_lock(&buf->rb_lock);
- list_add(&rep->rr_list, &buf->rb_recv_bufs);
- spin_unlock(&buf->rb_lock);
- return 0;
+ return rep;
out_free:
kfree(rep);
out:
- dprintk("RPC: %s: reply buffer %d alloc failed\n",
- __func__, rc);
- return rc;
+ return NULL;
}
-int
-rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
+ kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+ struct llist_node *node;
+
+ /* Calls to llist_del_first are required to be serialized */
+ node = llist_del_first(&buf->rb_free_reps);
+ if (!node)
+ return NULL;
+ return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+ struct rpcrdma_rep *rep)
+{
+ if (!rep->rr_temp)
+ llist_add(&rep->rr_node, &buf->rb_free_reps);
+ else
+ rpcrdma_rep_destroy(rep);
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+ rpcrdma_rep_destroy(rep);
+}
+
+/**
+ * rpcrdma_buffer_create - Create initial set of req/rep objects
+ * @r_xprt: transport instance to (re)initialize
+ *
+ * Returns zero on success, otherwise a negative errno.
+ */
+int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;
- buf->rb_max_requests = r_xprt->rx_data.max_requests;
+ buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
- spin_lock_init(&buf->rb_recovery_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
- INIT_LIST_HEAD(&buf->rb_all);
- INIT_LIST_HEAD(&buf->rb_stale_mrs);
- INIT_DELAYED_WORK(&buf->rb_refresh_worker,
- rpcrdma_mr_refresh_worker);
- INIT_DELAYED_WORK(&buf->rb_recovery_worker,
- rpcrdma_mr_recovery_worker);
+ INIT_LIST_HEAD(&buf->rb_all_mrs);
+ INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
rpcrdma_mrs_create(r_xprt);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
- spin_lock_init(&buf->rb_reqslock);
+
+ rc = -ENOMEM;
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- req = rpcrdma_create_req(r_xprt);
- if (IS_ERR(req)) {
- dprintk("RPC: %s: request buffer %d alloc"
- " failed\n", __func__, i);
- rc = PTR_ERR(req);
+ req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+ GFP_KERNEL);
+ if (!req)
goto out;
- }
list_add(&req->rl_list, &buf->rb_send_bufs);
}
buf->rb_credits = 1;
- buf->rb_posted_receives = 0;
- INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
@@ -1183,19 +1154,23 @@
return rc;
}
-static void
-rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
+/**
+ * rpcrdma_req_destroy - Destroy an rpcrdma_req object
+ * @req: unused object to be destroyed
+ *
+ * This function assumes that the caller prevents concurrent device
+ * unload and transport tear-down.
+ */
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
- rpcrdma_free_regbuf(rep->rr_rdmabuf);
- kfree(rep);
-}
+ list_del(&req->rl_all);
-void
-rpcrdma_destroy_req(struct rpcrdma_req *req)
-{
- rpcrdma_free_regbuf(req->rl_recvbuf);
- rpcrdma_free_regbuf(req->rl_sendbuf);
- rpcrdma_free_regbuf(req->rl_rdmabuf);
+ while (!list_empty(&req->rl_free_mrs))
+ rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+
+ rpcrdma_regbuf_free(req->rl_recvbuf);
+ rpcrdma_regbuf_free(req->rl_sendbuf);
+ rpcrdma_regbuf_free(req->rl_rdmabuf);
kfree(req);
}
@@ -1204,62 +1179,49 @@
{
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
struct rpcrdma_mr *mr;
unsigned int count;
count = 0;
- spin_lock(&buf->rb_mrlock);
- while (!list_empty(&buf->rb_all)) {
- mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+ spin_lock(&buf->rb_lock);
+ while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+ struct rpcrdma_mr,
+ mr_all)) != NULL) {
list_del(&mr->mr_all);
+ spin_unlock(&buf->rb_lock);
- spin_unlock(&buf->rb_mrlock);
-
- /* Ensure MW is not on any rl_registered list */
- if (!list_empty(&mr->mr_list))
- list_del(&mr->mr_list);
-
- ia->ri_ops->ro_release_mr(mr);
+ frwr_release_mr(mr);
count++;
- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
}
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
r_xprt->rx_stats.mrs_allocated = 0;
-
- dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
+/**
+ * rpcrdma_buffer_destroy - Release all hw resources
+ * @buf: root control block for resources
+ *
+ * ORDERING: relies on a prior rpcrdma_xprt_drain :
+ * - No more Send or Receive completions can occur
+ * - All MRs, reps, and reqs are returned to their free lists
+ */
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_delayed_work_sync(&buf->rb_recovery_worker);
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf);
+ rpcrdma_reps_destroy(buf);
- while (!list_empty(&buf->rb_recv_bufs)) {
- struct rpcrdma_rep *rep;
-
- rep = list_first_entry(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- rpcrdma_destroy_rep(rep);
- }
-
- spin_lock(&buf->rb_reqslock);
- while (!list_empty(&buf->rb_allreqs)) {
+ while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
- req = list_first_entry(&buf->rb_allreqs,
- struct rpcrdma_req, rl_all);
- list_del(&req->rl_all);
-
- spin_unlock(&buf->rb_reqslock);
- rpcrdma_destroy_req(req);
- spin_lock(&buf->rb_reqslock);
+ req = list_first_entry(&buf->rb_send_bufs,
+ struct rpcrdma_req, rl_list);
+ list_del(&req->rl_list);
+ rpcrdma_req_destroy(req);
}
- spin_unlock(&buf->rb_reqslock);
rpcrdma_mrs_destroy(buf);
}
@@ -1275,61 +1237,42 @@
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mr *mr = NULL;
+ struct rpcrdma_mr *mr;
- spin_lock(&buf->rb_mrlock);
- if (!list_empty(&buf->rb_mrs))
- mr = rpcrdma_mr_pop(&buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
-
- if (!mr)
- goto out_nomrs;
+ spin_lock(&buf->rb_lock);
+ mr = rpcrdma_mr_pop(&buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
return mr;
-
-out_nomrs:
- trace_xprtrdma_nomrs(r_xprt);
- if (r_xprt->rx_ep.rep_connected != -ENODEV)
- schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
- /* Allow the reply handler and refresh worker to run */
- cond_resched();
-
- return NULL;
-}
-
-static void
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
-{
- spin_lock(&buf->rb_mrlock);
- rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
}
/**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
*
*/
-void
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
-{
- __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
-}
-
-/**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
- *
- */
-void
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- trace_xprtrdma_dma_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
- __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+ if (mr->mr_dir != DMA_NONE) {
+ trace_xprtrdma_mr_unmap(mr);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+ mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ mr->mr_dir = DMA_NONE;
+ }
+
+ rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+}
+
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+{
+ struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ mr->mr_req = NULL;
+ spin_lock(&buf->rb_lock);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
}
/**
@@ -1354,105 +1297,112 @@
/**
* rpcrdma_buffer_put - Put request/reply buffers back into pool
+ * @buffers: buffer pool
* @req: object to return
*
*/
-void
-rpcrdma_buffer_put(struct rpcrdma_req *req)
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- struct rpcrdma_buffer *buffers = req->rl_buffer;
- struct rpcrdma_rep *rep = req->rl_reply;
-
+ if (req->rl_reply)
+ rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
- if (rep) {
- if (!rep->rr_temp) {
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- rep = NULL;
- }
- }
spin_unlock(&buffers->rb_lock);
- if (rep)
- rpcrdma_destroy_rep(rep);
-}
-
-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
- */
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
-{
- struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
- if (!rep->rr_temp) {
- spin_lock(&buffers->rb_lock);
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock(&buffers->rb_lock);
- } else {
- rpcrdma_destroy_rep(rep);
- }
}
/**
- * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
- * @size: size of buffer to be allocated, in bytes
- * @direction: direction of data movement
- * @flags: GFP flags
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
*
- * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
- * can be persistently DMA-mapped for I/O.
+ * Used after error conditions.
+ */
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+{
+ rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
+}
+
+/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
- * or Replies they may be registered externally via ro_map.
+ * or Replies they may be registered externally via frwr_map.
*/
-struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- rb = kmalloc(sizeof(*rb) + size, flags);
- if (rb == NULL)
- return ERR_PTR(-ENOMEM);
+ rb = kmalloc(sizeof(*rb), flags);
+ if (!rb)
+ return NULL;
+ rb->rg_data = kmalloc(size, flags);
+ if (!rb->rg_data) {
+ kfree(rb);
+ return NULL;
+ }
rb->rg_device = NULL;
rb->rg_direction = direction;
rb->rg_iov.length = size;
-
return rb;
}
/**
- * __rpcrdma_map_regbuf - DMA-map a regbuf
- * @ia: controlling rpcrdma_ia
- * @rb: regbuf to be mapped
+ * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
+ * @rb: regbuf to reallocate
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns true if reallocation was successful. If false is
+ * returned, @rb is left untouched.
*/
-bool
-__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{
- struct ib_device *device = ia->ri_device;
+ void *buf;
+
+ buf = kmalloc(size, flags);
+ if (!buf)
+ return false;
+
+ rpcrdma_regbuf_dma_unmap(rb);
+ kfree(rb->rg_data);
+
+ rb->rg_data = buf;
+ rb->rg_iov.length = size;
+ return true;
+}
+
+/**
+ * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
+ * @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is now DMA mapped to @r_xprt's device
+ */
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb)
+{
+ struct ib_device *device = r_xprt->rx_ia.ri_id->device;
if (rb->rg_direction == DMA_NONE)
return false;
- rb->rg_iov.addr = ib_dma_map_single(device,
- (void *)rb->rg_base,
- rdmab_length(rb),
- rb->rg_direction);
- if (ib_dma_mapping_error(device, rdmab_addr(rb)))
+ rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
+ rdmab_length(rb), rb->rg_direction);
+ if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
+ trace_xprtrdma_dma_maperr(rdmab_addr(rb));
return false;
+ }
rb->rg_device = device;
- rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+ rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
return true;
}
-static void
-rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
{
if (!rb)
return;
@@ -1460,26 +1410,27 @@
if (!rpcrdma_regbuf_is_mapped(rb))
return;
- ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
- rdmab_length(rb), rb->rg_direction);
+ ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
+ rb->rg_direction);
rb->rg_device = NULL;
}
-/**
- * rpcrdma_free_regbuf - deregister and free registered buffer
- * @rb: regbuf to be deregistered and freed
- */
-void
-rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
{
- rpcrdma_dma_unmap_regbuf(rb);
+ rpcrdma_regbuf_dma_unmap(rb);
+ if (rb)
+ kfree(rb->rg_data);
kfree(rb);
}
-/*
- * Prepost any receive buffer, then post send.
+/**
+ * rpcrdma_ep_post - Post WRs to a transport's Send Queue
+ * @ia: transport's device information
+ * @ep: transport's RDMA endpoint information
+ * @req: rpcrdma_req containing the Send WR to post
*
- * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ * Returns 0 if the post was successful, otherwise -ENOTCONN
+ * is returned.
*/
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
@@ -1489,8 +1440,7 @@
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
int rc;
- if (!ep->rep_send_count ||
- test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+ if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch;
} else {
@@ -1498,77 +1448,79 @@
--ep->rep_send_count;
}
- rc = ia->ri_ops->ro_send(ia, req);
+ rc = frwr_send(ia, req);
trace_xprtrdma_post_send(req, rc);
if (rc)
return -ENOTCONN;
return 0;
}
-/**
- * rpcrdma_post_recvs - Maybe post some Receive buffers
- * @r_xprt: controlling transport
- * @temp: when true, allocate temp rpcrdma_rep objects
- *
- */
-void
+static void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_recv_wr *wr, *bad_wr;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct ib_recv_wr *i, *wr, *bad_wr;
+ struct rpcrdma_rep *rep;
int needed, count, rc;
- needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (buf->rb_posted_receives > needed)
- return;
- needed -= buf->rb_posted_receives;
-
+ rc = 0;
count = 0;
+
+ needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
+ if (likely(ep->rep_receive_count > needed))
+ goto out;
+ needed -= ep->rep_receive_count;
+ if (!temp)
+ needed += RPCRDMA_MAX_RECV_BATCH;
+
+ /* fast path: all needed reps can be found on the free list */
wr = NULL;
while (needed) {
- struct rpcrdma_regbuf *rb;
- struct rpcrdma_rep *rep;
+ rep = rpcrdma_rep_get_locked(buf);
+ if (!rep)
+ rep = rpcrdma_rep_create(r_xprt, temp);
+ if (!rep)
+ break;
- spin_lock(&buf->rb_lock);
- rep = list_first_entry_or_null(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
- if (likely(rep))
- list_del(&rep->rr_list);
- spin_unlock(&buf->rb_lock);
- if (!rep) {
- if (rpcrdma_create_rep(r_xprt, temp))
- break;
- continue;
- }
-
- rb = rep->rr_rdmabuf;
- if (!rpcrdma_regbuf_is_mapped(rb)) {
- if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
- rpcrdma_recv_buffer_put(rep);
- break;
- }
- }
-
- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
- ++count;
--needed;
}
- if (!count)
- return;
+ if (!wr)
+ goto out;
+
+ for (i = wr; i; i = i->next) {
+ rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
+ goto release_wrs;
+
+ trace_xprtrdma_post_recv(rep);
+ ++count;
+ }
rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
+out:
+ trace_xprtrdma_post_recvs(r_xprt, count, rc);
if (rc) {
- for (wr = bad_wr; wr; wr = wr->next) {
+ for (wr = bad_wr; wr;) {
struct rpcrdma_rep *rep;
rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
+ wr = wr->next;
rpcrdma_recv_buffer_put(rep);
--count;
}
}
- buf->rb_posted_receives += count;
- trace_xprtrdma_post_recvs(r_xprt, count, rc);
+ ep->rep_receive_count += count;
+ return;
+
+release_wrs:
+ for (i = wr; i;) {
+ rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+ i = i->next;
+ rpcrdma_recv_buffer_put(rep);
+ }
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ca14f7..65e6b0e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,8 +44,10 @@
#include <linux/wait.h> /* wait_queue_head_t, etc */
#include <linux/spinlock.h> /* spinlock_t, etc */
-#include <linux/atomic.h> /* atomic_t, etc */
+#include <linux/atomic.h> /* atomic_t, etc */
+#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */
+#include <linux/llist.h>
#include <rdma/rdma_cm.h> /* RDMA connection api */
#include <rdma/ib_verbs.h> /* RDMA verbs api */
@@ -66,23 +68,17 @@
* Interface Adapter -- one per transport instance
*/
struct rpcrdma_ia {
- const struct rpcrdma_memreg_ops *ri_ops;
- struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
- struct completion ri_done;
- struct completion ri_remove_done;
int ri_async_rc;
unsigned int ri_max_segs;
unsigned int ri_max_frwr_depth;
- unsigned int ri_max_inline_write;
- unsigned int ri_max_inline_read;
unsigned int ri_max_send_sges;
bool ri_implicit_roundup;
enum ib_mr_type ri_mrtype;
unsigned long ri_flags;
- struct ib_qp_attr ri_qp_attr;
- struct ib_qp_init_attr ri_qp_init_attr;
+ struct completion ri_done;
+ struct completion ri_remove_done;
};
enum {
@@ -96,84 +92,86 @@
struct rpcrdma_ep {
unsigned int rep_send_count;
unsigned int rep_send_batch;
+ unsigned int rep_max_inline_send;
+ unsigned int rep_max_inline_recv;
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
- struct delayed_work rep_connect_worker;
+ unsigned int rep_max_requests; /* set by /proc */
+ unsigned int rep_inline_send; /* negotiated */
+ unsigned int rep_inline_recv; /* negotiated */
+ int rep_receive_count;
};
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
- * allocated when the forward channel is set up.
+ * allocated when the forward channel is set up, long before the
+ * backchannel is provisioned. This value is two times
+ * NFS4_DEF_CB_SLOT_TABLE_SIZE.
*/
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-#define RPCRDMA_BACKWARD_WRS (8)
+#define RPCRDMA_BACKWARD_WRS (32)
#else
-#define RPCRDMA_BACKWARD_WRS (0)
+#define RPCRDMA_BACKWARD_WRS (0)
#endif
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
- *
- * The below structure appears at the front of a large region of kmalloc'd
- * memory, which always starts on a good alignment boundary.
*/
struct rpcrdma_regbuf {
struct ib_sge rg_iov;
struct ib_device *rg_device;
enum dma_data_direction rg_direction;
- __be32 rg_base[0] __attribute__ ((aligned(256)));
+ void *rg_data;
};
-static inline u64
-rdmab_addr(struct rpcrdma_regbuf *rb)
+static inline u64 rdmab_addr(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.addr;
}
-static inline u32
-rdmab_length(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_length(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.length;
}
-static inline u32
-rdmab_lkey(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_lkey(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.lkey;
}
-static inline struct ib_device *
-rdmab_device(struct rpcrdma_regbuf *rb)
+static inline struct ib_device *rdmab_device(struct rpcrdma_regbuf *rb)
{
return rb->rg_device;
}
+static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_data;
+}
+
#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
/* To ensure a transport can always make forward progress,
* the number of RDMA segments allowed in header chunk lists
- * is capped at 8. This prevents less-capable devices and
- * memory registrations from overrunning the Send buffer
- * while building chunk lists.
+ * is capped at 16. This prevents less-capable devices from
+ * overrunning the Send buffer while building chunk lists.
*
* Elements of the Read list take up more room than the
- * Write list or Reply chunk. 8 read segments means the Read
- * list (or Write list or Reply chunk) cannot consume more
- * than
+ * Write list or Reply chunk. 16 read segments means the
+ * chunk lists cannot consume more than
*
- * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ * ((16 + 2) * read segment size) + 1 XDR words,
*
- * And the fixed part of the header is another 24 bytes.
- *
- * The smallest inline threshold is 1024 bytes, ensuring that
- * at least 750 bytes are available for RPC messages.
+ * or about 400 bytes. The fixed part of the header is
+ * another 24 bytes. Thus when the inline threshold is
+ * 1024 bytes, at least 600 bytes are available for RPC
+ * message bodies.
*/
enum {
- RPCRDMA_MAX_HDR_SEGS = 8,
- RPCRDMA_HDRBUF_SIZE = 256,
+ RPCRDMA_MAX_HDR_SEGS = 16,
};
/*
@@ -200,14 +198,23 @@
bool rr_temp;
struct rpcrdma_regbuf *rr_rdmabuf;
struct rpcrdma_xprt *rr_rxprt;
- struct work_struct rr_work;
+ struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
- struct rpc_rqst *rr_rqst;
- struct list_head rr_list;
+ struct llist_node rr_node;
struct ib_recv_wr rr_recv_wr;
};
+/* To reduce the rate at which a transport invokes ib_post_recv
+ * (and thus the hardware doorbell rate), xprtrdma posts Receive
+ * WRs in batches.
+ *
+ * Setting this to zero disables Receive post batching.
+ */
+enum {
+ RPCRDMA_MAX_RECV_BATCH = 7,
+};
+
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
*/
struct rpcrdma_req;
@@ -215,46 +222,22 @@
struct rpcrdma_sendctx {
struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe;
+ struct ib_device *sc_device;
struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count;
struct ib_sge sc_sges[];
};
-/* Limit the number of SGEs that can be unmapped during one
- * Send completion. This caps the amount of work a single
- * completion can do before returning to the provider.
- *
- * Setting this to zero disables Send completion batching.
- */
-enum {
- RPCRDMA_MAX_SEND_BATCH = 7,
-};
-
/*
* struct rpcrdma_mr - external memory region metadata
*
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
- *
- * Each rpcrdma_buffer has a list of free MWs anchored in rb_mrs. During
- * call_allocate, rpcrdma_buffer_get() assigns one to each segment in
- * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
- * track of registration metadata while each RPC is pending.
- * rpcrdma_deregister_external() uses this metadata to unmap and
- * release these resources when an RPC is complete.
*/
-enum rpcrdma_frwr_state {
- FRWR_IS_INVALID, /* ready to be used */
- FRWR_IS_VALID, /* in use */
- FRWR_FLUSHED_FR, /* flushed FASTREG WR */
- FRWR_FLUSHED_LI, /* flushed LOCALINV WR */
-};
-
struct rpcrdma_frwr {
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
- enum rpcrdma_frwr_state fr_state;
struct completion fr_linv_done;
union {
struct ib_reg_wr fr_regwr;
@@ -262,24 +245,19 @@
};
};
-struct rpcrdma_fmr {
- struct ib_fmr *fm_mr;
- u64 *fm_physaddrs;
-};
-
+struct rpcrdma_req;
struct rpcrdma_mr {
struct list_head mr_list;
+ struct rpcrdma_req *mr_req;
struct scatterlist *mr_sg;
int mr_nents;
enum dma_data_direction mr_dir;
- union {
- struct rpcrdma_fmr fmr;
- struct rpcrdma_frwr frwr;
- };
+ struct rpcrdma_frwr frwr;
struct rpcrdma_xprt *mr_xprt;
u32 mr_handle;
u32 mr_length;
u64 mr_offset;
+ struct work_struct mr_recycle;
struct list_head mr_all;
};
@@ -337,7 +315,6 @@
struct rpcrdma_req {
struct list_head rl_list;
struct rpc_rqst rl_slot;
- struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf;
@@ -347,18 +324,13 @@
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
struct list_head rl_all;
- unsigned long rl_flags;
+ struct kref rl_kref;
- struct list_head rl_registered; /* registered segments */
+ struct list_head rl_free_mrs;
+ struct list_head rl_registered;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
-/* rl_flags */
-enum {
- RPCRDMA_REQ_F_PENDING = 0,
- RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
static inline struct rpcrdma_req *
rpcr_to_rdmar(const struct rpc_rqst *rqst)
{
@@ -368,7 +340,7 @@
static inline void
rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list)
{
- list_add_tail(&mr->mr_list, list);
+ list_add(&mr->mr_list, list);
}
static inline struct rpcrdma_mr *
@@ -376,8 +348,9 @@
{
struct rpcrdma_mr *mr;
- mr = list_first_entry(list, struct rpcrdma_mr, mr_list);
- list_del_init(&mr->mr_list);
+ mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list);
+ if (mr)
+ list_del_init(&mr->mr_list);
return mr;
}
@@ -388,53 +361,27 @@
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
- spinlock_t rb_mrlock; /* protect rb_mrs list */
+ spinlock_t rb_lock;
+ struct list_head rb_send_bufs;
struct list_head rb_mrs;
- struct list_head rb_all;
unsigned long rb_sc_head;
unsigned long rb_sc_tail;
unsigned long rb_sc_last;
struct rpcrdma_sendctx **rb_sc_ctxs;
- spinlock_t rb_lock; /* protect buf lists */
- struct list_head rb_send_bufs;
- struct list_head rb_recv_bufs;
- unsigned long rb_flags;
+ struct list_head rb_allreqs;
+ struct list_head rb_all_mrs;
+
+ struct llist_head rb_free_reps;
+
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
- int rb_posted_receives;
u32 rb_bc_srv_max_requests;
- spinlock_t rb_reqslock; /* protect rb_allreqs */
- struct list_head rb_allreqs;
-
u32 rb_bc_max_requests;
- spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */
- struct list_head rb_stale_mrs;
- struct delayed_work rb_recovery_worker;
- struct delayed_work rb_refresh_worker;
-};
-#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
-
-/* rb_flags */
-enum {
- RPCRDMA_BUF_F_EMPTY_SCQ = 0,
-};
-
-/*
- * Internal structure for transport instance creation. This
- * exists primarily for modularity.
- *
- * This data should be set with mount options
- */
-struct rpcrdma_create_data_internal {
- unsigned int max_requests; /* max requests (slots) in flight */
- unsigned int rsize; /* mount rsize - max read hdr+data */
- unsigned int wsize; /* mount wsize - max write hdr+data */
- unsigned int inline_rsize; /* max non-rdma read data payload */
- unsigned int inline_wsize; /* max non-rdma write data payload */
+ struct work_struct rb_refresh_worker;
};
/*
@@ -452,7 +399,7 @@
unsigned long hardway_register_count;
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
- unsigned long mrs_recovered;
+ unsigned long mrs_recycled;
unsigned long mrs_orphaned;
unsigned long mrs_allocated;
unsigned long empty_sendctx_q;
@@ -467,36 +414,6 @@
};
/*
- * Per-registration mode operations
- */
-struct rpcrdma_xprt;
-struct rpcrdma_memreg_ops {
- struct rpcrdma_mr_seg *
- (*ro_map)(struct rpcrdma_xprt *,
- struct rpcrdma_mr_seg *, int, bool,
- struct rpcrdma_mr **);
- int (*ro_send)(struct rpcrdma_ia *ia,
- struct rpcrdma_req *req);
- void (*ro_reminv)(struct rpcrdma_rep *rep,
- struct list_head *mrs);
- void (*ro_unmap_sync)(struct rpcrdma_xprt *,
- struct list_head *);
- void (*ro_recover_mr)(struct rpcrdma_mr *mr);
- int (*ro_open)(struct rpcrdma_ia *,
- struct rpcrdma_ep *,
- struct rpcrdma_create_data_internal *);
- size_t (*ro_maxpages)(struct rpcrdma_xprt *);
- int (*ro_init_mr)(struct rpcrdma_ia *,
- struct rpcrdma_mr *);
- void (*ro_release_mr)(struct rpcrdma_mr *mr);
- const char *ro_displayname;
- const int ro_send_w_inv_ok;
-};
-
-extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
-extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
-
-/*
* RPCRDMA transport -- encapsulates the structures above for
* integration with RPC.
*
@@ -511,13 +428,12 @@
struct rpcrdma_ia rx_ia;
struct rpcrdma_ep rx_ep;
struct rpcrdma_buffer rx_buf;
- struct rpcrdma_create_data_internal rx_data;
struct delayed_work rx_connect_worker;
+ struct rpc_timeout rx_timeout;
struct rpcrdma_stats rx_stats;
};
#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
-#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
static inline const char *
rpcrdma_addrstr(const struct rpcrdma_xprt *r_xprt)
@@ -547,65 +463,72 @@
int rpcrdma_ia_open(struct rpcrdma_xprt *xprt);
void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
void rpcrdma_ia_close(struct rpcrdma_ia *);
-bool frwr_is_supported(struct rpcrdma_ia *);
-bool fmr_is_supported(struct rpcrdma_ia *);
-
-extern struct workqueue_struct *rpcrdma_receive_wq;
/*
* Endpoint calls - xprtrdma/verbs.c
*/
-int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
- struct rpcrdma_create_data_internal *);
-void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt);
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
-void rpcrdma_conn_func(struct rpcrdma_ep *ep);
void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/*
* Buffer calls - xprtrdma/verbs.c
*/
-struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_req *);
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+ gfp_t flags);
+void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr);
+
+static inline void
+rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
+{
+ schedule_work(&mr->mr_recycle);
+}
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
-void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
+ struct rpcrdma_req *req);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
- gfp_t);
-bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
-void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
+ gfp_t flags);
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb);
-static inline bool
-rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_is_mapped - check if buffer is DMA mapped
+ *
+ * Returns true if the buffer is now mapped to rb->rg_device.
+ */
+static inline bool rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
{
return rb->rg_device != NULL;
}
-static inline bool
-rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
+ * @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is currently DMA mapped.
+ */
+static inline bool rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb)
{
if (likely(rpcrdma_regbuf_is_mapped(rb)))
return true;
- return __rpcrdma_dma_map_regbuf(ia, rb);
+ return __rpcrdma_regbuf_dma_map(r_xprt, rb);
}
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
@@ -616,6 +539,24 @@
return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
}
+/* Memory registration calls xprtrdma/frwr_ops.c
+ */
+bool frwr_is_supported(struct ib_device *device);
+void frwr_recycle(struct rpcrdma_req *req);
+void frwr_reset(struct rpcrdma_req *req);
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
+void frwr_release_mr(struct rpcrdma_mr *mr);
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing, __be32 xid,
+ struct rpcrdma_mr *mr);
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
@@ -632,14 +573,11 @@
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype);
-void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_req *req);
-void rpcrdma_deferred_completion(struct work_struct *work);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{
@@ -649,10 +587,12 @@
/* RPC/RDMA module init - xprtrdma/transport.c
*/
+extern unsigned int xprt_rdma_slot_table_entries;
extern unsigned int xprt_rdma_max_inline_read;
+extern unsigned int xprt_rdma_max_inline_write;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
-void rpcrdma_connect_worker(struct work_struct *work);
+void xprt_rdma_close(struct rpc_xprt *xprt);
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
@@ -661,8 +601,8 @@
*/
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
-int xprt_rdma_bc_up(struct svc_serv *, struct net *);
size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);