Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 60934bd..ba45b07 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -126,6 +126,9 @@
 	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
 }
 
+/*
+ * Consumes @pages if @own_pages is true.
+ */
 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 			struct page **pages, u64 length, u32 alignment,
 			bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@
 	osd_data->own_pages = own_pages;
 }
 
+/*
+ * Consumes a ref on @pagelist.
+ */
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 			struct ceph_pagelist *pagelist)
 {
@@ -165,14 +171,6 @@
 	osd_data->num_bvecs = num_bvecs;
 }
 
-#define osd_req_op_data(oreq, whch, typ, fld)				\
-({									\
-	struct ceph_osd_request *__oreq = (oreq);			\
-	unsigned int __whch = (whch);					\
-	BUG_ON(__whch >= __oreq->r_num_ops);				\
-	&__oreq->r_ops[__whch].typ.fld;					\
-})
-
 static struct ceph_osd_data *
 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
 {
@@ -362,6 +360,8 @@
 		num_pages = calc_pages_for((u64)osd_data->alignment,
 						(u64)osd_data->length);
 		ceph_release_page_vector(osd_data->pages, num_pages);
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+		ceph_pagelist_release(osd_data->pagelist);
 	}
 	ceph_osd_data_init(osd_data);
 }
@@ -402,6 +402,9 @@
 	case CEPH_OSD_OP_LIST_WATCHERS:
 		ceph_osd_data_release(&op->list_watchers.response_data);
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		ceph_osd_data_release(&op->copy_from.osd_data);
+		break;
 	default:
 		break;
 	}
@@ -467,7 +470,7 @@
 {
 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
 	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
-	WARN_ON(!list_empty(&req->r_unsafe_item));
+	WARN_ON(!list_empty(&req->r_private_item));
 	WARN_ON(req->r_osd);
 }
 
@@ -527,7 +530,7 @@
 	init_completion(&req->r_completion);
 	RB_CLEAR_NODE(&req->r_node);
 	RB_CLEAR_NODE(&req->r_mc_node);
-	INIT_LIST_HEAD(&req->r_unsafe_item);
+	INIT_LIST_HEAD(&req->r_private_item);
 
 	target_init(&req->r_t);
 }
@@ -606,12 +609,15 @@
 	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
 }
 
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
+				      int num_request_data_items,
+				      int num_reply_data_items)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
 	struct ceph_msg *msg;
 	int msg_size;
 
+	WARN_ON(req->r_request || req->r_reply);
 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
 	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
@@ -633,9 +639,11 @@
 	msg_size += 4 + 8; /* retry_attempt, features */
 
 	if (req->r_mempool)
-		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
+		msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
+				       num_request_data_items);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+		msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
+				    num_request_data_items, gfp, true);
 	if (!msg)
 		return -ENOMEM;
 
@@ -648,9 +656,11 @@
 	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 
 	if (req->r_mempool)
-		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
+				       num_reply_data_items);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+		msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
+				    num_reply_data_items, gfp, true);
 	if (!msg)
 		return -ENOMEM;
 
@@ -658,7 +668,6 @@
 
 	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 
 static bool osd_req_opcode_valid(u16 opcode)
 {
@@ -671,6 +680,65 @@
 	}
 }
 
+static void get_num_data_items(struct ceph_osd_request *req,
+			       int *num_request_data_items,
+			       int *num_reply_data_items)
+{
+	struct ceph_osd_req_op *op;
+
+	*num_request_data_items = 0;
+	*num_reply_data_items = 0;
+
+	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
+		switch (op->op) {
+		/* request */
+		case CEPH_OSD_OP_WRITE:
+		case CEPH_OSD_OP_WRITEFULL:
+		case CEPH_OSD_OP_SETXATTR:
+		case CEPH_OSD_OP_CMPXATTR:
+		case CEPH_OSD_OP_NOTIFY_ACK:
+		case CEPH_OSD_OP_COPY_FROM:
+			*num_request_data_items += 1;
+			break;
+
+		/* reply */
+		case CEPH_OSD_OP_STAT:
+		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_LIST_WATCHERS:
+			*num_reply_data_items += 1;
+			break;
+
+		/* both */
+		case CEPH_OSD_OP_NOTIFY:
+			*num_request_data_items += 1;
+			*num_reply_data_items += 1;
+			break;
+		case CEPH_OSD_OP_CALL:
+			*num_request_data_items += 2;
+			*num_reply_data_items += 1;
+			break;
+
+		default:
+			WARN_ON(!osd_req_opcode_valid(op->op));
+			break;
+		}
+	}
+}
+
+/*
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
+ * is called.
+ */
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+	int num_request_data_items, num_reply_data_items;
+
+	get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
+	return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
+					  num_reply_data_items);
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
+
 /*
  * This is an osd op init function for opcodes that have no data or
  * other information associated with them.  It also serves as a
@@ -767,40 +835,45 @@
 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
 
 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
-			u16 opcode, const char *class, const char *method)
+			const char *class, const char *method)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      opcode, 0);
+	struct ceph_osd_req_op *op;
 	struct ceph_pagelist *pagelist;
 	size_t payload_len = 0;
 	size_t size;
+	int ret;
 
-	BUG_ON(opcode != CEPH_OSD_OP_CALL);
+	op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 
-	pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pagelist);
-
 	op->cls.class_name = class;
 	size = strlen(class);
 	BUG_ON(size > (size_t) U8_MAX);
 	op->cls.class_len = size;
-	ceph_pagelist_append(pagelist, class, size);
+	ret = ceph_pagelist_append(pagelist, class, size);
+	if (ret)
+		goto err_pagelist_free;
 	payload_len += size;
 
 	op->cls.method_name = method;
 	size = strlen(method);
 	BUG_ON(size > (size_t) U8_MAX);
 	op->cls.method_len = size;
-	ceph_pagelist_append(pagelist, method, size);
+	ret = ceph_pagelist_append(pagelist, method, size);
+	if (ret)
+		goto err_pagelist_free;
 	payload_len += size;
 
 	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
-
 	op->indata_len = payload_len;
 	return 0;
+
+err_pagelist_free:
+	ceph_pagelist_release(pagelist);
+	return ret;
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
 
@@ -812,21 +885,24 @@
 						      opcode, 0);
 	struct ceph_pagelist *pagelist;
 	size_t payload_len;
+	int ret;
 
 	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 
-	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pagelist);
-
 	payload_len = strlen(name);
 	op->xattr.name_len = payload_len;
-	ceph_pagelist_append(pagelist, name, payload_len);
+	ret = ceph_pagelist_append(pagelist, name, payload_len);
+	if (ret)
+		goto err_pagelist_free;
 
 	op->xattr.value_len = size;
-	ceph_pagelist_append(pagelist, value, size);
+	ret = ceph_pagelist_append(pagelist, value, size);
+	if (ret)
+		goto err_pagelist_free;
 	payload_len += size;
 
 	op->xattr.cmp_op = cmp_op;
@@ -835,6 +911,10 @@
 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
 	op->indata_len = payload_len;
 	return 0;
+
+err_pagelist_free:
+	ceph_pagelist_release(pagelist);
+	return ret;
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
@@ -900,12 +980,6 @@
 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 			     const struct ceph_osd_req_op *src)
 {
-	if (WARN_ON(!osd_req_opcode_valid(src->op))) {
-		pr_err("unrecognized osd opcode %d\n", src->op);
-
-		return 0;
-	}
-
 	switch (src->op) {
 	case CEPH_OSD_OP_STAT:
 		break;
@@ -955,6 +1029,14 @@
 	case CEPH_OSD_OP_CREATE:
 	case CEPH_OSD_OP_DELETE:
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+		dst->copy_from.src_version =
+			cpu_to_le64(src->copy_from.src_version);
+		dst->copy_from.flags = src->copy_from.flags;
+		dst->copy_from.src_fadvise_flags =
+			cpu_to_le32(src->copy_from.src_fadvise_flags);
+		break;
 	default:
 		pr_err("unsupported osd opcode %s\n",
 			ceph_osd_op_name(src->op));
@@ -1038,7 +1120,15 @@
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		req->r_data_offset = off;
 
-	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+	if (num_ops > 1)
+		/*
+		 * This is a special case for ceph_writepages_start(), but it
+		 * also covers ceph_uninline_data().  If more multi-op request
+		 * use cases emerge, we will need a separate helper.
+		 */
+		r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
+	else
+		r = ceph_osdc_alloc_messages(req, GFP_NOFS);
 	if (r)
 		goto fail;
 
@@ -1415,7 +1505,6 @@
 
 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 					   struct ceph_osd_request_target *t,
-					   struct ceph_connection *con,
 					   bool any_change)
 {
 	struct ceph_pg_pool_info *pi;
@@ -1423,7 +1512,7 @@
 	struct ceph_osds up, acting;
 	bool force_resend = false;
 	bool unpaused = false;
-	bool legacy_change;
+	bool legacy_change = false;
 	bool split = false;
 	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
 	bool recovery_deletes = ceph_osdmap_flag(osdc,
@@ -1511,15 +1600,14 @@
 		t->osd = acting.primary;
 	}
 
-	if (unpaused || legacy_change || force_resend ||
-	    (split && con && CEPH_HAVE_FEATURE(con->peer_features,
-					       RESEND_ON_SPLIT)))
+	if (unpaused || legacy_change || force_resend || split)
 		ct_res = CALC_TARGET_NEED_RESEND;
 	else
 		ct_res = CALC_TARGET_NO_ACTION;
 
 out:
-	dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
+	dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
+	     legacy_change, force_resend, split, ct_res, t->osd);
 	return ct_res;
 }
 
@@ -1845,48 +1933,55 @@
 	return true;
 }
 
-static void setup_request_data(struct ceph_osd_request *req,
-			       struct ceph_msg *msg)
+/*
+ * Keep get_num_data_items() in sync with this function.
+ */
+static void setup_request_data(struct ceph_osd_request *req)
 {
-	u32 data_len = 0;
-	int i;
+	struct ceph_msg *request_msg = req->r_request;
+	struct ceph_msg *reply_msg = req->r_reply;
+	struct ceph_osd_req_op *op;
 
-	if (!list_empty(&msg->data))
+	if (req->r_request->num_data_items || req->r_reply->num_data_items)
 		return;
 
-	WARN_ON(msg->data_length);
-	for (i = 0; i < req->r_num_ops; i++) {
-		struct ceph_osd_req_op *op = &req->r_ops[i];
-
+	WARN_ON(request_msg->data_length || reply_msg->data_length);
+	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
 		switch (op->op) {
 		/* request */
 		case CEPH_OSD_OP_WRITE:
 		case CEPH_OSD_OP_WRITEFULL:
 			WARN_ON(op->indata_len != op->extent.length);
-			ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->extent.osd_data);
 			break;
 		case CEPH_OSD_OP_SETXATTR:
 		case CEPH_OSD_OP_CMPXATTR:
 			WARN_ON(op->indata_len != op->xattr.name_len +
 						  op->xattr.value_len);
-			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->xattr.osd_data);
 			break;
 		case CEPH_OSD_OP_NOTIFY_ACK:
-			ceph_osdc_msg_data_add(msg,
+			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify_ack.request_data);
 			break;
+		case CEPH_OSD_OP_COPY_FROM:
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->copy_from.osd_data);
+			break;
 
 		/* reply */
 		case CEPH_OSD_OP_STAT:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->raw_data_in);
 			break;
 		case CEPH_OSD_OP_READ:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->extent.osd_data);
 			break;
 		case CEPH_OSD_OP_LIST_WATCHERS:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->list_watchers.response_data);
 			break;
 
@@ -1895,25 +1990,23 @@
 			WARN_ON(op->indata_len != op->cls.class_len +
 						  op->cls.method_len +
 						  op->cls.indata_len);
-			ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->cls.request_info);
 			/* optional, can be NONE */
-			ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->cls.request_data);
 			/* optional, can be NONE */
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->cls.response_data);
 			break;
 		case CEPH_OSD_OP_NOTIFY:
-			ceph_osdc_msg_data_add(msg,
+			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify.request_data);
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->notify.response_data);
 			break;
 		}
-
-		data_len += op->indata_len;
 	}
-
-	WARN_ON(data_len != msg->data_length);
 }
 
 static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2054,7 @@
 			req->r_data_offset || req->r_snapc);
 	}
 
-	setup_request_data(req, msg);
+	setup_request_data(req);
 
 	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
 	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -2195,7 +2288,7 @@
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
 
 again:
-	ct_res = calc_target(osdc, &req->r_t, NULL, false);
+	ct_res = calc_target(osdc, &req->r_t, false);
 	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
 		goto promote;
 
@@ -2229,7 +2322,7 @@
 		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
 		    pool_full(osdc, req->r_t.base_oloc.pool))) {
 		dout("req %p full/pool_full\n", req);
-		if (osdc->abort_on_full) {
+		if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
 			err = -ENOSPC;
 		} else {
 			pr_warn_ratelimited("FULL or reached pool quota\n");
@@ -2312,7 +2405,7 @@
 
 static void __complete_request(struct ceph_osd_request *req)
 {
-	dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
+	dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
 	     req->r_tid, req->r_callback, req->r_result);
 
 	if (req->r_callback)
@@ -2399,6 +2492,14 @@
 }
 EXPORT_SYMBOL(ceph_osdc_abort_requests);
 
+void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
+{
+	down_write(&osdc->lock);
+	osdc->abort_err = 0;
+	up_write(&osdc->lock);
+}
+EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
+
 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
 {
 	if (likely(eb > osdc->epoch_barrier)) {
@@ -2459,7 +2560,7 @@
 {
 	bool victims = false;
 
-	if (osdc->abort_on_full &&
+	if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
 	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
 		for_each_request(osdc, abort_on_full_fn, &victims);
 }
@@ -3001,11 +3102,21 @@
 	struct ceph_osd_client *osdc = lreq->osdc;
 	struct ceph_osd *osd;
 
-	calc_target(osdc, &lreq->t, NULL, false);
+	down_write(&osdc->lock);
+	linger_register(lreq);
+	if (lreq->is_watch) {
+		lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
+		lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
+	} else {
+		lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
+	}
+
+	calc_target(osdc, &lreq->t, false);
 	osd = lookup_create_osd(osdc, lreq->t.osd, true);
 	link_linger(osd, lreq);
 
 	send_linger(lreq);
+	up_write(&osdc->lock);
 }
 
 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -3617,7 +3728,7 @@
 	struct ceph_osd_client *osdc = lreq->osdc;
 	enum calc_target_result ct_res;
 
-	ct_res = calc_target(osdc, &lreq->t, NULL, true);
+	ct_res = calc_target(osdc, &lreq->t, true);
 	if (ct_res == CALC_TARGET_NEED_RESEND) {
 		struct ceph_osd *osd;
 
@@ -3689,8 +3800,7 @@
 		n = rb_next(n); /* unlink_request(), check_pool_dne() */
 
 		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
-		ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
-				     false);
+		ct_res = calc_target(osdc, &req->r_t, false);
 		switch (ct_res) {
 		case CALC_TARGET_NO_ACTION:
 			force_resend_writes = cleared_full ||
@@ -3799,7 +3909,7 @@
 		n = rb_next(n);
 
 		if (req->r_t.epoch < osdc->osdmap->epoch) {
-			ct_res = calc_target(osdc, &req->r_t, NULL, false);
+			ct_res = calc_target(osdc, &req->r_t, false);
 			if (ct_res == CALC_TARGET_POOL_DNE) {
 				erase_request(need_resend, req);
 				check_pool_dne(req);
@@ -4318,9 +4428,7 @@
 			     lreq->notify_id, notify_id);
 		} else if (!completion_done(&lreq->notify_finish_wait)) {
 			struct ceph_msg_data *data =
-			    list_first_entry_or_null(&msg->data,
-						     struct ceph_msg_data,
-						     links);
+			    msg->num_data_items ? &msg->data[0] : NULL;
 
 			if (data) {
 				if (lreq->preply_pages) {
@@ -4476,6 +4584,23 @@
 
 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+	return req;
+}
+
+static struct ceph_osd_request *
+alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
+{
+	struct ceph_osd_request *req;
+
+	req = alloc_linger_request(lreq);
+	if (!req)
+		return NULL;
+
+	/*
+	 * Pass 0 for cookie because we don't know it yet, it will be
+	 * filled in by linger_submit().
+	 */
+	osd_req_op_watch_init(req, 0, 0, watch_opcode);
 
 	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
 		ceph_osdc_put_request(req);
@@ -4514,27 +4639,19 @@
 	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
 	ktime_get_real_ts64(&lreq->mtime);
 
-	lreq->reg_req = alloc_linger_request(lreq);
+	lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
 	if (!lreq->reg_req) {
 		ret = -ENOMEM;
 		goto err_put_lreq;
 	}
 
-	lreq->ping_req = alloc_linger_request(lreq);
+	lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
 	if (!lreq->ping_req) {
 		ret = -ENOMEM;
 		goto err_put_lreq;
 	}
 
-	down_write(&osdc->lock);
-	linger_register(lreq); /* before osd_req_op_* */
-	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
-			      CEPH_OSD_WATCH_OP_WATCH);
-	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
-			      CEPH_OSD_WATCH_OP_PING);
 	linger_submit(lreq);
-	up_write(&osdc->lock);
-
 	ret = linger_reg_commit_wait(lreq);
 	if (ret) {
 		linger_cancel(lreq);
@@ -4599,11 +4716,10 @@
 
 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
-	pl = kmalloc(sizeof(*pl), GFP_NOIO);
+	pl = ceph_pagelist_alloc(GFP_NOIO);
 	if (!pl)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pl);
 	ret = ceph_pagelist_encode_64(pl, notify_id);
 	ret |= ceph_pagelist_encode_64(pl, cookie);
 	if (payload) {
@@ -4641,12 +4757,12 @@
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+					 payload_len);
 	if (ret)
 		goto out_put_req;
 
-	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
-					 payload_len);
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
 	if (ret)
 		goto out_put_req;
 
@@ -4670,11 +4786,10 @@
 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
 	op->notify.cookie = cookie;
 
-	pl = kmalloc(sizeof(*pl), GFP_NOIO);
+	pl = ceph_pagelist_alloc(GFP_NOIO);
 	if (!pl)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pl);
 	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
 	ret |= ceph_pagelist_encode_32(pl, timeout);
 	ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4848,30 @@
 		goto out_put_lreq;
 	}
 
+	/*
+	 * Pass 0 for cookie because we don't know it yet, it will be
+	 * filled in by linger_submit().
+	 */
+	ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
+				     payload, payload_len);
+	if (ret)
+		goto out_put_lreq;
+
 	/* for notify_id */
 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
 		goto out_put_lreq;
 	}
-
-	down_write(&osdc->lock);
-	linger_register(lreq); /* before osd_req_op_* */
-	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
-				     timeout, payload, payload_len);
-	if (ret) {
-		linger_unregister(lreq);
-		up_write(&osdc->lock);
-		ceph_release_page_vector(pages, 1);
-		goto out_put_lreq;
-	}
 	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
 						 response_data),
 				 pages, PAGE_SIZE, 0, false, true);
-	linger_submit(lreq);
-	up_write(&osdc->lock);
 
+	ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+	if (ret)
+		goto out_put_lreq;
+
+	linger_submit(lreq);
 	ret = linger_reg_commit_wait(lreq);
 	if (!ret)
 		ret = linger_notify_finish_wait(lreq);
@@ -4812,20 +4928,26 @@
 	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
 				  &struct_v, &struct_len);
 	if (ret)
-		return ret;
+		goto bad;
 
-	ceph_decode_copy(p, &item->name, sizeof(item->name));
-	item->cookie = ceph_decode_64(p);
-	*p += 4; /* skip timeout_seconds */
+	ret = -EINVAL;
+	ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
+	ceph_decode_64_safe(p, end, item->cookie, bad);
+	ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
+
 	if (struct_v >= 2) {
-		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
-		ceph_decode_addr(&item->addr);
+		ret = ceph_decode_entity_addr(p, end, &item->addr);
+		if (ret)
+			goto bad;
+	} else {
+		ret = 0;
 	}
 
 	dout("%s %s%llu cookie %llu addr %s\n", __func__,
 	     ENTITY_NAME(item->name), item->cookie,
-	     ceph_pr_addr(&item->addr.in_addr));
-	return 0;
+	     ceph_pr_addr(&item->addr));
+bad:
+	return ret;
 }
 
 static int decode_watchers(void **p, void *end,
@@ -4881,10 +5003,6 @@
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-	if (ret)
-		goto out_put_req;
-
 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
@@ -4896,6 +5014,10 @@
 						 response_data),
 				 pages, PAGE_SIZE, 0, false, true);
 
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
 	if (ret >= 0) {
@@ -4942,12 +5064,12 @@
 		   const char *class, const char *method,
 		   unsigned int flags,
 		   struct page *req_page, size_t req_len,
-		   struct page *resp_page, size_t *resp_len)
+		   struct page **resp_pages, size_t *resp_len)
 {
 	struct ceph_osd_request *req;
 	int ret;
 
-	if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
+	if (req_len > PAGE_SIZE)
 		return -E2BIG;
 
 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
@@ -4958,26 +5080,26 @@
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = flags;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-	if (ret)
-		goto out_put_req;
-
-	ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+	ret = osd_req_op_cls_init(req, 0, class, method);
 	if (ret)
 		goto out_put_req;
 
 	if (req_page)
 		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
 						  0, false, false);
-	if (resp_page)
-		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
+	if (resp_pages)
+		osd_req_op_cls_response_data_pages(req, 0, resp_pages,
 						   *resp_len, 0, false, false);
 
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
 	if (ret >= 0) {
 		ret = req->r_ops[0].rval;
-		if (resp_page)
+		if (resp_pages)
 			*resp_len = req->r_ops[0].outdata_len;
 	}
 
@@ -4988,6 +5110,24 @@
 EXPORT_SYMBOL(ceph_osdc_call);
 
 /*
+ * reset all osd connections
+ */
+void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
+{
+	struct rb_node *n;
+
+	down_write(&osdc->lock);
+	for (n = rb_first(&osdc->osds); n; ) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+		n = rb_next(n);
+		if (!reopen_osd(osd))
+			kick_osd_requests(osd);
+	}
+	up_write(&osdc->lock);
+}
+
+/*
  * init, shutdown
  */
 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
@@ -5021,11 +5161,12 @@
 		goto out_map;
 
 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
-				PAGE_SIZE, 10, true, "osd_op");
+				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
 	if (err < 0)
 		goto out_mempool;
 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
-				PAGE_SIZE, 10, true, "osd_op_reply");
+				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
+				"osd_op_reply");
 	if (err < 0)
 		goto out_msgpool;
 
@@ -5168,6 +5309,80 @@
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+				     u64 src_snapid, u64 src_version,
+				     struct ceph_object_id *src_oid,
+				     struct ceph_object_locator *src_oloc,
+				     u32 src_fadvise_flags,
+				     u32 dst_fadvise_flags,
+				     u8 copy_from_flags)
+{
+	struct ceph_osd_req_op *op;
+	struct page **pages;
+	void *p, *end;
+
+	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+
+	op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+	op->copy_from.snapid = src_snapid;
+	op->copy_from.src_version = src_version;
+	op->copy_from.flags = copy_from_flags;
+	op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+	p = page_address(pages[0]);
+	end = p + PAGE_SIZE;
+	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+	encode_oloc(&p, end, src_oloc);
+	op->indata_len = PAGE_SIZE - (end - p);
+
+	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+				 op->indata_len, 0, false, true);
+	return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+			u64 src_snapid, u64 src_version,
+			struct ceph_object_id *src_oid,
+			struct ceph_object_locator *src_oloc,
+			u32 src_fadvise_flags,
+			struct ceph_object_id *dst_oid,
+			struct ceph_object_locator *dst_oloc,
+			u32 dst_fadvise_flags,
+			u8 copy_from_flags)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+	ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+	ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+	ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+					src_oloc, src_fadvise_flags,
+					dst_fadvise_flags, copy_from_flags);
+	if (ret)
+		goto out;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out;
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
 int __init ceph_osdc_setup(void)
 {
 	size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5510,7 @@
 	u32 front_len = le32_to_cpu(hdr->front_len);
 	u32 data_len = le32_to_cpu(hdr->data_len);
 
-	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+	m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
 	if (!m)
 		return NULL;