Update Linux to v5.4.2
Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bc43c82..a516329 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -20,6 +20,8 @@
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
+
/*
* A cluster of MDS (metadata server) daemons is responsible for
* managing the file system namespace (the directory hierarchy and
@@ -46,13 +48,17 @@
*/
struct ceph_reconnect_state {
- int nr_caps;
+ struct ceph_mds_session *session;
+ int nr_caps, nr_realms;
struct ceph_pagelist *pagelist;
unsigned msg_version;
+ bool allow_multi;
};
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
+static void ceph_cap_reclaim_work(struct work_struct *work);
static const struct ceph_connection_operations mds_con_ops;
@@ -61,6 +67,29 @@
* mds reply parsing
*/
+static int parse_reply_info_quota(void **p, void *end,
+ struct ceph_mds_reply_info_in *info)
+{
+ u8 struct_v, struct_compat;
+ u32 struct_len;
+
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ /* struct_v is expected to be >= 1. we only
+ * understand encoding with struct_compat == 1. */
+ if (!struct_v || struct_compat != 1)
+ goto bad;
+ ceph_decode_32_safe(p, end, struct_len, bad);
+ ceph_decode_need(p, end, struct_len, bad);
+ end = *p + struct_len;
+ ceph_decode_64_safe(p, end, info->max_bytes, bad);
+ ceph_decode_64_safe(p, end, info->max_files, bad);
+ *p = end;
+ return 0;
+bad:
+ return -EIO;
+}
+
/*
* parse individual inode info
*/
@@ -68,8 +97,24 @@
struct ceph_mds_reply_info_in *info,
u64 features)
{
- int err = -EIO;
+ int err = 0;
+ u8 struct_v = 0;
+ if (features == (u64)-1) {
+ u32 struct_len;
+ u8 struct_compat;
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ /* struct_v is expected to be >= 1. we only understand
+ * encoding with struct_compat == 1. */
+ if (!struct_v || struct_compat != 1)
+ goto bad;
+ ceph_decode_32_safe(p, end, struct_len, bad);
+ ceph_decode_need(p, end, struct_len, bad);
+ end = *p + struct_len;
+ }
+
+ ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
info->in = *p;
*p += sizeof(struct ceph_mds_reply_inode) +
sizeof(*info->in->fragtree.splits) *
@@ -80,62 +125,160 @@
info->symlink = *p;
*p += info->symlink_len;
- if (features & CEPH_FEATURE_DIRLAYOUTHASH)
- ceph_decode_copy_safe(p, end, &info->dir_layout,
- sizeof(info->dir_layout), bad);
- else
- memset(&info->dir_layout, 0, sizeof(info->dir_layout));
-
+ ceph_decode_copy_safe(p, end, &info->dir_layout,
+ sizeof(info->dir_layout), bad);
ceph_decode_32_safe(p, end, info->xattr_len, bad);
ceph_decode_need(p, end, info->xattr_len, bad);
info->xattr_data = *p;
*p += info->xattr_len;
- if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+ if (features == (u64)-1) {
+ /* inline data */
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
ceph_decode_need(p, end, info->inline_len, bad);
info->inline_data = *p;
*p += info->inline_len;
- } else
- info->inline_version = CEPH_INLINE_NONE;
-
- if (features & CEPH_FEATURE_MDS_QUOTA) {
- u8 struct_v, struct_compat;
- u32 struct_len;
-
- /*
- * both struct_v and struct_compat are expected to be >= 1
- */
- ceph_decode_8_safe(p, end, struct_v, bad);
- ceph_decode_8_safe(p, end, struct_compat, bad);
- if (!struct_v || !struct_compat)
- goto bad;
- ceph_decode_32_safe(p, end, struct_len, bad);
- ceph_decode_need(p, end, struct_len, bad);
- ceph_decode_64_safe(p, end, info->max_bytes, bad);
- ceph_decode_64_safe(p, end, info->max_files, bad);
- } else {
- info->max_bytes = 0;
- info->max_files = 0;
- }
-
- info->pool_ns_len = 0;
- info->pool_ns_data = NULL;
- if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+ /* quota */
+ err = parse_reply_info_quota(p, end, info);
+ if (err < 0)
+ goto out_bad;
+ /* pool namespace */
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
if (info->pool_ns_len > 0) {
ceph_decode_need(p, end, info->pool_ns_len, bad);
info->pool_ns_data = *p;
*p += info->pool_ns_len;
}
- }
+ /* btime */
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+
+ /* change attribute */
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
+
+ /* dir pin */
+ if (struct_v >= 2) {
+ ceph_decode_32_safe(p, end, info->dir_pin, bad);
+ } else {
+ info->dir_pin = -ENODATA;
+ }
+
+ /* snapshot birth time, remains zero for v<=2 */
+ if (struct_v >= 3) {
+ ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
+ ceph_decode_copy(p, &info->snap_btime,
+ sizeof(info->snap_btime));
+ } else {
+ memset(&info->snap_btime, 0, sizeof(info->snap_btime));
+ }
+
+ *p = end;
+ } else {
+ if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+ ceph_decode_64_safe(p, end, info->inline_version, bad);
+ ceph_decode_32_safe(p, end, info->inline_len, bad);
+ ceph_decode_need(p, end, info->inline_len, bad);
+ info->inline_data = *p;
+ *p += info->inline_len;
+ } else
+ info->inline_version = CEPH_INLINE_NONE;
+
+ if (features & CEPH_FEATURE_MDS_QUOTA) {
+ err = parse_reply_info_quota(p, end, info);
+ if (err < 0)
+ goto out_bad;
+ } else {
+ info->max_bytes = 0;
+ info->max_files = 0;
+ }
+
+ info->pool_ns_len = 0;
+ info->pool_ns_data = NULL;
+ if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+ ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
+ if (info->pool_ns_len > 0) {
+ ceph_decode_need(p, end, info->pool_ns_len, bad);
+ info->pool_ns_data = *p;
+ *p += info->pool_ns_len;
+ }
+ }
+
+ if (features & CEPH_FEATURE_FS_BTIME) {
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
+ }
+
+ info->dir_pin = -ENODATA;
+ /* info->snap_btime remains zero */
+ }
return 0;
bad:
+ err = -EIO;
+out_bad:
return err;
}
+static int parse_reply_info_dir(void **p, void *end,
+ struct ceph_mds_reply_dirfrag **dirfrag,
+ u64 features)
+{
+ if (features == (u64)-1) {
+ u8 struct_v, struct_compat;
+ u32 struct_len;
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ /* struct_v is expected to be >= 1. we only understand
+ * encoding whose struct_compat == 1. */
+ if (!struct_v || struct_compat != 1)
+ goto bad;
+ ceph_decode_32_safe(p, end, struct_len, bad);
+ ceph_decode_need(p, end, struct_len, bad);
+ end = *p + struct_len;
+ }
+
+ ceph_decode_need(p, end, sizeof(**dirfrag), bad);
+ *dirfrag = *p;
+ *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
+ if (unlikely(*p > end))
+ goto bad;
+ if (features == (u64)-1)
+ *p = end;
+ return 0;
+bad:
+ return -EIO;
+}
+
+static int parse_reply_info_lease(void **p, void *end,
+ struct ceph_mds_reply_lease **lease,
+ u64 features)
+{
+ if (features == (u64)-1) {
+ u8 struct_v, struct_compat;
+ u32 struct_len;
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ /* struct_v is expected to be >= 1. we only understand
+ * encoding whose struct_compat == 1. */
+ if (!struct_v || struct_compat != 1)
+ goto bad;
+ ceph_decode_32_safe(p, end, struct_len, bad);
+ ceph_decode_need(p, end, struct_len, bad);
+ end = *p + struct_len;
+ }
+
+ ceph_decode_need(p, end, sizeof(**lease), bad);
+ *lease = *p;
+ *p += sizeof(**lease);
+ if (features == (u64)-1)
+ *p = end;
+ return 0;
+bad:
+ return -EIO;
+}
+
/*
* parse a normal reply, which may contain a (dir+)dentry and/or a
* target inode.
@@ -151,20 +294,18 @@
if (err < 0)
goto out_bad;
- if (unlikely(*p + sizeof(*info->dirfrag) > end))
- goto bad;
- info->dirfrag = *p;
- *p += sizeof(*info->dirfrag) +
- sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
- if (unlikely(*p > end))
- goto bad;
+ err = parse_reply_info_dir(p, end, &info->dirfrag, features);
+ if (err < 0)
+ goto out_bad;
ceph_decode_32_safe(p, end, info->dname_len, bad);
ceph_decode_need(p, end, info->dname_len, bad);
info->dname = *p;
*p += info->dname_len;
- info->dlease = *p;
- *p += sizeof(*info->dlease);
+
+ err = parse_reply_info_lease(p, end, &info->dlease, features);
+ if (err < 0)
+ goto out_bad;
}
if (info->head->is_target) {
@@ -187,20 +328,16 @@
/*
* parse readdir results
*/
-static int parse_reply_info_dir(void **p, void *end,
+static int parse_reply_info_readdir(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
u32 num, i = 0;
int err;
- info->dir_dir = *p;
- if (*p + sizeof(*info->dir_dir) > end)
- goto bad;
- *p += sizeof(*info->dir_dir) +
- sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
- if (*p > end)
- goto bad;
+ err = parse_reply_info_dir(p, end, &info->dir_dir, features);
+ if (err < 0)
+ goto out_bad;
ceph_decode_need(p, end, sizeof(num) + 2, bad);
num = ceph_decode_32(p);
@@ -226,15 +363,16 @@
while (num) {
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
/* dentry */
- ceph_decode_need(p, end, sizeof(u32)*2, bad);
- rde->name_len = ceph_decode_32(p);
+ ceph_decode_32_safe(p, end, rde->name_len, bad);
ceph_decode_need(p, end, rde->name_len, bad);
rde->name = *p;
*p += rde->name_len;
dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
- rde->lease = *p;
- *p += sizeof(struct ceph_mds_reply_lease);
+ /* dentry lease */
+ err = parse_reply_info_lease(p, end, &rde->lease, features);
+ if (err)
+ goto out_bad;
/* inode */
err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
@@ -246,8 +384,8 @@
}
done:
- if (*p != end)
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
bad:
@@ -268,12 +406,10 @@
goto bad;
info->filelock_reply = *p;
- *p += sizeof(*info->filelock_reply);
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -285,19 +421,23 @@
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
- if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+ if (features == (u64)-1 ||
+ (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
+ /* Malformed reply? */
if (*p == end) {
info->has_create_ino = false;
} else {
info->has_create_ino = true;
- info->ino = ceph_decode_64(p);
+ ceph_decode_64_safe(p, end, info->ino, bad);
}
+ } else {
+ if (*p != end)
+ goto bad;
}
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -314,7 +454,7 @@
if (op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features);
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
- return parse_reply_info_dir(p, end, info, features);
+ return parse_reply_info_readdir(p, end, info, features);
else if (op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features);
else
@@ -426,15 +566,9 @@
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
int mds)
{
- struct ceph_mds_session *session;
-
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return NULL;
- session = mdsc->sessions[mds];
- dout("lookup_mds_session %p %d\n", session,
- refcount_read(&session->s_ref));
- get_session(session);
- return session;
+ return get_session(mdsc->sessions[mds]);
}
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -498,7 +632,7 @@
ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
spin_lock_init(&s->s_gen_ttl_lock);
- s->s_cap_gen = 0;
+ s->s_cap_gen = 1;
s->s_cap_ttl = jiffies - 1;
spin_lock_init(&s->s_cap_lock);
@@ -506,7 +640,6 @@
s->s_renew_seq = 0;
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
- s->s_trim_caps = 0;
refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
@@ -514,6 +647,8 @@
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
+ INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s;
@@ -539,6 +674,7 @@
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
+ s->s_state = 0;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
@@ -569,11 +705,12 @@
ceph_msg_put(req->r_reply);
if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- iput(req->r_inode);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_inode);
}
if (req->r_parent)
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
- iput(req->r_target_inode);
+ ceph_async_iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
@@ -587,7 +724,7 @@
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- iput(req->r_old_dentry_dir);
+ ceph_async_iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
@@ -595,6 +732,7 @@
ceph_pagelist_release(req->r_pagelist);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
+ WARN_ON_ONCE(!list_empty(&req->r_wait));
kfree(req);
}
@@ -697,7 +835,8 @@
}
if (req->r_unsafe_dir) {
- iput(req->r_unsafe_dir);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL;
}
@@ -780,7 +919,7 @@
struct inode *dir;
rcu_read_lock();
- parent = req->r_dentry->d_parent;
+ parent = READ_ONCE(req->r_dentry->d_parent);
dir = req->r_parent ? : d_inode_rcu(parent);
if (!dir || dir->i_sb != mdsc->fsc->sb) {
@@ -862,7 +1001,7 @@
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ ceph_async_iput(inode);
goto random;
}
mds = cap->session->s_mds;
@@ -871,7 +1010,9 @@
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
out:
- iput(inode);
+ /* avoid calling iput_final() while holding mdsc->mutex or
+ * in mds dispatch threads */
+ ceph_async_iput(inode);
return mds;
random:
@@ -1129,6 +1270,7 @@
{
struct ceph_mds_request *req;
struct rb_node *p;
+ struct ceph_inode_info *ci;
dout("cleanup_session_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
@@ -1137,6 +1279,16 @@
struct ceph_mds_request, r_unsafe_item);
pr_warn_ratelimited(" dropping unsafe request %llu\n",
req->r_tid);
+ if (req->r_target_inode) {
+ /* dropping unsafe change of inode's attributes */
+ ci = ceph_inode(req->r_target_inode);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
+ if (req->r_unsafe_dir) {
+ /* dropping unsafe directory operation */
+ ci = ceph_inode(req->r_unsafe_dir);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
__unregister_request(mdsc, req);
}
/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1157,9 +1309,9 @@
*
* Caller must hold session s_mutex.
*/
-static int iterate_session_caps(struct ceph_mds_session *session,
- int (*cb)(struct inode *, struct ceph_cap *,
- void *), void *arg)
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
+ int (*cb)(struct inode *, struct ceph_cap *,
+ void *), void *arg)
{
struct list_head *p;
struct ceph_cap *cap;
@@ -1181,7 +1333,9 @@
spin_unlock(&session->s_cap_lock);
if (last_inode) {
- iput(last_inode);
+ /* avoid calling iput_final() while holding
+ * s_mutex or in mds dispatch threads */
+ ceph_async_iput(last_inode);
last_inode = NULL;
}
if (old_cap) {
@@ -1201,13 +1355,10 @@
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
- if (cap->queue_release) {
- list_add_tail(&cap->session_caps,
- &session->s_cap_releases);
- session->s_num_cap_releases++;
- } else {
+ if (cap->queue_release)
+ __ceph_queue_cap_release(session, cap);
+ else
old_cap = cap; /* put_cap it w/o locks held */
- }
}
if (ret < 0)
goto out;
@@ -1217,7 +1368,7 @@
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
- iput(last_inode);
+ ceph_async_iput(last_inode);
if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
@@ -1230,22 +1381,25 @@
struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
- bool drop = false;
+ bool dirty_dropped = false;
bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
+ if (cap->mds_wanted | cap->issued)
+ ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc = fsc->mdsc;
- ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
-
- if (ci->i_wrbuffer_ref > 0 &&
- READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
- invalidate = true;
+ if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ if (inode->i_data.nrpages > 0)
+ invalidate = true;
+ if (ci->i_wrbuffer_ref > 0)
+ mapping_set_error(&inode->i_data, -EIO);
+ }
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1265,7 +1419,7 @@
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
- drop = true;
+ dirty_dropped = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
@@ -1275,10 +1429,22 @@
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
- drop = true;
+ dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
+ if (dirty_dropped) {
+ errseq_set(&ci->i_meta_err, -EIO);
+
+ if (ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_wr_ref == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
+ }
+
if (atomic_read(&ci->i_filelock_ref) > 0) {
/* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
@@ -1303,7 +1469,7 @@
wake_up_all(&ci->i_cap_wq);
if (invalidate)
ceph_queue_invalidate(inode);
- if (drop)
+ if (dirty_dropped)
iput(inode);
return 0;
}
@@ -1318,7 +1484,7 @@
LIST_HEAD(dispose);
dout("remove_session_caps on %p\n", session);
- iterate_session_caps(session, remove_session_caps_cb, fsc);
+ ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
wake_up_all(&fsc->mdsc->cap_flushing_wq);
@@ -1344,7 +1510,8 @@
spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino);
- iput(inode);
+ /* avoid calling iput_final() while holding s_mutex */
+ ceph_async_iput(inode);
spin_lock(&session->s_cap_lock);
}
@@ -1359,6 +1526,12 @@
dispose_cap_releases(session->s_mdsc, &dispose);
}
+enum {
+ RECONNECT,
+ RENEWCAPS,
+ FORCE_RO,
+};
+
/*
* wake up any threads waiting on this session's caps. if the cap is
* old (didn't get renewed on the client reconnect), remove it now.
@@ -1369,23 +1542,34 @@
void *arg)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ unsigned long ev = (unsigned long)arg;
- if (arg) {
+ if (ev == RECONNECT) {
spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
+ } else if (ev == RENEWCAPS) {
+ if (cap->cap_gen < cap->session->s_cap_gen) {
+ /* mds did not re-issue stale cap */
+ spin_lock(&ci->i_ceph_lock);
+ cap->issued = cap->implemented = CEPH_CAP_PIN;
+ /* make sure mds knows what we want */
+ if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
+ ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+ spin_unlock(&ci->i_ceph_lock);
+ }
+ } else if (ev == FORCE_RO) {
}
wake_up_all(&ci->i_cap_wq);
return 0;
}
-static void wake_up_session_caps(struct ceph_mds_session *session,
- int reconnect)
+static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
{
dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
- iterate_session_caps(session, wake_up_session_cb,
- (void *)(unsigned long)reconnect);
+ ceph_iterate_session_caps(session, wake_up_session_cb,
+ (void *)(unsigned long)ev);
}
/*
@@ -1470,7 +1654,7 @@
spin_unlock(&session->s_cap_lock);
if (wake)
- wake_up_session_caps(session, 0);
+ wake_up_session_caps(session, RENEWCAPS);
}
/*
@@ -1538,11 +1722,11 @@
*/
static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
{
- struct ceph_mds_session *session = arg;
+ int *remaining = arg;
struct ceph_inode_info *ci = ceph_inode(inode);
int used, wanted, oissued, mine;
- if (session->s_trim_caps <= 0)
+ if (*remaining <= 0)
return -1;
spin_lock(&ci->i_ceph_lock);
@@ -1579,7 +1763,7 @@
if (oissued) {
/* we aren't the only cap.. just remove us */
__ceph_remove_cap(cap, true);
- session->s_trim_caps--;
+ (*remaining)--;
} else {
struct dentry *dentry;
/* try dropping referring dentries */
@@ -1591,7 +1775,7 @@
d_prune_aliases(inode);
count = atomic_read(&inode->i_count);
if (count == 1)
- session->s_trim_caps--;
+ (*remaining)--;
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
inode, cap, count);
} else {
@@ -1617,15 +1801,15 @@
dout("trim_caps mds%d start: %d / %d, trim %d\n",
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
if (trim_caps > 0) {
- session->s_trim_caps = trim_caps;
- iterate_session_caps(session, trim_caps_cb, session);
+ int remaining = trim_caps;
+
+ ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
session->s_mds, session->s_nr_caps, max_caps,
- trim_caps - session->s_trim_caps);
- session->s_trim_caps = 0;
+ trim_caps - remaining);
}
- ceph_send_cap_releases(mdsc, session);
+ ceph_flush_cap_releases(mdsc, session);
return 0;
}
@@ -1668,8 +1852,8 @@
/*
* called under s_mutex
*/
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
struct ceph_msg *msg = NULL;
struct ceph_mds_cap_release *head;
@@ -1711,7 +1895,8 @@
num_cap_releases--;
head = msg->front.iov_base;
- le32_add_cpu(&head->num, 1);
+ put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
+ &head->num);
item = msg->front.iov_base + msg->front.iov_len;
item->ino = cpu_to_le64(cap->cap_ino);
item->cap_id = cpu_to_le64(cap->cap_id);
@@ -1761,6 +1946,81 @@
spin_unlock(&session->s_cap_lock);
}
+static void ceph_cap_release_work(struct work_struct *work)
+{
+ struct ceph_mds_session *session =
+ container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+ mutex_lock(&session->s_mutex);
+ if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+ session->s_state == CEPH_MDS_SESSION_HUNG)
+ ceph_send_cap_releases(session->s_mdsc, session);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ if (mdsc->stopping)
+ return;
+
+ get_session(session);
+ if (queue_work(mdsc->fsc->cap_wq,
+ &session->s_cap_release_work)) {
+ dout("cap release work queued\n");
+ } else {
+ ceph_put_mds_session(session);
+ dout("failed to queue cap release work\n");
+ }
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+ struct ceph_cap *cap)
+{
+ list_add_tail(&cap->session_caps, &session->s_cap_releases);
+ session->s_num_cap_releases++;
+
+ if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+ ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
+static void ceph_cap_reclaim_work(struct work_struct *work)
+{
+ struct ceph_mds_client *mdsc =
+ container_of(work, struct ceph_mds_client, cap_reclaim_work);
+ int ret = ceph_trim_dentries(mdsc);
+ if (ret == -EAGAIN)
+ ceph_queue_cap_reclaim_work(mdsc);
+}
+
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
+{
+ if (mdsc->stopping)
+ return;
+
+ if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
+ dout("caps reclaim work queued\n");
+ } else {
+ dout("failed to queue caps release work\n");
+ }
+}
+
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+ int val;
+ if (!nr)
+ return;
+ val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+ if (!(val % CEPH_CAPS_PER_RELEASE)) {
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
+ ceph_queue_cap_reclaim_work(mdsc);
+ }
+}
+
/*
* requests
*/
@@ -1864,43 +2124,29 @@
* Encode hidden .snap dirs as a double /, i.e.
* foo/.snap/bar -> foo//bar
*/
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
int stop_on_nosnap)
{
struct dentry *temp;
char *path;
- int len, pos;
+ int pos;
unsigned seq;
+ u64 base;
if (!dentry)
return ERR_PTR(-EINVAL);
-retry:
- len = 0;
- seq = read_seqbegin(&rename_lock);
- rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp);) {
- struct inode *inode = d_inode(temp);
- if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
- len++; /* slash only */
- else if (stop_on_nosnap && inode &&
- ceph_snap(inode) == CEPH_NOSNAP)
- break;
- else
- len += 1 + temp->d_name.len;
- temp = temp->d_parent;
- }
- rcu_read_unlock();
- if (len)
- len--; /* no leading '/' */
-
- path = kmalloc(len+1, GFP_NOFS);
+ path = __getname();
if (!path)
return ERR_PTR(-ENOMEM);
- pos = len;
- path[pos] = 0; /* trailing null */
+retry:
+ pos = PATH_MAX - 1;
+ path[pos] = '\0';
+
+ seq = read_seqbegin(&rename_lock);
rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+ temp = dentry;
+ for (;;) {
struct inode *inode;
spin_lock(&temp->d_lock);
@@ -1908,9 +2154,10 @@
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
dout("build_path path+%d: %p SNAPDIR\n",
pos, temp);
- } else if (stop_on_nosnap && inode &&
+ } else if (stop_on_nosnap && inode && dentry != temp &&
ceph_snap(inode) == CEPH_NOSNAP) {
spin_unlock(&temp->d_lock);
+ pos++; /* get rid of any prepended '/' */
break;
} else {
pos -= temp->d_name.len;
@@ -1918,43 +2165,50 @@
spin_unlock(&temp->d_lock);
break;
}
- strncpy(path + pos, temp->d_name.name,
- temp->d_name.len);
+ memcpy(path + pos, temp->d_name.name, temp->d_name.len);
}
spin_unlock(&temp->d_lock);
- if (pos)
- path[--pos] = '/';
- temp = temp->d_parent;
+ temp = READ_ONCE(temp->d_parent);
+
+ /* Are we at the root? */
+ if (IS_ROOT(temp))
+ break;
+
+ /* Are we out of buffer? */
+ if (--pos < 0)
+ break;
+
+ path[pos] = '/';
}
+ base = ceph_ino(d_inode(temp));
rcu_read_unlock();
- if (pos != 0 || read_seqretry(&rename_lock, seq)) {
+ if (pos < 0 || read_seqretry(&rename_lock, seq)) {
pr_err("build_path did not end path lookup where "
- "expected, namelen is %d, pos is %d\n", len, pos);
+ "expected, pos is %d\n", pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
lock the dentries above us to prevent this, but
retrying should be harmless) */
- kfree(path);
goto retry;
}
- *base = ceph_ino(d_inode(temp));
- *plen = len;
+ *pbase = base;
+ *plen = PATH_MAX - 1 - pos;
dout("build_path on %p %d built %llx '%.*s'\n",
- dentry, d_count(dentry), *base, len, path);
- return path;
+ dentry, d_count(dentry), base, *plen, path + pos);
+ return path + pos;
}
static int build_dentry_path(struct dentry *dentry, struct inode *dir,
const char **ppath, int *ppathlen, u64 *pino,
- int *pfreepath)
+ bool *pfreepath, bool parent_locked)
{
char *path;
rcu_read_lock();
if (!dir)
dir = d_inode_rcu(dentry->d_parent);
- if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+ if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
*pino = ceph_ino(dir);
rcu_read_unlock();
*ppath = dentry->d_name.name;
@@ -1966,13 +2220,13 @@
if (IS_ERR(path))
return PTR_ERR(path);
*ppath = path;
- *pfreepath = 1;
+ *pfreepath = true;
return 0;
}
static int build_inode_path(struct inode *inode,
const char **ppath, int *ppathlen, u64 *pino,
- int *pfreepath)
+ bool *pfreepath)
{
struct dentry *dentry;
char *path;
@@ -1988,7 +2242,7 @@
if (IS_ERR(path))
return PTR_ERR(path);
*ppath = path;
- *pfreepath = 1;
+ *pfreepath = true;
return 0;
}
@@ -1999,7 +2253,7 @@
static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
struct inode *rdiri, const char *rpath,
u64 rino, const char **ppath, int *pathlen,
- u64 *ino, int *freepath)
+ u64 *ino, bool *freepath, bool parent_locked)
{
int r = 0;
@@ -2009,7 +2263,7 @@
ceph_snap(rinode));
} else if (rdentry) {
r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
- freepath);
+ freepath, parent_locked);
dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
*ppath);
} else if (rpath || rino) {
@@ -2035,7 +2289,7 @@
const char *path2 = NULL;
u64 ino1 = 0, ino2 = 0;
int pathlen1 = 0, pathlen2 = 0;
- int freepath1 = 0, freepath2 = 0;
+ bool freepath1 = false, freepath2 = false;
int len;
u16 releases;
void *p, *end;
@@ -2043,16 +2297,19 @@
ret = set_request_path_attr(req->r_inode, req->r_dentry,
req->r_parent, req->r_path1, req->r_ino1.ino,
- &path1, &pathlen1, &ino1, &freepath1);
+ &path1, &pathlen1, &ino1, &freepath1,
+ test_bit(CEPH_MDS_R_PARENT_LOCKED,
+ &req->r_req_flags));
if (ret < 0) {
msg = ERR_PTR(ret);
goto out;
}
+ /* If r_old_dentry is set, then assume that its parent is locked */
ret = set_request_path_attr(NULL, req->r_old_dentry,
req->r_old_dentry_dir,
req->r_path2, req->r_ino2.ino,
- &path2, &pathlen2, &ino2, &freepath2);
+ &path2, &pathlen2, &ino2, &freepath2, true);
if (ret < 0) {
msg = ERR_PTR(ret);
goto out_free1;
@@ -2067,11 +2324,11 @@
(!!req->r_inode_drop + !!req->r_dentry_drop +
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
if (req->r_dentry_drop)
- len += req->r_dentry->d_name.len;
+ len += pathlen1;
if (req->r_old_dentry_drop)
- len += req->r_old_dentry->d_name.len;
+ len += pathlen2;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
+ msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
if (!msg) {
msg = ERR_PTR(-ENOMEM);
goto out_free2;
@@ -2136,7 +2393,6 @@
if (req->r_pagelist) {
struct ceph_pagelist *pagelist = req->r_pagelist;
- refcount_inc(&pagelist->refcnt);
ceph_msg_data_add_pagelist(msg, pagelist);
msg->hdr.data_len = cpu_to_le32(pagelist->length);
} else {
@@ -2147,10 +2403,10 @@
out_free2:
if (freepath2)
- kfree((char *)path2);
+ ceph_mdsc_free_path((char *)path2, pathlen2);
out_free1:
if (freepath1)
- kfree((char *)path1);
+ ceph_mdsc_free_path((char *)path1, pathlen1);
out:
return msg;
}
@@ -2164,8 +2420,7 @@
{
if (req->r_callback)
req->r_callback(mdsc, req);
- else
- complete_all(&req->r_completion);
+ complete_all(&req->r_completion);
}
/*
@@ -2407,28 +2662,11 @@
}
}
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
struct ceph_mds_request *req)
{
- dout("submit_request on %p\n", req);
- mutex_lock(&mdsc->mutex);
- __register_request(mdsc, req, NULL);
- __do_request(mdsc, req);
- mutex_unlock(&mdsc->mutex);
-}
-
-/*
- * Synchrously perform an mds request. Take care of all of the
- * session setup, forwarding, retry details.
- */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *dir,
- struct ceph_mds_request *req)
-{
int err;
- dout("do_request on %p\n", req);
-
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
@@ -2438,18 +2676,21 @@
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- /* issue */
+ dout("submit_request on %p for inode %p\n", req, dir);
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
__do_request(mdsc, req);
+ err = req->r_err;
+ mutex_unlock(&mdsc->mutex);
+ return err;
+}
- if (req->r_err) {
- err = req->r_err;
- goto out;
- }
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int err;
/* wait */
- mutex_unlock(&mdsc->mutex);
dout("do_request waiting\n");
if (!req->r_timeout && req->r_wait_for_completion) {
err = req->r_wait_for_completion(mdsc, req);
@@ -2490,8 +2731,26 @@
err = req->r_err;
}
-out:
mutex_unlock(&mdsc->mutex);
+ return err;
+}
+
+/*
+ * Synchrously perform an mds request. Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *dir,
+ struct ceph_mds_request *req)
+{
+ int err;
+
+ dout("do_request on %p\n", req);
+
+ /* issue */
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err)
+ err = ceph_mdsc_wait_request(mdsc, req);
dout("do_request %p done, result %d\n", req, err);
return err;
}
@@ -2641,7 +2900,10 @@
dout("handle_reply tid %lld result %d\n", tid, result);
rinfo = &req->r_reply_info;
- err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
+ if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
+ err = parse_reply_info(msg, rinfo, (u64)-1);
+ else
+ err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
@@ -2672,7 +2934,6 @@
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
- ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
current->journal_info = NULL;
mutex_unlock(&req->r_fill_mutex);
@@ -2681,12 +2942,18 @@
if (realm)
ceph_put_snap_realm(mdsc, realm);
- if (err == 0 && req->r_target_inode &&
- test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
- struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
- spin_lock(&ci->i_unsafe_lock);
- list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
- spin_unlock(&ci->i_unsafe_lock);
+ if (err == 0) {
+ if (req->r_target_inode &&
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+ struct ceph_inode_info *ci =
+ ceph_inode(req->r_target_inode);
+ spin_lock(&ci->i_unsafe_lock);
+ list_add_tail(&req->r_unsafe_target_item,
+ &ci->i_unsafe_iops);
+ spin_unlock(&ci->i_unsafe_lock);
+ }
+
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
out_err:
mutex_lock(&mdsc->mutex);
@@ -2765,6 +3032,30 @@
pr_err("mdsc_handle_forward decode error err=%d\n", err);
}
+static int __decode_session_metadata(void **p, void *end,
+ bool *blacklisted)
+{
+ /* map<string,string> */
+ u32 n;
+ bool err_str;
+ ceph_decode_32_safe(p, end, n, bad);
+ while (n-- > 0) {
+ u32 len;
+ ceph_decode_32_safe(p, end, len, bad);
+ ceph_decode_need(p, end, len, bad);
+ err_str = !strncmp(*p, "error_string", len);
+ *p += len;
+ ceph_decode_32_safe(p, end, len, bad);
+ ceph_decode_need(p, end, len, bad);
+ if (err_str && strnstr(*p, "blacklisted", len))
+ *blacklisted = true;
+ *p += len;
+ }
+ return 0;
+bad:
+ return -1;
+}
+
/*
* handle a mds session control message
*/
@@ -2772,18 +3063,37 @@
struct ceph_msg *msg)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
+ int mds = session->s_mds;
+ int msg_version = le16_to_cpu(msg->hdr.version);
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
+ struct ceph_mds_session_head *h;
u32 op;
u64 seq;
- int mds = session->s_mds;
- struct ceph_mds_session_head *h = msg->front.iov_base;
+ unsigned long features = 0;
int wake = 0;
+ bool blacklisted = false;
/* decode */
- if (msg->front.iov_len < sizeof(*h))
- goto bad;
+ ceph_decode_need(&p, end, sizeof(*h), bad);
+ h = p;
+ p += sizeof(*h);
+
op = le32_to_cpu(h->op);
seq = le64_to_cpu(h->seq);
+ if (msg_version >= 3) {
+ u32 len;
+ /* version >= 2, metadata */
+ if (__decode_session_metadata(&p, end, &blacklisted) < 0)
+ goto bad;
+ /* version >= 3, feature bits */
+ ceph_decode_32_safe(&p, end, len, bad);
+ ceph_decode_need(&p, end, len, bad);
+ memcpy(&features, p, min_t(size_t, len, sizeof(features)));
+ p += len;
+ }
+
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
get_session(session);
@@ -2809,6 +3119,7 @@
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info("mds%d reconnect success\n", session->s_mds);
session->s_state = CEPH_MDS_SESSION_OPEN;
+ session->s_features = features;
renewed_caps(mdsc, session, 0);
wake = 1;
if (mdsc->stopping)
@@ -2852,7 +3163,7 @@
spin_lock(&session->s_cap_lock);
session->s_readonly = true;
spin_unlock(&session->s_cap_lock);
- wake_up_session_caps(session, 0);
+ wake_up_session_caps(session, FORCE_RO);
break;
case CEPH_SESSION_REJECT:
@@ -2861,6 +3172,8 @@
session->s_state = CEPH_MDS_SESSION_REJECTED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
+ if (blacklisted)
+ mdsc->fsc->blacklisted = true;
wake = 2; /* for good measure */
break;
@@ -2935,6 +3248,82 @@
mutex_unlock(&mdsc->mutex);
}
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+ struct ceph_msg *reply;
+ struct ceph_pagelist *_pagelist;
+ struct page *page;
+ __le32 *addr;
+ int err = -ENOMEM;
+
+ if (!recon_state->allow_multi)
+ return -ENOSPC;
+
+ /* can't handle message that contains both caps and realm */
+ BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+ /* pre-allocate new pagelist */
+ _pagelist = ceph_pagelist_alloc(GFP_NOFS);
+ if (!_pagelist)
+ return -ENOMEM;
+
+ reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+ if (!reply)
+ goto fail_msg;
+
+ /* placeholder for nr_caps */
+ err = ceph_pagelist_encode_32(_pagelist, 0);
+ if (err < 0)
+ goto fail;
+
+ if (recon_state->nr_caps) {
+ /* currently encoding caps */
+ err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+ if (err)
+ goto fail;
+ } else {
+ /* placeholder for nr_realms (currently encoding relams) */
+ err = ceph_pagelist_encode_32(_pagelist, 0);
+ if (err < 0)
+ goto fail;
+ }
+
+ err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+ if (err)
+ goto fail;
+
+ page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+ addr = kmap_atomic(page);
+ if (recon_state->nr_caps) {
+ /* currently encoding caps */
+ *addr = cpu_to_le32(recon_state->nr_caps);
+ } else {
+ /* currently encoding relams */
+ *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+ }
+ kunmap_atomic(addr);
+
+ reply->hdr.version = cpu_to_le16(5);
+ reply->hdr.compat_version = cpu_to_le16(4);
+
+ reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+ ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+ ceph_con_send(&recon_state->session->s_con, reply);
+ ceph_pagelist_release(recon_state->pagelist);
+
+ recon_state->pagelist = _pagelist;
+ recon_state->nr_caps = 0;
+ recon_state->nr_realms = 0;
+ recon_state->msg_version = 5;
+ return 0;
+fail:
+ ceph_msg_put(reply);
+fail_msg:
+ ceph_pagelist_release(_pagelist);
+ return err;
+}
+
/*
* Encode information about a cap for a reconnect with the MDS.
*/
@@ -2948,31 +3337,12 @@
struct ceph_inode_info *ci = cap->ci;
struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist;
- char *path;
- int pathlen, err;
- u64 pathbase;
+ int err;
u64 snap_follows;
- struct dentry *dentry;
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
- err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
- if (err)
- return err;
-
- dentry = d_find_alias(inode);
- if (dentry) {
- path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
- if (IS_ERR(path)) {
- err = PTR_ERR(path);
- goto out_dput;
- }
- } else {
- path = NULL;
- pathlen = 0;
- pathbase = 0;
- }
spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
@@ -2985,7 +3355,7 @@
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
- rec.v2.pathbase = cpu_to_le64(pathbase);
+ rec.v2.pathbase = 0;
rec.v2.flock_len = (__force __le32)
((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
} else {
@@ -2996,7 +3366,7 @@
ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
- rec.v1.pathbase = cpu_to_le64(pathbase);
+ rec.v1.pathbase = 0;
}
if (list_empty(&ci->i_cap_snaps)) {
@@ -3012,7 +3382,7 @@
if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks;
struct ceph_filelock *flocks = NULL;
- size_t struct_len, total_len = 0;
+ size_t struct_len, total_len = sizeof(u64);
u8 struct_v = 0;
encode_again:
@@ -3028,7 +3398,7 @@
GFP_NOFS);
if (!flocks) {
err = -ENOMEM;
- goto out_free;
+ goto out_err;
}
err = ceph_encode_locks_to_buffer(inode, flocks,
num_fcntl_locks,
@@ -3038,7 +3408,7 @@
flocks = NULL;
if (err == -ENOSPC)
goto encode_again;
- goto out_free;
+ goto out_err;
}
} else {
kfree(flocks);
@@ -3047,7 +3417,7 @@
if (recon_state->msg_version >= 3) {
/* version, compat_version and struct_len */
- total_len = 2 * sizeof(u8) + sizeof(u32);
+ total_len += 2 * sizeof(u8) + sizeof(u32);
struct_v = 2;
}
/*
@@ -3058,44 +3428,132 @@
sizeof(struct ceph_filelock);
rec.v2.flock_len = cpu_to_le32(struct_len);
- struct_len += sizeof(rec.v2);
- struct_len += sizeof(u32) + pathlen;
+ struct_len += sizeof(u32) + sizeof(rec.v2);
if (struct_v >= 2)
struct_len += sizeof(u64); /* snap_follows */
total_len += struct_len;
- err = ceph_pagelist_reserve(pagelist, total_len);
- if (!err) {
- if (recon_state->msg_version >= 3) {
- ceph_pagelist_encode_8(pagelist, struct_v);
- ceph_pagelist_encode_8(pagelist, 1);
- ceph_pagelist_encode_32(pagelist, struct_len);
- }
- ceph_pagelist_encode_string(pagelist, path, pathlen);
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
- ceph_locks_to_pagelist(flocks, pagelist,
- num_fcntl_locks,
- num_flock_locks);
- if (struct_v >= 2)
- ceph_pagelist_encode_64(pagelist, snap_follows);
+ if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+ err = send_reconnect_partial(recon_state);
+ if (err)
+ goto out_freeflocks;
+ pagelist = recon_state->pagelist;
}
+
+ err = ceph_pagelist_reserve(pagelist, total_len);
+ if (err)
+ goto out_freeflocks;
+
+ ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+ if (recon_state->msg_version >= 3) {
+ ceph_pagelist_encode_8(pagelist, struct_v);
+ ceph_pagelist_encode_8(pagelist, 1);
+ ceph_pagelist_encode_32(pagelist, struct_len);
+ }
+ ceph_pagelist_encode_string(pagelist, NULL, 0);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+ ceph_locks_to_pagelist(flocks, pagelist,
+ num_fcntl_locks, num_flock_locks);
+ if (struct_v >= 2)
+ ceph_pagelist_encode_64(pagelist, snap_follows);
+out_freeflocks:
kfree(flocks);
} else {
- size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
- err = ceph_pagelist_reserve(pagelist, size);
- if (!err) {
- ceph_pagelist_encode_string(pagelist, path, pathlen);
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+ u64 pathbase = 0;
+ int pathlen = 0;
+ char *path = NULL;
+ struct dentry *dentry;
+
+ dentry = d_find_alias(inode);
+ if (dentry) {
+ path = ceph_mdsc_build_path(dentry,
+ &pathlen, &pathbase, 0);
+ dput(dentry);
+ if (IS_ERR(path)) {
+ err = PTR_ERR(path);
+ goto out_err;
+ }
+ rec.v1.pathbase = cpu_to_le64(pathbase);
}
+
+ err = ceph_pagelist_reserve(pagelist,
+ sizeof(u64) + sizeof(u32) +
+ pathlen + sizeof(rec.v1));
+ if (err) {
+ goto out_freepath;
+ }
+
+ ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+out_freepath:
+ ceph_mdsc_free_path(path, pathlen);
}
- recon_state->nr_caps++;
-out_free:
- kfree(path);
-out_dput:
- dput(dentry);
+out_err:
+ if (err >= 0)
+ recon_state->nr_caps++;
+ return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+ struct ceph_reconnect_state *recon_state)
+{
+ struct rb_node *p;
+ struct ceph_pagelist *pagelist = recon_state->pagelist;
+ int err = 0;
+
+ if (recon_state->msg_version >= 4) {
+ err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+ if (err < 0)
+ goto fail;
+ }
+
+ /*
+ * snaprealms. we provide mds with the ino, seq (version), and
+ * parent for all of our realms. If the mds has any newer info,
+ * it will tell us.
+ */
+ for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+ struct ceph_snap_realm *realm =
+ rb_entry(p, struct ceph_snap_realm, node);
+ struct ceph_mds_snaprealm_reconnect sr_rec;
+
+ if (recon_state->msg_version >= 4) {
+ size_t need = sizeof(u8) * 2 + sizeof(u32) +
+ sizeof(sr_rec);
+
+ if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+ err = send_reconnect_partial(recon_state);
+ if (err)
+ goto fail;
+ pagelist = recon_state->pagelist;
+ }
+
+ err = ceph_pagelist_reserve(pagelist, need);
+ if (err)
+ goto fail;
+
+ ceph_pagelist_encode_8(pagelist, 1);
+ ceph_pagelist_encode_8(pagelist, 1);
+ ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+ }
+
+ dout(" adding snap realm %llx seq %lld parent %llx\n",
+ realm->ino, realm->seq, realm->parent_ino);
+ sr_rec.ino = cpu_to_le64(realm->ino);
+ sr_rec.seq = cpu_to_le64(realm->seq);
+ sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+ err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+ if (err)
+ goto fail;
+
+ recon_state->nr_realms++;
+ }
+fail:
return err;
}
@@ -3116,22 +3574,20 @@
struct ceph_mds_session *session)
{
struct ceph_msg *reply;
- struct rb_node *p;
int mds = session->s_mds;
int err = -ENOMEM;
- int s_nr_caps;
- struct ceph_pagelist *pagelist;
- struct ceph_reconnect_state recon_state;
+ struct ceph_reconnect_state recon_state = {
+ .session = session,
+ };
LIST_HEAD(dispose);
pr_info("mds%d reconnect start\n", mds);
- pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
- if (!pagelist)
+ recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+ if (!recon_state.pagelist)
goto fail_nopagelist;
- ceph_pagelist_init(pagelist);
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
+ reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
if (!reply)
goto fail_nomsg;
@@ -3172,65 +3628,90 @@
/* replay unsafe requests */
replay_unsafe_requests(mdsc, session);
+ ceph_early_kick_flushing_caps(mdsc, session);
+
down_read(&mdsc->snap_rwsem);
- /* traverse this session's caps */
- s_nr_caps = session->s_nr_caps;
- err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+ /* placeholder for nr_caps */
+ err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
if (err)
goto fail;
- recon_state.nr_caps = 0;
- recon_state.pagelist = pagelist;
- if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+ if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
recon_state.msg_version = 3;
- else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+ recon_state.allow_multi = true;
+ } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+ recon_state.msg_version = 3;
+ } else {
recon_state.msg_version = 2;
- else
- recon_state.msg_version = 1;
- err = iterate_session_caps(session, encode_caps_cb, &recon_state);
- if (err < 0)
- goto fail;
+ }
+ /* trsaverse this session's caps */
+ err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
spin_unlock(&session->s_cap_lock);
- /*
- * snaprealms. we provide mds with the ino, seq (version), and
- * parent for all of our realms. If the mds has any newer info,
- * it will tell us.
- */
- for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
- struct ceph_snap_realm *realm =
- rb_entry(p, struct ceph_snap_realm, node);
- struct ceph_mds_snaprealm_reconnect sr_rec;
+ if (err < 0)
+ goto fail;
- dout(" adding snap realm %llx seq %lld parent %llx\n",
- realm->ino, realm->seq, realm->parent_ino);
- sr_rec.ino = cpu_to_le64(realm->ino);
- sr_rec.seq = cpu_to_le64(realm->seq);
- sr_rec.parent = cpu_to_le64(realm->parent_ino);
- err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
- if (err)
+ /* check if all realms can be encoded into current message */
+ if (mdsc->num_snap_realms) {
+ size_t total_len =
+ recon_state.pagelist->length +
+ mdsc->num_snap_realms *
+ sizeof(struct ceph_mds_snaprealm_reconnect);
+ if (recon_state.msg_version >= 4) {
+ /* number of realms */
+ total_len += sizeof(u32);
+ /* version, compat_version and struct_len */
+ total_len += mdsc->num_snap_realms *
+ (2 * sizeof(u8) + sizeof(u32));
+ }
+ if (total_len > RECONNECT_MAX_SIZE) {
+ if (!recon_state.allow_multi) {
+ err = -ENOSPC;
+ goto fail;
+ }
+ if (recon_state.nr_caps) {
+ err = send_reconnect_partial(&recon_state);
+ if (err)
+ goto fail;
+ }
+ recon_state.msg_version = 5;
+ }
+ }
+
+ err = encode_snap_realms(mdsc, &recon_state);
+ if (err < 0)
+ goto fail;
+
+ if (recon_state.msg_version >= 5) {
+ err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+ if (err < 0)
goto fail;
}
- reply->hdr.version = cpu_to_le16(recon_state.msg_version);
-
- /* raced with cap release? */
- if (s_nr_caps != recon_state.nr_caps) {
- struct page *page = list_first_entry(&pagelist->head,
- struct page, lru);
+ if (recon_state.nr_caps || recon_state.nr_realms) {
+ struct page *page =
+ list_first_entry(&recon_state.pagelist->head,
+ struct page, lru);
__le32 *addr = kmap_atomic(page);
- *addr = cpu_to_le32(recon_state.nr_caps);
+ if (recon_state.nr_caps) {
+ WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+ *addr = cpu_to_le32(recon_state.nr_caps);
+ } else if (recon_state.msg_version >= 4) {
+ *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+ }
kunmap_atomic(addr);
}
- reply->hdr.data_len = cpu_to_le32(pagelist->length);
- ceph_msg_data_add_pagelist(reply, pagelist);
+ reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+ if (recon_state.msg_version >= 4)
+ reply->hdr.compat_version = cpu_to_le16(4);
- ceph_early_kick_flushing_caps(mdsc, session);
+ reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+ ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
ceph_con_send(&session->s_con, reply);
@@ -3241,6 +3722,7 @@
mutex_unlock(&mdsc->mutex);
up_read(&mdsc->snap_rwsem);
+ ceph_pagelist_release(recon_state.pagelist);
return;
fail:
@@ -3248,7 +3730,7 @@
up_read(&mdsc->snap_rwsem);
mutex_unlock(&session->s_mutex);
fail_nomsg:
- ceph_pagelist_release(pagelist);
+ ceph_pagelist_release(recon_state.pagelist);
fail_nopagelist:
pr_err("error %d preparing reconnect for mds%d\n", err, mds);
return;
@@ -3286,42 +3768,35 @@
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_num_mds ||
- memcmp(ceph_mdsmap_get_addr(oldmap, i),
+ if (i >= newmap->m_num_mds) {
+ /* force close session for stopped mds */
+ get_session(s);
+ __unregister_session(mdsc, s);
+ __wake_requests(mdsc, &s->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+
+ mutex_lock(&s->s_mutex);
+ cleanup_session_requests(mdsc, s);
+ remove_session_caps(s);
+ mutex_unlock(&s->s_mutex);
+
+ ceph_put_mds_session(s);
+
+ mutex_lock(&mdsc->mutex);
+ kick_requests(mdsc, i);
+ continue;
+ }
+
+ if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
- if (s->s_state == CEPH_MDS_SESSION_OPENING) {
- /* the session never opened, just close it
- * out now */
- get_session(s);
- __unregister_session(mdsc, s);
- __wake_requests(mdsc, &s->s_waiting);
- ceph_put_mds_session(s);
- } else if (i >= newmap->m_num_mds) {
- /* force close session for stopped mds */
- get_session(s);
- __unregister_session(mdsc, s);
- __wake_requests(mdsc, &s->s_waiting);
- kick_requests(mdsc, i);
- mutex_unlock(&mdsc->mutex);
-
- mutex_lock(&s->s_mutex);
- cleanup_session_requests(mdsc, s);
- remove_session_caps(s);
- mutex_unlock(&s->s_mutex);
-
- ceph_put_mds_session(s);
-
- mutex_lock(&mdsc->mutex);
- } else {
- /* just close it */
- mutex_unlock(&mdsc->mutex);
- mutex_lock(&s->s_mutex);
- mutex_lock(&mdsc->mutex);
- ceph_con_close(&s->s_con);
- mutex_unlock(&s->s_mutex);
- s->s_state = CEPH_MDS_SESSION_RESTARTING;
- }
+ /* just close it */
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&s->s_mutex);
+ mutex_lock(&mdsc->mutex);
+ ceph_con_close(&s->s_con);
+ mutex_unlock(&s->s_mutex);
+ s->s_state = CEPH_MDS_SESSION_RESTARTING;
} else if (oldstate == newstate) {
continue; /* nothing new with this mds */
}
@@ -3346,7 +3821,7 @@
pr_info("mds%d recovery completed\n", s->s_mds);
kick_requests(mdsc, i);
ceph_kick_flushing_caps(mdsc, s);
- wake_up_session_caps(s, 1);
+ wake_up_session_caps(s, RECONNECT);
}
}
@@ -3479,8 +3954,9 @@
ceph_con_send(&session->s_con, msg);
out:
- iput(inode);
mutex_unlock(&session->s_mutex);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(inode);
return;
bad:
@@ -3489,31 +3965,33 @@
}
void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
- struct inode *inode,
struct dentry *dentry, char action,
u32 seq)
{
struct ceph_msg *msg;
struct ceph_mds_lease *lease;
- int len = sizeof(*lease) + sizeof(u32);
- int dnamelen = 0;
+ struct inode *dir;
+ int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
- dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
- inode, dentry, ceph_lease_op_name(action), session->s_mds);
- dnamelen = dentry->d_name.len;
- len += dnamelen;
+ dout("lease_send_msg identry %p %s to mds%d\n",
+ dentry, ceph_lease_op_name(action), session->s_mds);
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
- lease->ino = cpu_to_le64(ceph_vino(inode).ino);
- lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq);
- put_unaligned_le32(dnamelen, lease + 1);
- memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
+ spin_lock(&dentry->d_lock);
+ dir = d_inode(dentry->d_parent);
+ lease->ino = cpu_to_le64(ceph_ino(dir));
+ lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
+
+ put_unaligned_le32(dentry->d_name.len, lease + 1);
+ memcpy((void *)(lease + 1) + 4,
+ dentry->d_name.name, dentry->d_name.len);
+ spin_unlock(&dentry->d_lock);
/*
* if this is a preemptive lease RELEASE, no need to
* flush request stream, since the actual request will
@@ -3545,7 +4023,27 @@
mutex_unlock(&mdsc->mutex);
}
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+ struct ceph_fs_client *fsc = mdsc->fsc;
+ if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+ return;
+
+ if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+ return;
+
+ if (!READ_ONCE(fsc->blacklisted))
+ return;
+
+ if (fsc->last_auto_reconnect &&
+ time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+ return;
+
+ pr_info("auto reconnect after blacklisted\n");
+ fsc->last_auto_reconnect = jiffies;
+ ceph_force_reconnect(fsc->sb);
+}
/*
* delayed work -- periodically trim expired leases, renew caps with mds
@@ -3566,7 +4064,6 @@
int renew_caps;
dout("mdsc delayed_work\n");
- ceph_check_delayed_caps(mdsc);
mutex_lock(&mdsc->mutex);
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
@@ -3592,7 +4089,9 @@
pr_info("mds%d hung\n", s->s_mds);
}
}
- if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+ if (s->s_state == CEPH_MDS_SESSION_NEW ||
+ s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+ s->s_state == CEPH_MDS_SESSION_REJECTED) {
/* this mds is failed or recovering, just wait */
ceph_put_mds_session(s);
continue;
@@ -3614,6 +4113,14 @@
}
mutex_unlock(&mdsc->mutex);
+ ceph_check_delayed_caps(mdsc);
+
+ ceph_queue_cap_reclaim_work(mdsc);
+
+ ceph_trim_snapid_map(mdsc);
+
+ maybe_recover_session(mdsc);
+
schedule_delayed(mdsc);
}
@@ -3642,10 +4149,13 @@
mdsc->max_sessions = 0;
mdsc->stopping = 0;
atomic64_set(&mdsc->quotarealms_count, 0);
+ mdsc->quotarealms_inodes = RB_ROOT;
+ mutex_init(&mdsc->quotarealms_inodes_mutex);
mdsc->last_snap_seq = 0;
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snap_empty);
+ mdsc->num_snap_realms = 0;
spin_lock_init(&mdsc->snap_empty_lock);
mdsc->last_tid = 0;
mdsc->oldest_tid = 0;
@@ -3663,11 +4173,19 @@
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
- spin_lock_init(&mdsc->dentry_lru_lock);
- INIT_LIST_HEAD(&mdsc->dentry_lru);
+ INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
+
+ spin_lock_init(&mdsc->dentry_list_lock);
+ INIT_LIST_HEAD(&mdsc->dentry_leases);
+ INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
ceph_caps_init(mdsc);
- ceph_adjust_min_caps(mdsc, fsc->min_caps);
+ ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
+
+ spin_lock_init(&mdsc->snapid_map_lock);
+ mdsc->snapid_map_tree = RB_ROOT;
+ INIT_LIST_HEAD(&mdsc->snapid_map_lru);
init_rwsem(&mdsc->pool_perm_rwsem);
mdsc->pool_perm_tree = RB_ROOT;
@@ -3699,6 +4217,7 @@
while ((req = __get_oldest_req(mdsc))) {
dout("wait_requests timed out on tid %llu\n",
req->r_tid);
+ list_del_init(&req->r_wait);
__unregister_request(mdsc, req);
}
}
@@ -3724,6 +4243,8 @@
* their inode/dcache refs
*/
ceph_msgr_flush();
+
+ ceph_cleanup_quotarealms_inodes(mdsc);
}
/*
@@ -3862,8 +4383,10 @@
WARN_ON(!list_empty(&mdsc->cap_delay_list));
mutex_unlock(&mdsc->mutex);
+ ceph_cleanup_snapid_map(mdsc);
ceph_cleanup_empty_realms(mdsc);
+ cancel_work_sync(&mdsc->cap_reclaim_work);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
dout("stopped\n");
@@ -3881,7 +4404,12 @@
session = __ceph_lookup_mds_session(mdsc, mds);
if (!session)
continue;
+
+ if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+ __unregister_session(mdsc, session);
+ __wake_requests(mdsc, &session->s_waiting);
mutex_unlock(&mdsc->mutex);
+
mutex_lock(&session->s_mutex);
__close_session(mdsc, session);
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -3890,6 +4418,7 @@
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
+
mutex_lock(&mdsc->mutex);
kick_requests(mdsc, mds);
}