Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bc43c82..a516329 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -20,6 +20,8 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
 
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
+
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
  * managing the file system namespace (the directory hierarchy and
@@ -46,13 +48,17 @@
  */
 
 struct ceph_reconnect_state {
-	int nr_caps;
+	struct ceph_mds_session *session;
+	int nr_caps, nr_realms;
 	struct ceph_pagelist *pagelist;
 	unsigned msg_version;
+	bool allow_multi;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
+static void ceph_cap_reclaim_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -61,6 +67,29 @@
  * mds reply parsing
  */
 
+static int parse_reply_info_quota(void **p, void *end,
+				  struct ceph_mds_reply_info_in *info)
+{
+	u8 struct_v, struct_compat;
+	u32 struct_len;
+
+	ceph_decode_8_safe(p, end, struct_v, bad);
+	ceph_decode_8_safe(p, end, struct_compat, bad);
+	/* struct_v is expected to be >= 1. we only
+	 * understand encoding with struct_compat == 1. */
+	if (!struct_v || struct_compat != 1)
+		goto bad;
+	ceph_decode_32_safe(p, end, struct_len, bad);
+	ceph_decode_need(p, end, struct_len, bad);
+	end = *p + struct_len;
+	ceph_decode_64_safe(p, end, info->max_bytes, bad);
+	ceph_decode_64_safe(p, end, info->max_files, bad);
+	*p = end;
+	return 0;
+bad:
+	return -EIO;
+}
+
 /*
  * parse individual inode info
  */
@@ -68,8 +97,24 @@
 			       struct ceph_mds_reply_info_in *info,
 			       u64 features)
 {
-	int err = -EIO;
+	int err = 0;
+	u8 struct_v = 0;
 
+	if (features == (u64)-1) {
+		u32 struct_len;
+		u8 struct_compat;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding with struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
 	info->in = *p;
 	*p += sizeof(struct ceph_mds_reply_inode) +
 		sizeof(*info->in->fragtree.splits) *
@@ -80,62 +125,160 @@
 	info->symlink = *p;
 	*p += info->symlink_len;
 
-	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
-		ceph_decode_copy_safe(p, end, &info->dir_layout,
-				      sizeof(info->dir_layout), bad);
-	else
-		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
-
+	ceph_decode_copy_safe(p, end, &info->dir_layout,
+			      sizeof(info->dir_layout), bad);
 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
 	ceph_decode_need(p, end, info->xattr_len, bad);
 	info->xattr_data = *p;
 	*p += info->xattr_len;
 
-	if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+	if (features == (u64)-1) {
+		/* inline data */
 		ceph_decode_64_safe(p, end, info->inline_version, bad);
 		ceph_decode_32_safe(p, end, info->inline_len, bad);
 		ceph_decode_need(p, end, info->inline_len, bad);
 		info->inline_data = *p;
 		*p += info->inline_len;
-	} else
-		info->inline_version = CEPH_INLINE_NONE;
-
-	if (features & CEPH_FEATURE_MDS_QUOTA) {
-		u8 struct_v, struct_compat;
-		u32 struct_len;
-
-		/*
-		 * both struct_v and struct_compat are expected to be >= 1
-		 */
-		ceph_decode_8_safe(p, end, struct_v, bad);
-		ceph_decode_8_safe(p, end, struct_compat, bad);
-		if (!struct_v || !struct_compat)
-			goto bad;
-		ceph_decode_32_safe(p, end, struct_len, bad);
-		ceph_decode_need(p, end, struct_len, bad);
-		ceph_decode_64_safe(p, end, info->max_bytes, bad);
-		ceph_decode_64_safe(p, end, info->max_files, bad);
-	} else {
-		info->max_bytes = 0;
-		info->max_files = 0;
-	}
-
-	info->pool_ns_len = 0;
-	info->pool_ns_data = NULL;
-	if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+		/* quota */
+		err = parse_reply_info_quota(p, end, info);
+		if (err < 0)
+			goto out_bad;
+		/* pool namespace */
 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 		if (info->pool_ns_len > 0) {
 			ceph_decode_need(p, end, info->pool_ns_len, bad);
 			info->pool_ns_data = *p;
 			*p += info->pool_ns_len;
 		}
-	}
 
+		/* btime */
+		ceph_decode_need(p, end, sizeof(info->btime), bad);
+		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+
+		/* change attribute */
+		ceph_decode_64_safe(p, end, info->change_attr, bad);
+
+		/* dir pin */
+		if (struct_v >= 2) {
+			ceph_decode_32_safe(p, end, info->dir_pin, bad);
+		} else {
+			info->dir_pin = -ENODATA;
+		}
+
+		/* snapshot birth time, remains zero for v<=2 */
+		if (struct_v >= 3) {
+			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
+			ceph_decode_copy(p, &info->snap_btime,
+					 sizeof(info->snap_btime));
+		} else {
+			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
+		}
+
+		*p = end;
+	} else {
+		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+			ceph_decode_64_safe(p, end, info->inline_version, bad);
+			ceph_decode_32_safe(p, end, info->inline_len, bad);
+			ceph_decode_need(p, end, info->inline_len, bad);
+			info->inline_data = *p;
+			*p += info->inline_len;
+		} else
+			info->inline_version = CEPH_INLINE_NONE;
+
+		if (features & CEPH_FEATURE_MDS_QUOTA) {
+			err = parse_reply_info_quota(p, end, info);
+			if (err < 0)
+				goto out_bad;
+		} else {
+			info->max_bytes = 0;
+			info->max_files = 0;
+		}
+
+		info->pool_ns_len = 0;
+		info->pool_ns_data = NULL;
+		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
+			if (info->pool_ns_len > 0) {
+				ceph_decode_need(p, end, info->pool_ns_len, bad);
+				info->pool_ns_data = *p;
+				*p += info->pool_ns_len;
+			}
+		}
+
+		if (features & CEPH_FEATURE_FS_BTIME) {
+			ceph_decode_need(p, end, sizeof(info->btime), bad);
+			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+			ceph_decode_64_safe(p, end, info->change_attr, bad);
+		}
+
+		info->dir_pin = -ENODATA;
+		/* info->snap_btime remains zero */
+	}
 	return 0;
 bad:
+	err = -EIO;
+out_bad:
 	return err;
 }
 
+static int parse_reply_info_dir(void **p, void *end,
+				struct ceph_mds_reply_dirfrag **dirfrag,
+				u64 features)
+{
+	if (features == (u64)-1) {
+		u8 struct_v, struct_compat;
+		u32 struct_len;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding whose struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
+	*dirfrag = *p;
+	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
+	if (unlikely(*p > end))
+		goto bad;
+	if (features == (u64)-1)
+		*p = end;
+	return 0;
+bad:
+	return -EIO;
+}
+
+static int parse_reply_info_lease(void **p, void *end,
+				  struct ceph_mds_reply_lease **lease,
+				  u64 features)
+{
+	if (features == (u64)-1) {
+		u8 struct_v, struct_compat;
+		u32 struct_len;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding whose struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(**lease), bad);
+	*lease = *p;
+	*p += sizeof(**lease);
+	if (features == (u64)-1)
+		*p = end;
+	return 0;
+bad:
+	return -EIO;
+}
+
 /*
  * parse a normal reply, which may contain a (dir+)dentry and/or a
  * target inode.
@@ -151,20 +294,18 @@
 		if (err < 0)
 			goto out_bad;
 
-		if (unlikely(*p + sizeof(*info->dirfrag) > end))
-			goto bad;
-		info->dirfrag = *p;
-		*p += sizeof(*info->dirfrag) +
-			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
-		if (unlikely(*p > end))
-			goto bad;
+		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
+		if (err < 0)
+			goto out_bad;
 
 		ceph_decode_32_safe(p, end, info->dname_len, bad);
 		ceph_decode_need(p, end, info->dname_len, bad);
 		info->dname = *p;
 		*p += info->dname_len;
-		info->dlease = *p;
-		*p += sizeof(*info->dlease);
+
+		err = parse_reply_info_lease(p, end, &info->dlease, features);
+		if (err < 0)
+			goto out_bad;
 	}
 
 	if (info->head->is_target) {
@@ -187,20 +328,16 @@
 /*
  * parse readdir results
  */
-static int parse_reply_info_dir(void **p, void *end,
+static int parse_reply_info_readdir(void **p, void *end,
 				struct ceph_mds_reply_info_parsed *info,
 				u64 features)
 {
 	u32 num, i = 0;
 	int err;
 
-	info->dir_dir = *p;
-	if (*p + sizeof(*info->dir_dir) > end)
-		goto bad;
-	*p += sizeof(*info->dir_dir) +
-		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
-	if (*p > end)
-		goto bad;
+	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
+	if (err < 0)
+		goto out_bad;
 
 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
 	num = ceph_decode_32(p);
@@ -226,15 +363,16 @@
 	while (num) {
 		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 		/* dentry */
-		ceph_decode_need(p, end, sizeof(u32)*2, bad);
-		rde->name_len = ceph_decode_32(p);
+		ceph_decode_32_safe(p, end, rde->name_len, bad);
 		ceph_decode_need(p, end, rde->name_len, bad);
 		rde->name = *p;
 		*p += rde->name_len;
 		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
-		rde->lease = *p;
-		*p += sizeof(struct ceph_mds_reply_lease);
 
+		/* dentry lease */
+		err = parse_reply_info_lease(p, end, &rde->lease, features);
+		if (err)
+			goto out_bad;
 		/* inode */
 		err = parse_reply_info_in(p, end, &rde->inode, features);
 		if (err < 0)
@@ -246,8 +384,8 @@
 	}
 
 done:
-	if (*p != end)
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
 
 bad:
@@ -268,12 +406,10 @@
 		goto bad;
 
 	info->filelock_reply = *p;
-	*p += sizeof(*info->filelock_reply);
 
-	if (unlikely(*p != end))
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
-
 bad:
 	return -EIO;
 }
@@ -285,19 +421,23 @@
 				  struct ceph_mds_reply_info_parsed *info,
 				  u64 features)
 {
-	if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+	if (features == (u64)-1 ||
+	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
+		/* Malformed reply? */
 		if (*p == end) {
 			info->has_create_ino = false;
 		} else {
 			info->has_create_ino = true;
-			info->ino = ceph_decode_64(p);
+			ceph_decode_64_safe(p, end, info->ino, bad);
 		}
+	} else {
+		if (*p != end)
+			goto bad;
 	}
 
-	if (unlikely(*p != end))
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
-
 bad:
 	return -EIO;
 }
@@ -314,7 +454,7 @@
 	if (op == CEPH_MDS_OP_GETFILELOCK)
 		return parse_reply_info_filelock(p, end, info, features);
 	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
-		return parse_reply_info_dir(p, end, info, features);
+		return parse_reply_info_readdir(p, end, info, features);
 	else if (op == CEPH_MDS_OP_CREATE)
 		return parse_reply_info_create(p, end, info, features);
 	else
@@ -426,15 +566,9 @@
 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 						   int mds)
 {
-	struct ceph_mds_session *session;
-
 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 		return NULL;
-	session = mdsc->sessions[mds];
-	dout("lookup_mds_session %p %d\n", session,
-	     refcount_read(&session->s_ref));
-	get_session(session);
-	return session;
+	return get_session(mdsc->sessions[mds]);
 }
 
 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -498,7 +632,7 @@
 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 
 	spin_lock_init(&s->s_gen_ttl_lock);
-	s->s_cap_gen = 0;
+	s->s_cap_gen = 1;
 	s->s_cap_ttl = jiffies - 1;
 
 	spin_lock_init(&s->s_cap_lock);
@@ -506,7 +640,6 @@
 	s->s_renew_seq = 0;
 	INIT_LIST_HEAD(&s->s_caps);
 	s->s_nr_caps = 0;
-	s->s_trim_caps = 0;
 	refcount_set(&s->s_ref, 1);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
@@ -514,6 +647,8 @@
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
+	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -539,6 +674,7 @@
 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 	BUG_ON(mdsc->sessions[s->s_mds] != s);
 	mdsc->sessions[s->s_mds] = NULL;
+	s->s_state = 0;
 	ceph_con_close(&s->s_con);
 	ceph_put_mds_session(s);
 	atomic_dec(&mdsc->num_sessions);
@@ -569,11 +705,12 @@
 		ceph_msg_put(req->r_reply);
 	if (req->r_inode) {
 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-		iput(req->r_inode);
+		/* avoid calling iput_final() in mds dispatch threads */
+		ceph_async_iput(req->r_inode);
 	}
 	if (req->r_parent)
 		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
-	iput(req->r_target_inode);
+	ceph_async_iput(req->r_target_inode);
 	if (req->r_dentry)
 		dput(req->r_dentry);
 	if (req->r_old_dentry)
@@ -587,7 +724,7 @@
 		 */
 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
-		iput(req->r_old_dentry_dir);
+		ceph_async_iput(req->r_old_dentry_dir);
 	}
 	kfree(req->r_path1);
 	kfree(req->r_path2);
@@ -595,6 +732,7 @@
 		ceph_pagelist_release(req->r_pagelist);
 	put_request_session(req);
 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
+	WARN_ON_ONCE(!list_empty(&req->r_wait));
 	kfree(req);
 }
 
@@ -697,7 +835,8 @@
 	}
 
 	if (req->r_unsafe_dir) {
-		iput(req->r_unsafe_dir);
+		/* avoid calling iput_final() in mds dispatch threads */
+		ceph_async_iput(req->r_unsafe_dir);
 		req->r_unsafe_dir = NULL;
 	}
 
@@ -780,7 +919,7 @@
 		struct inode *dir;
 
 		rcu_read_lock();
-		parent = req->r_dentry->d_parent;
+		parent = READ_ONCE(req->r_dentry->d_parent);
 		dir = req->r_parent ? : d_inode_rcu(parent);
 
 		if (!dir || dir->i_sb != mdsc->fsc->sb) {
@@ -862,7 +1001,7 @@
 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 	if (!cap) {
 		spin_unlock(&ci->i_ceph_lock);
-		iput(inode);
+		ceph_async_iput(inode);
 		goto random;
 	}
 	mds = cap->session->s_mds;
@@ -871,7 +1010,9 @@
 	     cap == ci->i_auth_cap ? "auth " : "", cap);
 	spin_unlock(&ci->i_ceph_lock);
 out:
-	iput(inode);
+	/* avoid calling iput_final() while holding mdsc->mutex or
+	 * in mds dispatch threads */
+	ceph_async_iput(inode);
 	return mds;
 
 random:
@@ -1129,6 +1270,7 @@
 {
 	struct ceph_mds_request *req;
 	struct rb_node *p;
+	struct ceph_inode_info *ci;
 
 	dout("cleanup_session_requests mds%d\n", session->s_mds);
 	mutex_lock(&mdsc->mutex);
@@ -1137,6 +1279,16 @@
 				       struct ceph_mds_request, r_unsafe_item);
 		pr_warn_ratelimited(" dropping unsafe request %llu\n",
 				    req->r_tid);
+		if (req->r_target_inode) {
+			/* dropping unsafe change of inode's attributes */
+			ci = ceph_inode(req->r_target_inode);
+			errseq_set(&ci->i_meta_err, -EIO);
+		}
+		if (req->r_unsafe_dir) {
+			/* dropping unsafe directory operation */
+			ci = ceph_inode(req->r_unsafe_dir);
+			errseq_set(&ci->i_meta_err, -EIO);
+		}
 		__unregister_request(mdsc, req);
 	}
 	/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1157,9 +1309,9 @@
  *
  * Caller must hold session s_mutex.
  */
-static int iterate_session_caps(struct ceph_mds_session *session,
-				 int (*cb)(struct inode *, struct ceph_cap *,
-					    void *), void *arg)
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
+			      int (*cb)(struct inode *, struct ceph_cap *,
+					void *), void *arg)
 {
 	struct list_head *p;
 	struct ceph_cap *cap;
@@ -1181,7 +1333,9 @@
 		spin_unlock(&session->s_cap_lock);
 
 		if (last_inode) {
-			iput(last_inode);
+			/* avoid calling iput_final() while holding
+			 * s_mutex or in mds dispatch threads */
+			ceph_async_iput(last_inode);
 			last_inode = NULL;
 		}
 		if (old_cap) {
@@ -1201,13 +1355,10 @@
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			if (cap->queue_release) {
-				list_add_tail(&cap->session_caps,
-					      &session->s_cap_releases);
-				session->s_num_cap_releases++;
-			} else {
+			if (cap->queue_release)
+				__ceph_queue_cap_release(session, cap);
+			else
 				old_cap = cap;  /* put_cap it w/o locks held */
-			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1217,7 +1368,7 @@
 	session->s_cap_iterator = NULL;
 	spin_unlock(&session->s_cap_lock);
 
-	iput(last_inode);
+	ceph_async_iput(last_inode);
 	if (old_cap)
 		ceph_put_cap(session->s_mdsc, old_cap);
 
@@ -1230,22 +1381,25 @@
 	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	LIST_HEAD(to_remove);
-	bool drop = false;
+	bool dirty_dropped = false;
 	bool invalidate = false;
 
 	dout("removing cap %p, ci is %p, inode is %p\n",
 	     cap, ci, &ci->vfs_inode);
 	spin_lock(&ci->i_ceph_lock);
+	if (cap->mds_wanted | cap->issued)
+		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
 	__ceph_remove_cap(cap, false);
 	if (!ci->i_auth_cap) {
 		struct ceph_cap_flush *cf;
 		struct ceph_mds_client *mdsc = fsc->mdsc;
 
-		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
-
-		if (ci->i_wrbuffer_ref > 0 &&
-		    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
-			invalidate = true;
+		if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+			if (inode->i_data.nrpages > 0)
+				invalidate = true;
+			if (ci->i_wrbuffer_ref > 0)
+				mapping_set_error(&inode->i_data, -EIO);
+		}
 
 		while (!list_empty(&ci->i_cap_flush_list)) {
 			cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1265,7 +1419,7 @@
 				inode, ceph_ino(inode));
 			ci->i_dirty_caps = 0;
 			list_del_init(&ci->i_dirty_item);
-			drop = true;
+			dirty_dropped = true;
 		}
 		if (!list_empty(&ci->i_flushing_item)) {
 			pr_warn_ratelimited(
@@ -1275,10 +1429,22 @@
 			ci->i_flushing_caps = 0;
 			list_del_init(&ci->i_flushing_item);
 			mdsc->num_cap_flushing--;
-			drop = true;
+			dirty_dropped = true;
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		if (dirty_dropped) {
+			errseq_set(&ci->i_meta_err, -EIO);
+
+			if (ci->i_wrbuffer_ref_head == 0 &&
+			    ci->i_wr_ref == 0 &&
+			    ci->i_dirty_caps == 0 &&
+			    ci->i_flushing_caps == 0) {
+				ceph_put_snap_context(ci->i_head_snapc);
+				ci->i_head_snapc = NULL;
+			}
+		}
+
 		if (atomic_read(&ci->i_filelock_ref) > 0) {
 			/* make further file lock syscall return -EIO */
 			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
@@ -1303,7 +1469,7 @@
 	wake_up_all(&ci->i_cap_wq);
 	if (invalidate)
 		ceph_queue_invalidate(inode);
-	if (drop)
+	if (dirty_dropped)
 		iput(inode);
 	return 0;
 }
@@ -1318,7 +1484,7 @@
 	LIST_HEAD(dispose);
 
 	dout("remove_session_caps on %p\n", session);
-	iterate_session_caps(session, remove_session_caps_cb, fsc);
+	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
 
 	wake_up_all(&fsc->mdsc->cap_flushing_wq);
 
@@ -1344,7 +1510,8 @@
 			spin_unlock(&session->s_cap_lock);
 
 			inode = ceph_find_inode(sb, vino);
-			iput(inode);
+			 /* avoid calling iput_final() while holding s_mutex */
+			ceph_async_iput(inode);
 
 			spin_lock(&session->s_cap_lock);
 		}
@@ -1359,6 +1526,12 @@
 	dispose_cap_releases(session->s_mdsc, &dispose);
 }
 
+enum {
+	RECONNECT,
+	RENEWCAPS,
+	FORCE_RO,
+};
+
 /*
  * wake up any threads waiting on this session's caps.  if the cap is
  * old (didn't get renewed on the client reconnect), remove it now.
@@ -1369,23 +1542,34 @@
 			      void *arg)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	unsigned long ev = (unsigned long)arg;
 
-	if (arg) {
+	if (ev == RECONNECT) {
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_wanted_max_size = 0;
 		ci->i_requested_max_size = 0;
 		spin_unlock(&ci->i_ceph_lock);
+	} else if (ev == RENEWCAPS) {
+		if (cap->cap_gen < cap->session->s_cap_gen) {
+			/* mds did not re-issue stale cap */
+			spin_lock(&ci->i_ceph_lock);
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+			/* make sure mds knows what we want */
+			if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
+				ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+			spin_unlock(&ci->i_ceph_lock);
+		}
+	} else if (ev == FORCE_RO) {
 	}
 	wake_up_all(&ci->i_cap_wq);
 	return 0;
 }
 
-static void wake_up_session_caps(struct ceph_mds_session *session,
-				 int reconnect)
+static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
 {
 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
-	iterate_session_caps(session, wake_up_session_cb,
-			     (void *)(unsigned long)reconnect);
+	ceph_iterate_session_caps(session, wake_up_session_cb,
+				  (void *)(unsigned long)ev);
 }
 
 /*
@@ -1470,7 +1654,7 @@
 	spin_unlock(&session->s_cap_lock);
 
 	if (wake)
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, RENEWCAPS);
 }
 
 /*
@@ -1538,11 +1722,11 @@
  */
 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 {
-	struct ceph_mds_session *session = arg;
+	int *remaining = arg;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int used, wanted, oissued, mine;
 
-	if (session->s_trim_caps <= 0)
+	if (*remaining <= 0)
 		return -1;
 
 	spin_lock(&ci->i_ceph_lock);
@@ -1579,7 +1763,7 @@
 	if (oissued) {
 		/* we aren't the only cap.. just remove us */
 		__ceph_remove_cap(cap, true);
-		session->s_trim_caps--;
+		(*remaining)--;
 	} else {
 		struct dentry *dentry;
 		/* try dropping referring dentries */
@@ -1591,7 +1775,7 @@
 			d_prune_aliases(inode);
 			count = atomic_read(&inode->i_count);
 			if (count == 1)
-				session->s_trim_caps--;
+				(*remaining)--;
 			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
 			     inode, cap, count);
 		} else {
@@ -1617,15 +1801,15 @@
 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
 	if (trim_caps > 0) {
-		session->s_trim_caps = trim_caps;
-		iterate_session_caps(session, trim_caps_cb, session);
+		int remaining = trim_caps;
+
+		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
 		     session->s_mds, session->s_nr_caps, max_caps,
-			trim_caps - session->s_trim_caps);
-		session->s_trim_caps = 0;
+			trim_caps - remaining);
 	}
 
-	ceph_send_cap_releases(mdsc, session);
+	ceph_flush_cap_releases(mdsc, session);
 	return 0;
 }
 
@@ -1668,8 +1852,8 @@
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-			    struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
@@ -1711,7 +1895,8 @@
 		num_cap_releases--;
 
 		head = msg->front.iov_base;
-		le32_add_cpu(&head->num, 1);
+		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
+				   &head->num);
 		item = msg->front.iov_base + msg->front.iov_len;
 		item->ino = cpu_to_le64(cap->cap_ino);
 		item->cap_id = cpu_to_le64(cap->cap_id);
@@ -1761,6 +1946,81 @@
 	spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+	struct ceph_mds_session *session =
+		container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+	mutex_lock(&session->s_mutex);
+	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+	    session->s_state == CEPH_MDS_SESSION_HUNG)
+		ceph_send_cap_releases(session->s_mdsc, session);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+		             struct ceph_mds_session *session)
+{
+	if (mdsc->stopping)
+		return;
+
+	get_session(session);
+	if (queue_work(mdsc->fsc->cap_wq,
+		       &session->s_cap_release_work)) {
+		dout("cap release work queued\n");
+	} else {
+		ceph_put_mds_session(session);
+		dout("failed to queue cap release work\n");
+	}
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+			      struct ceph_cap *cap)
+{
+	list_add_tail(&cap->session_caps, &session->s_cap_releases);
+	session->s_num_cap_releases++;
+
+	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+		ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
+static void ceph_cap_reclaim_work(struct work_struct *work)
+{
+	struct ceph_mds_client *mdsc =
+		container_of(work, struct ceph_mds_client, cap_reclaim_work);
+	int ret = ceph_trim_dentries(mdsc);
+	if (ret == -EAGAIN)
+		ceph_queue_cap_reclaim_work(mdsc);
+}
+
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
+{
+	if (mdsc->stopping)
+		return;
+
+        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
+                dout("caps reclaim work queued\n");
+        } else {
+                dout("failed to queue caps release work\n");
+        }
+}
+
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+	int val;
+	if (!nr)
+		return;
+	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+	if (!(val % CEPH_CAPS_PER_RELEASE)) {
+		atomic_set(&mdsc->cap_reclaim_pending, 0);
+		ceph_queue_cap_reclaim_work(mdsc);
+	}
+}
+
 /*
  * requests
  */
@@ -1864,43 +2124,29 @@
  * Encode hidden .snap dirs as a double /, i.e.
  *   foo/.snap/bar -> foo//bar
  */
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
 			   int stop_on_nosnap)
 {
 	struct dentry *temp;
 	char *path;
-	int len, pos;
+	int pos;
 	unsigned seq;
+	u64 base;
 
 	if (!dentry)
 		return ERR_PTR(-EINVAL);
 
-retry:
-	len = 0;
-	seq = read_seqbegin(&rename_lock);
-	rcu_read_lock();
-	for (temp = dentry; !IS_ROOT(temp);) {
-		struct inode *inode = d_inode(temp);
-		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
-			len++;  /* slash only */
-		else if (stop_on_nosnap && inode &&
-			 ceph_snap(inode) == CEPH_NOSNAP)
-			break;
-		else
-			len += 1 + temp->d_name.len;
-		temp = temp->d_parent;
-	}
-	rcu_read_unlock();
-	if (len)
-		len--;  /* no leading '/' */
-
-	path = kmalloc(len+1, GFP_NOFS);
+	path = __getname();
 	if (!path)
 		return ERR_PTR(-ENOMEM);
-	pos = len;
-	path[pos] = 0;	/* trailing null */
+retry:
+	pos = PATH_MAX - 1;
+	path[pos] = '\0';
+
+	seq = read_seqbegin(&rename_lock);
 	rcu_read_lock();
-	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+	temp = dentry;
+	for (;;) {
 		struct inode *inode;
 
 		spin_lock(&temp->d_lock);
@@ -1908,9 +2154,10 @@
 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
 			dout("build_path path+%d: %p SNAPDIR\n",
 			     pos, temp);
-		} else if (stop_on_nosnap && inode &&
+		} else if (stop_on_nosnap && inode && dentry != temp &&
 			   ceph_snap(inode) == CEPH_NOSNAP) {
 			spin_unlock(&temp->d_lock);
+			pos++; /* get rid of any prepended '/' */
 			break;
 		} else {
 			pos -= temp->d_name.len;
@@ -1918,43 +2165,50 @@
 				spin_unlock(&temp->d_lock);
 				break;
 			}
-			strncpy(path + pos, temp->d_name.name,
-				temp->d_name.len);
+			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
 		}
 		spin_unlock(&temp->d_lock);
-		if (pos)
-			path[--pos] = '/';
-		temp = temp->d_parent;
+		temp = READ_ONCE(temp->d_parent);
+
+		/* Are we at the root? */
+		if (IS_ROOT(temp))
+			break;
+
+		/* Are we out of buffer? */
+		if (--pos < 0)
+			break;
+
+		path[pos] = '/';
 	}
+	base = ceph_ino(d_inode(temp));
 	rcu_read_unlock();
-	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
+	if (pos < 0 || read_seqretry(&rename_lock, seq)) {
 		pr_err("build_path did not end path lookup where "
-		       "expected, namelen is %d, pos is %d\n", len, pos);
+		       "expected, pos is %d\n", pos);
 		/* presumably this is only possible if racing with a
 		   rename of one of the parent directories (we can not
 		   lock the dentries above us to prevent this, but
 		   retrying should be harmless) */
-		kfree(path);
 		goto retry;
 	}
 
-	*base = ceph_ino(d_inode(temp));
-	*plen = len;
+	*pbase = base;
+	*plen = PATH_MAX - 1 - pos;
 	dout("build_path on %p %d built %llx '%.*s'\n",
-	     dentry, d_count(dentry), *base, len, path);
-	return path;
+	     dentry, d_count(dentry), base, *plen, path + pos);
+	return path + pos;
 }
 
 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
 			     const char **ppath, int *ppathlen, u64 *pino,
-			     int *pfreepath)
+			     bool *pfreepath, bool parent_locked)
 {
 	char *path;
 
 	rcu_read_lock();
 	if (!dir)
 		dir = d_inode_rcu(dentry->d_parent);
-	if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
 		*pino = ceph_ino(dir);
 		rcu_read_unlock();
 		*ppath = dentry->d_name.name;
@@ -1966,13 +2220,13 @@
 	if (IS_ERR(path))
 		return PTR_ERR(path);
 	*ppath = path;
-	*pfreepath = 1;
+	*pfreepath = true;
 	return 0;
 }
 
 static int build_inode_path(struct inode *inode,
 			    const char **ppath, int *ppathlen, u64 *pino,
-			    int *pfreepath)
+			    bool *pfreepath)
 {
 	struct dentry *dentry;
 	char *path;
@@ -1988,7 +2242,7 @@
 	if (IS_ERR(path))
 		return PTR_ERR(path);
 	*ppath = path;
-	*pfreepath = 1;
+	*pfreepath = true;
 	return 0;
 }
 
@@ -1999,7 +2253,7 @@
 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
 				  struct inode *rdiri, const char *rpath,
 				  u64 rino, const char **ppath, int *pathlen,
-				  u64 *ino, int *freepath)
+				  u64 *ino, bool *freepath, bool parent_locked)
 {
 	int r = 0;
 
@@ -2009,7 +2263,7 @@
 		     ceph_snap(rinode));
 	} else if (rdentry) {
 		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
-					freepath);
+					freepath, parent_locked);
 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
 		     *ppath);
 	} else if (rpath || rino) {
@@ -2035,7 +2289,7 @@
 	const char *path2 = NULL;
 	u64 ino1 = 0, ino2 = 0;
 	int pathlen1 = 0, pathlen2 = 0;
-	int freepath1 = 0, freepath2 = 0;
+	bool freepath1 = false, freepath2 = false;
 	int len;
 	u16 releases;
 	void *p, *end;
@@ -2043,16 +2297,19 @@
 
 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
 			      req->r_parent, req->r_path1, req->r_ino1.ino,
-			      &path1, &pathlen1, &ino1, &freepath1);
+			      &path1, &pathlen1, &ino1, &freepath1,
+			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
+					&req->r_req_flags));
 	if (ret < 0) {
 		msg = ERR_PTR(ret);
 		goto out;
 	}
 
+	/* If r_old_dentry is set, then assume that its parent is locked */
 	ret = set_request_path_attr(NULL, req->r_old_dentry,
 			      req->r_old_dentry_dir,
 			      req->r_path2, req->r_ino2.ino,
-			      &path2, &pathlen2, &ino2, &freepath2);
+			      &path2, &pathlen2, &ino2, &freepath2, true);
 	if (ret < 0) {
 		msg = ERR_PTR(ret);
 		goto out_free1;
@@ -2067,11 +2324,11 @@
 		(!!req->r_inode_drop + !!req->r_dentry_drop +
 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
 	if (req->r_dentry_drop)
-		len += req->r_dentry->d_name.len;
+		len += pathlen1;
 	if (req->r_old_dentry_drop)
-		len += req->r_old_dentry->d_name.len;
+		len += pathlen2;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
+	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
 	if (!msg) {
 		msg = ERR_PTR(-ENOMEM);
 		goto out_free2;
@@ -2136,7 +2393,6 @@
 
 	if (req->r_pagelist) {
 		struct ceph_pagelist *pagelist = req->r_pagelist;
-		refcount_inc(&pagelist->refcnt);
 		ceph_msg_data_add_pagelist(msg, pagelist);
 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
 	} else {
@@ -2147,10 +2403,10 @@
 
 out_free2:
 	if (freepath2)
-		kfree((char *)path2);
+		ceph_mdsc_free_path((char *)path2, pathlen2);
 out_free1:
 	if (freepath1)
-		kfree((char *)path1);
+		ceph_mdsc_free_path((char *)path1, pathlen1);
 out:
 	return msg;
 }
@@ -2164,8 +2420,7 @@
 {
 	if (req->r_callback)
 		req->r_callback(mdsc, req);
-	else
-		complete_all(&req->r_completion);
+	complete_all(&req->r_completion);
 }
 
 /*
@@ -2407,28 +2662,11 @@
 	}
 }
 
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
 			      struct ceph_mds_request *req)
 {
-	dout("submit_request on %p\n", req);
-	mutex_lock(&mdsc->mutex);
-	__register_request(mdsc, req, NULL);
-	__do_request(mdsc, req);
-	mutex_unlock(&mdsc->mutex);
-}
-
-/*
- * Synchrously perform an mds request.  Take care of all of the
- * session setup, forwarding, retry details.
- */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
-			 struct inode *dir,
-			 struct ceph_mds_request *req)
-{
 	int err;
 
-	dout("do_request on %p\n", req);
-
 	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
 	if (req->r_inode)
 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
@@ -2438,18 +2676,21 @@
 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
 
-	/* issue */
+	dout("submit_request on %p for inode %p\n", req, dir);
 	mutex_lock(&mdsc->mutex);
 	__register_request(mdsc, req, dir);
 	__do_request(mdsc, req);
+	err = req->r_err;
+	mutex_unlock(&mdsc->mutex);
+	return err;
+}
 
-	if (req->r_err) {
-		err = req->r_err;
-		goto out;
-	}
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+				  struct ceph_mds_request *req)
+{
+	int err;
 
 	/* wait */
-	mutex_unlock(&mdsc->mutex);
 	dout("do_request waiting\n");
 	if (!req->r_timeout && req->r_wait_for_completion) {
 		err = req->r_wait_for_completion(mdsc, req);
@@ -2490,8 +2731,26 @@
 		err = req->r_err;
 	}
 
-out:
 	mutex_unlock(&mdsc->mutex);
+	return err;
+}
+
+/*
+ * Synchrously perform an mds request.  Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+			 struct inode *dir,
+			 struct ceph_mds_request *req)
+{
+	int err;
+
+	dout("do_request on %p\n", req);
+
+	/* issue */
+	err = ceph_mdsc_submit_request(mdsc, dir, req);
+	if (!err)
+		err = ceph_mdsc_wait_request(mdsc, req);
 	dout("do_request %p done, result %d\n", req, err);
 	return err;
 }
@@ -2641,7 +2900,10 @@
 
 	dout("handle_reply tid %lld result %d\n", tid, result);
 	rinfo = &req->r_reply_info;
-	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
+	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
+		err = parse_reply_info(msg, rinfo, (u64)-1);
+	else
+		err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
 	mutex_unlock(&mdsc->mutex);
 
 	mutex_lock(&session->s_mutex);
@@ -2672,7 +2934,6 @@
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
 				    req->r_op == CEPH_MDS_OP_LSSNAP))
 			ceph_readdir_prepopulate(req, req->r_session);
-		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
@@ -2681,12 +2942,18 @@
 	if (realm)
 		ceph_put_snap_realm(mdsc, realm);
 
-	if (err == 0 && req->r_target_inode &&
-	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
-		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-		spin_lock(&ci->i_unsafe_lock);
-		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
-		spin_unlock(&ci->i_unsafe_lock);
+	if (err == 0) {
+		if (req->r_target_inode &&
+		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+			struct ceph_inode_info *ci =
+				ceph_inode(req->r_target_inode);
+			spin_lock(&ci->i_unsafe_lock);
+			list_add_tail(&req->r_unsafe_target_item,
+				      &ci->i_unsafe_iops);
+			spin_unlock(&ci->i_unsafe_lock);
+		}
+
+		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 out_err:
 	mutex_lock(&mdsc->mutex);
@@ -2765,6 +3032,30 @@
 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
 }
 
+static int __decode_session_metadata(void **p, void *end,
+				     bool *blacklisted)
+{
+	/* map<string,string> */
+	u32 n;
+	bool err_str;
+	ceph_decode_32_safe(p, end, n, bad);
+	while (n-- > 0) {
+		u32 len;
+		ceph_decode_32_safe(p, end, len, bad);
+		ceph_decode_need(p, end, len, bad);
+		err_str = !strncmp(*p, "error_string", len);
+		*p += len;
+		ceph_decode_32_safe(p, end, len, bad);
+		ceph_decode_need(p, end, len, bad);
+		if (err_str && strnstr(*p, "blacklisted", len))
+			*blacklisted = true;
+		*p += len;
+	}
+	return 0;
+bad:
+	return -1;
+}
+
 /*
  * handle a mds session control message
  */
@@ -2772,18 +3063,37 @@
 			   struct ceph_msg *msg)
 {
 	struct ceph_mds_client *mdsc = session->s_mdsc;
+	int mds = session->s_mds;
+	int msg_version = le16_to_cpu(msg->hdr.version);
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	struct ceph_mds_session_head *h;
 	u32 op;
 	u64 seq;
-	int mds = session->s_mds;
-	struct ceph_mds_session_head *h = msg->front.iov_base;
+	unsigned long features = 0;
 	int wake = 0;
+	bool blacklisted = false;
 
 	/* decode */
-	if (msg->front.iov_len < sizeof(*h))
-		goto bad;
+	ceph_decode_need(&p, end, sizeof(*h), bad);
+	h = p;
+	p += sizeof(*h);
+
 	op = le32_to_cpu(h->op);
 	seq = le64_to_cpu(h->seq);
 
+	if (msg_version >= 3) {
+		u32 len;
+		/* version >= 2, metadata */
+		if (__decode_session_metadata(&p, end, &blacklisted) < 0)
+			goto bad;
+		/* version >= 3, feature bits */
+		ceph_decode_32_safe(&p, end, len, bad);
+		ceph_decode_need(&p, end, len, bad);
+		memcpy(&features, p, min_t(size_t, len, sizeof(features)));
+		p += len;
+	}
+
 	mutex_lock(&mdsc->mutex);
 	if (op == CEPH_SESSION_CLOSE) {
 		get_session(session);
@@ -2809,6 +3119,7 @@
 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
 			pr_info("mds%d reconnect success\n", session->s_mds);
 		session->s_state = CEPH_MDS_SESSION_OPEN;
+		session->s_features = features;
 		renewed_caps(mdsc, session, 0);
 		wake = 1;
 		if (mdsc->stopping)
@@ -2852,7 +3163,7 @@
 		spin_lock(&session->s_cap_lock);
 		session->s_readonly = true;
 		spin_unlock(&session->s_cap_lock);
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, FORCE_RO);
 		break;
 
 	case CEPH_SESSION_REJECT:
@@ -2861,6 +3172,8 @@
 		session->s_state = CEPH_MDS_SESSION_REJECTED;
 		cleanup_session_requests(mdsc, session);
 		remove_session_caps(session);
+		if (blacklisted)
+			mdsc->fsc->blacklisted = true;
 		wake = 2; /* for good measure */
 		break;
 
@@ -2935,6 +3248,82 @@
 	mutex_unlock(&mdsc->mutex);
 }
 
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+	struct ceph_msg *reply;
+	struct ceph_pagelist *_pagelist;
+	struct page *page;
+	__le32 *addr;
+	int err = -ENOMEM;
+
+	if (!recon_state->allow_multi)
+		return -ENOSPC;
+
+	/* can't handle message that contains both caps and realm */
+	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+	/* pre-allocate new pagelist */
+	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!_pagelist)
+		return -ENOMEM;
+
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+	if (!reply)
+		goto fail_msg;
+
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(_pagelist, 0);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+		if (err)
+			goto fail;
+	} else {
+		/* placeholder for nr_realms (currently encoding relams) */
+		err = ceph_pagelist_encode_32(_pagelist, 0);
+		if (err < 0)
+			goto fail;
+	}
+
+	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+	if (err)
+		goto fail;
+
+	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+	addr = kmap_atomic(page);
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		*addr = cpu_to_le32(recon_state->nr_caps);
+	} else {
+		/* currently encoding relams */
+		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+	}
+	kunmap_atomic(addr);
+
+	reply->hdr.version = cpu_to_le16(5);
+	reply->hdr.compat_version = cpu_to_le16(4);
+
+	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+	ceph_con_send(&recon_state->session->s_con, reply);
+	ceph_pagelist_release(recon_state->pagelist);
+
+	recon_state->pagelist = _pagelist;
+	recon_state->nr_caps = 0;
+	recon_state->nr_realms = 0;
+	recon_state->msg_version = 5;
+	return 0;
+fail:
+	ceph_msg_put(reply);
+fail_msg:
+	ceph_pagelist_release(_pagelist);
+	return err;
+}
+
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
@@ -2948,31 +3337,12 @@
 	struct ceph_inode_info *ci = cap->ci;
 	struct ceph_reconnect_state *recon_state = arg;
 	struct ceph_pagelist *pagelist = recon_state->pagelist;
-	char *path;
-	int pathlen, err;
-	u64 pathbase;
+	int err;
 	u64 snap_follows;
-	struct dentry *dentry;
 
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
 	     ceph_cap_string(cap->issued));
-	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
-	if (err)
-		return err;
-
-	dentry = d_find_alias(inode);
-	if (dentry) {
-		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
-		if (IS_ERR(path)) {
-			err = PTR_ERR(path);
-			goto out_dput;
-		}
-	} else {
-		path = NULL;
-		pathlen = 0;
-		pathbase = 0;
-	}
 
 	spin_lock(&ci->i_ceph_lock);
 	cap->seq = 0;        /* reset cap seq */
@@ -2985,7 +3355,7 @@
 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
 		rec.v2.issued = cpu_to_le32(cap->issued);
 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
-		rec.v2.pathbase = cpu_to_le64(pathbase);
+		rec.v2.pathbase = 0;
 		rec.v2.flock_len = (__force __le32)
 			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
 	} else {
@@ -2996,7 +3366,7 @@
 		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
 		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
-		rec.v1.pathbase = cpu_to_le64(pathbase);
+		rec.v1.pathbase = 0;
 	}
 
 	if (list_empty(&ci->i_cap_snaps)) {
@@ -3012,7 +3382,7 @@
 	if (recon_state->msg_version >= 2) {
 		int num_fcntl_locks, num_flock_locks;
 		struct ceph_filelock *flocks = NULL;
-		size_t struct_len, total_len = 0;
+		size_t struct_len, total_len = sizeof(u64);
 		u8 struct_v = 0;
 
 encode_again:
@@ -3028,7 +3398,7 @@
 					       GFP_NOFS);
 			if (!flocks) {
 				err = -ENOMEM;
-				goto out_free;
+				goto out_err;
 			}
 			err = ceph_encode_locks_to_buffer(inode, flocks,
 							  num_fcntl_locks,
@@ -3038,7 +3408,7 @@
 				flocks = NULL;
 				if (err == -ENOSPC)
 					goto encode_again;
-				goto out_free;
+				goto out_err;
 			}
 		} else {
 			kfree(flocks);
@@ -3047,7 +3417,7 @@
 
 		if (recon_state->msg_version >= 3) {
 			/* version, compat_version and struct_len */
-			total_len = 2 * sizeof(u8) + sizeof(u32);
+			total_len += 2 * sizeof(u8) + sizeof(u32);
 			struct_v = 2;
 		}
 		/*
@@ -3058,44 +3428,132 @@
 			    sizeof(struct ceph_filelock);
 		rec.v2.flock_len = cpu_to_le32(struct_len);
 
-		struct_len += sizeof(rec.v2);
-		struct_len += sizeof(u32) + pathlen;
+		struct_len += sizeof(u32) + sizeof(rec.v2);
 
 		if (struct_v >= 2)
 			struct_len += sizeof(u64); /* snap_follows */
 
 		total_len += struct_len;
-		err = ceph_pagelist_reserve(pagelist, total_len);
 
-		if (!err) {
-			if (recon_state->msg_version >= 3) {
-				ceph_pagelist_encode_8(pagelist, struct_v);
-				ceph_pagelist_encode_8(pagelist, 1);
-				ceph_pagelist_encode_32(pagelist, struct_len);
-			}
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
-			ceph_locks_to_pagelist(flocks, pagelist,
-					       num_fcntl_locks,
-					       num_flock_locks);
-			if (struct_v >= 2)
-				ceph_pagelist_encode_64(pagelist, snap_follows);
+		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+			err = send_reconnect_partial(recon_state);
+			if (err)
+				goto out_freeflocks;
+			pagelist = recon_state->pagelist;
 		}
+
+		err = ceph_pagelist_reserve(pagelist, total_len);
+		if (err)
+			goto out_freeflocks;
+
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+		if (recon_state->msg_version >= 3) {
+			ceph_pagelist_encode_8(pagelist, struct_v);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, struct_len);
+		}
+		ceph_pagelist_encode_string(pagelist, NULL, 0);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+		ceph_locks_to_pagelist(flocks, pagelist,
+				       num_fcntl_locks, num_flock_locks);
+		if (struct_v >= 2)
+			ceph_pagelist_encode_64(pagelist, snap_follows);
+out_freeflocks:
 		kfree(flocks);
 	} else {
-		size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
-		err = ceph_pagelist_reserve(pagelist, size);
-		if (!err) {
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+		u64 pathbase = 0;
+		int pathlen = 0;
+		char *path = NULL;
+		struct dentry *dentry;
+
+		dentry = d_find_alias(inode);
+		if (dentry) {
+			path = ceph_mdsc_build_path(dentry,
+						&pathlen, &pathbase, 0);
+			dput(dentry);
+			if (IS_ERR(path)) {
+				err = PTR_ERR(path);
+				goto out_err;
+			}
+			rec.v1.pathbase = cpu_to_le64(pathbase);
 		}
+
+		err = ceph_pagelist_reserve(pagelist,
+					    sizeof(u64) + sizeof(u32) +
+					    pathlen + sizeof(rec.v1));
+		if (err) {
+			goto out_freepath;
+		}
+
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+		ceph_pagelist_encode_string(pagelist, path, pathlen);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+out_freepath:
+		ceph_mdsc_free_path(path, pathlen);
 	}
 
-	recon_state->nr_caps++;
-out_free:
-	kfree(path);
-out_dput:
-	dput(dentry);
+out_err:
+	if (err >= 0)
+		recon_state->nr_caps++;
+	return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+			      struct ceph_reconnect_state *recon_state)
+{
+	struct rb_node *p;
+	struct ceph_pagelist *pagelist = recon_state->pagelist;
+	int err = 0;
+
+	if (recon_state->msg_version >= 4) {
+		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+		if (err < 0)
+			goto fail;
+	}
+
+	/*
+	 * snaprealms.  we provide mds with the ino, seq (version), and
+	 * parent for all of our realms.  If the mds has any newer info,
+	 * it will tell us.
+	 */
+	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+		struct ceph_snap_realm *realm =
+		       rb_entry(p, struct ceph_snap_realm, node);
+		struct ceph_mds_snaprealm_reconnect sr_rec;
+
+		if (recon_state->msg_version >= 4) {
+			size_t need = sizeof(u8) * 2 + sizeof(u32) +
+				      sizeof(sr_rec);
+
+			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+				err = send_reconnect_partial(recon_state);
+				if (err)
+					goto fail;
+				pagelist = recon_state->pagelist;
+			}
+
+			err = ceph_pagelist_reserve(pagelist, need);
+			if (err)
+				goto fail;
+
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+		}
+
+		dout(" adding snap realm %llx seq %lld parent %llx\n",
+		     realm->ino, realm->seq, realm->parent_ino);
+		sr_rec.ino = cpu_to_le64(realm->ino);
+		sr_rec.seq = cpu_to_le64(realm->seq);
+		sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+		if (err)
+			goto fail;
+
+		recon_state->nr_realms++;
+	}
+fail:
 	return err;
 }
 
@@ -3116,22 +3574,20 @@
 			       struct ceph_mds_session *session)
 {
 	struct ceph_msg *reply;
-	struct rb_node *p;
 	int mds = session->s_mds;
 	int err = -ENOMEM;
-	int s_nr_caps;
-	struct ceph_pagelist *pagelist;
-	struct ceph_reconnect_state recon_state;
+	struct ceph_reconnect_state recon_state = {
+		.session = session,
+	};
 	LIST_HEAD(dispose);
 
 	pr_info("mds%d reconnect start\n", mds);
 
-	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
-	if (!pagelist)
+	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!recon_state.pagelist)
 		goto fail_nopagelist;
-	ceph_pagelist_init(pagelist);
 
-	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
 	if (!reply)
 		goto fail_nomsg;
 
@@ -3172,65 +3628,90 @@
 	/* replay unsafe requests */
 	replay_unsafe_requests(mdsc, session);
 
+	ceph_early_kick_flushing_caps(mdsc, session);
+
 	down_read(&mdsc->snap_rwsem);
 
-	/* traverse this session's caps */
-	s_nr_caps = session->s_nr_caps;
-	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
 	if (err)
 		goto fail;
 
-	recon_state.nr_caps = 0;
-	recon_state.pagelist = pagelist;
-	if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
 		recon_state.msg_version = 3;
-	else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+		recon_state.allow_multi = true;
+	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+		recon_state.msg_version = 3;
+	} else {
 		recon_state.msg_version = 2;
-	else
-		recon_state.msg_version = 1;
-	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
-	if (err < 0)
-		goto fail;
+	}
+	/* trsaverse this session's caps */
+	err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
 
 	spin_lock(&session->s_cap_lock);
 	session->s_cap_reconnect = 0;
 	spin_unlock(&session->s_cap_lock);
 
-	/*
-	 * snaprealms.  we provide mds with the ino, seq (version), and
-	 * parent for all of our realms.  If the mds has any newer info,
-	 * it will tell us.
-	 */
-	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
-		struct ceph_snap_realm *realm =
-			rb_entry(p, struct ceph_snap_realm, node);
-		struct ceph_mds_snaprealm_reconnect sr_rec;
+	if (err < 0)
+		goto fail;
 
-		dout(" adding snap realm %llx seq %lld parent %llx\n",
-		     realm->ino, realm->seq, realm->parent_ino);
-		sr_rec.ino = cpu_to_le64(realm->ino);
-		sr_rec.seq = cpu_to_le64(realm->seq);
-		sr_rec.parent = cpu_to_le64(realm->parent_ino);
-		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
-		if (err)
+	/* check if all realms can be encoded into current message */
+	if (mdsc->num_snap_realms) {
+		size_t total_len =
+			recon_state.pagelist->length +
+			mdsc->num_snap_realms *
+			sizeof(struct ceph_mds_snaprealm_reconnect);
+		if (recon_state.msg_version >= 4) {
+			/* number of realms */
+			total_len += sizeof(u32);
+			/* version, compat_version and struct_len */
+			total_len += mdsc->num_snap_realms *
+				     (2 * sizeof(u8) + sizeof(u32));
+		}
+		if (total_len > RECONNECT_MAX_SIZE) {
+			if (!recon_state.allow_multi) {
+				err = -ENOSPC;
+				goto fail;
+			}
+			if (recon_state.nr_caps) {
+				err = send_reconnect_partial(&recon_state);
+				if (err)
+					goto fail;
+			}
+			recon_state.msg_version = 5;
+		}
+	}
+
+	err = encode_snap_realms(mdsc, &recon_state);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state.msg_version >= 5) {
+		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+		if (err < 0)
 			goto fail;
 	}
 
-	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
-
-	/* raced with cap release? */
-	if (s_nr_caps != recon_state.nr_caps) {
-		struct page *page = list_first_entry(&pagelist->head,
-						     struct page, lru);
+	if (recon_state.nr_caps || recon_state.nr_realms) {
+		struct page *page =
+			list_first_entry(&recon_state.pagelist->head,
+					struct page, lru);
 		__le32 *addr = kmap_atomic(page);
-		*addr = cpu_to_le32(recon_state.nr_caps);
+		if (recon_state.nr_caps) {
+			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+			*addr = cpu_to_le32(recon_state.nr_caps);
+		} else if (recon_state.msg_version >= 4) {
+			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+		}
 		kunmap_atomic(addr);
 	}
 
-	reply->hdr.data_len = cpu_to_le32(pagelist->length);
-	ceph_msg_data_add_pagelist(reply, pagelist);
+	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+	if (recon_state.msg_version >= 4)
+		reply->hdr.compat_version = cpu_to_le16(4);
 
-	ceph_early_kick_flushing_caps(mdsc, session);
+	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
 
 	ceph_con_send(&session->s_con, reply);
 
@@ -3241,6 +3722,7 @@
 	mutex_unlock(&mdsc->mutex);
 
 	up_read(&mdsc->snap_rwsem);
+	ceph_pagelist_release(recon_state.pagelist);
 	return;
 
 fail:
@@ -3248,7 +3730,7 @@
 	up_read(&mdsc->snap_rwsem);
 	mutex_unlock(&session->s_mutex);
 fail_nomsg:
-	ceph_pagelist_release(pagelist);
+	ceph_pagelist_release(recon_state.pagelist);
 fail_nopagelist:
 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
 	return;
@@ -3286,42 +3768,35 @@
 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
 		     ceph_session_state_name(s->s_state));
 
-		if (i >= newmap->m_num_mds ||
-		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
+		if (i >= newmap->m_num_mds) {
+			/* force close session for stopped mds */
+			get_session(s);
+			__unregister_session(mdsc, s);
+			__wake_requests(mdsc, &s->s_waiting);
+			mutex_unlock(&mdsc->mutex);
+
+			mutex_lock(&s->s_mutex);
+			cleanup_session_requests(mdsc, s);
+			remove_session_caps(s);
+			mutex_unlock(&s->s_mutex);
+
+			ceph_put_mds_session(s);
+
+			mutex_lock(&mdsc->mutex);
+			kick_requests(mdsc, i);
+			continue;
+		}
+
+		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
 			   ceph_mdsmap_get_addr(newmap, i),
 			   sizeof(struct ceph_entity_addr))) {
-			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
-				/* the session never opened, just close it
-				 * out now */
-				get_session(s);
-				__unregister_session(mdsc, s);
-				__wake_requests(mdsc, &s->s_waiting);
-				ceph_put_mds_session(s);
-			} else if (i >= newmap->m_num_mds) {
-				/* force close session for stopped mds */
-				get_session(s);
-				__unregister_session(mdsc, s);
-				__wake_requests(mdsc, &s->s_waiting);
-				kick_requests(mdsc, i);
-				mutex_unlock(&mdsc->mutex);
-
-				mutex_lock(&s->s_mutex);
-				cleanup_session_requests(mdsc, s);
-				remove_session_caps(s);
-				mutex_unlock(&s->s_mutex);
-
-				ceph_put_mds_session(s);
-
-				mutex_lock(&mdsc->mutex);
-			} else {
-				/* just close it */
-				mutex_unlock(&mdsc->mutex);
-				mutex_lock(&s->s_mutex);
-				mutex_lock(&mdsc->mutex);
-				ceph_con_close(&s->s_con);
-				mutex_unlock(&s->s_mutex);
-				s->s_state = CEPH_MDS_SESSION_RESTARTING;
-			}
+			/* just close it */
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&s->s_mutex);
+			mutex_lock(&mdsc->mutex);
+			ceph_con_close(&s->s_con);
+			mutex_unlock(&s->s_mutex);
+			s->s_state = CEPH_MDS_SESSION_RESTARTING;
 		} else if (oldstate == newstate) {
 			continue;  /* nothing new with this mds */
 		}
@@ -3346,7 +3821,7 @@
 				pr_info("mds%d recovery completed\n", s->s_mds);
 			kick_requests(mdsc, i);
 			ceph_kick_flushing_caps(mdsc, s);
-			wake_up_session_caps(s, 1);
+			wake_up_session_caps(s, RECONNECT);
 		}
 	}
 
@@ -3479,8 +3954,9 @@
 	ceph_con_send(&session->s_con, msg);
 
 out:
-	iput(inode);
 	mutex_unlock(&session->s_mutex);
+	/* avoid calling iput_final() in mds dispatch threads */
+	ceph_async_iput(inode);
 	return;
 
 bad:
@@ -3489,31 +3965,33 @@
 }
 
 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
-			      struct inode *inode,
 			      struct dentry *dentry, char action,
 			      u32 seq)
 {
 	struct ceph_msg *msg;
 	struct ceph_mds_lease *lease;
-	int len = sizeof(*lease) + sizeof(u32);
-	int dnamelen = 0;
+	struct inode *dir;
+	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
 
-	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
-	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
-	dnamelen = dentry->d_name.len;
-	len += dnamelen;
+	dout("lease_send_msg identry %p %s to mds%d\n",
+	     dentry, ceph_lease_op_name(action), session->s_mds);
 
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
 	if (!msg)
 		return;
 	lease = msg->front.iov_base;
 	lease->action = action;
-	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
-	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
 	lease->seq = cpu_to_le32(seq);
-	put_unaligned_le32(dnamelen, lease + 1);
-	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
 
+	spin_lock(&dentry->d_lock);
+	dir = d_inode(dentry->d_parent);
+	lease->ino = cpu_to_le64(ceph_ino(dir));
+	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
+
+	put_unaligned_le32(dentry->d_name.len, lease + 1);
+	memcpy((void *)(lease + 1) + 4,
+	       dentry->d_name.name, dentry->d_name.len);
+	spin_unlock(&dentry->d_lock);
 	/*
 	 * if this is a preemptive lease RELEASE, no need to
 	 * flush request stream, since the actual request will
@@ -3545,7 +4023,27 @@
 	mutex_unlock(&mdsc->mutex);
 }
 
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
 
+	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+		return;
+
+	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+		return;
+
+	if (!READ_ONCE(fsc->blacklisted))
+		return;
+
+	if (fsc->last_auto_reconnect &&
+	    time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+		return;
+
+	pr_info("auto reconnect after blacklisted\n");
+	fsc->last_auto_reconnect = jiffies;
+	ceph_force_reconnect(fsc->sb);
+}
 
 /*
  * delayed work -- periodically trim expired leases, renew caps with mds
@@ -3566,7 +4064,6 @@
 	int renew_caps;
 
 	dout("mdsc delayed_work\n");
-	ceph_check_delayed_caps(mdsc);
 
 	mutex_lock(&mdsc->mutex);
 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
@@ -3592,7 +4089,9 @@
 				pr_info("mds%d hung\n", s->s_mds);
 			}
 		}
-		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+		if (s->s_state == CEPH_MDS_SESSION_NEW ||
+		    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+		    s->s_state == CEPH_MDS_SESSION_REJECTED) {
 			/* this mds is failed or recovering, just wait */
 			ceph_put_mds_session(s);
 			continue;
@@ -3614,6 +4113,14 @@
 	}
 	mutex_unlock(&mdsc->mutex);
 
+	ceph_check_delayed_caps(mdsc);
+
+	ceph_queue_cap_reclaim_work(mdsc);
+
+	ceph_trim_snapid_map(mdsc);
+
+	maybe_recover_session(mdsc);
+
 	schedule_delayed(mdsc);
 }
 
@@ -3642,10 +4149,13 @@
 	mdsc->max_sessions = 0;
 	mdsc->stopping = 0;
 	atomic64_set(&mdsc->quotarealms_count, 0);
+	mdsc->quotarealms_inodes = RB_ROOT;
+	mutex_init(&mdsc->quotarealms_inodes_mutex);
 	mdsc->last_snap_seq = 0;
 	init_rwsem(&mdsc->snap_rwsem);
 	mdsc->snap_realms = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->snap_empty);
+	mdsc->num_snap_realms = 0;
 	spin_lock_init(&mdsc->snap_empty_lock);
 	mdsc->last_tid = 0;
 	mdsc->oldest_tid = 0;
@@ -3663,11 +4173,19 @@
 	mdsc->num_cap_flushing = 0;
 	spin_lock_init(&mdsc->cap_dirty_lock);
 	init_waitqueue_head(&mdsc->cap_flushing_wq);
-	spin_lock_init(&mdsc->dentry_lru_lock);
-	INIT_LIST_HEAD(&mdsc->dentry_lru);
+	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+	atomic_set(&mdsc->cap_reclaim_pending, 0);
+
+	spin_lock_init(&mdsc->dentry_list_lock);
+	INIT_LIST_HEAD(&mdsc->dentry_leases);
+	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
 
 	ceph_caps_init(mdsc);
-	ceph_adjust_min_caps(mdsc, fsc->min_caps);
+	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
+
+	spin_lock_init(&mdsc->snapid_map_lock);
+	mdsc->snapid_map_tree = RB_ROOT;
+	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
 
 	init_rwsem(&mdsc->pool_perm_rwsem);
 	mdsc->pool_perm_tree = RB_ROOT;
@@ -3699,6 +4217,7 @@
 		while ((req = __get_oldest_req(mdsc))) {
 			dout("wait_requests timed out on tid %llu\n",
 			     req->r_tid);
+			list_del_init(&req->r_wait);
 			__unregister_request(mdsc, req);
 		}
 	}
@@ -3724,6 +4243,8 @@
 	 * their inode/dcache refs
 	 */
 	ceph_msgr_flush();
+
+	ceph_cleanup_quotarealms_inodes(mdsc);
 }
 
 /*
@@ -3862,8 +4383,10 @@
 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
 	mutex_unlock(&mdsc->mutex);
 
+	ceph_cleanup_snapid_map(mdsc);
 	ceph_cleanup_empty_realms(mdsc);
 
+	cancel_work_sync(&mdsc->cap_reclaim_work);
 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
 
 	dout("stopped\n");
@@ -3881,7 +4404,12 @@
 		session = __ceph_lookup_mds_session(mdsc, mds);
 		if (!session)
 			continue;
+
+		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+			__unregister_session(mdsc, session);
+		__wake_requests(mdsc, &session->s_waiting);
 		mutex_unlock(&mdsc->mutex);
+
 		mutex_lock(&session->s_mutex);
 		__close_session(mdsc, session);
 		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -3890,6 +4418,7 @@
 		}
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
+
 		mutex_lock(&mdsc->mutex);
 		kick_requests(mdsc, mds);
 	}