Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dd7dfdd..f5a3891 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -8,6 +8,7 @@
 #include <linux/vmalloc.h>
 #include <linux/wait.h>
 #include <linux/writeback.h>
+#include <linux/iversion.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -148,11 +149,17 @@
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+			      struct ceph_mount_options *fsopt)
 {
 	spin_lock(&mdsc->caps_list_lock);
-	mdsc->caps_min_count += delta;
-	BUG_ON(mdsc->caps_min_count < 0);
+	mdsc->caps_min_count = fsopt->max_readdir;
+	if (mdsc->caps_min_count < 1024)
+		mdsc->caps_min_count = 1024;
+	mdsc->caps_use_max = fsopt->caps_max;
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_max < mdsc->caps_min_count)
+		mdsc->caps_use_max = mdsc->caps_min_count;
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
@@ -272,6 +279,7 @@
 	if (!err) {
 		BUG_ON(have + alloc != need);
 		ctx->count = need;
+		ctx->used = 0;
 	}
 
 	spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +303,24 @@
 }
 
 void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
-			struct ceph_cap_reservation *ctx)
+			 struct ceph_cap_reservation *ctx)
 {
+	bool reclaim = false;
+	if (!ctx->count)
+		return;
+
 	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
 	spin_lock(&mdsc->caps_list_lock);
 	__ceph_unreserve_caps(mdsc, ctx->count);
 	ctx->count = 0;
+
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max)
+		reclaim = true;
 	spin_unlock(&mdsc->caps_list_lock);
+
+	if (reclaim)
+		ceph_reclaim_caps_nr(mdsc, ctx->used);
 }
 
 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +365,7 @@
 	BUG_ON(list_empty(&mdsc->caps_list));
 
 	ctx->count--;
+	ctx->used++;
 	mdsc->caps_reserve_count--;
 	mdsc->caps_use_count++;
 
@@ -438,37 +458,6 @@
 }
 
 /*
- * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
- */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
-{
-	struct ceph_cap *cap;
-	int mds = -1;
-	struct rb_node *p;
-
-	/* prefer mds with WR|BUFFER|EXCL caps */
-	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
-		cap = rb_entry(p, struct ceph_cap, ci_node);
-		mds = cap->mds;
-		if (cap->issued & (CEPH_CAP_FILE_WR |
-				   CEPH_CAP_FILE_BUFFER |
-				   CEPH_CAP_FILE_EXCL))
-			break;
-	}
-	return mds;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int mds;
-	spin_lock(&ci->i_ceph_lock);
-	mds = __ceph_get_cap_mds(ceph_inode(inode));
-	spin_unlock(&ci->i_ceph_lock);
-	return mds;
-}
-
-/*
  * Called under i_ceph_lock.
  */
 static void __insert_cap_node(struct ceph_inode_info *ci,
@@ -500,12 +489,12 @@
 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
 			       struct ceph_inode_info *ci)
 {
-	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
 
 	ci->i_hold_caps_min = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_min * HZ);
+					    opt->caps_wanted_delay_min * HZ);
 	ci->i_hold_caps_max = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_max * HZ);
+					    opt->caps_wanted_delay_max * HZ);
 	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
 	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
 }
@@ -519,9 +508,9 @@
  *    -> we take mdsc->cap_delay_lock
  */
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
-				struct ceph_inode_info *ci)
+				struct ceph_inode_info *ci,
+				bool set_timeout)
 {
-	__cap_set_timeouts(mdsc, ci);
 	dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
 	     ci->i_ceph_flags, ci->i_hold_caps_max);
 	if (!mdsc->stopping) {
@@ -531,6 +520,8 @@
 				goto no_change;
 			list_del_init(&ci->i_cap_delay_list);
 		}
+		if (set_timeout)
+			__cap_set_timeouts(mdsc, ci);
 		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 no_change:
 		spin_unlock(&mdsc->cap_delay_lock);
@@ -606,7 +597,7 @@
 /*
  * Add a capability under the given MDS session.
  *
- * Caller should hold session snap_rwsem (read) and s_mutex.
+ * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
  *
  * @fmode is the open file mode, if we are opening a file, otherwise
  * it is < 0.  (This is so we can atomically add the cap and add an
@@ -623,6 +614,9 @@
 	struct ceph_cap *cap;
 	int mds = session->s_mds;
 	int actual_wanted;
+	u32 gen;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
 	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
@@ -634,6 +628,10 @@
 	if (fmode >= 0)
 		wanted |= ceph_caps_for_mode(fmode);
 
+	spin_lock(&session->s_gen_ttl_lock);
+	gen = session->s_cap_gen;
+	spin_unlock(&session->s_gen_ttl_lock);
+
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
 		cap = *new_cap;
@@ -655,6 +653,13 @@
 		session->s_nr_caps++;
 		spin_unlock(&session->s_cap_lock);
 	} else {
+		spin_lock(&session->s_cap_lock);
+		list_move_tail(&cap->session_caps, &session->s_caps);
+		spin_unlock(&session->s_cap_lock);
+
+		if (cap->cap_gen < gen)
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+
 		/*
 		 * auth mds of the inode changed. we received the cap export
 		 * message, but still haven't received the cap import message.
@@ -720,7 +725,7 @@
 		dout(" issued %s, mds wanted %s, actual %s, queueing\n",
 		     ceph_cap_string(issued), ceph_cap_string(wanted),
 		     ceph_cap_string(actual_wanted));
-		__cap_delay_requeue(mdsc, ci);
+		__cap_delay_requeue(mdsc, ci, true);
 	}
 
 	if (flags & CEPH_CAP_FLAG_AUTH) {
@@ -746,7 +751,7 @@
 	cap->seq = seq;
 	cap->issue_seq = seq;
 	cap->mseq = mseq;
-	cap->cap_gen = session->s_cap_gen;
+	cap->cap_gen = gen;
 
 	if (fmode >= 0)
 		__ceph_get_fmode(ci, fmode);
@@ -864,8 +869,8 @@
 	int have = ci->i_snap_caps;
 
 	if ((have & mask) == mask) {
-		dout("__ceph_caps_issued_mask %p snap issued %s"
-		     " (mask %s)\n", &ci->vfs_inode,
+		dout("__ceph_caps_issued_mask ino 0x%lx snap issued %s"
+		     " (mask %s)\n", ci->vfs_inode.i_ino,
 		     ceph_cap_string(have),
 		     ceph_cap_string(mask));
 		return 1;
@@ -876,8 +881,8 @@
 		if (!__cap_is_valid(cap))
 			continue;
 		if ((cap->issued & mask) == mask) {
-			dout("__ceph_caps_issued_mask %p cap %p issued %s"
-			     " (mask %s)\n", &ci->vfs_inode, cap,
+			dout("__ceph_caps_issued_mask ino 0x%lx cap %p issued %s"
+			     " (mask %s)\n", ci->vfs_inode.i_ino, cap,
 			     ceph_cap_string(cap->issued),
 			     ceph_cap_string(mask));
 			if (touch)
@@ -888,8 +893,8 @@
 		/* does a combination of caps satisfy mask? */
 		have |= cap->issued;
 		if ((have & mask) == mask) {
-			dout("__ceph_caps_issued_mask %p combo issued %s"
-			     " (mask %s)\n", &ci->vfs_inode,
+			dout("__ceph_caps_issued_mask ino 0x%lx combo issued %s"
+			     " (mask %s)\n", ci->vfs_inode.i_ino,
 			     ceph_cap_string(cap->issued),
 			     ceph_cap_string(mask));
 			if (touch) {
@@ -1030,6 +1035,8 @@
 	list_del_init(&ci->i_snap_realm_item);
 	ci->i_snap_realm_counter++;
 	ci->i_snap_realm = NULL;
+	if (realm->ino == ci->i_vino.ino)
+		realm->inode = NULL;
 	spin_unlock(&realm->inodes_with_caps_lock);
 	ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
 			    realm);
@@ -1051,6 +1058,11 @@
 
 	dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
+	/* remove from inode's cap rbtree, and clear auth cap */
+	rb_erase(&cap->ci_node, &ci->i_caps);
+	if (ci->i_auth_cap == cap)
+		ci->i_auth_cap = NULL;
+
 	/* remove from session list */
 	spin_lock(&session->s_cap_lock);
 	if (session->s_cap_iterator == cap) {
@@ -1074,9 +1086,7 @@
 	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
 		cap->queue_release = 1;
 		if (removed) {
-			list_add_tail(&cap->session_caps,
-				      &session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			removed = 0;
 		}
 	} else {
@@ -1086,11 +1096,6 @@
 
 	spin_unlock(&session->s_cap_lock);
 
-	/* remove from inode list */
-	rb_erase(&cap->ci_node, &ci->i_caps);
-	if (ci->i_auth_cap == cap)
-		ci->i_auth_cap = NULL;
-
 	if (removed)
 		ceph_put_cap(mdsc, cap);
 
@@ -1110,8 +1115,9 @@
 	u64			ino, cid, follows;
 	u64			flush_tid, oldest_flush_tid, size, max_size;
 	u64			xattr_version;
+	u64			change_attr;
 	struct ceph_buffer	*xattr_buf;
-	struct timespec64	atime, mtime, ctime;
+	struct timespec64	atime, mtime, ctime, btime;
 	int			op, caps, wanted, dirty;
 	u32			seq, issue_seq, mseq, time_warp_seq;
 	u32			flags;
@@ -1132,7 +1138,6 @@
 	struct ceph_msg *msg;
 	void *p;
 	size_t extra_len;
-	struct timespec64 zerotime = {0};
 	struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
 
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
@@ -1217,15 +1222,10 @@
 	/* pool namespace (version 8) (mds always ignores this) */
 	ceph_encode_32(&p, 0);
 
-	/*
-	 * btime and change_attr (version 9)
-	 *
-	 * We just zero these out for now, as the MDS ignores them unless
-	 * the requisite feature flags are set (which we don't do yet).
-	 */
-	ceph_encode_timespec64(p, &zerotime);
+	/* btime and change_attr (version 9) */
+	ceph_encode_timespec64(p, &arg->btime);
 	p += sizeof(struct ceph_timespec);
-	ceph_encode_64(&p, 0);
+	ceph_encode_64(&p, arg->change_attr);
 
 	/* Advisory flags (version 10) */
 	ceph_encode_32(&p, arg->flags);
@@ -1235,20 +1235,22 @@
 }
 
 /*
- * Queue cap releases when an inode is dropped from our cache.  Since
- * inode is about to be destroyed, there is no need for i_ceph_lock.
+ * Queue cap releases when an inode is dropped from our cache.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct ceph_inode_info *ci)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct rb_node *p;
 
+	/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
+	 * may call __ceph_caps_issued_mask() on a freeing inode. */
+	spin_lock(&ci->i_ceph_lock);
 	p = rb_first(&ci->i_caps);
 	while (p) {
 		struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
 		p = rb_next(p);
 		__ceph_remove_cap(cap, true);
 	}
+	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -1258,10 +1260,6 @@
  * Make note of max_size reported/requested from mds, revoked caps
  * that have now been implemented.
  *
- * Make half-hearted attempt ot to invalidate page cache if we are
- * dropping RDCACHE.  Note that this will leave behind locked pages
- * that we'll then need to deal with elsewhere.
- *
  * Return non-zero if delayed release, or we experienced an error
  * such that the caller should requeue + retry later.
  *
@@ -1269,12 +1267,13 @@
  * caller should hold snap_rwsem (read), s_mutex.
  */
 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-		      int op, bool sync, int used, int want, int retain,
+		      int op, int flags, int used, int want, int retain,
 		      int flushing, u64 flush_tid, u64 oldest_flush_tid)
 	__releases(cap->ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = cap->ci;
 	struct inode *inode = &ci->vfs_inode;
+	struct ceph_buffer *old_blob = NULL;
 	struct cap_msg_args arg;
 	int held, revoking;
 	int wake = 0;
@@ -1339,7 +1338,7 @@
 	ci->i_requested_max_size = arg.max_size;
 
 	if (flushing & CEPH_CAP_XATTR_EXCL) {
-		__ceph_build_xattrs_blob(ci);
+		old_blob = __ceph_build_xattrs_blob(ci);
 		arg.xattr_version = ci->i_xattrs.version;
 		arg.xattr_buf = ci->i_xattrs.blob;
 	} else {
@@ -1349,6 +1348,8 @@
 	arg.mtime = inode->i_mtime;
 	arg.atime = inode->i_atime;
 	arg.ctime = inode->i_ctime;
+	arg.btime = ci->i_btime;
+	arg.change_attr = inode_peek_iversion_raw(inode);
 
 	arg.op = op;
 	arg.caps = cap->implemented;
@@ -1365,15 +1366,24 @@
 	arg.mode = inode->i_mode;
 
 	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
-	if (list_empty(&ci->i_cap_snaps))
-		arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
-	else
-		arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
-	if (sync)
-		arg.flags |= CEPH_CLIENT_CAPS_SYNC;
+	if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
+	    !list_empty(&ci->i_cap_snaps)) {
+		struct ceph_cap_snap *capsnap;
+		list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
+			if (capsnap->cap_flush.tid)
+				break;
+			if (capsnap->need_flush) {
+				flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
+				break;
+			}
+		}
+	}
+	arg.flags = flags;
 
 	spin_unlock(&ci->i_ceph_lock);
 
+	ceph_buffer_put(old_blob);
+
 	ret = send_cap_msg(&arg);
 	if (ret < 0) {
 		dout("error sending cap msg, must requeue %p\n", inode);
@@ -1408,6 +1418,8 @@
 	arg.atime = capsnap->atime;
 	arg.mtime = capsnap->mtime;
 	arg.ctime = capsnap->ctime;
+	arg.btime = capsnap->btime;
+	arg.change_attr = capsnap->change_attr;
 
 	arg.op = CEPH_CAP_OP_FLUSHSNAP;
 	arg.caps = capsnap->issued;
@@ -1575,10 +1587,8 @@
 	}
 
 	// make sure flushsnap messages are sent in proper order.
-	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
 		__kick_flushing_caps(mdsc, session, ci, 0);
-		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
-	}
 
 	__ceph_flush_snaps(ci, session);
 out:
@@ -1647,7 +1657,7 @@
 	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
 	    (mask & CEPH_CAP_FILE_BUFFER))
 		dirty |= I_DIRTY_DATASYNC;
-	__cap_delay_requeue(mdsc, ci);
+	__cap_delay_requeue(mdsc, ci, true);
 	return dirty;
 }
 
@@ -1708,11 +1718,11 @@
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
  *
- * Called under i_ceph_lock.
+ * Called under i_ceph_lock. Returns the flush tid.
  */
-static int __mark_caps_flushing(struct inode *inode,
+static u64 __mark_caps_flushing(struct inode *inode,
 				struct ceph_mds_session *session, bool wake,
-				u64 *flush_tid, u64 *oldest_flush_tid)
+				u64 *oldest_flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1751,8 +1761,7 @@
 
 	list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
 
-	*flush_tid = cf->tid;
-	return flushing;
+	return cf->tid;
 }
 
 /*
@@ -1853,14 +1862,17 @@
 			retain |= CEPH_CAP_ANY;       /* be greedy */
 		} else if (S_ISDIR(inode->i_mode) &&
 			   (issued & CEPH_CAP_FILE_SHARED) &&
-			    __ceph_dir_is_complete(ci)) {
+			   __ceph_dir_is_complete(ci)) {
 			/*
 			 * If a directory is complete, we want to keep
 			 * the exclusive cap. So that MDS does not end up
 			 * revoking the shared cap on every create/unlink
 			 * operation.
 			 */
-			want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			if (IS_RDONLY(inode))
+				want = CEPH_CAP_ANY_SHARED;
+			else
+				want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
 			retain |= want;
 		} else {
 
@@ -1968,8 +1980,7 @@
 			goto ack;
 
 		/* things we might delay */
-		if ((cap->issued & ~retain) == 0 &&
-		    cap->mds_wanted == want)
+		if ((cap->issued & ~retain) == 0)
 			continue;     /* nope, all good */
 
 		if (no_delay)
@@ -1988,11 +1999,6 @@
 		}
 
 ack:
-		if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-			dout(" skipping %p I_NOFLUSH set\n", inode);
-			continue;
-		}
-
 		if (session && session != cap->session) {
 			dout("oops, wrong session %p mutex\n", session);
 			mutex_unlock(&session->s_mutex);
@@ -2018,10 +2024,8 @@
 		if (cap == ci->i_auth_cap &&
 		    (ci->i_ceph_flags &
 		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
-			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
 				__kick_flushing_caps(mdsc, session, ci, 0);
-				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
-			}
 			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
 				__ceph_flush_snaps(ci, session);
 
@@ -2042,9 +2046,9 @@
 		}
 
 		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
-			flushing = __mark_caps_flushing(inode, session, false,
-							&flush_tid,
-							&oldest_flush_tid);
+			flushing = ci->i_dirty_caps;
+			flush_tid = __mark_caps_flushing(inode, session, false,
+							 &oldest_flush_tid);
 		} else {
 			flushing = 0;
 			flush_tid = 0;
@@ -2057,7 +2061,7 @@
 		sent++;
 
 		/* __send_cap drops i_ceph_lock */
-		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
+		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0,
 				cap_used, want, retain, flushing,
 				flush_tid, oldest_flush_tid);
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
@@ -2065,7 +2069,7 @@
 
 	/* Reschedule delayed caps release if we delayed anything */
 	if (delayed)
-		__cap_delay_requeue(mdsc, ci);
+		__cap_delay_requeue(mdsc, ci, false);
 
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -2091,18 +2095,12 @@
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-		spin_unlock(&ci->i_ceph_lock);
-		dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
-		goto out;
-	}
+retry_locked:
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
-		int used = __ceph_caps_used(ci);
-		int want = __ceph_caps_wanted(ci);
 		int delayed;
 
-		if (!session || session != cap->session) {
+		if (session != cap->session) {
 			spin_unlock(&ci->i_ceph_lock);
 			if (session)
 				mutex_unlock(&session->s_mutex);
@@ -2115,17 +2113,30 @@
 			goto out;
 		}
 
-		flushing = __mark_caps_flushing(inode, session, true,
-						&flush_tid, &oldest_flush_tid);
+		if (ci->i_ceph_flags &
+		    (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
+			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
+				__kick_flushing_caps(mdsc, session, ci, 0);
+			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+				__ceph_flush_snaps(ci, session);
+			goto retry_locked;
+		}
+
+		flushing = ci->i_dirty_caps;
+		flush_tid = __mark_caps_flushing(inode, session, true,
+						 &oldest_flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
-		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
-				used, want, (cap->issued | cap->implemented),
-				flushing, flush_tid, oldest_flush_tid);
+		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+				     CEPH_CLIENT_CAPS_SYNC,
+				     __ceph_caps_used(ci),
+				     __ceph_caps_wanted(ci),
+				     (cap->issued | cap->implemented),
+				     flushing, flush_tid, oldest_flush_tid);
 
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
+			__cap_delay_requeue(mdsc, ci, true);
 			spin_unlock(&ci->i_ceph_lock);
 		}
 	} else {
@@ -2212,38 +2223,45 @@
 
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
+	struct ceph_file_info *fi = file->private_data;
 	struct inode *inode = file->f_mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 flush_tid;
-	int ret;
+	int ret, err;
 	int dirty;
 
 	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
 
 	ret = file_write_and_wait_range(file, start, end);
-	if (ret < 0)
-		goto out;
-
 	if (datasync)
 		goto out;
 
-	inode_lock(inode);
-
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
-	ret = unsafe_request_wait(inode);
+	err = unsafe_request_wait(inode);
 
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * can recover size and mtime, so we don't need to
 	 * wait for that)
 	 */
-	if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
-		ret = wait_event_interruptible(ci->i_cap_wq,
+	if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+		err = wait_event_interruptible(ci->i_cap_wq,
 					caps_are_flushed(inode, flush_tid));
 	}
-	inode_unlock(inode);
+
+	if (err < 0)
+		ret = err;
+
+	if (errseq_check(&ci->i_meta_err, READ_ONCE(fi->meta_err))) {
+		spin_lock(&file->f_lock);
+		err = errseq_check_and_advance(&ci->i_meta_err,
+					       &fi->meta_err);
+		spin_unlock(&file->f_lock);
+		if (err < 0)
+			ret = err;
+	}
 out:
 	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
 	return ret;
@@ -2293,6 +2311,16 @@
 	struct ceph_cap_flush *cf;
 	int ret;
 	u64 first_tid = 0;
+	u64 last_snap_flush = 0;
+
+	ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+
+	list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
+		if (!cf->caps) {
+			last_snap_flush = cf->tid;
+			break;
+		}
+	}
 
 	list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid < first_tid)
@@ -2311,10 +2339,13 @@
 			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
 			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
 			ci->i_ceph_flags |= CEPH_I_NODELAY;
+
 			ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-					  false, __ceph_caps_used(ci),
+					 (cf->tid < last_snap_flush ?
+					  CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
+					  __ceph_caps_used(ci),
 					  __ceph_caps_wanted(ci),
-					  cap->issued | cap->implemented,
+					  (cap->issued | cap->implemented),
 					  cf->caps, cf->tid, oldest_flush_tid);
 			if (ret) {
 				pr_err("kick_flushing_caps: error sending "
@@ -2383,7 +2414,12 @@
 		 */
 		if ((cap->issued & ci->i_flushing_caps) !=
 		    ci->i_flushing_caps) {
-			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			/* encode_caps_cb() also will reset these sequence
+			 * numbers. make sure sequence numbers in cap flush
+			 * message match later reconnect message */
+			cap->seq = 0;
+			cap->issue_seq = 0;
+			cap->mseq = 0;
 			__kick_flushing_caps(mdsc, session, ci,
 					     oldest_flush_tid);
 		} else {
@@ -2417,7 +2453,6 @@
 			continue;
 		}
 		if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
-			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 			__kick_flushing_caps(mdsc, session, ci,
 					     oldest_flush_tid);
 		}
@@ -2445,7 +2480,6 @@
 		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
-		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
 		spin_unlock(&ci->i_ceph_lock);
 	} else {
@@ -2492,11 +2526,21 @@
  * to (when applicable), and check against max_size here as well.
  * Note that caller is responsible for ensuring max_size increases are
  * requested from the MDS.
+ *
+ * Returns 0 if caps were not able to be acquired (yet), a 1 if they were,
+ * or a negative error code.
+ *
+ * FIXME: how does a 0 return differ from -EAGAIN?
  */
-static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    loff_t endoff, bool nonblock, int *got, int *err)
+enum {
+	NON_BLOCKING	= 1,
+	CHECK_FILELOCK	= 2,
+};
+
+static int try_get_cap_refs(struct inode *inode, int need, int want,
+			    loff_t endoff, int flags, int *got)
 {
-	struct inode *inode = &ci->vfs_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
@@ -2509,13 +2553,19 @@
 again:
 	spin_lock(&ci->i_ceph_lock);
 
+	if ((flags & CHECK_FILELOCK) &&
+	    (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
+		dout("try_get_cap_refs %p error filelock\n", inode);
+		ret = -EIO;
+		goto out_unlock;
+	}
+
 	/* make sure file is actually open */
 	file_wanted = __ceph_caps_file_wanted(ci);
 	if ((file_wanted & need) != need) {
 		dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
 		     ceph_cap_string(need), ceph_cap_string(file_wanted));
-		*err = -EBADF;
-		ret = 1;
+		ret = -EBADF;
 		goto out_unlock;
 	}
 
@@ -2536,10 +2586,8 @@
 		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
-			if (endoff > ci->i_requested_max_size) {
-				*err = -EAGAIN;
-				ret = 1;
-			}
+			if (endoff > ci->i_requested_max_size)
+				ret = -EAGAIN;
 			goto out_unlock;
 		}
 		/*
@@ -2573,9 +2621,8 @@
 					 * we can not call down_read() when
 					 * task isn't in TASK_RUNNING state
 					 */
-					if (nonblock) {
-						*err = -EAGAIN;
-						ret = 1;
+					if (flags & NON_BLOCKING) {
+						ret = -EAGAIN;
 						goto out_unlock;
 					}
 
@@ -2604,8 +2651,7 @@
 		if (session_readonly) {
 			dout("get_cap_refs %p needed %s but mds%d readonly\n",
 			     inode, ceph_cap_string(need), ci->i_auth_cap->mds);
-			*err = -EROFS;
-			ret = 1;
+			ret = -EROFS;
 			goto out_unlock;
 		}
 
@@ -2614,16 +2660,14 @@
 			if (READ_ONCE(mdsc->fsc->mount_state) ==
 			    CEPH_MOUNT_SHUTDOWN) {
 				dout("get_cap_refs %p forced umount\n", inode);
-				*err = -EIO;
-				ret = 1;
+				ret = -EIO;
 				goto out_unlock;
 			}
 			mds_wanted = __ceph_caps_mds_wanted(ci, false);
 			if (need & ~(mds_wanted & need)) {
 				dout("get_cap_refs %p caps were dropped"
 				     " (session killed?)\n", inode);
-				*err = -ESTALE;
-				ret = 1;
+				ret = -ESTALE;
 				goto out_unlock;
 			}
 			if (!(file_wanted & ~mds_wanted))
@@ -2671,25 +2715,20 @@
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+int ceph_try_get_caps(struct inode *inode, int need, int want,
+		      bool nonblock, int *got)
 {
-	int ret, err = 0;
+	int ret;
 
 	BUG_ON(need & ~CEPH_CAP_FILE_RD);
-	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
-	ret = ceph_pool_perm_check(ci, need);
+	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
+	ret = ceph_pool_perm_check(inode, need);
 	if (ret < 0)
 		return ret;
 
-	ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
-	if (ret) {
-		if (err == -EAGAIN) {
-			ret = 0;
-		} else if (err < 0) {
-			ret = err;
-		}
-	}
-	return ret;
+	ret = try_get_cap_refs(inode, need, want, 0,
+			       (nonblock ? NON_BLOCKING : 0), got);
+	return ret == -EAGAIN ? 0 : ret;
 }
 
 /*
@@ -2697,34 +2736,40 @@
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_get_caps(struct file *filp, int need, int want,
 		  loff_t endoff, int *got, struct page **pinned_page)
 {
-	int _got, ret, err = 0;
+	struct ceph_file_info *fi = filp->private_data;
+	struct inode *inode = file_inode(filp);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	int ret, _got, flags;
 
-	ret = ceph_pool_perm_check(ci, need);
+	ret = ceph_pool_perm_check(inode, need);
 	if (ret < 0)
 		return ret;
 
+	if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+	    fi->filp_gen != READ_ONCE(fsc->filp_gen))
+		return -EBADF;
+
 	while (true) {
 		if (endoff > 0)
-			check_max_size(&ci->vfs_inode, endoff);
+			check_max_size(inode, endoff);
 
-		err = 0;
+		flags = atomic_read(&fi->num_locks) ? CHECK_FILELOCK : 0;
 		_got = 0;
-		ret = try_get_cap_refs(ci, need, want, endoff,
-				       false, &_got, &err);
-		if (ret) {
-			if (err == -EAGAIN)
-				continue;
-			if (err < 0)
-				ret = err;
-		} else {
+		ret = try_get_cap_refs(inode, need, want, endoff,
+				       flags, &_got);
+		if (ret == -EAGAIN)
+			continue;
+		if (!ret) {
 			DEFINE_WAIT_FUNC(wait, woken_wake_function);
 			add_wait_queue(&ci->i_cap_wq, &wait);
 
-			while (!try_get_cap_refs(ci, need, want, endoff,
-						 true, &_got, &err)) {
+			flags |= NON_BLOCKING;
+			while (!(ret = try_get_cap_refs(inode, need, want,
+							endoff, flags, &_got))) {
 				if (signal_pending(current)) {
 					ret = -ERESTARTSYS;
 					break;
@@ -2733,16 +2778,21 @@
 			}
 
 			remove_wait_queue(&ci->i_cap_wq, &wait);
-
-			if (err == -EAGAIN)
+			if (ret == -EAGAIN)
 				continue;
-			if (err < 0)
-				ret = err;
 		}
+
+		if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+		    fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
+			if (ret >= 0 && _got)
+				ceph_put_cap_refs(ci, _got);
+			return -EBADF;
+		}
+
 		if (ret < 0) {
-			if (err == -ESTALE) {
+			if (ret == -ESTALE) {
 				/* session was killed, try renew caps */
-				ret = ceph_renew_caps(&ci->vfs_inode);
+				ret = ceph_renew_caps(inode);
 				if (ret == 0)
 					continue;
 			}
@@ -2751,9 +2801,9 @@
 
 		if (ci->i_inline_version != CEPH_INLINE_NONE &&
 		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-		    i_size_read(&ci->vfs_inode) > 0) {
+		    i_size_read(inode) > 0) {
 			struct page *page =
-				find_get_page(ci->vfs_inode.i_mapping, 0);
+				find_get_page(inode->i_mapping, 0);
 			if (page) {
 				if (PageUptodate(page)) {
 					*pinned_page = page;
@@ -2772,7 +2822,7 @@
 			 * getattr request will bring inline data into
 			 * page cache
 			 */
-			ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+			ret = __ceph_do_getattr(inode, NULL,
 						CEPH_STAT_CAP_INLINE_DATA,
 						true);
 			if (ret < 0)
@@ -2974,8 +3024,10 @@
 	}
 	if (complete_capsnap)
 		wake_up_all(&ci->i_cap_wq);
-	while (put-- > 0)
-		iput(inode);
+	while (put-- > 0) {
+		/* avoid calling iput_final() in osd dispatch threads */
+		ceph_async_iput(inode);
+	}
 }
 
 /*
@@ -3020,8 +3072,10 @@
 	bool dirstat_valid;
 	u64 nfiles;
 	u64 nsubdirs;
+	u64 change_attr;
 	/* currently issued */
 	int issued;
+	struct timespec64 btime;
 };
 
 /*
@@ -3045,7 +3099,8 @@
 	int used, wanted, dirty;
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
-	int check_caps = 0;
+	unsigned char check_caps = 0;
+	bool was_stale = cap->cap_gen < session->s_cap_gen;
 	bool wake = false;
 	bool writeback = false;
 	bool queue_trunc = false;
@@ -3060,21 +3115,6 @@
 
 
 	/*
-	 * auth mds of the inode changed. we received the cap export message,
-	 * but still haven't received the cap import message. handle_cap_export
-	 * updated the new auth MDS' cap.
-	 *
-	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
-	 * that was sent before the cap import message. So don't remove caps.
-	 */
-	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
-		WARN_ON(cap != ci->i_auth_cap);
-		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
-		seq = cap->seq;
-		newcaps |= cap->issued;
-	}
-
-	/*
 	 * If CACHE is being revoked, and we have no dirty buffers,
 	 * try to invalidate (once).  (If there are dirty buffers, we
 	 * will invalidate _after_ writeback.)
@@ -3093,17 +3133,38 @@
 		}
 	}
 
+	if (was_stale)
+		cap->issued = cap->implemented = CEPH_CAP_PIN;
+
+	/*
+	 * auth mds of the inode changed. we received the cap export message,
+	 * but still haven't received the cap import message. handle_cap_export
+	 * updated the new auth MDS' cap.
+	 *
+	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
+	 * that was sent before the cap import message. So don't remove caps.
+	 */
+	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+		WARN_ON(cap != ci->i_auth_cap);
+		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
+		seq = cap->seq;
+		newcaps |= cap->issued;
+	}
+
 	/* side effects now are allowed */
 	cap->cap_gen = session->s_cap_gen;
 	cap->seq = seq;
 
 	__check_cap_issue(ci, cap, newcaps);
 
+	inode_set_max_iversion_raw(inode, extra_info->change_attr);
+
 	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
 	    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(grant->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
+		ci->i_btime = extra_info->btime;
 		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
 		     from_kuid(&init_user_ns, inode->i_uid),
 		     from_kgid(&init_user_ns, inode->i_gid));
@@ -3130,6 +3191,7 @@
 			ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
 			ci->i_xattrs.version = version;
 			ceph_forget_all_cached_acls(inode);
+			ceph_security_invalidate_secctx(inode);
 		}
 	}
 
@@ -3197,13 +3259,20 @@
 	     ceph_cap_string(wanted),
 	     ceph_cap_string(used),
 	     ceph_cap_string(dirty));
-	if (wanted != le32_to_cpu(grant->wanted)) {
-		dout("mds wanted %s -> %s\n",
-		     ceph_cap_string(le32_to_cpu(grant->wanted)),
-		     ceph_cap_string(wanted));
-		/* imported cap may not have correct mds_wanted */
-		if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
-			check_caps = 1;
+
+	if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
+	    (wanted & ~(cap->mds_wanted | newcaps))) {
+		/*
+		 * If mds is importing cap, prior cap messages that update
+		 * 'wanted' may get dropped by mds (migrate seq mismatch).
+		 *
+		 * We don't send cap message to update 'wanted' if what we
+		 * want are already issued. If mds revokes caps, cap message
+		 * that releases caps also tells mds what we want. But if
+		 * caps got revoked by mds forcedly (session stale). We may
+		 * haven't told mds what we want.
+		 */
+		check_caps = 1;
 	}
 
 	/* revocation, grant, or no-op? */
@@ -3536,9 +3605,9 @@
 		goto out_unlock;
 
 	if (target < 0) {
-		__ceph_remove_cap(cap, false);
-		if (!ci->i_auth_cap)
+		if (cap->mds_wanted | cap->issued)
 			ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+		__ceph_remove_cap(cap, false);
 		goto out_unlock;
 	}
 
@@ -3566,7 +3635,6 @@
 			tcap->cap_id = t_cap_id;
 			tcap->seq = t_seq - 1;
 			tcap->issue_seq = t_seq - 1;
-			tcap->mseq = t_mseq;
 			tcap->issued |= issued;
 			tcap->implemented |= issued;
 			if (cap == ci->i_auth_cap)
@@ -3818,17 +3886,19 @@
 		}
 	}
 
-	if (msg_version >= 11) {
+	if (msg_version >= 9) {
 		struct ceph_timespec *btime;
-		u64 change_attr;
-		u32 flags;
 
-		/* version >= 9 */
 		if (p + sizeof(*btime) > end)
 			goto bad;
 		btime = p;
+		ceph_decode_timespec64(&extra_info.btime, btime);
 		p += sizeof(*btime);
-		ceph_decode_64_safe(&p, end, change_attr, bad);
+		ceph_decode_64_safe(&p, end, extra_info.change_attr, bad);
+	}
+
+	if (msg_version >= 11) {
+		u32 flags;
 		/* version >= 10 */
 		ceph_decode_32_safe(&p, end, flags, bad);
 		/* version >= 11 */
@@ -3860,12 +3930,10 @@
 			cap->seq = seq;
 			cap->issue_seq = seq;
 			spin_lock(&session->s_cap_lock);
-			list_add_tail(&cap->session_caps,
-					&session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			spin_unlock(&session->s_cap_lock);
 		}
-		goto flush_cap_releases;
+		goto done;
 	}
 
 	/* these will work even if we don't have a cap yet */
@@ -3935,7 +4003,13 @@
 		       ceph_cap_op_name(op));
 	}
 
-	goto done;
+done:
+	mutex_unlock(&session->s_mutex);
+done_unlocked:
+	ceph_put_string(extra_info.pool_ns);
+	/* avoid calling iput_final() in mds dispatch threads */
+	ceph_async_iput(inode);
+	return;
 
 flush_cap_releases:
 	/*
@@ -3943,14 +4017,8 @@
 	 * along for the mds (who clearly thinks we still have this
 	 * cap).
 	 */
-	ceph_send_cap_releases(mdsc, session);
-
-done:
-	mutex_unlock(&session->s_mutex);
-done_unlocked:
-	iput(inode);
-	ceph_put_string(extra_info.pool_ns);
-	return;
+	ceph_flush_cap_releases(mdsc, session);
+	goto done;
 
 bad:
 	pr_err("ceph_handle_caps: corrupt message\n");
@@ -3986,7 +4054,8 @@
 		if (inode) {
 			dout("check_delayed_caps on %p\n", inode);
 			ceph_check_caps(ci, flags, NULL);
-			iput(inode);
+			/* avoid calling iput_final() in tick thread */
+			ceph_async_iput(inode);
 		}
 	}
 	spin_unlock(&mdsc->cap_delay_lock);
@@ -4055,7 +4124,7 @@
 }
 
 /*
- * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
+ * For a soon-to-be unlinked file, drop the LINK caps. If it
  * looks like the link count will hit 0, drop any other caps (other
  * than PIN) we don't specifically want (due to the file still being
  * open).