Update Linux to v5.10.109
Sourced from [1]
[1] https://cdn.kernel.org/pub/linux/kernel/v5.x/linux-5.10.109.tar.xz
Change-Id: I19bca9fc6762d4e63bcf3e4cba88bbe560d9c76c
Signed-off-by: Olivier Deprez <olivier.deprez@arm.com>
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index 2e8e6f9..f36f9a3 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -13,7 +13,7 @@
common functionality to both the Ceph filesystem and
to the rados block device (rbd).
- More information at http://ceph.newdream.net/.
+ More information at https://ceph.io/.
If unsure, say N.
@@ -39,6 +39,6 @@
be resolved using the CONFIG_DNS_RESOLVER facility.
For information on how to use CONFIG_DNS_RESOLVER consult
- Documentation/networking/dns_resolver.txt
+ Documentation/networking/dns_resolver.rst
If unsure, say N.
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 59d0ba2..ce09bb4 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -13,5 +13,5 @@
auth.o auth_none.o \
crypto.o armor.o \
auth_x.o \
- ceph_fs.o ceph_strings.o ceph_hash.o \
+ ceph_strings.o ceph_hash.o \
pagevec.o snapshot.o string_table.o
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 2d56824..4e7edd7 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -11,7 +11,7 @@
#include <linux/module.h>
#include <linux/mount.h>
#include <linux/nsproxy.h>
-#include <linux/parser.h>
+#include <linux/fs_parser.h>
#include <linux/sched.h>
#include <linux/sched/mm.h>
#include <linux/seq_file.h>
@@ -176,6 +176,10 @@ int ceph_compare_options(struct ceph_options *new_opt,
}
}
+ ret = ceph_compare_crush_locs(&opt1->crush_locs, &opt2->crush_locs);
+ if (ret)
+ return ret;
+
/* any matching mon ip implies a match */
for (i = 0; i < opt1->num_mon; i++) {
if (ceph_monmap_contains(client->monc.monmap,
@@ -190,8 +194,7 @@ EXPORT_SYMBOL(ceph_compare_options);
* kvmalloc() doesn't fall back to the vmalloc allocator unless flags are
* compatible with (a superset of) GFP_KERNEL. This is because while the
* actual pages are allocated with the specified flags, the page table pages
- * are always allocated with GFP_KERNEL. map_vm_area() doesn't even take
- * flags because GFP_KERNEL is hard-coded in {p4d,pud,pmd,pte}_alloc().
+ * are always allocated with GFP_KERNEL.
*
* ceph_kvmalloc() may be called with GFP_KERNEL, GFP_NOFS or GFP_NOIO.
*/
@@ -254,58 +257,93 @@ enum {
Opt_mount_timeout,
Opt_osd_idle_ttl,
Opt_osd_request_timeout,
- Opt_last_int,
/* int args above */
Opt_fsid,
Opt_name,
Opt_secret,
Opt_key,
Opt_ip,
- Opt_last_string,
+ Opt_crush_location,
+ Opt_read_from_replica,
/* string args above */
Opt_share,
- Opt_noshare,
Opt_crc,
- Opt_nocrc,
Opt_cephx_require_signatures,
- Opt_nocephx_require_signatures,
Opt_cephx_sign_messages,
- Opt_nocephx_sign_messages,
Opt_tcp_nodelay,
- Opt_notcp_nodelay,
Opt_abort_on_full,
};
-static match_table_t opt_tokens = {
- {Opt_osdtimeout, "osdtimeout=%d"},
- {Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
- {Opt_mount_timeout, "mount_timeout=%d"},
- {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
- {Opt_osd_request_timeout, "osd_request_timeout=%d"},
- /* int args above */
- {Opt_fsid, "fsid=%s"},
- {Opt_name, "name=%s"},
- {Opt_secret, "secret=%s"},
- {Opt_key, "key=%s"},
- {Opt_ip, "ip=%s"},
- /* string args above */
- {Opt_share, "share"},
- {Opt_noshare, "noshare"},
- {Opt_crc, "crc"},
- {Opt_nocrc, "nocrc"},
- {Opt_cephx_require_signatures, "cephx_require_signatures"},
- {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
- {Opt_cephx_sign_messages, "cephx_sign_messages"},
- {Opt_nocephx_sign_messages, "nocephx_sign_messages"},
- {Opt_tcp_nodelay, "tcp_nodelay"},
- {Opt_notcp_nodelay, "notcp_nodelay"},
- {Opt_abort_on_full, "abort_on_full"},
- {-1, NULL}
+enum {
+ Opt_read_from_replica_no,
+ Opt_read_from_replica_balance,
+ Opt_read_from_replica_localize,
};
+static const struct constant_table ceph_param_read_from_replica[] = {
+ {"no", Opt_read_from_replica_no},
+ {"balance", Opt_read_from_replica_balance},
+ {"localize", Opt_read_from_replica_localize},
+ {}
+};
+
+static const struct fs_parameter_spec ceph_parameters[] = {
+ fsparam_flag ("abort_on_full", Opt_abort_on_full),
+ fsparam_flag_no ("cephx_require_signatures", Opt_cephx_require_signatures),
+ fsparam_flag_no ("cephx_sign_messages", Opt_cephx_sign_messages),
+ fsparam_flag_no ("crc", Opt_crc),
+ fsparam_string ("crush_location", Opt_crush_location),
+ fsparam_string ("fsid", Opt_fsid),
+ fsparam_string ("ip", Opt_ip),
+ fsparam_string ("key", Opt_key),
+ fsparam_u32 ("mount_timeout", Opt_mount_timeout),
+ fsparam_string ("name", Opt_name),
+ fsparam_u32 ("osd_idle_ttl", Opt_osd_idle_ttl),
+ fsparam_u32 ("osd_request_timeout", Opt_osd_request_timeout),
+ fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout),
+ __fsparam (fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
+ fs_param_deprecated, NULL),
+ fsparam_enum ("read_from_replica", Opt_read_from_replica,
+ ceph_param_read_from_replica),
+ fsparam_string ("secret", Opt_secret),
+ fsparam_flag_no ("share", Opt_share),
+ fsparam_flag_no ("tcp_nodelay", Opt_tcp_nodelay),
+ {}
+};
+
+struct ceph_options *ceph_alloc_options(void)
+{
+ struct ceph_options *opt;
+
+ opt = kzalloc(sizeof(*opt), GFP_KERNEL);
+ if (!opt)
+ return NULL;
+
+ opt->crush_locs = RB_ROOT;
+ opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
+ GFP_KERNEL);
+ if (!opt->mon_addr) {
+ kfree(opt);
+ return NULL;
+ }
+
+ opt->flags = CEPH_OPT_DEFAULT;
+ opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
+ opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
+ opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
+ opt->osd_request_timeout = CEPH_OSD_REQUEST_TIMEOUT_DEFAULT;
+ opt->read_from_replica = CEPH_READ_FROM_REPLICA_DEFAULT;
+ return opt;
+}
+EXPORT_SYMBOL(ceph_alloc_options);
+
void ceph_destroy_options(struct ceph_options *opt)
{
dout("destroy_options %p\n", opt);
+ if (!opt)
+ return;
+
+ ceph_clear_crush_locs(&opt->crush_locs);
kfree(opt->name);
if (opt->key) {
ceph_crypto_key_destroy(opt->key);
@@ -317,7 +355,9 @@ void ceph_destroy_options(struct ceph_options *opt)
EXPORT_SYMBOL(ceph_destroy_options);
/* get secret from key store */
-static int get_secret(struct ceph_crypto_key *dst, const char *name) {
+static int get_secret(struct ceph_crypto_key *dst, const char *name,
+ struct p_log *log)
+{
struct key *ukey;
int key_err;
int err = 0;
@@ -330,20 +370,20 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
key_err = PTR_ERR(ukey);
switch (key_err) {
case -ENOKEY:
- pr_warn("ceph: Mount failed due to key not found: %s\n",
- name);
+ error_plog(log, "Failed due to key not found: %s",
+ name);
break;
case -EKEYEXPIRED:
- pr_warn("ceph: Mount failed due to expired key: %s\n",
- name);
+ error_plog(log, "Failed due to expired key: %s",
+ name);
break;
case -EKEYREVOKED:
- pr_warn("ceph: Mount failed due to revoked key: %s\n",
- name);
+ error_plog(log, "Failed due to revoked key: %s",
+ name);
break;
default:
- pr_warn("ceph: Mount failed due to unknown key error %d: %s\n",
- key_err, name);
+ error_plog(log, "Failed due to key error %d: %s",
+ key_err, name);
}
err = -EPERM;
goto out;
@@ -361,223 +401,191 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
return err;
}
-struct ceph_options *
-ceph_parse_options(char *options, const char *dev_name,
- const char *dev_name_end,
- int (*parse_extra_token)(char *c, void *private),
- void *private)
+int ceph_parse_mon_ips(const char *buf, size_t len, struct ceph_options *opt,
+ struct fc_log *l)
{
- struct ceph_options *opt;
- const char *c;
- int err = -ENOMEM;
- substring_t argstr[MAX_OPT_ARGS];
+ struct p_log log = {.prefix = "libceph", .log = l};
+ int ret;
- opt = kzalloc(sizeof(*opt), GFP_KERNEL);
- if (!opt)
- return ERR_PTR(-ENOMEM);
- opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
- GFP_KERNEL);
- if (!opt->mon_addr)
- goto out;
-
- dout("parse_options %p options '%s' dev_name '%s'\n", opt, options,
- dev_name);
-
- /* start with defaults */
- opt->flags = CEPH_OPT_DEFAULT;
- opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
- opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
- opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
- opt->osd_request_timeout = CEPH_OSD_REQUEST_TIMEOUT_DEFAULT;
-
- /* get mon ip(s) */
/* ip1[:port1][,ip2[:port2]...] */
- err = ceph_parse_ips(dev_name, dev_name_end, opt->mon_addr,
- CEPH_MAX_MON, &opt->num_mon);
- if (err < 0)
- goto out;
-
- /* parse mount options */
- while ((c = strsep(&options, ",")) != NULL) {
- int token, intval;
- if (!*c)
- continue;
- err = -EINVAL;
- token = match_token((char *)c, opt_tokens, argstr);
- if (token < 0 && parse_extra_token) {
- /* extra? */
- err = parse_extra_token((char *)c, private);
- if (err < 0) {
- pr_err("bad option at '%s'\n", c);
- goto out;
- }
- continue;
- }
- if (token < Opt_last_int) {
- err = match_int(&argstr[0], &intval);
- if (err < 0) {
- pr_err("bad option arg (not int) at '%s'\n", c);
- goto out;
- }
- dout("got int token %d val %d\n", token, intval);
- } else if (token > Opt_last_int && token < Opt_last_string) {
- dout("got string token %d val %s\n", token,
- argstr[0].from);
- } else {
- dout("got token %d\n", token);
- }
- switch (token) {
- case Opt_ip:
- err = ceph_parse_ips(argstr[0].from,
- argstr[0].to,
- &opt->my_addr,
- 1, NULL);
- if (err < 0)
- goto out;
- opt->flags |= CEPH_OPT_MYIP;
- break;
-
- case Opt_fsid:
- err = parse_fsid(argstr[0].from, &opt->fsid);
- if (err == 0)
- opt->flags |= CEPH_OPT_FSID;
- break;
- case Opt_name:
- kfree(opt->name);
- opt->name = kstrndup(argstr[0].from,
- argstr[0].to-argstr[0].from,
- GFP_KERNEL);
- if (!opt->name) {
- err = -ENOMEM;
- goto out;
- }
- break;
- case Opt_secret:
- ceph_crypto_key_destroy(opt->key);
- kfree(opt->key);
-
- opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
- if (!opt->key) {
- err = -ENOMEM;
- goto out;
- }
- err = ceph_crypto_key_unarmor(opt->key, argstr[0].from);
- if (err < 0)
- goto out;
- break;
- case Opt_key:
- ceph_crypto_key_destroy(opt->key);
- kfree(opt->key);
-
- opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
- if (!opt->key) {
- err = -ENOMEM;
- goto out;
- }
- err = get_secret(opt->key, argstr[0].from);
- if (err < 0)
- goto out;
- break;
-
- /* misc */
- case Opt_osdtimeout:
- pr_warn("ignoring deprecated osdtimeout option\n");
- break;
- case Opt_osdkeepalivetimeout:
- /* 0 isn't well defined right now, reject it */
- if (intval < 1 || intval > INT_MAX / 1000) {
- pr_err("osdkeepalive out of range\n");
- err = -EINVAL;
- goto out;
- }
- opt->osd_keepalive_timeout =
- msecs_to_jiffies(intval * 1000);
- break;
- case Opt_osd_idle_ttl:
- /* 0 isn't well defined right now, reject it */
- if (intval < 1 || intval > INT_MAX / 1000) {
- pr_err("osd_idle_ttl out of range\n");
- err = -EINVAL;
- goto out;
- }
- opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000);
- break;
- case Opt_mount_timeout:
- /* 0 is "wait forever" (i.e. infinite timeout) */
- if (intval < 0 || intval > INT_MAX / 1000) {
- pr_err("mount_timeout out of range\n");
- err = -EINVAL;
- goto out;
- }
- opt->mount_timeout = msecs_to_jiffies(intval * 1000);
- break;
- case Opt_osd_request_timeout:
- /* 0 is "wait forever" (i.e. infinite timeout) */
- if (intval < 0 || intval > INT_MAX / 1000) {
- pr_err("osd_request_timeout out of range\n");
- err = -EINVAL;
- goto out;
- }
- opt->osd_request_timeout = msecs_to_jiffies(intval * 1000);
- break;
-
- case Opt_share:
- opt->flags &= ~CEPH_OPT_NOSHARE;
- break;
- case Opt_noshare:
- opt->flags |= CEPH_OPT_NOSHARE;
- break;
-
- case Opt_crc:
- opt->flags &= ~CEPH_OPT_NOCRC;
- break;
- case Opt_nocrc:
- opt->flags |= CEPH_OPT_NOCRC;
- break;
-
- case Opt_cephx_require_signatures:
- opt->flags &= ~CEPH_OPT_NOMSGAUTH;
- break;
- case Opt_nocephx_require_signatures:
- opt->flags |= CEPH_OPT_NOMSGAUTH;
- break;
- case Opt_cephx_sign_messages:
- opt->flags &= ~CEPH_OPT_NOMSGSIGN;
- break;
- case Opt_nocephx_sign_messages:
- opt->flags |= CEPH_OPT_NOMSGSIGN;
- break;
-
- case Opt_tcp_nodelay:
- opt->flags |= CEPH_OPT_TCP_NODELAY;
- break;
- case Opt_notcp_nodelay:
- opt->flags &= ~CEPH_OPT_TCP_NODELAY;
- break;
-
- case Opt_abort_on_full:
- opt->flags |= CEPH_OPT_ABORT_ON_FULL;
- break;
-
- default:
- BUG_ON(token);
- }
+ ret = ceph_parse_ips(buf, buf + len, opt->mon_addr, CEPH_MAX_MON,
+ &opt->num_mon);
+ if (ret) {
+ error_plog(&log, "Failed to parse monitor IPs: %d", ret);
+ return ret;
}
- /* success */
- return opt;
-
-out:
- ceph_destroy_options(opt);
- return ERR_PTR(err);
+ return 0;
}
-EXPORT_SYMBOL(ceph_parse_options);
+EXPORT_SYMBOL(ceph_parse_mon_ips);
+
+int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
+ struct fc_log *l)
+{
+ struct fs_parse_result result;
+ int token, err;
+ struct p_log log = {.prefix = "libceph", .log = l};
+
+ token = __fs_parse(&log, ceph_parameters, param, &result);
+ dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
+ if (token < 0)
+ return token;
+
+ switch (token) {
+ case Opt_ip:
+ err = ceph_parse_ips(param->string,
+ param->string + param->size,
+ &opt->my_addr,
+ 1, NULL);
+ if (err) {
+ error_plog(&log, "Failed to parse ip: %d", err);
+ return err;
+ }
+ opt->flags |= CEPH_OPT_MYIP;
+ break;
+
+ case Opt_fsid:
+ err = parse_fsid(param->string, &opt->fsid);
+ if (err) {
+ error_plog(&log, "Failed to parse fsid: %d", err);
+ return err;
+ }
+ opt->flags |= CEPH_OPT_FSID;
+ break;
+ case Opt_name:
+ kfree(opt->name);
+ opt->name = param->string;
+ param->string = NULL;
+ break;
+ case Opt_secret:
+ ceph_crypto_key_destroy(opt->key);
+ kfree(opt->key);
+
+ opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
+ if (!opt->key)
+ return -ENOMEM;
+ err = ceph_crypto_key_unarmor(opt->key, param->string);
+ if (err) {
+ error_plog(&log, "Failed to parse secret: %d", err);
+ return err;
+ }
+ break;
+ case Opt_key:
+ ceph_crypto_key_destroy(opt->key);
+ kfree(opt->key);
+
+ opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
+ if (!opt->key)
+ return -ENOMEM;
+ return get_secret(opt->key, param->string, &log);
+ case Opt_crush_location:
+ ceph_clear_crush_locs(&opt->crush_locs);
+ err = ceph_parse_crush_location(param->string,
+ &opt->crush_locs);
+ if (err) {
+ error_plog(&log, "Failed to parse CRUSH location: %d",
+ err);
+ return err;
+ }
+ break;
+ case Opt_read_from_replica:
+ switch (result.uint_32) {
+ case Opt_read_from_replica_no:
+ opt->read_from_replica = 0;
+ break;
+ case Opt_read_from_replica_balance:
+ opt->read_from_replica = CEPH_OSD_FLAG_BALANCE_READS;
+ break;
+ case Opt_read_from_replica_localize:
+ opt->read_from_replica = CEPH_OSD_FLAG_LOCALIZE_READS;
+ break;
+ default:
+ BUG();
+ }
+ break;
+
+ case Opt_osdtimeout:
+ warn_plog(&log, "Ignoring osdtimeout");
+ break;
+ case Opt_osdkeepalivetimeout:
+ /* 0 isn't well defined right now, reject it */
+ if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
+ goto out_of_range;
+ opt->osd_keepalive_timeout =
+ msecs_to_jiffies(result.uint_32 * 1000);
+ break;
+ case Opt_osd_idle_ttl:
+ /* 0 isn't well defined right now, reject it */
+ if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
+ goto out_of_range;
+ opt->osd_idle_ttl = msecs_to_jiffies(result.uint_32 * 1000);
+ break;
+ case Opt_mount_timeout:
+ /* 0 is "wait forever" (i.e. infinite timeout) */
+ if (result.uint_32 > INT_MAX / 1000)
+ goto out_of_range;
+ opt->mount_timeout = msecs_to_jiffies(result.uint_32 * 1000);
+ break;
+ case Opt_osd_request_timeout:
+ /* 0 is "wait forever" (i.e. infinite timeout) */
+ if (result.uint_32 > INT_MAX / 1000)
+ goto out_of_range;
+ opt->osd_request_timeout =
+ msecs_to_jiffies(result.uint_32 * 1000);
+ break;
+
+ case Opt_share:
+ if (!result.negated)
+ opt->flags &= ~CEPH_OPT_NOSHARE;
+ else
+ opt->flags |= CEPH_OPT_NOSHARE;
+ break;
+ case Opt_crc:
+ if (!result.negated)
+ opt->flags &= ~CEPH_OPT_NOCRC;
+ else
+ opt->flags |= CEPH_OPT_NOCRC;
+ break;
+ case Opt_cephx_require_signatures:
+ if (!result.negated)
+ opt->flags &= ~CEPH_OPT_NOMSGAUTH;
+ else
+ opt->flags |= CEPH_OPT_NOMSGAUTH;
+ break;
+ case Opt_cephx_sign_messages:
+ if (!result.negated)
+ opt->flags &= ~CEPH_OPT_NOMSGSIGN;
+ else
+ opt->flags |= CEPH_OPT_NOMSGSIGN;
+ break;
+ case Opt_tcp_nodelay:
+ if (!result.negated)
+ opt->flags |= CEPH_OPT_TCP_NODELAY;
+ else
+ opt->flags &= ~CEPH_OPT_TCP_NODELAY;
+ break;
+
+ case Opt_abort_on_full:
+ opt->flags |= CEPH_OPT_ABORT_ON_FULL;
+ break;
+
+ default:
+ BUG();
+ }
+
+ return 0;
+
+out_of_range:
+ return inval_plog(&log, "%s out of range", param->key);
+}
+EXPORT_SYMBOL(ceph_parse_param);
int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
bool show_all)
{
struct ceph_options *opt = client->options;
size_t pos = m->count;
+ struct rb_node *n;
if (opt->name) {
seq_puts(m, "name=");
@@ -587,6 +595,28 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
if (opt->key)
seq_puts(m, "secret=<hidden>,");
+ if (!RB_EMPTY_ROOT(&opt->crush_locs)) {
+ seq_puts(m, "crush_location=");
+ for (n = rb_first(&opt->crush_locs); ; ) {
+ struct crush_loc_node *loc =
+ rb_entry(n, struct crush_loc_node, cl_node);
+
+ seq_printf(m, "%s:%s", loc->cl_loc.cl_type_name,
+ loc->cl_loc.cl_name);
+ n = rb_next(n);
+ if (!n)
+ break;
+
+ seq_putc(m, '|');
+ }
+ seq_putc(m, ',');
+ }
+ if (opt->read_from_replica == CEPH_OSD_FLAG_BALANCE_READS) {
+ seq_puts(m, "read_from_replica=balance,");
+ } else if (opt->read_from_replica == CEPH_OSD_FLAG_LOCALIZE_READS) {
+ seq_puts(m, "read_from_replica=localize,");
+ }
+
if (opt->flags & CEPH_OPT_FSID)
seq_printf(m, "fsid=%pU,", &opt->fsid);
if (opt->flags & CEPH_OPT_NOSHARE)
diff --git a/net/ceph/ceph_fs.c b/net/ceph/ceph_fs.c
deleted file mode 100644
index 756a2dc..0000000
--- a/net/ceph/ceph_fs.c
+++ /dev/null
@@ -1,104 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * Some non-inline ceph helpers
- */
-#include <linux/module.h>
-#include <linux/ceph/types.h>
-
-/*
- * return true if @layout appears to be valid
- */
-int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
-{
- __u32 su = layout->stripe_unit;
- __u32 sc = layout->stripe_count;
- __u32 os = layout->object_size;
-
- /* stripe unit, object size must be non-zero, 64k increment */
- if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1)))
- return 0;
- if (!os || (os & (CEPH_MIN_STRIPE_UNIT-1)))
- return 0;
- /* object size must be a multiple of stripe unit */
- if (os < su || os % su)
- return 0;
- /* stripe count must be non-zero */
- if (!sc)
- return 0;
- return 1;
-}
-
-void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
- struct ceph_file_layout_legacy *legacy)
-{
- fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit);
- fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count);
- fl->object_size = le32_to_cpu(legacy->fl_object_size);
- fl->pool_id = le32_to_cpu(legacy->fl_pg_pool);
- if (fl->pool_id == 0 && fl->stripe_unit == 0 &&
- fl->stripe_count == 0 && fl->object_size == 0)
- fl->pool_id = -1;
-}
-EXPORT_SYMBOL(ceph_file_layout_from_legacy);
-
-void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
- struct ceph_file_layout_legacy *legacy)
-{
- legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit);
- legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count);
- legacy->fl_object_size = cpu_to_le32(fl->object_size);
- if (fl->pool_id >= 0)
- legacy->fl_pg_pool = cpu_to_le32(fl->pool_id);
- else
- legacy->fl_pg_pool = 0;
-}
-EXPORT_SYMBOL(ceph_file_layout_to_legacy);
-
-int ceph_flags_to_mode(int flags)
-{
- int mode;
-
-#ifdef O_DIRECTORY /* fixme */
- if ((flags & O_DIRECTORY) == O_DIRECTORY)
- return CEPH_FILE_MODE_PIN;
-#endif
-
- switch (flags & O_ACCMODE) {
- case O_WRONLY:
- mode = CEPH_FILE_MODE_WR;
- break;
- case O_RDONLY:
- mode = CEPH_FILE_MODE_RD;
- break;
- case O_RDWR:
- case O_ACCMODE: /* this is what the VFS does */
- mode = CEPH_FILE_MODE_RDWR;
- break;
- }
-#ifdef O_LAZY
- if (flags & O_LAZY)
- mode |= CEPH_FILE_MODE_LAZY;
-#endif
-
- return mode;
-}
-EXPORT_SYMBOL(ceph_flags_to_mode);
-
-int ceph_caps_for_mode(int mode)
-{
- int caps = CEPH_CAP_PIN;
-
- if (mode & CEPH_FILE_MODE_RD)
- caps |= CEPH_CAP_FILE_SHARED |
- CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
- if (mode & CEPH_FILE_MODE_WR)
- caps |= CEPH_CAP_FILE_EXCL |
- CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
- CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
- CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
- if (mode & CEPH_FILE_MODE_LAZY)
- caps |= CEPH_CAP_FILE_LAZYIO;
-
- return caps;
-}
-EXPORT_SYMBOL(ceph_caps_for_mode);
diff --git a/net/ceph/ceph_hash.c b/net/ceph/ceph_hash.c
index 9a5850f..16a47c0 100644
--- a/net/ceph/ceph_hash.c
+++ b/net/ceph/ceph_hash.c
@@ -4,7 +4,7 @@
/*
* Robert Jenkin's hash function.
- * http://burtleburtle.net/bob/hash/evahash.html
+ * https://burtleburtle.net/bob/hash/evahash.html
* This is in the public domain.
*/
#define mix(a, b, c) \
@@ -50,35 +50,35 @@ unsigned int ceph_str_hash_rjenkins(const char *str, unsigned int length)
switch (len) {
case 11:
c = c + ((__u32)k[10] << 24);
- /* fall through */
+ fallthrough;
case 10:
c = c + ((__u32)k[9] << 16);
- /* fall through */
+ fallthrough;
case 9:
c = c + ((__u32)k[8] << 8);
/* the first byte of c is reserved for the length */
- /* fall through */
+ fallthrough;
case 8:
b = b + ((__u32)k[7] << 24);
- /* fall through */
+ fallthrough;
case 7:
b = b + ((__u32)k[6] << 16);
- /* fall through */
+ fallthrough;
case 6:
b = b + ((__u32)k[5] << 8);
- /* fall through */
+ fallthrough;
case 5:
b = b + k[4];
- /* fall through */
+ fallthrough;
case 4:
a = a + ((__u32)k[3] << 24);
- /* fall through */
+ fallthrough;
case 3:
a = a + ((__u32)k[2] << 16);
- /* fall through */
+ fallthrough;
case 2:
a = a + ((__u32)k[1] << 8);
- /* fall through */
+ fallthrough;
case 1:
a = a + k[0];
/* case 0: nothing left to add */
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 3d70244..254ded0 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -2,7 +2,6 @@
#ifdef __KERNEL__
# include <linux/slab.h>
# include <linux/crush/crush.h>
-void clear_choose_args(struct crush_map *c);
#else
# include "crush_compat.h"
# include "crush.h"
@@ -130,6 +129,8 @@ void crush_destroy(struct crush_map *map)
#ifndef __KERNEL__
kfree(map->choose_tries);
#else
+ clear_crush_names(&map->type_names);
+ clear_crush_names(&map->names);
clear_choose_args(map);
#endif
kfree(map);
diff --git a/net/ceph/crush/hash.c b/net/ceph/crush/hash.c
index e5cc603..fe79f6d 100644
--- a/net/ceph/crush/hash.c
+++ b/net/ceph/crush/hash.c
@@ -7,7 +7,7 @@
/*
* Robert Jenkins' function for mixing 32-bit values
- * http://burtleburtle.net/bob/hash/evahash.html
+ * https://burtleburtle.net/bob/hash/evahash.html
* a, b = random bits, c = input and output
*/
#define crush_hashmix(a, b, c) do { \
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 3f323ed..7057f8d 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -298,7 +298,7 @@ static __u64 crush_ln(unsigned int xin)
*
* for reference, see:
*
- * http://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
+ * https://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
*
*/
@@ -987,7 +987,7 @@ int crush_do_rule(const struct crush_map *map,
case CRUSH_RULE_CHOOSELEAF_FIRSTN:
case CRUSH_RULE_CHOOSE_FIRSTN:
firstn = 1;
- /* fall through */
+ fallthrough;
case CRUSH_RULE_CHOOSELEAF_INDEP:
case CRUSH_RULE_CHOOSE_INDEP:
if (wsize == 0)
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 7cb992e..2110439 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -81,11 +81,13 @@ static int osdmap_show(struct seq_file *s, void *p)
u32 state = map->osd_state[i];
char sb[64];
- seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
+ seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\t%2d\n",
i, ceph_pr_addr(addr),
((map->osd_weight[i]*100) >> 16),
ceph_osdmap_state_str(sb, sizeof(sb), state),
- ((ceph_get_primary_affinity(map, i)*100) >> 16));
+ ((ceph_get_primary_affinity(map, i)*100) >> 16),
+ ceph_get_crush_locality(map, i,
+ &client->options->crush_locs));
}
for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg =
@@ -221,6 +223,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
if (op->op == CEPH_OSD_OP_WATCH)
seq_printf(s, "-%s",
ceph_osd_watch_op_name(op->watch.op));
+ else if (op->op == CEPH_OSD_OP_CALL)
+ seq_printf(s, "-%s/%s", op->cls.class_name,
+ op->cls.method_name);
}
seq_putc(s, '\n');
@@ -383,11 +388,11 @@ static int client_options_show(struct seq_file *s, void *p)
return 0;
}
-CEPH_DEFINE_SHOW_FUNC(monmap_show)
-CEPH_DEFINE_SHOW_FUNC(osdmap_show)
-CEPH_DEFINE_SHOW_FUNC(monc_show)
-CEPH_DEFINE_SHOW_FUNC(osdc_show)
-CEPH_DEFINE_SHOW_FUNC(client_options_show)
+DEFINE_SHOW_ATTRIBUTE(monmap);
+DEFINE_SHOW_ATTRIBUTE(osdmap);
+DEFINE_SHOW_ATTRIBUTE(monc);
+DEFINE_SHOW_ATTRIBUTE(osdc);
+DEFINE_SHOW_ATTRIBUTE(client_options);
void __init ceph_debugfs_init(void)
{
@@ -414,31 +419,31 @@ void ceph_debugfs_client_init(struct ceph_client *client)
0400,
client->debugfs_dir,
client,
- &monc_show_fops);
+ &monc_fops);
client->osdc.debugfs_file = debugfs_create_file("osdc",
0400,
client->debugfs_dir,
client,
- &osdc_show_fops);
+ &osdc_fops);
client->debugfs_monmap = debugfs_create_file("monmap",
0400,
client->debugfs_dir,
client,
- &monmap_show_fops);
+ &monmap_fops);
client->debugfs_osdmap = debugfs_create_file("osdmap",
0400,
client->debugfs_dir,
client,
- &osdmap_show_fops);
+ &osdmap_fops);
client->debugfs_options = debugfs_create_file("client_options",
0400,
client->debugfs_dir,
client,
- &client_options_show_fops);
+ &client_options_fops);
}
void ceph_debugfs_client_cleanup(struct ceph_client *client)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 49726c3..af0f1fa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -412,7 +412,7 @@ static void ceph_sock_state_change(struct sock *sk)
switch (sk->sk_state) {
case TCP_CLOSE:
dout("%s TCP_CLOSE\n", __func__);
- /* fall through */
+ fallthrough;
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
@@ -490,15 +490,8 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return ret;
}
- if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
- int optval = 1;
-
- ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
- (char *)&optval, sizeof(optval));
- if (ret)
- pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
- ret);
- }
+ if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
+ tcp_sock_set_nodelay(sock->sk);
con->sock = sock;
return 0;
@@ -582,7 +575,7 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
* coalescing neighboring slab objects into a single frag which
* triggers one of hardened usercopy checks.
*/
- if (page_count(page) >= 1 && !PageSlab(page))
+ if (sendpage_ok(page))
sendpage = sock->ops->sendpage;
else
sendpage = sock_no_sendpage;
@@ -2004,10 +1997,8 @@ int ceph_parse_ips(const char *c, const char *end,
return 0;
bad:
- pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
return ret;
}
-EXPORT_SYMBOL(ceph_parse_ips);
static int process_banner(struct ceph_connection *con)
{
@@ -2025,11 +2016,11 @@ static int process_banner(struct ceph_connection *con)
sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+ pr_warn("wrong peer, want %s/%u, got %s/%u\n",
ceph_pr_addr(&con->peer_addr),
- (int)le32_to_cpu(con->peer_addr.nonce),
+ le32_to_cpu(con->peer_addr.nonce),
ceph_pr_addr(&con->actual_peer_addr),
- (int)le32_to_cpu(con->actual_peer_addr.nonce));
+ le32_to_cpu(con->actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
@@ -2760,7 +2751,7 @@ static int try_read(struct ceph_connection *con)
switch (ret) {
case -EBADMSG:
con->error_msg = "bad crc/signature";
- /* fall through */
+ fallthrough;
case -EBADE:
ret = -EIO;
break;
@@ -2820,13 +2811,13 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
return -ENOENT;
}
+ dout("%s %p %lu\n", __func__, con, delay);
if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
return -EBUSY;
}
- dout("%s %p %lu\n", __func__, con, delay);
return 0;
}
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 7256c40..c4cf252 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -467,7 +467,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_client *client = monc->client;
- struct ceph_monmap *monmap = NULL, *old = monc->monmap;
+ struct ceph_monmap *monmap;
void *p, *end;
mutex_lock(&monc->mutex);
@@ -484,13 +484,13 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
goto out;
}
- if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
+ if (ceph_check_fsid(client, &monmap->fsid) < 0) {
kfree(monmap);
goto out;
}
- client->monc.monmap = monmap;
- kfree(old);
+ kfree(monc->monmap);
+ monc->monmap = monmap;
__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
client->have_fsid = true;
@@ -896,8 +896,9 @@ static void handle_command_ack(struct ceph_mon_client *monc,
ceph_msg_dump(msg);
}
-int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
- struct ceph_entity_addr *client_addr)
+static __printf(2, 0)
+int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
+ va_list ap)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_command *h;
@@ -925,29 +926,65 @@ int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->num_strs = cpu_to_le32(1);
- len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
- \"blacklistop\": \"add\", \
- \"addr\": \"%pISpc/%u\" }",
- &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+ len = vsprintf(h->str, fmt, ap);
h->str_len = cpu_to_le32(len);
send_generic_request(monc, req);
mutex_unlock(&monc->mutex);
ret = wait_generic_request(req);
- if (!ret)
- /*
- * Make sure we have the osdmap that includes the blacklist
- * entry. This is needed to ensure that the OSDs pick up the
- * new blacklist before processing any future requests from
- * this client.
- */
- ret = ceph_wait_for_latest_osdmap(monc->client, 0);
-
out:
put_generic_request(req);
return ret;
}
-EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
+static __printf(2, 3)
+int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
+{
+ va_list ap;
+ int ret;
+
+ va_start(ap, fmt);
+ ret = do_mon_command_vargs(monc, fmt, ap);
+ va_end(ap);
+ return ret;
+}
+
+int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
+ struct ceph_entity_addr *client_addr)
+{
+ int ret;
+
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blocklist\", \
+ \"blocklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ if (ret == -EINVAL) {
+ /*
+ * The monitor returns EINVAL on an unrecognized command.
+ * Try the legacy command -- it is exactly the same except
+ * for the name.
+ */
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blacklist\", \
+ \"blacklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ }
+ if (ret)
+ return ret;
+
+ /*
+ * Make sure we have the osdmap that includes the blocklist
+ * entry. This is needed to ensure that the OSDs pick up the
+ * new blocklist before processing any future requests from
+ * this client.
+ */
+ return ceph_wait_for_latest_osdmap(monc->client, 0);
+}
+EXPORT_SYMBOL(ceph_monc_blocklist_add);
/*
* Resend pending generic requests.
@@ -1233,9 +1270,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(msg->hdr.type);
- if (!monc)
- return;
-
switch (type) {
case CEPH_MSG_AUTH_REPLY:
handle_auth_reply(monc, msg);
@@ -1310,7 +1344,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
* request had a non-zero tid. Work around this weirdness
* by allocating a new message.
*/
- /* fall through */
+ fallthrough;
case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a8481da..7901ab6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -402,7 +402,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osd_data_release(&op->list_watchers.response_data);
break;
- case CEPH_OSD_OP_COPY_FROM:
+ case CEPH_OSD_OP_COPY_FROM2:
ceph_osd_data_release(&op->copy_from.osd_data);
break;
default:
@@ -448,6 +448,7 @@ static void target_copy(struct ceph_osd_request_target *dest,
dest->recovery_deletes = src->recovery_deletes;
dest->flags = src->flags;
+ dest->used_replica = src->used_replica;
dest->paused = src->paused;
dest->epoch = src->epoch;
@@ -524,7 +525,7 @@ EXPORT_SYMBOL(ceph_osdc_put_request);
static void request_init(struct ceph_osd_request *req)
{
- /* req only, each op is zeroed in _osd_req_op_init() */
+ /* req only, each op is zeroed in osd_req_op_init() */
memset(req, 0, sizeof(*req));
kref_init(&req->r_kref);
@@ -698,7 +699,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_NOTIFY_ACK:
- case CEPH_OSD_OP_COPY_FROM:
+ case CEPH_OSD_OP_COPY_FROM2:
*num_request_data_items += 1;
break;
@@ -745,8 +746,8 @@ EXPORT_SYMBOL(ceph_osdc_alloc_messages);
* other information associated with them. It also serves as a
* common init routine for all the other init functions, below.
*/
-static struct ceph_osd_req_op *
-_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+struct ceph_osd_req_op *
+osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, u32 flags)
{
struct ceph_osd_req_op *op;
@@ -761,12 +762,6 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
return op;
}
-
-void osd_req_op_init(struct ceph_osd_request *osd_req,
- unsigned int which, u16 opcode, u32 flags)
-{
- (void)_osd_req_op_init(osd_req, which, opcode, flags);
-}
EXPORT_SYMBOL(osd_req_op_init);
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
@@ -774,8 +769,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
- opcode, 0);
+ struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
+ opcode, 0);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
@@ -821,7 +816,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
BUG_ON(which + 1 >= osd_req->r_num_ops);
prev_op = &osd_req->r_ops[which];
- op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
+ op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
/* dup previous one */
op->indata_len = prev_op->indata_len;
op->outdata_len = prev_op->outdata_len;
@@ -844,7 +839,7 @@ int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
size_t size;
int ret;
- op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
+ op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist)
@@ -882,8 +877,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
- opcode, 0);
+ struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
+ opcode, 0);
struct ceph_pagelist *pagelist;
size_t payload_len;
int ret;
@@ -927,23 +922,27 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
{
struct ceph_osd_req_op *op;
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
op->watch.cookie = cookie;
op->watch.op = watch_opcode;
op->watch.gen = 0;
}
+/*
+ * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
+ */
void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which,
u64 expected_object_size,
- u64 expected_write_size)
+ u64 expected_write_size,
+ u32 flags)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
- CEPH_OSD_OP_SETALLOCHINT,
- 0);
+ struct ceph_osd_req_op *op;
+ op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
op->alloc_hint.expected_object_size = expected_object_size;
op->alloc_hint.expected_write_size = expected_write_size;
+ op->alloc_hint.flags = flags;
/*
* CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
@@ -1019,6 +1018,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
cpu_to_le64(src->alloc_hint.expected_object_size);
dst->alloc_hint.expected_write_size =
cpu_to_le64(src->alloc_hint.expected_write_size);
+ dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
break;
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
@@ -1030,7 +1030,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE:
break;
- case CEPH_OSD_OP_COPY_FROM:
+ case CEPH_OSD_OP_COPY_FROM2:
dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
dst->copy_from.src_version =
cpu_to_le64(src->copy_from.src_version);
@@ -1112,10 +1112,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size, truncate_seq);
}
- req->r_flags = flags;
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
+ req->r_flags = flags | osdc->client->options->read_from_replica;
req->r_snapid = vino.snap;
if (flags & CEPH_OSD_FLAG_WRITE)
@@ -1498,6 +1498,45 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
(osdc->osdmap->epoch < osdc->epoch_barrier);
}
+static int pick_random_replica(const struct ceph_osds *acting)
+{
+ int i = prandom_u32() % acting->size;
+
+ dout("%s picked osd%d, primary osd%d\n", __func__,
+ acting->osds[i], acting->primary);
+ return i;
+}
+
+/*
+ * Picks the closest replica based on client's location given by
+ * crush_location option. Prefers the primary if the locality is
+ * the same.
+ */
+static int pick_closest_replica(struct ceph_osd_client *osdc,
+ const struct ceph_osds *acting)
+{
+ struct ceph_options *opt = osdc->client->options;
+ int best_i, best_locality;
+ int i = 0, locality;
+
+ do {
+ locality = ceph_get_crush_locality(osdc->osdmap,
+ acting->osds[i],
+ &opt->crush_locs);
+ if (i == 0 ||
+ (locality >= 0 && best_locality < 0) ||
+ (locality >= 0 && best_locality >= 0 &&
+ locality < best_locality)) {
+ best_i = i;
+ best_locality = locality;
+ }
+ } while (++i < acting->size);
+
+ dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
+ acting->osds[best_i], best_locality, acting->primary);
+ return best_i;
+}
+
enum calc_target_result {
CALC_TARGET_NO_ACTION = 0,
CALC_TARGET_NEED_RESEND,
@@ -1511,6 +1550,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
struct ceph_pg_pool_info *pi;
struct ceph_pg pgid, last_pgid;
struct ceph_osds up, acting;
+ bool is_read = t->flags & CEPH_OSD_FLAG_READ;
+ bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
bool force_resend = false;
bool unpaused = false;
bool legacy_change = false;
@@ -1541,9 +1582,9 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
ceph_oid_copy(&t->target_oid, &t->base_oid);
ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
- if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
+ if (is_read && pi->read_tier >= 0)
t->target_oloc.pool = pi->read_tier;
- if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
+ if (is_write && pi->write_tier >= 0)
t->target_oloc.pool = pi->write_tier;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
@@ -1582,7 +1623,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
unpaused = true;
}
legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
- ceph_osds_changed(&t->acting, &acting, any_change);
+ ceph_osds_changed(&t->acting, &acting,
+ t->used_replica || any_change);
if (t->pg_num)
split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
@@ -1598,7 +1640,24 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
t->sort_bitwise = sort_bitwise;
t->recovery_deletes = recovery_deletes;
- t->osd = acting.primary;
+ if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+ !is_write && pi->type == CEPH_POOL_TYPE_REP &&
+ acting.size > 1) {
+ int pos;
+
+ WARN_ON(!is_read || acting.osds[0] != acting.primary);
+ if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
+ pos = pick_random_replica(&acting);
+ } else {
+ pos = pick_closest_replica(osdc, &acting);
+ }
+ t->osd = acting.osds[pos];
+ t->used_replica = pos > 0;
+ } else {
+ t->osd = acting.primary;
+ t->used_replica = false;
+ }
}
if (unpaused || legacy_change || force_resend || split)
@@ -1967,7 +2026,7 @@ static void setup_request_data(struct ceph_osd_request *req)
ceph_osdc_msg_data_add(request_msg,
&op->notify_ack.request_data);
break;
- case CEPH_OSD_OP_COPY_FROM:
+ case CEPH_OSD_OP_COPY_FROM2:
ceph_osdc_msg_data_add(request_msg,
&op->copy_from.osd_data);
break;
@@ -2374,6 +2433,7 @@ static void account_request(struct ceph_osd_request *req)
atomic_inc(&req->r_osdc->num_requests);
req->r_start_stamp = jiffies;
+ req->r_start_latency = ktime_get();
}
static void submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -2390,6 +2450,8 @@ static void finish_request(struct ceph_osd_request *req)
WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+ req->r_end_latency = ktime_get();
+
if (req->r_osd)
unlink_request(req->r_osd, req);
atomic_dec(&osdc->num_requests);
@@ -3007,9 +3069,7 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
cancel_linger_request(req);
request_reinit(req);
- ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
- ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
- req->r_flags = lreq->t.flags;
+ target_copy(&req->r_t, &lreq->t);
req->r_mtime = lreq->mtime;
mutex_lock(&lreq->lock);
@@ -3484,9 +3544,6 @@ static int ceph_redirect_decode(void **p, void *end,
goto e_inval;
}
- len = ceph_decode_32(p);
- *p += len; /* skip osd_instructions */
-
/* skip the rest */
*p = struct_end;
out:
@@ -3661,6 +3718,26 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
goto out_unlock_osdc;
}
+ if (m.result == -EAGAIN) {
+ dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
+ unlink_request(osd, req);
+ mutex_unlock(&osd->lock);
+
+ /*
+ * The object is missing on the replica or not (yet)
+ * readable. Clear pgid to force a resend to the primary
+ * via legacy_change.
+ */
+ req->r_t.pgid.pool = 0;
+ req->r_t.pgid.seed = 0;
+ WARN_ON(!req->r_t.used_replica);
+ req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS);
+ req->r_tid = 0;
+ __submit_request(req, false);
+ goto out_unlock_osdc;
+ }
+
if (m.num_ops != req->r_num_ops) {
pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
req->r_num_ops, req->r_tid);
@@ -3777,7 +3854,7 @@ static void scan_requests(struct ceph_osd *osd,
if (!force_resend && !force_resend_writes)
break;
- /* fall through */
+ fallthrough;
case CALC_TARGET_NEED_RESEND:
cancel_linger_map_check(lreq);
/*
@@ -3814,7 +3891,7 @@ static void scan_requests(struct ceph_osd *osd,
!force_resend_writes))
break;
- /* fall through */
+ fallthrough;
case CALC_TARGET_NEED_RESEND:
cancel_map_check(req);
unlink_request(osd, req);
@@ -4715,7 +4792,7 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
struct ceph_pagelist *pl;
int ret;
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
pl = ceph_pagelist_alloc(GFP_NOIO);
if (!pl)
@@ -4784,7 +4861,7 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
struct ceph_pagelist *pl;
int ret;
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
op->notify.cookie = cookie;
pl = ceph_pagelist_alloc(GFP_NOIO);
@@ -5231,91 +5308,13 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
-/*
- * Read some contiguous pages. If we cross a stripe boundary, shorten
- * *plen. Return number of bytes read, or error.
- */
-int ceph_osdc_readpages(struct ceph_osd_client *osdc,
- struct ceph_vino vino, struct ceph_file_layout *layout,
- u64 off, u64 *plen,
- u32 truncate_seq, u64 truncate_size,
- struct page **pages, int num_pages, int page_align)
-{
- struct ceph_osd_request *req;
- int rc = 0;
-
- dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
- vino.snap, off, *plen);
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
- CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
- NULL, truncate_seq, truncate_size,
- false);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- /* it may be a short read due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0,
- pages, *plen, page_align, false, false);
-
- dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
- off, *plen, *plen, page_align);
-
- rc = ceph_osdc_start_request(osdc, req, false);
- if (!rc)
- rc = ceph_osdc_wait_request(osdc, req);
-
- ceph_osdc_put_request(req);
- dout("readpages result %d\n", rc);
- return rc;
-}
-EXPORT_SYMBOL(ceph_osdc_readpages);
-
-/*
- * do a synchronous write on N pages
- */
-int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
- struct ceph_file_layout *layout,
- struct ceph_snap_context *snapc,
- u64 off, u64 len,
- u32 truncate_seq, u64 truncate_size,
- struct timespec64 *mtime,
- struct page **pages, int num_pages)
-{
- struct ceph_osd_request *req;
- int rc = 0;
- int page_align = off & ~PAGE_MASK;
-
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
- CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
- snapc, truncate_seq, truncate_size,
- true);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- /* it may be a short write due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
- false, false);
- dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
-
- req->r_mtime = *mtime;
- rc = ceph_osdc_start_request(osdc, req, true);
- if (!rc)
- rc = ceph_osdc_wait_request(osdc, req);
-
- ceph_osdc_put_request(req);
- if (rc == 0)
- rc = len;
- dout("writepages result %d\n", rc);
- return rc;
-}
-EXPORT_SYMBOL(ceph_osdc_writepages);
-
static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
u32 dst_fadvise_flags,
+ u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags)
{
struct ceph_osd_req_op *op;
@@ -5326,7 +5325,8 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
if (IS_ERR(pages))
return PTR_ERR(pages);
- op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+ op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
+ dst_fadvise_flags);
op->copy_from.snapid = src_snapid;
op->copy_from.src_version = src_version;
op->copy_from.flags = copy_from_flags;
@@ -5336,6 +5336,8 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
end = p + PAGE_SIZE;
ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
encode_oloc(&p, end, src_oloc);
+ ceph_encode_32(&p, truncate_seq);
+ ceph_encode_64(&p, truncate_size);
op->indata_len = PAGE_SIZE - (end - p);
ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
@@ -5351,6 +5353,7 @@ int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 dst_fadvise_flags,
+ u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags)
{
struct ceph_osd_request *req;
@@ -5367,7 +5370,8 @@ int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
src_oloc, src_fadvise_flags,
- dst_fadvise_flags, copy_from_flags);
+ dst_fadvise_flags, truncate_seq,
+ truncate_size, copy_from_flags);
if (ret)
goto out;
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 2a6e63a..fa08c15 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -138,6 +138,79 @@ static int crush_decode_straw2_bucket(void **p, void *end,
return -EINVAL;
}
+struct crush_name_node {
+ struct rb_node cn_node;
+ int cn_id;
+ char cn_name[];
+};
+
+static struct crush_name_node *alloc_crush_name(size_t name_len)
+{
+ struct crush_name_node *cn;
+
+ cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
+ if (!cn)
+ return NULL;
+
+ RB_CLEAR_NODE(&cn->cn_node);
+ return cn;
+}
+
+static void free_crush_name(struct crush_name_node *cn)
+{
+ WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
+
+ kfree(cn);
+}
+
+DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
+
+static int decode_crush_names(void **p, void *end, struct rb_root *root)
+{
+ u32 n;
+
+ ceph_decode_32_safe(p, end, n, e_inval);
+ while (n--) {
+ struct crush_name_node *cn;
+ int id;
+ u32 name_len;
+
+ ceph_decode_32_safe(p, end, id, e_inval);
+ ceph_decode_32_safe(p, end, name_len, e_inval);
+ ceph_decode_need(p, end, name_len, e_inval);
+
+ cn = alloc_crush_name(name_len);
+ if (!cn)
+ return -ENOMEM;
+
+ cn->cn_id = id;
+ memcpy(cn->cn_name, *p, name_len);
+ cn->cn_name[name_len] = '\0';
+ *p += name_len;
+
+ if (!__insert_crush_name(root, cn)) {
+ free_crush_name(cn);
+ return -EEXIST;
+ }
+ }
+
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+void clear_crush_names(struct rb_root *root)
+{
+ while (!RB_EMPTY_ROOT(root)) {
+ struct crush_name_node *cn =
+ rb_entry(rb_first(root), struct crush_name_node, cn_node);
+
+ erase_crush_name(root, cn);
+ free_crush_name(cn);
+ }
+}
+
static struct crush_choose_arg_map *alloc_choose_arg_map(void)
{
struct crush_choose_arg_map *arg_map;
@@ -354,6 +427,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
if (c == NULL)
return ERR_PTR(-ENOMEM);
+ c->type_names = RB_ROOT;
+ c->names = RB_ROOT;
c->choose_args = RB_ROOT;
/* set tunables to default values */
@@ -510,8 +585,14 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
}
}
- ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
- ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+ err = decode_crush_names(p, end, &c->type_names);
+ if (err)
+ goto fail;
+
+ err = decode_crush_names(p, end, &c->names);
+ if (err)
+ goto fail;
+
ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
/* tunables */
@@ -636,48 +717,11 @@ DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
/*
* rbtree of pg pool info
*/
-static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
-{
- struct rb_node **p = &root->rb_node;
- struct rb_node *parent = NULL;
- struct ceph_pg_pool_info *pi = NULL;
-
- while (*p) {
- parent = *p;
- pi = rb_entry(parent, struct ceph_pg_pool_info, node);
- if (new->id < pi->id)
- p = &(*p)->rb_left;
- else if (new->id > pi->id)
- p = &(*p)->rb_right;
- else
- return -EEXIST;
- }
-
- rb_link_node(&new->node, parent, p);
- rb_insert_color(&new->node, root);
- return 0;
-}
-
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
-{
- struct ceph_pg_pool_info *pi;
- struct rb_node *n = root->rb_node;
-
- while (n) {
- pi = rb_entry(n, struct ceph_pg_pool_info, node);
- if (id < pi->id)
- n = n->rb_left;
- else if (id > pi->id)
- n = n->rb_right;
- else
- return pi;
- }
- return NULL;
-}
+DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
{
- return __lookup_pg_pool(&map->pg_pools, id);
+ return lookup_pg_pool(&map->pg_pools, id);
}
const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
@@ -690,8 +734,7 @@ const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
if (WARN_ON_ONCE(id > (u64) INT_MAX))
return NULL;
- pi = __lookup_pg_pool(&map->pg_pools, (int) id);
-
+ pi = lookup_pg_pool(&map->pg_pools, id);
return pi ? pi->name : NULL;
}
EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
@@ -714,14 +757,14 @@ u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
{
struct ceph_pg_pool_info *pi;
- pi = __lookup_pg_pool(&map->pg_pools, id);
+ pi = lookup_pg_pool(&map->pg_pools, id);
return pi ? pi->flags : 0;
}
EXPORT_SYMBOL(ceph_pg_pool_flags);
static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
{
- rb_erase(&pi->node, root);
+ erase_pg_pool(root, pi);
kfree(pi->name);
kfree(pi);
}
@@ -903,7 +946,7 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
ceph_decode_32_safe(p, end, len, bad);
dout(" pool %llu len %d\n", pool, len);
ceph_decode_need(p, end, len, bad);
- pi = __lookup_pg_pool(&map->pg_pools, pool);
+ pi = lookup_pg_pool(&map->pg_pools, pool);
if (pi) {
char *name = kstrndup(*p, len, GFP_NOFS);
@@ -922,6 +965,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
}
/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+ struct crush_work *work;
+ size_t work_size;
+
+ WARN_ON(!c->working_size);
+ work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+ dout("%s work_size %zu bytes\n", __func__, work_size);
+
+ work = ceph_kvmalloc(work_size, GFP_NOIO);
+ if (!work)
+ return NULL;
+
+ INIT_LIST_HEAD(&work->item);
+ crush_init_workspace(c, work);
+ return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+ WARN_ON(!list_empty(&work->item));
+ kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+ INIT_LIST_HEAD(&wsm->idle_ws);
+ spin_lock_init(&wsm->ws_lock);
+ atomic_set(&wsm->total_ws, 0);
+ wsm->free_ws = 0;
+ init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+ struct crush_work *work)
+{
+ WARN_ON(!list_empty(&wsm->idle_ws));
+
+ list_add(&work->item, &wsm->idle_ws);
+ atomic_set(&wsm->total_ws, 1);
+ wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+ struct crush_work *work;
+
+ while (!list_empty(&wsm->idle_ws)) {
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
+ item);
+ list_del_init(&work->item);
+ free_workspace(work);
+ }
+ atomic_set(&wsm->total_ws, 0);
+ wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one. If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+ const struct crush_map *c)
+{
+ struct crush_work *work;
+ int cpus = num_online_cpus();
+
+again:
+ spin_lock(&wsm->ws_lock);
+ if (!list_empty(&wsm->idle_ws)) {
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
+ item);
+ list_del_init(&work->item);
+ wsm->free_ws--;
+ spin_unlock(&wsm->ws_lock);
+ return work;
+
+ }
+ if (atomic_read(&wsm->total_ws) > cpus) {
+ DEFINE_WAIT(wait);
+
+ spin_unlock(&wsm->ws_lock);
+ prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+ if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+ schedule();
+ finish_wait(&wsm->ws_wait, &wait);
+ goto again;
+ }
+ atomic_inc(&wsm->total_ws);
+ spin_unlock(&wsm->ws_lock);
+
+ work = alloc_workspace(c);
+ if (!work) {
+ atomic_dec(&wsm->total_ws);
+ wake_up(&wsm->ws_wait);
+
+ /*
+ * Do not return the error but go back to waiting. We
+ * have the inital workspace and the CRUSH computation
+ * time is bounded so we will get it eventually.
+ */
+ WARN_ON(atomic_read(&wsm->total_ws) < 1);
+ goto again;
+ }
+ return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+ struct crush_work *work)
+{
+ spin_lock(&wsm->ws_lock);
+ if (wsm->free_ws <= num_online_cpus()) {
+ list_add(&work->item, &wsm->idle_ws);
+ wsm->free_ws++;
+ spin_unlock(&wsm->ws_lock);
+ goto wake;
+ }
+ spin_unlock(&wsm->ws_lock);
+
+ free_workspace(work);
+ atomic_dec(&wsm->total_ws);
+wake:
+ if (wq_has_sleeper(&wsm->ws_wait))
+ wake_up(&wsm->ws_wait);
+}
+
+/*
* osd map
*/
struct ceph_osdmap *ceph_osdmap_alloc(void)
@@ -938,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
map->primary_temp = RB_ROOT;
map->pg_upmap = RB_ROOT;
map->pg_upmap_items = RB_ROOT;
- mutex_init(&map->crush_workspace_mutex);
+
+ init_workspace_manager(&map->crush_wsm);
return map;
}
@@ -946,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
void ceph_osdmap_destroy(struct ceph_osdmap *map)
{
dout("osdmap_destroy %p\n", map);
+
if (map->crush)
crush_destroy(map->crush);
+ cleanup_workspace_manager(&map->crush_wsm);
+
while (!RB_EMPTY_ROOT(&map->pg_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
@@ -986,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
kvfree(map->osd_weight);
kvfree(map->osd_addr);
kvfree(map->osd_primary_affinity);
- kvfree(map->crush_workspace);
kfree(map);
}
@@ -1061,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
{
- void *workspace;
- size_t work_size;
+ struct crush_work *work;
if (IS_ERR(crush))
return PTR_ERR(crush);
- work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
- dout("%s work_size %zu bytes\n", __func__, work_size);
- workspace = ceph_kvmalloc(work_size, GFP_NOIO);
- if (!workspace) {
+ work = alloc_workspace(crush);
+ if (!work) {
crush_destroy(crush);
return -ENOMEM;
}
- crush_init_workspace(crush, workspace);
if (map->crush)
crush_destroy(map->crush);
- kvfree(map->crush_workspace);
+ cleanup_workspace_manager(&map->crush_wsm);
map->crush = crush;
- map->crush_workspace = workspace;
+ add_initial_workspace(&map->crush_wsm, work);
return 0;
}
@@ -1154,18 +1333,18 @@ static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
ceph_decode_64_safe(p, end, pool, e_inval);
- pi = __lookup_pg_pool(&map->pg_pools, pool);
+ pi = lookup_pg_pool(&map->pg_pools, pool);
if (!incremental || !pi) {
pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi)
return -ENOMEM;
+ RB_CLEAR_NODE(&pi->node);
pi->id = pool;
- ret = __insert_pg_pool(&map->pg_pools, pi);
- if (ret) {
+ if (!__insert_pg_pool(&map->pg_pools, pi)) {
kfree(pi);
- return ret;
+ return -EEXIST;
}
}
@@ -1829,7 +2008,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_pg_pool_info *pi;
ceph_decode_64_safe(p, end, pool, e_inval);
- pi = __lookup_pg_pool(&map->pg_pools, pool);
+ pi = lookup_pg_pool(&map->pg_pools, pool);
if (pi)
__remove_pg_pool(&map->pg_pools, pi);
}
@@ -2279,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
s64 choose_args_index)
{
struct crush_choose_arg_map *arg_map;
+ struct crush_work *work;
int r;
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2289,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
arg_map = lookup_choose_arg_map(&map->crush->choose_args,
CEPH_DEFAULT_CHOOSE_ARGS);
- mutex_lock(&map->crush_workspace_mutex);
+ work = get_workspace(&map->crush_wsm, map->crush);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
- weight, weight_max, map->crush_workspace,
+ weight, weight_max, work,
arg_map ? arg_map->args : NULL);
- mutex_unlock(&map->crush_workspace_mutex);
-
+ put_workspace(&map->crush_wsm, work);
return r;
}
@@ -2672,3 +2851,221 @@ int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
return acting.primary;
}
EXPORT_SYMBOL(ceph_pg_to_acting_primary);
+
+static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
+ size_t name_len)
+{
+ struct crush_loc_node *loc;
+
+ loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
+ if (!loc)
+ return NULL;
+
+ RB_CLEAR_NODE(&loc->cl_node);
+ return loc;
+}
+
+static void free_crush_loc(struct crush_loc_node *loc)
+{
+ WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
+
+ kfree(loc);
+}
+
+static int crush_loc_compare(const struct crush_loc *loc1,
+ const struct crush_loc *loc2)
+{
+ return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
+ strcmp(loc1->cl_name, loc2->cl_name);
+}
+
+DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
+ RB_BYPTR, const struct crush_loc *, cl_node)
+
+/*
+ * Parses a set of <bucket type name>':'<bucket name> pairs separated
+ * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
+ *
+ * Note that @crush_location is modified by strsep().
+ */
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
+{
+ struct crush_loc_node *loc;
+ const char *type_name, *name, *colon;
+ size_t type_name_len, name_len;
+
+ dout("%s '%s'\n", __func__, crush_location);
+ while ((type_name = strsep(&crush_location, "|"))) {
+ colon = strchr(type_name, ':');
+ if (!colon)
+ return -EINVAL;
+
+ type_name_len = colon - type_name;
+ if (type_name_len == 0)
+ return -EINVAL;
+
+ name = colon + 1;
+ name_len = strlen(name);
+ if (name_len == 0)
+ return -EINVAL;
+
+ loc = alloc_crush_loc(type_name_len, name_len);
+ if (!loc)
+ return -ENOMEM;
+
+ loc->cl_loc.cl_type_name = loc->cl_data;
+ memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
+ loc->cl_loc.cl_type_name[type_name_len] = '\0';
+
+ loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
+ memcpy(loc->cl_loc.cl_name, name, name_len);
+ loc->cl_loc.cl_name[name_len] = '\0';
+
+ if (!__insert_crush_loc(locs, loc)) {
+ free_crush_loc(loc);
+ return -EEXIST;
+ }
+
+ dout("%s type_name '%s' name '%s'\n", __func__,
+ loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
+ }
+
+ return 0;
+}
+
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
+{
+ struct rb_node *n1 = rb_first(locs1);
+ struct rb_node *n2 = rb_first(locs2);
+ int ret;
+
+ for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
+ struct crush_loc_node *loc1 =
+ rb_entry(n1, struct crush_loc_node, cl_node);
+ struct crush_loc_node *loc2 =
+ rb_entry(n2, struct crush_loc_node, cl_node);
+
+ ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
+ if (ret)
+ return ret;
+ }
+
+ if (!n1 && n2)
+ return -1;
+ if (n1 && !n2)
+ return 1;
+ return 0;
+}
+
+void ceph_clear_crush_locs(struct rb_root *locs)
+{
+ while (!RB_EMPTY_ROOT(locs)) {
+ struct crush_loc_node *loc =
+ rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
+
+ erase_crush_loc(locs, loc);
+ free_crush_loc(loc);
+ }
+}
+
+/*
+ * [a-zA-Z0-9-_.]+
+ */
+static bool is_valid_crush_name(const char *name)
+{
+ do {
+ if (!('a' <= *name && *name <= 'z') &&
+ !('A' <= *name && *name <= 'Z') &&
+ !('0' <= *name && *name <= '9') &&
+ *name != '-' && *name != '_' && *name != '.')
+ return false;
+ } while (*++name != '\0');
+
+ return true;
+}
+
+/*
+ * Gets the parent of an item. Returns its id (<0 because the
+ * parent is always a bucket), type id (>0 for the same reason,
+ * via @parent_type_id) and location (via @parent_loc). If no
+ * parent, returns 0.
+ *
+ * Does a linear search, as there are no parent pointers of any
+ * kind. Note that the result is ambigous for items that occur
+ * multiple times in the map.
+ */
+static int get_immediate_parent(struct crush_map *c, int id,
+ u16 *parent_type_id,
+ struct crush_loc *parent_loc)
+{
+ struct crush_bucket *b;
+ struct crush_name_node *type_cn, *cn;
+ int i, j;
+
+ for (i = 0; i < c->max_buckets; i++) {
+ b = c->buckets[i];
+ if (!b)
+ continue;
+
+ /* ignore per-class shadow hierarchy */
+ cn = lookup_crush_name(&c->names, b->id);
+ if (!cn || !is_valid_crush_name(cn->cn_name))
+ continue;
+
+ for (j = 0; j < b->size; j++) {
+ if (b->items[j] != id)
+ continue;
+
+ *parent_type_id = b->type;
+ type_cn = lookup_crush_name(&c->type_names, b->type);
+ parent_loc->cl_type_name = type_cn->cn_name;
+ parent_loc->cl_name = cn->cn_name;
+ return b->id;
+ }
+ }
+
+ return 0; /* no parent */
+}
+
+/*
+ * Calculates the locality/distance from an item to a client
+ * location expressed in terms of CRUSH hierarchy as a set of
+ * (bucket type name, bucket name) pairs. Specifically, looks
+ * for the lowest-valued bucket type for which the location of
+ * @id matches one of the locations in @locs, so for standard
+ * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
+ * a matching host is closer than a matching rack and a matching
+ * data center is closer than a matching zone.
+ *
+ * Specifying multiple locations (a "multipath" location) such
+ * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
+ * is a multimap. The locality will be:
+ *
+ * - 3 for OSDs in racks foo1 and foo2
+ * - 8 for OSDs in data center bar
+ * - -1 for all other OSDs
+ *
+ * The lowest possible bucket type is 1, so the best locality
+ * for an OSD is 1 (i.e. a matching host). Locality 0 would be
+ * the OSD itself.
+ */
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
+ struct rb_root *locs)
+{
+ struct crush_loc loc;
+ u16 type_id;
+
+ /*
+ * Instead of repeated get_immediate_parent() calls,
+ * the location of @id could be obtained with a single
+ * depth-first traversal.
+ */
+ for (;;) {
+ id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
+ if (id >= 0)
+ return -1; /* not local */
+
+ if (lookup_crush_loc(locs, &loc))
+ return type_id;
+ }
+}