Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 4055ab4..c074075 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -13,6 +13,7 @@
 #include <linux/posix_acl.h>
 #include <linux/random.h>
 #include <linux/sort.h>
+#include <linux/iversion.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -33,9 +34,7 @@
 
 static const struct inode_operations ceph_symlink_iops;
 
-static void ceph_invalidate_work(struct work_struct *work);
-static void ceph_writeback_work(struct work_struct *work);
-static void ceph_vmtruncate_work(struct work_struct *work);
+static void ceph_inode_work(struct work_struct *work);
 
 /*
  * find or create an inode, given the ceph ino number
@@ -44,6 +43,7 @@
 {
 	ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
 	inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
+	inode_set_iversion_raw(inode, 0);
 	return 0;
 }
 
@@ -497,7 +497,7 @@
 	ci->i_wrbuffer_ref = 0;
 	ci->i_wrbuffer_ref_head = 0;
 	atomic_set(&ci->i_filelock_ref, 0);
-	atomic_set(&ci->i_shared_gen, 0);
+	atomic_set(&ci->i_shared_gen, 1);
 	ci->i_rdcache_gen = 0;
 	ci->i_rdcache_revoking = 0;
 
@@ -509,35 +509,39 @@
 	INIT_LIST_HEAD(&ci->i_snap_realm_item);
 	INIT_LIST_HEAD(&ci->i_snap_flush_item);
 
-	INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
-	INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
-
-	INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
+	INIT_WORK(&ci->i_work, ceph_inode_work);
+	ci->i_work_mask = 0;
+	memset(&ci->i_btime, '\0', sizeof(ci->i_btime));
 
 	ceph_fscache_inode_init(ci);
 
+	ci->i_meta_err = 0;
+
 	return &ci->vfs_inode;
 }
 
-static void ceph_i_callback(struct rcu_head *head)
+void ceph_free_inode(struct inode *inode)
 {
-	struct inode *inode = container_of(head, struct inode, i_rcu);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 
+	kfree(ci->i_symlink);
 	kmem_cache_free(ceph_inode_cachep, ci);
 }
 
-void ceph_destroy_inode(struct inode *inode)
+void ceph_evict_inode(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_inode_frag *frag;
 	struct rb_node *n;
 
-	dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
+	dout("evict_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
+
+	truncate_inode_pages_final(&inode->i_data);
+	clear_inode(inode);
 
 	ceph_fscache_unregister_inode_cookie(ci);
 
-	ceph_queue_caps_release(inode);
+	__ceph_remove_caps(ci);
 
 	if (__ceph_has_any_quota(ci))
 		ceph_adjust_quota_realms_count(inode, false);
@@ -548,20 +552,24 @@
 	 */
 	if (ci->i_snap_realm) {
 		struct ceph_mds_client *mdsc =
-			ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
-		struct ceph_snap_realm *realm = ci->i_snap_realm;
-
-		dout(" dropping residual ref to snap realm %p\n", realm);
-		spin_lock(&realm->inodes_with_caps_lock);
-		list_del_init(&ci->i_snap_realm_item);
-		ci->i_snap_realm = NULL;
-		if (realm->ino == ci->i_vino.ino)
-			realm->inode = NULL;
-		spin_unlock(&realm->inodes_with_caps_lock);
-		ceph_put_snap_realm(mdsc, realm);
+					ceph_inode_to_client(inode)->mdsc;
+		if (ceph_snap(inode) == CEPH_NOSNAP) {
+			struct ceph_snap_realm *realm = ci->i_snap_realm;
+			dout(" dropping residual ref to snap realm %p\n",
+			     realm);
+			spin_lock(&realm->inodes_with_caps_lock);
+			list_del_init(&ci->i_snap_realm_item);
+			ci->i_snap_realm = NULL;
+			if (realm->ino == ci->i_vino.ino)
+				realm->inode = NULL;
+			spin_unlock(&realm->inodes_with_caps_lock);
+			ceph_put_snap_realm(mdsc, realm);
+		} else {
+			ceph_put_snapid_map(mdsc, ci->i_snapid_map);
+			ci->i_snap_realm = NULL;
+		}
 	}
 
-	kfree(ci->i_symlink);
 	while ((n = rb_first(&ci->i_fragtree)) != NULL) {
 		frag = rb_entry(n, struct ceph_inode_frag, node);
 		rb_erase(n, &ci->i_fragtree);
@@ -576,18 +584,6 @@
 		ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
 	ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
-
-	call_rcu(&inode->i_rcu, ceph_i_callback);
-}
-
-int ceph_drop_inode(struct inode *inode)
-{
-	/*
-	 * Positve dentry and corresponding inode are always accompanied
-	 * in MDS reply. So no need to keep inode in the cache after
-	 * dropping all its aliases.
-	 */
-	return 1;
 }
 
 static inline blkcnt_t calc_inode_blocks(u64 size)
@@ -742,6 +738,7 @@
 	int issued, new_issued, info_caps;
 	struct timespec64 mtime, atime, ctime;
 	struct ceph_buffer *xattr_blob = NULL;
+	struct ceph_buffer *old_blob = NULL;
 	struct ceph_string *pool_ns = NULL;
 	struct ceph_cap *new_cap = NULL;
 	int err = 0;
@@ -776,6 +773,9 @@
 		pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
 						     iinfo->pool_ns_len);
 
+	if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map)
+		ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode));
+
 	spin_lock(&ci->i_ceph_lock);
 
 	/*
@@ -794,13 +794,21 @@
 	     le64_to_cpu(info->version) > (ci->i_version & ~1)))
 		new_version = true;
 
+	/* Update change_attribute */
+	inode_set_max_iversion_raw(inode, iinfo->change_attr);
+
 	__ceph_caps_issued(ci, &issued);
 	issued |= __ceph_caps_dirty(ci);
 	new_issued = ~issued & info_caps;
 
 	/* update inode */
 	inode->i_rdev = le32_to_cpu(info->rdev);
-	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	/* directories have fl_stripe_unit set to zero */
+	if (le32_to_cpu(info->layout.fl_stripe_unit))
+		inode->i_blkbits =
+			fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	else
+		inode->i_blkbits = CEPH_BLOCK_SHIFT;
 
 	__ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);
 
@@ -812,6 +820,8 @@
 		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
 		     from_kuid(&init_user_ns, inode->i_uid),
 		     from_kgid(&init_user_ns, inode->i_gid));
+		ceph_decode_timespec64(&ci->i_btime, &iinfo->btime);
+		ceph_decode_timespec64(&ci->i_snap_btime, &iinfo->snap_btime);
 	}
 
 	if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
@@ -869,6 +879,7 @@
 			ci->i_rbytes = le64_to_cpu(info->rbytes);
 			ci->i_rfiles = le64_to_cpu(info->rfiles);
 			ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
+			ci->i_dir_pin = iinfo->dir_pin;
 			ceph_decode_timespec64(&ci->i_rctime, &info->rctime);
 		}
 	}
@@ -878,13 +889,14 @@
 	if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
 	    le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
 		if (ci->i_xattrs.blob)
-			ceph_buffer_put(ci->i_xattrs.blob);
+			old_blob = ci->i_xattrs.blob;
 		ci->i_xattrs.blob = xattr_blob;
 		if (xattr_blob)
 			memcpy(ci->i_xattrs.blob->vec.iov_base,
 			       iinfo->xattr_data, iinfo->xattr_len);
 		ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
 		ceph_forget_all_cached_acls(inode);
+		ceph_security_invalidate_secctx(inode);
 		xattr_blob = NULL;
 	}
 
@@ -899,6 +911,7 @@
 	case S_IFBLK:
 	case S_IFCHR:
 	case S_IFSOCK:
+		inode->i_blkbits = PAGE_SHIFT;
 		init_special_inode(inode, inode->i_mode, inode->i_rdev);
 		inode->i_op = &ceph_file_iops;
 		break;
@@ -1017,70 +1030,48 @@
 out:
 	if (new_cap)
 		ceph_put_cap(mdsc, new_cap);
-	if (xattr_blob)
-		ceph_buffer_put(xattr_blob);
+	ceph_buffer_put(old_blob);
+	ceph_buffer_put(xattr_blob);
 	ceph_put_string(pool_ns);
 	return err;
 }
 
 /*
- * caller should hold session s_mutex.
+ * caller should hold session s_mutex and dentry->d_lock.
  */
-static void update_dentry_lease(struct dentry *dentry,
-				struct ceph_mds_reply_lease *lease,
-				struct ceph_mds_session *session,
-				unsigned long from_time,
-				struct ceph_vino *tgt_vino,
-				struct ceph_vino *dir_vino)
+static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
+				  struct ceph_mds_reply_lease *lease,
+				  struct ceph_mds_session *session,
+				  unsigned long from_time,
+				  struct ceph_mds_session **old_lease_session)
 {
 	struct ceph_dentry_info *di = ceph_dentry(dentry);
 	long unsigned duration = le32_to_cpu(lease->duration_ms);
 	long unsigned ttl = from_time + (duration * HZ) / 1000;
 	long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
-	struct inode *dir;
-	struct ceph_mds_session *old_lease_session = NULL;
 
-	/*
-	 * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
-	 * we expect a negative dentry.
-	 */
-	if (!tgt_vino && d_really_is_positive(dentry))
-		return;
-
-	if (tgt_vino && (d_really_is_negative(dentry) ||
-			!ceph_ino_compare(d_inode(dentry), tgt_vino)))
-		return;
-
-	spin_lock(&dentry->d_lock);
 	dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
 	     dentry, duration, ttl);
 
-	dir = d_inode(dentry->d_parent);
-
-	/* make sure parent matches dir_vino */
-	if (!ceph_ino_compare(dir, dir_vino))
-		goto out_unlock;
-
 	/* only track leases on regular dentries */
 	if (ceph_snap(dir) != CEPH_NOSNAP)
-		goto out_unlock;
+		return;
 
 	di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
-
-	if (duration == 0)
-		goto out_unlock;
+	if (duration == 0) {
+		__ceph_dentry_dir_lease_touch(di);
+		return;
+	}
 
 	if (di->lease_gen == session->s_cap_gen &&
 	    time_before(ttl, di->time))
-		goto out_unlock;  /* we already have a newer lease. */
+		return;  /* we already have a newer lease. */
 
 	if (di->lease_session && di->lease_session != session) {
-		old_lease_session = di->lease_session;
+		*old_lease_session = di->lease_session;
 		di->lease_session = NULL;
 	}
 
-	ceph_dentry_lru_touch(dentry);
-
 	if (!di->lease_session)
 		di->lease_session = ceph_get_mds_session(session);
 	di->lease_gen = session->s_cap_gen;
@@ -1088,6 +1079,64 @@
 	di->lease_renew_after = half_ttl;
 	di->lease_renew_from = 0;
 	di->time = ttl;
+
+	__ceph_dentry_lease_touch(di);
+}
+
+static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
+					struct ceph_mds_reply_lease *lease,
+					struct ceph_mds_session *session,
+					unsigned long from_time)
+{
+	struct ceph_mds_session *old_lease_session = NULL;
+	spin_lock(&dentry->d_lock);
+	__update_dentry_lease(dir, dentry, lease, session, from_time,
+			      &old_lease_session);
+	spin_unlock(&dentry->d_lock);
+	if (old_lease_session)
+		ceph_put_mds_session(old_lease_session);
+}
+
+/*
+ * update dentry lease without having parent inode locked
+ */
+static void update_dentry_lease_careful(struct dentry *dentry,
+					struct ceph_mds_reply_lease *lease,
+					struct ceph_mds_session *session,
+					unsigned long from_time,
+					char *dname, u32 dname_len,
+					struct ceph_vino *pdvino,
+					struct ceph_vino *ptvino)
+
+{
+	struct inode *dir;
+	struct ceph_mds_session *old_lease_session = NULL;
+
+	spin_lock(&dentry->d_lock);
+	/* make sure dentry's name matches target */
+	if (dentry->d_name.len != dname_len ||
+	    memcmp(dentry->d_name.name, dname, dname_len))
+		goto out_unlock;
+
+	dir = d_inode(dentry->d_parent);
+	/* make sure parent matches dvino */
+	if (!ceph_ino_compare(dir, pdvino))
+		goto out_unlock;
+
+	/* make sure dentry's inode matches target. NULL ptvino means that
+	 * we expect a negative dentry */
+	if (ptvino) {
+		if (d_really_is_negative(dentry))
+			goto out_unlock;
+		if (!ceph_ino_compare(d_inode(dentry), ptvino))
+			goto out_unlock;
+	} else {
+		if (d_really_is_positive(dentry))
+			goto out_unlock;
+	}
+
+	__update_dentry_lease(dir, dentry, lease, session,
+			      from_time, &old_lease_session);
 out_unlock:
 	spin_unlock(&dentry->d_lock);
 	if (old_lease_session)
@@ -1098,8 +1147,9 @@
  * splice a dentry to an inode.
  * caller must hold directory i_mutex for this to be safe.
  */
-static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
+static int splice_dentry(struct dentry **pdn, struct inode *in)
 {
+	struct dentry *dn = *pdn;
 	struct dentry *realdn;
 
 	BUG_ON(d_inode(dn));
@@ -1132,28 +1182,23 @@
 	if (IS_ERR(realdn)) {
 		pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
 		       PTR_ERR(realdn), dn, in, ceph_vinop(in));
-		dn = realdn;
-		/*
-		 * Caller should release 'dn' in the case of error.
-		 * If 'req->r_dentry' is passed to this function,
-		 * caller should leave 'req->r_dentry' untouched.
-		 */
-		goto out;
-	} else if (realdn) {
+		return PTR_ERR(realdn);
+	}
+
+	if (realdn) {
 		dout("dn %p (%d) spliced with %p (%d) "
 		     "inode %p ino %llx.%llx\n",
 		     dn, d_count(dn),
 		     realdn, d_count(realdn),
 		     d_inode(realdn), ceph_vinop(d_inode(realdn)));
 		dput(dn);
-		dn = realdn;
+		*pdn = realdn;
 	} else {
 		BUG_ON(!ceph_dentry(dn));
 		dout("dn %p attached to %p ino %llx.%llx\n",
 		     dn, d_inode(dn), ceph_vinop(d_inode(dn)));
 	}
-out:
-	return dn;
+	return 0;
 }
 
 /*
@@ -1200,7 +1245,9 @@
 			WARN_ON_ONCE(1);
 		}
 
-		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
+		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
+		    test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
+		    !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
 			struct qstr dname;
 			struct dentry *dn, *parent;
 
@@ -1338,7 +1385,12 @@
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
 
-			dn = req->r_old_dentry;  /* use old_dentry */
+			/* swap r_dentry and r_old_dentry in case that
+			 * splice_dentry() gets called later. This is safe
+			 * because no other place will use them */
+			req->r_dentry = req->r_old_dentry;
+			req->r_old_dentry = dn;
+			dn = req->r_dentry;
 		}
 
 		/* null dentry? */
@@ -1351,10 +1403,9 @@
 			} else if (have_lease) {
 				if (d_unhashed(dn))
 					d_add(dn, NULL);
-				update_dentry_lease(dn, rinfo->dlease,
-						    session,
-						    req->r_request_started,
-						    NULL, &dvino);
+				update_dentry_lease(dir, dn,
+						    rinfo->dlease, session,
+						    req->r_request_started);
 			}
 			goto done;
 		}
@@ -1363,12 +1414,10 @@
 		if (d_really_is_negative(dn)) {
 			ceph_dir_clear_ordered(dir);
 			ihold(in);
-			dn = splice_dentry(dn, in);
-			if (IS_ERR(dn)) {
-				err = PTR_ERR(dn);
+			err = splice_dentry(&req->r_dentry, in);
+			if (err < 0)
 				goto done;
-			}
-			req->r_dentry = dn;  /* may have spliced */
+			dn = req->r_dentry;  /* may have spliced */
 		} else if (d_really_is_positive(dn) && d_inode(dn) != in) {
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
@@ -1378,52 +1427,41 @@
 		}
 
 		if (have_lease) {
-			tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
-			tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
-			update_dentry_lease(dn, rinfo->dlease, session,
-					    req->r_request_started,
-					    &tvino, &dvino);
+			update_dentry_lease(dir, dn,
+					    rinfo->dlease, session,
+					    req->r_request_started);
 		}
 		dout(" final dn %p\n", dn);
 	} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
 		    req->r_op == CEPH_MDS_OP_MKSNAP) &&
+	           test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
 		   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
-		struct dentry *dn = req->r_dentry;
 		struct inode *dir = req->r_parent;
 
 		/* fill out a snapdir LOOKUPSNAP dentry */
-		BUG_ON(!dn);
 		BUG_ON(!dir);
 		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
-		dout(" linking snapped dir %p to dn %p\n", in, dn);
+		BUG_ON(!req->r_dentry);
+		dout(" linking snapped dir %p to dn %p\n", in, req->r_dentry);
 		ceph_dir_clear_ordered(dir);
 		ihold(in);
-		dn = splice_dentry(dn, in);
-		if (IS_ERR(dn)) {
-			err = PTR_ERR(dn);
+		err = splice_dentry(&req->r_dentry, in);
+		if (err < 0)
 			goto done;
-		}
-		req->r_dentry = dn;  /* may have spliced */
-	} else if (rinfo->head->is_dentry) {
+	} else if (rinfo->head->is_dentry && req->r_dentry) {
+		/* parent inode is not locked, be carefull */
 		struct ceph_vino *ptvino = NULL;
-
-		if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) ||
-		    le32_to_cpu(rinfo->dlease->duration_ms)) {
-			dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
-			dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
-
-			if (rinfo->head->is_target) {
-				tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
-				tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
-				ptvino = &tvino;
-			}
-
-			update_dentry_lease(req->r_dentry, rinfo->dlease,
-				session, req->r_request_started, ptvino,
-				&dvino);
-		} else {
-			dout("%s: no dentry lease or dir cap\n", __func__);
+		dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
+		dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
+		if (rinfo->head->is_target) {
+			tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
+			tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
+			ptvino = &tvino;
 		}
+		update_dentry_lease_careful(req->r_dentry, rinfo->dlease,
+					    session, req->r_request_started,
+					    rinfo->dname, rinfo->dname_len,
+					    &dvino, ptvino);
 	}
 done:
 	dout("fill_trace done err=%d\n", err);
@@ -1461,7 +1499,8 @@
 			pr_err("fill_inode badness on %p got %d\n", in, rc);
 			err = rc;
 		}
-		iput(in);
+		/* avoid calling iput_final() in mds dispatch threads */
+		ceph_async_iput(in);
 	}
 
 	return err;
@@ -1584,7 +1623,7 @@
 	/* FIXME: release caps/leases if error occurs */
 	for (i = 0; i < rinfo->dir_nr; i++) {
 		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
-		struct ceph_vino tvino, dvino;
+		struct ceph_vino tvino;
 
 		dname.name = rde->name;
 		dname.len = rde->name_len;
@@ -1659,39 +1698,35 @@
 				 &req->r_caps_reservation);
 		if (ret < 0) {
 			pr_err("fill_inode badness on %p\n", in);
-			if (d_really_is_negative(dn))
-				iput(in);
+			if (d_really_is_negative(dn)) {
+				/* avoid calling iput_final() in mds
+				 * dispatch threads */
+				ceph_async_iput(in);
+			}
 			d_drop(dn);
 			err = ret;
 			goto next_item;
 		}
 
 		if (d_really_is_negative(dn)) {
-			struct dentry *realdn;
-
 			if (ceph_security_xattr_deadlock(in)) {
 				dout(" skip splicing dn %p to inode %p"
 				     " (security xattr deadlock)\n", dn, in);
-				iput(in);
+				ceph_async_iput(in);
 				skipped++;
 				goto next_item;
 			}
 
-			realdn = splice_dentry(dn, in);
-			if (IS_ERR(realdn)) {
-				err = PTR_ERR(realdn);
-				d_drop(dn);
-				dn = NULL;
+			err = splice_dentry(&dn, in);
+			if (err < 0)
 				goto next_item;
-			}
-			dn = realdn;
 		}
 
 		ceph_dentry(dn)->offset = rde->offset;
 
-		dvino = ceph_vino(d_inode(parent));
-		update_dentry_lease(dn, rde->lease, req->r_session,
-				    req->r_request_started, &tvino, &dvino);
+		update_dentry_lease(d_inode(parent), dn,
+				    rde->lease, req->r_session,
+				    req->r_request_started);
 
 		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
 			ret = fill_readdir_cache(d_inode(parent), dn,
@@ -1700,8 +1735,7 @@
 				err = ret;
 		}
 next_item:
-		if (dn)
-			dput(dn);
+		dput(dn);
 	}
 out:
 	if (err == 0 && skipped == 0) {
@@ -1730,56 +1764,86 @@
 }
 
 /*
+ * Put reference to inode, but avoid calling iput_final() in current thread.
+ * iput_final() may wait for reahahead pages. The wait can cause deadlock in
+ * some contexts.
+ */
+void ceph_async_iput(struct inode *inode)
+{
+	if (!inode)
+		return;
+	for (;;) {
+		if (atomic_add_unless(&inode->i_count, -1, 1))
+			break;
+		if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+			       &ceph_inode(inode)->i_work))
+			break;
+		/* queue work failed, i_count must be at least 2 */
+	}
+}
+
+/*
  * Write back inode data in a worker thread.  (This can't be done
  * in the message handler context.)
  */
 void ceph_queue_writeback(struct inode *inode)
 {
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
+
 	ihold(inode);
-	if (queue_work(ceph_inode_to_client(inode)->wb_wq,
-		       &ceph_inode(inode)->i_wb_work)) {
+	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+		       &ci->i_work)) {
 		dout("ceph_queue_writeback %p\n", inode);
 	} else {
-		dout("ceph_queue_writeback %p failed\n", inode);
+		dout("ceph_queue_writeback %p already queued, mask=%lx\n",
+		     inode, ci->i_work_mask);
 		iput(inode);
 	}
 }
 
-static void ceph_writeback_work(struct work_struct *work)
-{
-	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-						  i_wb_work);
-	struct inode *inode = &ci->vfs_inode;
-
-	dout("writeback %p\n", inode);
-	filemap_fdatawrite(&inode->i_data);
-	iput(inode);
-}
-
 /*
  * queue an async invalidation
  */
 void ceph_queue_invalidate(struct inode *inode)
 {
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
+
 	ihold(inode);
-	if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
-		       &ceph_inode(inode)->i_pg_inv_work)) {
+	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+		       &ceph_inode(inode)->i_work)) {
 		dout("ceph_queue_invalidate %p\n", inode);
 	} else {
-		dout("ceph_queue_invalidate %p failed\n", inode);
+		dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
+		     inode, ci->i_work_mask);
 		iput(inode);
 	}
 }
 
 /*
- * Invalidate inode pages in a worker thread.  (This can't be done
- * in the message handler context.)
+ * Queue an async vmtruncate.  If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
  */
-static void ceph_invalidate_work(struct work_struct *work)
+void ceph_queue_vmtruncate(struct inode *inode)
 {
-	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-						  i_pg_inv_work);
-	struct inode *inode = &ci->vfs_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
+
+	ihold(inode);
+	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+		       &ci->i_work)) {
+		dout("ceph_queue_vmtruncate %p\n", inode);
+	} else {
+		dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
+		     inode, ci->i_work_mask);
+		iput(inode);
+	}
+}
+
+static void ceph_do_invalidate_pages(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	u32 orig_gen;
 	int check = 0;
@@ -1831,44 +1895,6 @@
 out:
 	if (check)
 		ceph_check_caps(ci, 0, NULL);
-	iput(inode);
-}
-
-
-/*
- * called by trunc_wq;
- *
- * We also truncate in a separate thread as well.
- */
-static void ceph_vmtruncate_work(struct work_struct *work)
-{
-	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-						  i_vmtruncate_work);
-	struct inode *inode = &ci->vfs_inode;
-
-	dout("vmtruncate_work %p\n", inode);
-	__ceph_do_pending_vmtruncate(inode);
-	iput(inode);
-}
-
-/*
- * Queue an async vmtruncate.  If we fail to queue work, we will handle
- * the truncation the next time we call __ceph_do_pending_vmtruncate.
- */
-void ceph_queue_vmtruncate(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-
-	ihold(inode);
-
-	if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
-		       &ci->i_vmtruncate_work)) {
-		dout("ceph_queue_vmtruncate %p\n", inode);
-	} else {
-		dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
-		     inode, ci->i_truncate_pending);
-		iput(inode);
-	}
 }
 
 /*
@@ -1932,6 +1958,25 @@
 	wake_up_all(&ci->i_cap_wq);
 }
 
+static void ceph_inode_work(struct work_struct *work)
+{
+	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+						 i_work);
+	struct inode *inode = &ci->vfs_inode;
+
+	if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
+		dout("writeback %p\n", inode);
+		filemap_fdatawrite(&inode->i_data);
+	}
+	if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask))
+		ceph_do_invalidate_pages(inode);
+
+	if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
+		__ceph_do_pending_vmtruncate(inode);
+
+	iput(inode);
+}
+
 /*
  * symlinks
  */
@@ -1945,7 +1990,7 @@
 int __ceph_setattr(struct inode *inode, struct iattr *attr)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	const unsigned int ia_valid = attr->ia_valid;
+	unsigned int ia_valid = attr->ia_valid;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_cap_flush *prealloc_cf;
@@ -2050,6 +2095,26 @@
 				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
+	if (ia_valid & ATTR_SIZE) {
+		dout("setattr %p size %lld -> %lld\n", inode,
+		     inode->i_size, attr->ia_size);
+		if ((issued & CEPH_CAP_FILE_EXCL) &&
+		    attr->ia_size > inode->i_size) {
+			i_size_write(inode, attr->ia_size);
+			inode->i_blocks = calc_inode_blocks(attr->ia_size);
+			ci->i_reported_size = attr->ia_size;
+			dirtied |= CEPH_CAP_FILE_EXCL;
+			ia_valid |= ATTR_MTIME;
+		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
+			   attr->ia_size != inode->i_size) {
+			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
+			req->r_args.setattr.old_size =
+				cpu_to_le64(inode->i_size);
+			mask |= CEPH_SETATTR_SIZE;
+			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
+		}
+	}
 	if (ia_valid & ATTR_MTIME) {
 		dout("setattr %p mtime %lld.%ld -> %lld.%ld\n", inode,
 		     inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
@@ -2072,25 +2137,6 @@
 				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
-	if (ia_valid & ATTR_SIZE) {
-		dout("setattr %p size %lld -> %lld\n", inode,
-		     inode->i_size, attr->ia_size);
-		if ((issued & CEPH_CAP_FILE_EXCL) &&
-		    attr->ia_size > inode->i_size) {
-			i_size_write(inode, attr->ia_size);
-			inode->i_blocks = calc_inode_blocks(attr->ia_size);
-			ci->i_reported_size = attr->ia_size;
-			dirtied |= CEPH_CAP_FILE_EXCL;
-		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
-			   attr->ia_size != inode->i_size) {
-			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
-			req->r_args.setattr.old_size =
-				cpu_to_le64(inode->i_size);
-			mask |= CEPH_SETATTR_SIZE;
-			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
-				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
-		}
-	}
 
 	/* these do nothing */
 	if (ia_valid & ATTR_CTIME) {
@@ -2255,42 +2301,82 @@
 	return err;
 }
 
+/* Craft a mask of needed caps given a set of requested statx attrs. */
+static int statx_to_caps(u32 want)
+{
+	int mask = 0;
+
+	if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME|STATX_BTIME))
+		mask |= CEPH_CAP_AUTH_SHARED;
+
+	if (want & (STATX_NLINK|STATX_CTIME))
+		mask |= CEPH_CAP_LINK_SHARED;
+
+	if (want & (STATX_ATIME|STATX_MTIME|STATX_CTIME|STATX_SIZE|
+		    STATX_BLOCKS))
+		mask |= CEPH_CAP_FILE_SHARED;
+
+	if (want & (STATX_CTIME))
+		mask |= CEPH_CAP_XATTR_SHARED;
+
+	return mask;
+}
+
 /*
- * Get all attributes.  Hopefully somedata we'll have a statlite()
- * and can limit the fields we require to be accurate.
+ * Get all the attributes. If we have sufficient caps for the requested attrs,
+ * then we can avoid talking to the MDS at all.
  */
 int ceph_getattr(const struct path *path, struct kstat *stat,
 		 u32 request_mask, unsigned int flags)
 {
 	struct inode *inode = d_inode(path->dentry);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int err;
+	u32 valid_mask = STATX_BASIC_STATS;
+	int err = 0;
 
-	err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
-	if (!err) {
-		generic_fillattr(inode, stat);
-		stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
-		if (ceph_snap(inode) != CEPH_NOSNAP)
-			stat->dev = ceph_snap(inode);
-		else
-			stat->dev = 0;
-		if (S_ISDIR(inode->i_mode)) {
-			if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
-						RBYTES))
-				stat->size = ci->i_rbytes;
-			else
-				stat->size = ci->i_files + ci->i_subdirs;
-			stat->blocks = 0;
-			stat->blksize = 65536;
-			/*
-			 * Some applications rely on the number of st_nlink
-			 * value on directories to be either 0 (if unlinked)
-			 * or 2 + number of subdirectories.
-			 */
-			if (stat->nlink == 1)
-				/* '.' + '..' + subdirs */
-				stat->nlink = 1 + 1 + ci->i_subdirs;
-		}
+	/* Skip the getattr altogether if we're asked not to sync */
+	if (!(flags & AT_STATX_DONT_SYNC)) {
+		err = ceph_do_getattr(inode, statx_to_caps(request_mask),
+				      flags & AT_STATX_FORCE_SYNC);
+		if (err)
+			return err;
 	}
+
+	generic_fillattr(inode, stat);
+	stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
+
+	/*
+	 * btime on newly-allocated inodes is 0, so if this is still set to
+	 * that, then assume that it's not valid.
+	 */
+	if (ci->i_btime.tv_sec || ci->i_btime.tv_nsec) {
+		stat->btime = ci->i_btime;
+		valid_mask |= STATX_BTIME;
+	}
+
+	if (ceph_snap(inode) == CEPH_NOSNAP)
+		stat->dev = inode->i_sb->s_dev;
+	else
+		stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
+
+	if (S_ISDIR(inode->i_mode)) {
+		if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
+					RBYTES))
+			stat->size = ci->i_rbytes;
+		else
+			stat->size = ci->i_files + ci->i_subdirs;
+		stat->blocks = 0;
+		stat->blksize = 65536;
+		/*
+		 * Some applications rely on the number of st_nlink
+		 * value on directories to be either 0 (if unlinked)
+		 * or 2 + number of subdirectories.
+		 */
+		if (stat->nlink == 1)
+			/* '.' + '..' + subdirs */
+			stat->nlink = 1 + 1 + ci->i_subdirs;
+	}
+
+	stat->result_mask = request_mask & valid_mask;
 	return err;
 }