Update Linux to v5.10.109

Sourced from [1]

[1] https://cdn.kernel.org/pub/linux/kernel/v5.x/linux-5.10.109.tar.xz

Change-Id: I19bca9fc6762d4e63bcf3e4cba88bbe560d9c76c
Signed-off-by: Olivier Deprez <olivier.deprez@arm.com>
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index 2e8e6f9..f36f9a3 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -13,7 +13,7 @@
 	  common functionality to both the Ceph filesystem and
 	  to the rados block device (rbd).
 
-	  More information at http://ceph.newdream.net/.
+	  More information at https://ceph.io/.
 
 	  If unsure, say N.
 
@@ -39,6 +39,6 @@
 	  be resolved using the CONFIG_DNS_RESOLVER facility.
 
 	  For information on how to use CONFIG_DNS_RESOLVER consult
-	  Documentation/networking/dns_resolver.txt
+	  Documentation/networking/dns_resolver.rst
 
 	  If unsure, say N.
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 59d0ba2..ce09bb4 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -13,5 +13,5 @@
 	auth.o auth_none.o \
 	crypto.o armor.o \
 	auth_x.o \
-	ceph_fs.o ceph_strings.o ceph_hash.o \
+	ceph_strings.o ceph_hash.o \
 	pagevec.o snapshot.o string_table.o
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 2d56824..4e7edd7 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -11,7 +11,7 @@
 #include <linux/module.h>
 #include <linux/mount.h>
 #include <linux/nsproxy.h>
-#include <linux/parser.h>
+#include <linux/fs_parser.h>
 #include <linux/sched.h>
 #include <linux/sched/mm.h>
 #include <linux/seq_file.h>
@@ -176,6 +176,10 @@ int ceph_compare_options(struct ceph_options *new_opt,
 		}
 	}
 
+	ret = ceph_compare_crush_locs(&opt1->crush_locs, &opt2->crush_locs);
+	if (ret)
+		return ret;
+
 	/* any matching mon ip implies a match */
 	for (i = 0; i < opt1->num_mon; i++) {
 		if (ceph_monmap_contains(client->monc.monmap,
@@ -190,8 +194,7 @@ EXPORT_SYMBOL(ceph_compare_options);
  * kvmalloc() doesn't fall back to the vmalloc allocator unless flags are
  * compatible with (a superset of) GFP_KERNEL.  This is because while the
  * actual pages are allocated with the specified flags, the page table pages
- * are always allocated with GFP_KERNEL.  map_vm_area() doesn't even take
- * flags because GFP_KERNEL is hard-coded in {p4d,pud,pmd,pte}_alloc().
+ * are always allocated with GFP_KERNEL.
  *
  * ceph_kvmalloc() may be called with GFP_KERNEL, GFP_NOFS or GFP_NOIO.
  */
@@ -254,58 +257,93 @@ enum {
 	Opt_mount_timeout,
 	Opt_osd_idle_ttl,
 	Opt_osd_request_timeout,
-	Opt_last_int,
 	/* int args above */
 	Opt_fsid,
 	Opt_name,
 	Opt_secret,
 	Opt_key,
 	Opt_ip,
-	Opt_last_string,
+	Opt_crush_location,
+	Opt_read_from_replica,
 	/* string args above */
 	Opt_share,
-	Opt_noshare,
 	Opt_crc,
-	Opt_nocrc,
 	Opt_cephx_require_signatures,
-	Opt_nocephx_require_signatures,
 	Opt_cephx_sign_messages,
-	Opt_nocephx_sign_messages,
 	Opt_tcp_nodelay,
-	Opt_notcp_nodelay,
 	Opt_abort_on_full,
 };
 
-static match_table_t opt_tokens = {
-	{Opt_osdtimeout, "osdtimeout=%d"},
-	{Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
-	{Opt_mount_timeout, "mount_timeout=%d"},
-	{Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
-	{Opt_osd_request_timeout, "osd_request_timeout=%d"},
-	/* int args above */
-	{Opt_fsid, "fsid=%s"},
-	{Opt_name, "name=%s"},
-	{Opt_secret, "secret=%s"},
-	{Opt_key, "key=%s"},
-	{Opt_ip, "ip=%s"},
-	/* string args above */
-	{Opt_share, "share"},
-	{Opt_noshare, "noshare"},
-	{Opt_crc, "crc"},
-	{Opt_nocrc, "nocrc"},
-	{Opt_cephx_require_signatures, "cephx_require_signatures"},
-	{Opt_nocephx_require_signatures, "nocephx_require_signatures"},
-	{Opt_cephx_sign_messages, "cephx_sign_messages"},
-	{Opt_nocephx_sign_messages, "nocephx_sign_messages"},
-	{Opt_tcp_nodelay, "tcp_nodelay"},
-	{Opt_notcp_nodelay, "notcp_nodelay"},
-	{Opt_abort_on_full, "abort_on_full"},
-	{-1, NULL}
+enum {
+	Opt_read_from_replica_no,
+	Opt_read_from_replica_balance,
+	Opt_read_from_replica_localize,
 };
 
+static const struct constant_table ceph_param_read_from_replica[] = {
+	{"no",		Opt_read_from_replica_no},
+	{"balance",	Opt_read_from_replica_balance},
+	{"localize",	Opt_read_from_replica_localize},
+	{}
+};
+
+static const struct fs_parameter_spec ceph_parameters[] = {
+	fsparam_flag	("abort_on_full",		Opt_abort_on_full),
+	fsparam_flag_no ("cephx_require_signatures",	Opt_cephx_require_signatures),
+	fsparam_flag_no ("cephx_sign_messages",		Opt_cephx_sign_messages),
+	fsparam_flag_no ("crc",				Opt_crc),
+	fsparam_string	("crush_location",		Opt_crush_location),
+	fsparam_string	("fsid",			Opt_fsid),
+	fsparam_string	("ip",				Opt_ip),
+	fsparam_string	("key",				Opt_key),
+	fsparam_u32	("mount_timeout",		Opt_mount_timeout),
+	fsparam_string	("name",			Opt_name),
+	fsparam_u32	("osd_idle_ttl",		Opt_osd_idle_ttl),
+	fsparam_u32	("osd_request_timeout",		Opt_osd_request_timeout),
+	fsparam_u32	("osdkeepalive",		Opt_osdkeepalivetimeout),
+	__fsparam	(fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
+			 fs_param_deprecated, NULL),
+	fsparam_enum	("read_from_replica",		Opt_read_from_replica,
+			 ceph_param_read_from_replica),
+	fsparam_string	("secret",			Opt_secret),
+	fsparam_flag_no ("share",			Opt_share),
+	fsparam_flag_no ("tcp_nodelay",			Opt_tcp_nodelay),
+	{}
+};
+
+struct ceph_options *ceph_alloc_options(void)
+{
+	struct ceph_options *opt;
+
+	opt = kzalloc(sizeof(*opt), GFP_KERNEL);
+	if (!opt)
+		return NULL;
+
+	opt->crush_locs = RB_ROOT;
+	opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
+				GFP_KERNEL);
+	if (!opt->mon_addr) {
+		kfree(opt);
+		return NULL;
+	}
+
+	opt->flags = CEPH_OPT_DEFAULT;
+	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
+	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
+	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
+	opt->osd_request_timeout = CEPH_OSD_REQUEST_TIMEOUT_DEFAULT;
+	opt->read_from_replica = CEPH_READ_FROM_REPLICA_DEFAULT;
+	return opt;
+}
+EXPORT_SYMBOL(ceph_alloc_options);
+
 void ceph_destroy_options(struct ceph_options *opt)
 {
 	dout("destroy_options %p\n", opt);
+	if (!opt)
+		return;
+
+	ceph_clear_crush_locs(&opt->crush_locs);
 	kfree(opt->name);
 	if (opt->key) {
 		ceph_crypto_key_destroy(opt->key);
@@ -317,7 +355,9 @@ void ceph_destroy_options(struct ceph_options *opt)
 EXPORT_SYMBOL(ceph_destroy_options);
 
 /* get secret from key store */
-static int get_secret(struct ceph_crypto_key *dst, const char *name) {
+static int get_secret(struct ceph_crypto_key *dst, const char *name,
+		      struct p_log *log)
+{
 	struct key *ukey;
 	int key_err;
 	int err = 0;
@@ -330,20 +370,20 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
 		key_err = PTR_ERR(ukey);
 		switch (key_err) {
 		case -ENOKEY:
-			pr_warn("ceph: Mount failed due to key not found: %s\n",
-				name);
+			error_plog(log, "Failed due to key not found: %s",
+			       name);
 			break;
 		case -EKEYEXPIRED:
-			pr_warn("ceph: Mount failed due to expired key: %s\n",
-				name);
+			error_plog(log, "Failed due to expired key: %s",
+			       name);
 			break;
 		case -EKEYREVOKED:
-			pr_warn("ceph: Mount failed due to revoked key: %s\n",
-				name);
+			error_plog(log, "Failed due to revoked key: %s",
+			       name);
 			break;
 		default:
-			pr_warn("ceph: Mount failed due to unknown key error %d: %s\n",
-				key_err, name);
+			error_plog(log, "Failed due to key error %d: %s",
+			       key_err, name);
 		}
 		err = -EPERM;
 		goto out;
@@ -361,223 +401,191 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
 	return err;
 }
 
-struct ceph_options *
-ceph_parse_options(char *options, const char *dev_name,
-			const char *dev_name_end,
-			int (*parse_extra_token)(char *c, void *private),
-			void *private)
+int ceph_parse_mon_ips(const char *buf, size_t len, struct ceph_options *opt,
+		       struct fc_log *l)
 {
-	struct ceph_options *opt;
-	const char *c;
-	int err = -ENOMEM;
-	substring_t argstr[MAX_OPT_ARGS];
+	struct p_log log = {.prefix = "libceph", .log = l};
+	int ret;
 
-	opt = kzalloc(sizeof(*opt), GFP_KERNEL);
-	if (!opt)
-		return ERR_PTR(-ENOMEM);
-	opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
-				GFP_KERNEL);
-	if (!opt->mon_addr)
-		goto out;
-
-	dout("parse_options %p options '%s' dev_name '%s'\n", opt, options,
-	     dev_name);
-
-	/* start with defaults */
-	opt->flags = CEPH_OPT_DEFAULT;
-	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
-	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
-	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
-	opt->osd_request_timeout = CEPH_OSD_REQUEST_TIMEOUT_DEFAULT;
-
-	/* get mon ip(s) */
 	/* ip1[:port1][,ip2[:port2]...] */
-	err = ceph_parse_ips(dev_name, dev_name_end, opt->mon_addr,
-			     CEPH_MAX_MON, &opt->num_mon);
-	if (err < 0)
-		goto out;
-
-	/* parse mount options */
-	while ((c = strsep(&options, ",")) != NULL) {
-		int token, intval;
-		if (!*c)
-			continue;
-		err = -EINVAL;
-		token = match_token((char *)c, opt_tokens, argstr);
-		if (token < 0 && parse_extra_token) {
-			/* extra? */
-			err = parse_extra_token((char *)c, private);
-			if (err < 0) {
-				pr_err("bad option at '%s'\n", c);
-				goto out;
-			}
-			continue;
-		}
-		if (token < Opt_last_int) {
-			err = match_int(&argstr[0], &intval);
-			if (err < 0) {
-				pr_err("bad option arg (not int) at '%s'\n", c);
-				goto out;
-			}
-			dout("got int token %d val %d\n", token, intval);
-		} else if (token > Opt_last_int && token < Opt_last_string) {
-			dout("got string token %d val %s\n", token,
-			     argstr[0].from);
-		} else {
-			dout("got token %d\n", token);
-		}
-		switch (token) {
-		case Opt_ip:
-			err = ceph_parse_ips(argstr[0].from,
-					     argstr[0].to,
-					     &opt->my_addr,
-					     1, NULL);
-			if (err < 0)
-				goto out;
-			opt->flags |= CEPH_OPT_MYIP;
-			break;
-
-		case Opt_fsid:
-			err = parse_fsid(argstr[0].from, &opt->fsid);
-			if (err == 0)
-				opt->flags |= CEPH_OPT_FSID;
-			break;
-		case Opt_name:
-			kfree(opt->name);
-			opt->name = kstrndup(argstr[0].from,
-					      argstr[0].to-argstr[0].from,
-					      GFP_KERNEL);
-			if (!opt->name) {
-				err = -ENOMEM;
-				goto out;
-			}
-			break;
-		case Opt_secret:
-			ceph_crypto_key_destroy(opt->key);
-			kfree(opt->key);
-
-		        opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
-			if (!opt->key) {
-				err = -ENOMEM;
-				goto out;
-			}
-			err = ceph_crypto_key_unarmor(opt->key, argstr[0].from);
-			if (err < 0)
-				goto out;
-			break;
-		case Opt_key:
-			ceph_crypto_key_destroy(opt->key);
-			kfree(opt->key);
-
-		        opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
-			if (!opt->key) {
-				err = -ENOMEM;
-				goto out;
-			}
-			err = get_secret(opt->key, argstr[0].from);
-			if (err < 0)
-				goto out;
-			break;
-
-			/* misc */
-		case Opt_osdtimeout:
-			pr_warn("ignoring deprecated osdtimeout option\n");
-			break;
-		case Opt_osdkeepalivetimeout:
-			/* 0 isn't well defined right now, reject it */
-			if (intval < 1 || intval > INT_MAX / 1000) {
-				pr_err("osdkeepalive out of range\n");
-				err = -EINVAL;
-				goto out;
-			}
-			opt->osd_keepalive_timeout =
-					msecs_to_jiffies(intval * 1000);
-			break;
-		case Opt_osd_idle_ttl:
-			/* 0 isn't well defined right now, reject it */
-			if (intval < 1 || intval > INT_MAX / 1000) {
-				pr_err("osd_idle_ttl out of range\n");
-				err = -EINVAL;
-				goto out;
-			}
-			opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000);
-			break;
-		case Opt_mount_timeout:
-			/* 0 is "wait forever" (i.e. infinite timeout) */
-			if (intval < 0 || intval > INT_MAX / 1000) {
-				pr_err("mount_timeout out of range\n");
-				err = -EINVAL;
-				goto out;
-			}
-			opt->mount_timeout = msecs_to_jiffies(intval * 1000);
-			break;
-		case Opt_osd_request_timeout:
-			/* 0 is "wait forever" (i.e. infinite timeout) */
-			if (intval < 0 || intval > INT_MAX / 1000) {
-				pr_err("osd_request_timeout out of range\n");
-				err = -EINVAL;
-				goto out;
-			}
-			opt->osd_request_timeout = msecs_to_jiffies(intval * 1000);
-			break;
-
-		case Opt_share:
-			opt->flags &= ~CEPH_OPT_NOSHARE;
-			break;
-		case Opt_noshare:
-			opt->flags |= CEPH_OPT_NOSHARE;
-			break;
-
-		case Opt_crc:
-			opt->flags &= ~CEPH_OPT_NOCRC;
-			break;
-		case Opt_nocrc:
-			opt->flags |= CEPH_OPT_NOCRC;
-			break;
-
-		case Opt_cephx_require_signatures:
-			opt->flags &= ~CEPH_OPT_NOMSGAUTH;
-			break;
-		case Opt_nocephx_require_signatures:
-			opt->flags |= CEPH_OPT_NOMSGAUTH;
-			break;
-		case Opt_cephx_sign_messages:
-			opt->flags &= ~CEPH_OPT_NOMSGSIGN;
-			break;
-		case Opt_nocephx_sign_messages:
-			opt->flags |= CEPH_OPT_NOMSGSIGN;
-			break;
-
-		case Opt_tcp_nodelay:
-			opt->flags |= CEPH_OPT_TCP_NODELAY;
-			break;
-		case Opt_notcp_nodelay:
-			opt->flags &= ~CEPH_OPT_TCP_NODELAY;
-			break;
-
-		case Opt_abort_on_full:
-			opt->flags |= CEPH_OPT_ABORT_ON_FULL;
-			break;
-
-		default:
-			BUG_ON(token);
-		}
+	ret = ceph_parse_ips(buf, buf + len, opt->mon_addr, CEPH_MAX_MON,
+			     &opt->num_mon);
+	if (ret) {
+		error_plog(&log, "Failed to parse monitor IPs: %d", ret);
+		return ret;
 	}
 
-	/* success */
-	return opt;
-
-out:
-	ceph_destroy_options(opt);
-	return ERR_PTR(err);
+	return 0;
 }
-EXPORT_SYMBOL(ceph_parse_options);
+EXPORT_SYMBOL(ceph_parse_mon_ips);
+
+int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
+		     struct fc_log *l)
+{
+	struct fs_parse_result result;
+	int token, err;
+	struct p_log log = {.prefix = "libceph", .log = l};
+
+	token = __fs_parse(&log, ceph_parameters, param, &result);
+	dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
+	if (token < 0)
+		return token;
+
+	switch (token) {
+	case Opt_ip:
+		err = ceph_parse_ips(param->string,
+				     param->string + param->size,
+				     &opt->my_addr,
+				     1, NULL);
+		if (err) {
+			error_plog(&log, "Failed to parse ip: %d", err);
+			return err;
+		}
+		opt->flags |= CEPH_OPT_MYIP;
+		break;
+
+	case Opt_fsid:
+		err = parse_fsid(param->string, &opt->fsid);
+		if (err) {
+			error_plog(&log, "Failed to parse fsid: %d", err);
+			return err;
+		}
+		opt->flags |= CEPH_OPT_FSID;
+		break;
+	case Opt_name:
+		kfree(opt->name);
+		opt->name = param->string;
+		param->string = NULL;
+		break;
+	case Opt_secret:
+		ceph_crypto_key_destroy(opt->key);
+		kfree(opt->key);
+
+		opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
+		if (!opt->key)
+			return -ENOMEM;
+		err = ceph_crypto_key_unarmor(opt->key, param->string);
+		if (err) {
+			error_plog(&log, "Failed to parse secret: %d", err);
+			return err;
+		}
+		break;
+	case Opt_key:
+		ceph_crypto_key_destroy(opt->key);
+		kfree(opt->key);
+
+		opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
+		if (!opt->key)
+			return -ENOMEM;
+		return get_secret(opt->key, param->string, &log);
+	case Opt_crush_location:
+		ceph_clear_crush_locs(&opt->crush_locs);
+		err = ceph_parse_crush_location(param->string,
+						&opt->crush_locs);
+		if (err) {
+			error_plog(&log, "Failed to parse CRUSH location: %d",
+				   err);
+			return err;
+		}
+		break;
+	case Opt_read_from_replica:
+		switch (result.uint_32) {
+		case Opt_read_from_replica_no:
+			opt->read_from_replica = 0;
+			break;
+		case Opt_read_from_replica_balance:
+			opt->read_from_replica = CEPH_OSD_FLAG_BALANCE_READS;
+			break;
+		case Opt_read_from_replica_localize:
+			opt->read_from_replica = CEPH_OSD_FLAG_LOCALIZE_READS;
+			break;
+		default:
+			BUG();
+		}
+		break;
+
+	case Opt_osdtimeout:
+		warn_plog(&log, "Ignoring osdtimeout");
+		break;
+	case Opt_osdkeepalivetimeout:
+		/* 0 isn't well defined right now, reject it */
+		if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
+			goto out_of_range;
+		opt->osd_keepalive_timeout =
+		    msecs_to_jiffies(result.uint_32 * 1000);
+		break;
+	case Opt_osd_idle_ttl:
+		/* 0 isn't well defined right now, reject it */
+		if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
+			goto out_of_range;
+		opt->osd_idle_ttl = msecs_to_jiffies(result.uint_32 * 1000);
+		break;
+	case Opt_mount_timeout:
+		/* 0 is "wait forever" (i.e. infinite timeout) */
+		if (result.uint_32 > INT_MAX / 1000)
+			goto out_of_range;
+		opt->mount_timeout = msecs_to_jiffies(result.uint_32 * 1000);
+		break;
+	case Opt_osd_request_timeout:
+		/* 0 is "wait forever" (i.e. infinite timeout) */
+		if (result.uint_32 > INT_MAX / 1000)
+			goto out_of_range;
+		opt->osd_request_timeout =
+		    msecs_to_jiffies(result.uint_32 * 1000);
+		break;
+
+	case Opt_share:
+		if (!result.negated)
+			opt->flags &= ~CEPH_OPT_NOSHARE;
+		else
+			opt->flags |= CEPH_OPT_NOSHARE;
+		break;
+	case Opt_crc:
+		if (!result.negated)
+			opt->flags &= ~CEPH_OPT_NOCRC;
+		else
+			opt->flags |= CEPH_OPT_NOCRC;
+		break;
+	case Opt_cephx_require_signatures:
+		if (!result.negated)
+			opt->flags &= ~CEPH_OPT_NOMSGAUTH;
+		else
+			opt->flags |= CEPH_OPT_NOMSGAUTH;
+		break;
+	case Opt_cephx_sign_messages:
+		if (!result.negated)
+			opt->flags &= ~CEPH_OPT_NOMSGSIGN;
+		else
+			opt->flags |= CEPH_OPT_NOMSGSIGN;
+		break;
+	case Opt_tcp_nodelay:
+		if (!result.negated)
+			opt->flags |= CEPH_OPT_TCP_NODELAY;
+		else
+			opt->flags &= ~CEPH_OPT_TCP_NODELAY;
+		break;
+
+	case Opt_abort_on_full:
+		opt->flags |= CEPH_OPT_ABORT_ON_FULL;
+		break;
+
+	default:
+		BUG();
+	}
+
+	return 0;
+
+out_of_range:
+	return inval_plog(&log, "%s out of range", param->key);
+}
+EXPORT_SYMBOL(ceph_parse_param);
 
 int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
 			      bool show_all)
 {
 	struct ceph_options *opt = client->options;
 	size_t pos = m->count;
+	struct rb_node *n;
 
 	if (opt->name) {
 		seq_puts(m, "name=");
@@ -587,6 +595,28 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
 	if (opt->key)
 		seq_puts(m, "secret=<hidden>,");
 
+	if (!RB_EMPTY_ROOT(&opt->crush_locs)) {
+		seq_puts(m, "crush_location=");
+		for (n = rb_first(&opt->crush_locs); ; ) {
+			struct crush_loc_node *loc =
+			    rb_entry(n, struct crush_loc_node, cl_node);
+
+			seq_printf(m, "%s:%s", loc->cl_loc.cl_type_name,
+				   loc->cl_loc.cl_name);
+			n = rb_next(n);
+			if (!n)
+				break;
+
+			seq_putc(m, '|');
+		}
+		seq_putc(m, ',');
+	}
+	if (opt->read_from_replica == CEPH_OSD_FLAG_BALANCE_READS) {
+		seq_puts(m, "read_from_replica=balance,");
+	} else if (opt->read_from_replica == CEPH_OSD_FLAG_LOCALIZE_READS) {
+		seq_puts(m, "read_from_replica=localize,");
+	}
+
 	if (opt->flags & CEPH_OPT_FSID)
 		seq_printf(m, "fsid=%pU,", &opt->fsid);
 	if (opt->flags & CEPH_OPT_NOSHARE)
diff --git a/net/ceph/ceph_fs.c b/net/ceph/ceph_fs.c
deleted file mode 100644
index 756a2dc..0000000
--- a/net/ceph/ceph_fs.c
+++ /dev/null
@@ -1,104 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * Some non-inline ceph helpers
- */
-#include <linux/module.h>
-#include <linux/ceph/types.h>
-
-/*
- * return true if @layout appears to be valid
- */
-int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
-{
-	__u32 su = layout->stripe_unit;
-	__u32 sc = layout->stripe_count;
-	__u32 os = layout->object_size;
-
-	/* stripe unit, object size must be non-zero, 64k increment */
-	if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1)))
-		return 0;
-	if (!os || (os & (CEPH_MIN_STRIPE_UNIT-1)))
-		return 0;
-	/* object size must be a multiple of stripe unit */
-	if (os < su || os % su)
-		return 0;
-	/* stripe count must be non-zero */
-	if (!sc)
-		return 0;
-	return 1;
-}
-
-void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
-				  struct ceph_file_layout_legacy *legacy)
-{
-	fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit);
-	fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count);
-	fl->object_size = le32_to_cpu(legacy->fl_object_size);
-	fl->pool_id = le32_to_cpu(legacy->fl_pg_pool);
-	if (fl->pool_id == 0 && fl->stripe_unit == 0 &&
-	    fl->stripe_count == 0 && fl->object_size == 0)
-		fl->pool_id = -1;
-}
-EXPORT_SYMBOL(ceph_file_layout_from_legacy);
-
-void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
-				struct ceph_file_layout_legacy *legacy)
-{
-	legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit);
-	legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count);
-	legacy->fl_object_size = cpu_to_le32(fl->object_size);
-	if (fl->pool_id >= 0)
-		legacy->fl_pg_pool = cpu_to_le32(fl->pool_id);
-	else
-		legacy->fl_pg_pool = 0;
-}
-EXPORT_SYMBOL(ceph_file_layout_to_legacy);
-
-int ceph_flags_to_mode(int flags)
-{
-	int mode;
-
-#ifdef O_DIRECTORY  /* fixme */
-	if ((flags & O_DIRECTORY) == O_DIRECTORY)
-		return CEPH_FILE_MODE_PIN;
-#endif
-
-	switch (flags & O_ACCMODE) {
-	case O_WRONLY:
-		mode = CEPH_FILE_MODE_WR;
-		break;
-	case O_RDONLY:
-		mode = CEPH_FILE_MODE_RD;
-		break;
-	case O_RDWR:
-	case O_ACCMODE: /* this is what the VFS does */
-		mode = CEPH_FILE_MODE_RDWR;
-		break;
-	}
-#ifdef O_LAZY
-	if (flags & O_LAZY)
-		mode |= CEPH_FILE_MODE_LAZY;
-#endif
-
-	return mode;
-}
-EXPORT_SYMBOL(ceph_flags_to_mode);
-
-int ceph_caps_for_mode(int mode)
-{
-	int caps = CEPH_CAP_PIN;
-
-	if (mode & CEPH_FILE_MODE_RD)
-		caps |= CEPH_CAP_FILE_SHARED |
-			CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
-	if (mode & CEPH_FILE_MODE_WR)
-		caps |= CEPH_CAP_FILE_EXCL |
-			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
-			CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
-			CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
-	if (mode & CEPH_FILE_MODE_LAZY)
-		caps |= CEPH_CAP_FILE_LAZYIO;
-
-	return caps;
-}
-EXPORT_SYMBOL(ceph_caps_for_mode);
diff --git a/net/ceph/ceph_hash.c b/net/ceph/ceph_hash.c
index 9a5850f..16a47c0 100644
--- a/net/ceph/ceph_hash.c
+++ b/net/ceph/ceph_hash.c
@@ -4,7 +4,7 @@
 
 /*
  * Robert Jenkin's hash function.
- * http://burtleburtle.net/bob/hash/evahash.html
+ * https://burtleburtle.net/bob/hash/evahash.html
  * This is in the public domain.
  */
 #define mix(a, b, c)						\
@@ -50,35 +50,35 @@ unsigned int ceph_str_hash_rjenkins(const char *str, unsigned int length)
 	switch (len) {
 	case 11:
 		c = c + ((__u32)k[10] << 24);
-		/* fall through */
+		fallthrough;
 	case 10:
 		c = c + ((__u32)k[9] << 16);
-		/* fall through */
+		fallthrough;
 	case 9:
 		c = c + ((__u32)k[8] << 8);
 		/* the first byte of c is reserved for the length */
-		/* fall through */
+		fallthrough;
 	case 8:
 		b = b + ((__u32)k[7] << 24);
-		/* fall through */
+		fallthrough;
 	case 7:
 		b = b + ((__u32)k[6] << 16);
-		/* fall through */
+		fallthrough;
 	case 6:
 		b = b + ((__u32)k[5] << 8);
-		/* fall through */
+		fallthrough;
 	case 5:
 		b = b + k[4];
-		/* fall through */
+		fallthrough;
 	case 4:
 		a = a + ((__u32)k[3] << 24);
-		/* fall through */
+		fallthrough;
 	case 3:
 		a = a + ((__u32)k[2] << 16);
-		/* fall through */
+		fallthrough;
 	case 2:
 		a = a + ((__u32)k[1] << 8);
-		/* fall through */
+		fallthrough;
 	case 1:
 		a = a + k[0];
 		/* case 0: nothing left to add */
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 3d70244..254ded0 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -2,7 +2,6 @@
 #ifdef __KERNEL__
 # include <linux/slab.h>
 # include <linux/crush/crush.h>
-void clear_choose_args(struct crush_map *c);
 #else
 # include "crush_compat.h"
 # include "crush.h"
@@ -130,6 +129,8 @@ void crush_destroy(struct crush_map *map)
 #ifndef __KERNEL__
 	kfree(map->choose_tries);
 #else
+	clear_crush_names(&map->type_names);
+	clear_crush_names(&map->names);
 	clear_choose_args(map);
 #endif
 	kfree(map);
diff --git a/net/ceph/crush/hash.c b/net/ceph/crush/hash.c
index e5cc603..fe79f6d 100644
--- a/net/ceph/crush/hash.c
+++ b/net/ceph/crush/hash.c
@@ -7,7 +7,7 @@
 
 /*
  * Robert Jenkins' function for mixing 32-bit values
- * http://burtleburtle.net/bob/hash/evahash.html
+ * https://burtleburtle.net/bob/hash/evahash.html
  * a, b = random bits, c = input and output
  */
 #define crush_hashmix(a, b, c) do {			\
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 3f323ed..7057f8d 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -298,7 +298,7 @@ static __u64 crush_ln(unsigned int xin)
  *
  * for reference, see:
  *
- * http://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
+ * https://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
  *
  */
 
@@ -987,7 +987,7 @@ int crush_do_rule(const struct crush_map *map,
 		case CRUSH_RULE_CHOOSELEAF_FIRSTN:
 		case CRUSH_RULE_CHOOSE_FIRSTN:
 			firstn = 1;
-			/* fall through */
+			fallthrough;
 		case CRUSH_RULE_CHOOSELEAF_INDEP:
 		case CRUSH_RULE_CHOOSE_INDEP:
 			if (wsize == 0)
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 7cb992e..2110439 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -81,11 +81,13 @@ static int osdmap_show(struct seq_file *s, void *p)
 		u32 state = map->osd_state[i];
 		char sb[64];
 
-		seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
+		seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\t%2d\n",
 			   i, ceph_pr_addr(addr),
 			   ((map->osd_weight[i]*100) >> 16),
 			   ceph_osdmap_state_str(sb, sizeof(sb), state),
-			   ((ceph_get_primary_affinity(map, i)*100) >> 16));
+			   ((ceph_get_primary_affinity(map, i)*100) >> 16),
+			   ceph_get_crush_locality(map, i,
+					   &client->options->crush_locs));
 	}
 	for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
 		struct ceph_pg_mapping *pg =
@@ -221,6 +223,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
 		if (op->op == CEPH_OSD_OP_WATCH)
 			seq_printf(s, "-%s",
 				   ceph_osd_watch_op_name(op->watch.op));
+		else if (op->op == CEPH_OSD_OP_CALL)
+			seq_printf(s, "-%s/%s", op->cls.class_name,
+				   op->cls.method_name);
 	}
 
 	seq_putc(s, '\n');
@@ -383,11 +388,11 @@ static int client_options_show(struct seq_file *s, void *p)
 	return 0;
 }
 
-CEPH_DEFINE_SHOW_FUNC(monmap_show)
-CEPH_DEFINE_SHOW_FUNC(osdmap_show)
-CEPH_DEFINE_SHOW_FUNC(monc_show)
-CEPH_DEFINE_SHOW_FUNC(osdc_show)
-CEPH_DEFINE_SHOW_FUNC(client_options_show)
+DEFINE_SHOW_ATTRIBUTE(monmap);
+DEFINE_SHOW_ATTRIBUTE(osdmap);
+DEFINE_SHOW_ATTRIBUTE(monc);
+DEFINE_SHOW_ATTRIBUTE(osdc);
+DEFINE_SHOW_ATTRIBUTE(client_options);
 
 void __init ceph_debugfs_init(void)
 {
@@ -414,31 +419,31 @@ void ceph_debugfs_client_init(struct ceph_client *client)
 						      0400,
 						      client->debugfs_dir,
 						      client,
-						      &monc_show_fops);
+						      &monc_fops);
 
 	client->osdc.debugfs_file = debugfs_create_file("osdc",
 						      0400,
 						      client->debugfs_dir,
 						      client,
-						      &osdc_show_fops);
+						      &osdc_fops);
 
 	client->debugfs_monmap = debugfs_create_file("monmap",
 					0400,
 					client->debugfs_dir,
 					client,
-					&monmap_show_fops);
+					&monmap_fops);
 
 	client->debugfs_osdmap = debugfs_create_file("osdmap",
 					0400,
 					client->debugfs_dir,
 					client,
-					&osdmap_show_fops);
+					&osdmap_fops);
 
 	client->debugfs_options = debugfs_create_file("client_options",
 					0400,
 					client->debugfs_dir,
 					client,
-					&client_options_show_fops);
+					&client_options_fops);
 }
 
 void ceph_debugfs_client_cleanup(struct ceph_client *client)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 49726c3..af0f1fa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -412,7 +412,7 @@ static void ceph_sock_state_change(struct sock *sk)
 	switch (sk->sk_state) {
 	case TCP_CLOSE:
 		dout("%s TCP_CLOSE\n", __func__);
-		/* fall through */
+		fallthrough;
 	case TCP_CLOSE_WAIT:
 		dout("%s TCP_CLOSE_WAIT\n", __func__);
 		con_sock_state_closing(con);
@@ -490,15 +490,8 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 		return ret;
 	}
 
-	if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
-		int optval = 1;
-
-		ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
-					(char *)&optval, sizeof(optval));
-		if (ret)
-			pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
-			       ret);
-	}
+	if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
+		tcp_sock_set_nodelay(sock->sk);
 
 	con->sock = sock;
 	return 0;
@@ -582,7 +575,7 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
 	 * coalescing neighboring slab objects into a single frag which
 	 * triggers one of hardened usercopy checks.
 	 */
-	if (page_count(page) >= 1 && !PageSlab(page))
+	if (sendpage_ok(page))
 		sendpage = sock->ops->sendpage;
 	else
 		sendpage = sock_no_sendpage;
@@ -2004,10 +1997,8 @@ int ceph_parse_ips(const char *c, const char *end,
 	return 0;
 
 bad:
-	pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
 	return ret;
 }
-EXPORT_SYMBOL(ceph_parse_ips);
 
 static int process_banner(struct ceph_connection *con)
 {
@@ -2025,11 +2016,11 @@ static int process_banner(struct ceph_connection *con)
 		   sizeof(con->peer_addr)) != 0 &&
 	    !(addr_is_blank(&con->actual_peer_addr) &&
 	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-		pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 			ceph_pr_addr(&con->peer_addr),
-			(int)le32_to_cpu(con->peer_addr.nonce),
+			le32_to_cpu(con->peer_addr.nonce),
 			ceph_pr_addr(&con->actual_peer_addr),
-			(int)le32_to_cpu(con->actual_peer_addr.nonce));
+			le32_to_cpu(con->actual_peer_addr.nonce));
 		con->error_msg = "wrong peer at address";
 		return -1;
 	}
@@ -2760,7 +2751,7 @@ static int try_read(struct ceph_connection *con)
 			switch (ret) {
 			case -EBADMSG:
 				con->error_msg = "bad crc/signature";
-				/* fall through */
+				fallthrough;
 			case -EBADE:
 				ret = -EIO;
 				break;
@@ -2820,13 +2811,13 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 		return -ENOENT;
 	}
 
+	dout("%s %p %lu\n", __func__, con, delay);
 	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
 		dout("%s %p - already queued\n", __func__, con);
 		con->ops->put(con);
 		return -EBUSY;
 	}
 
-	dout("%s %p %lu\n", __func__, con, delay);
 	return 0;
 }
 
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 7256c40..c4cf252 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -467,7 +467,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 				 struct ceph_msg *msg)
 {
 	struct ceph_client *client = monc->client;
-	struct ceph_monmap *monmap = NULL, *old = monc->monmap;
+	struct ceph_monmap *monmap;
 	void *p, *end;
 
 	mutex_lock(&monc->mutex);
@@ -484,13 +484,13 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 		goto out;
 	}
 
-	if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
+	if (ceph_check_fsid(client, &monmap->fsid) < 0) {
 		kfree(monmap);
 		goto out;
 	}
 
-	client->monc.monmap = monmap;
-	kfree(old);
+	kfree(monc->monmap);
+	monc->monmap = monmap;
 
 	__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 	client->have_fsid = true;
@@ -896,8 +896,9 @@ static void handle_command_ack(struct ceph_mon_client *monc,
 	ceph_msg_dump(msg);
 }
 
-int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
-			    struct ceph_entity_addr *client_addr)
+static __printf(2, 0)
+int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
+			 va_list ap)
 {
 	struct ceph_mon_generic_request *req;
 	struct ceph_mon_command *h;
@@ -925,29 +926,65 @@ int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
 	h->monhdr.session_mon_tid = 0;
 	h->fsid = monc->monmap->fsid;
 	h->num_strs = cpu_to_le32(1);
-	len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
-		                 \"blacklistop\": \"add\", \
-				 \"addr\": \"%pISpc/%u\" }",
-		      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+	len = vsprintf(h->str, fmt, ap);
 	h->str_len = cpu_to_le32(len);
 	send_generic_request(monc, req);
 	mutex_unlock(&monc->mutex);
 
 	ret = wait_generic_request(req);
-	if (!ret)
-		/*
-		 * Make sure we have the osdmap that includes the blacklist
-		 * entry.  This is needed to ensure that the OSDs pick up the
-		 * new blacklist before processing any future requests from
-		 * this client.
-		 */
-		ret = ceph_wait_for_latest_osdmap(monc->client, 0);
-
 out:
 	put_generic_request(req);
 	return ret;
 }
-EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
+static __printf(2, 3)
+int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
+{
+	va_list ap;
+	int ret;
+
+	va_start(ap, fmt);
+	ret = do_mon_command_vargs(monc, fmt, ap);
+	va_end(ap);
+	return ret;
+}
+
+int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr)
+{
+	int ret;
+
+	ret = do_mon_command(monc,
+			     "{ \"prefix\": \"osd blocklist\", \
+				\"blocklistop\": \"add\", \
+				\"addr\": \"%pISpc/%u\" }",
+			     &client_addr->in_addr,
+			     le32_to_cpu(client_addr->nonce));
+	if (ret == -EINVAL) {
+		/*
+		 * The monitor returns EINVAL on an unrecognized command.
+		 * Try the legacy command -- it is exactly the same except
+		 * for the name.
+		 */
+		ret = do_mon_command(monc,
+				     "{ \"prefix\": \"osd blacklist\", \
+					\"blacklistop\": \"add\", \
+					\"addr\": \"%pISpc/%u\" }",
+				     &client_addr->in_addr,
+				     le32_to_cpu(client_addr->nonce));
+	}
+	if (ret)
+		return ret;
+
+	/*
+	 * Make sure we have the osdmap that includes the blocklist
+	 * entry.  This is needed to ensure that the OSDs pick up the
+	 * new blocklist before processing any future requests from
+	 * this client.
+	 */
+	return ceph_wait_for_latest_osdmap(monc->client, 0);
+}
+EXPORT_SYMBOL(ceph_monc_blocklist_add);
 
 /*
  * Resend pending generic requests.
@@ -1233,9 +1270,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 	struct ceph_mon_client *monc = con->private;
 	int type = le16_to_cpu(msg->hdr.type);
 
-	if (!monc)
-		return;
-
 	switch (type) {
 	case CEPH_MSG_AUTH_REPLY:
 		handle_auth_reply(monc, msg);
@@ -1310,7 +1344,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 		 * request had a non-zero tid.  Work around this weirdness
 		 * by allocating a new message.
 		 */
-		/* fall through */
+		fallthrough;
 	case CEPH_MSG_MON_MAP:
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a8481da..7901ab6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -402,7 +402,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_LIST_WATCHERS:
 		ceph_osd_data_release(&op->list_watchers.response_data);
 		break;
-	case CEPH_OSD_OP_COPY_FROM:
+	case CEPH_OSD_OP_COPY_FROM2:
 		ceph_osd_data_release(&op->copy_from.osd_data);
 		break;
 	default:
@@ -448,6 +448,7 @@ static void target_copy(struct ceph_osd_request_target *dest,
 	dest->recovery_deletes = src->recovery_deletes;
 
 	dest->flags = src->flags;
+	dest->used_replica = src->used_replica;
 	dest->paused = src->paused;
 
 	dest->epoch = src->epoch;
@@ -524,7 +525,7 @@ EXPORT_SYMBOL(ceph_osdc_put_request);
 
 static void request_init(struct ceph_osd_request *req)
 {
-	/* req only, each op is zeroed in _osd_req_op_init() */
+	/* req only, each op is zeroed in osd_req_op_init() */
 	memset(req, 0, sizeof(*req));
 
 	kref_init(&req->r_kref);
@@ -698,7 +699,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
 		case CEPH_OSD_OP_SETXATTR:
 		case CEPH_OSD_OP_CMPXATTR:
 		case CEPH_OSD_OP_NOTIFY_ACK:
-		case CEPH_OSD_OP_COPY_FROM:
+		case CEPH_OSD_OP_COPY_FROM2:
 			*num_request_data_items += 1;
 			break;
 
@@ -745,8 +746,8 @@ EXPORT_SYMBOL(ceph_osdc_alloc_messages);
  * other information associated with them.  It also serves as a
  * common init routine for all the other init functions, below.
  */
-static struct ceph_osd_req_op *
-_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+struct ceph_osd_req_op *
+osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
 		 u16 opcode, u32 flags)
 {
 	struct ceph_osd_req_op *op;
@@ -761,12 +762,6 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
 
 	return op;
 }
-
-void osd_req_op_init(struct ceph_osd_request *osd_req,
-		     unsigned int which, u16 opcode, u32 flags)
-{
-	(void)_osd_req_op_init(osd_req, which, opcode, flags);
-}
 EXPORT_SYMBOL(osd_req_op_init);
 
 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
@@ -774,8 +769,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 				u64 offset, u64 length,
 				u64 truncate_size, u32 truncate_seq)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      opcode, 0);
+	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
+						     opcode, 0);
 	size_t payload_len = 0;
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
@@ -821,7 +816,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
 	BUG_ON(which + 1 >= osd_req->r_num_ops);
 
 	prev_op = &osd_req->r_ops[which];
-	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
+	op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
 	/* dup previous one */
 	op->indata_len = prev_op->indata_len;
 	op->outdata_len = prev_op->outdata_len;
@@ -844,7 +839,7 @@ int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 	size_t size;
 	int ret;
 
-	op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
+	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 
 	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
@@ -882,8 +877,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 			  u16 opcode, const char *name, const void *value,
 			  size_t size, u8 cmp_op, u8 cmp_mode)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      opcode, 0);
+	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
+						     opcode, 0);
 	struct ceph_pagelist *pagelist;
 	size_t payload_len;
 	int ret;
@@ -927,23 +922,27 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
 {
 	struct ceph_osd_req_op *op;
 
-	op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
+	op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
 	op->watch.cookie = cookie;
 	op->watch.op = watch_opcode;
 	op->watch.gen = 0;
 }
 
+/*
+ * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
+ */
 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				unsigned int which,
 				u64 expected_object_size,
-				u64 expected_write_size)
+				u64 expected_write_size,
+				u32 flags)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      CEPH_OSD_OP_SETALLOCHINT,
-						      0);
+	struct ceph_osd_req_op *op;
 
+	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
 	op->alloc_hint.expected_object_size = expected_object_size;
 	op->alloc_hint.expected_write_size = expected_write_size;
+	op->alloc_hint.flags = flags;
 
 	/*
 	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
@@ -1019,6 +1018,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 		    cpu_to_le64(src->alloc_hint.expected_object_size);
 		dst->alloc_hint.expected_write_size =
 		    cpu_to_le64(src->alloc_hint.expected_write_size);
+		dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
 		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
@@ -1030,7 +1030,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_CREATE:
 	case CEPH_OSD_OP_DELETE:
 		break;
-	case CEPH_OSD_OP_COPY_FROM:
+	case CEPH_OSD_OP_COPY_FROM2:
 		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
 		dst->copy_from.src_version =
 			cpu_to_le64(src->copy_from.src_version);
@@ -1112,10 +1112,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 				       truncate_size, truncate_seq);
 	}
 
-	req->r_flags = flags;
 	req->r_base_oloc.pool = layout->pool_id;
 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
+	req->r_flags = flags | osdc->client->options->read_from_replica;
 
 	req->r_snapid = vino.snap;
 	if (flags & CEPH_OSD_FLAG_WRITE)
@@ -1498,6 +1498,45 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
 	       (osdc->osdmap->epoch < osdc->epoch_barrier);
 }
 
+static int pick_random_replica(const struct ceph_osds *acting)
+{
+	int i = prandom_u32() % acting->size;
+
+	dout("%s picked osd%d, primary osd%d\n", __func__,
+	     acting->osds[i], acting->primary);
+	return i;
+}
+
+/*
+ * Picks the closest replica based on client's location given by
+ * crush_location option.  Prefers the primary if the locality is
+ * the same.
+ */
+static int pick_closest_replica(struct ceph_osd_client *osdc,
+				const struct ceph_osds *acting)
+{
+	struct ceph_options *opt = osdc->client->options;
+	int best_i, best_locality;
+	int i = 0, locality;
+
+	do {
+		locality = ceph_get_crush_locality(osdc->osdmap,
+						   acting->osds[i],
+						   &opt->crush_locs);
+		if (i == 0 ||
+		    (locality >= 0 && best_locality < 0) ||
+		    (locality >= 0 && best_locality >= 0 &&
+		     locality < best_locality)) {
+			best_i = i;
+			best_locality = locality;
+		}
+	} while (++i < acting->size);
+
+	dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
+	     acting->osds[best_i], best_locality, acting->primary);
+	return best_i;
+}
+
 enum calc_target_result {
 	CALC_TARGET_NO_ACTION = 0,
 	CALC_TARGET_NEED_RESEND,
@@ -1511,6 +1550,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 	struct ceph_pg_pool_info *pi;
 	struct ceph_pg pgid, last_pgid;
 	struct ceph_osds up, acting;
+	bool is_read = t->flags & CEPH_OSD_FLAG_READ;
+	bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
 	bool force_resend = false;
 	bool unpaused = false;
 	bool legacy_change = false;
@@ -1541,9 +1582,9 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 	ceph_oid_copy(&t->target_oid, &t->base_oid);
 	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
 	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
-		if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
+		if (is_read && pi->read_tier >= 0)
 			t->target_oloc.pool = pi->read_tier;
-		if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
+		if (is_write && pi->write_tier >= 0)
 			t->target_oloc.pool = pi->write_tier;
 
 		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
@@ -1582,7 +1623,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 		unpaused = true;
 	}
 	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
-			ceph_osds_changed(&t->acting, &acting, any_change);
+			ceph_osds_changed(&t->acting, &acting,
+					  t->used_replica || any_change);
 	if (t->pg_num)
 		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
 
@@ -1598,7 +1640,24 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 		t->sort_bitwise = sort_bitwise;
 		t->recovery_deletes = recovery_deletes;
 
-		t->osd = acting.primary;
+		if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
+				 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+		    !is_write && pi->type == CEPH_POOL_TYPE_REP &&
+		    acting.size > 1) {
+			int pos;
+
+			WARN_ON(!is_read || acting.osds[0] != acting.primary);
+			if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
+				pos = pick_random_replica(&acting);
+			} else {
+				pos = pick_closest_replica(osdc, &acting);
+			}
+			t->osd = acting.osds[pos];
+			t->used_replica = pos > 0;
+		} else {
+			t->osd = acting.primary;
+			t->used_replica = false;
+		}
 	}
 
 	if (unpaused || legacy_change || force_resend || split)
@@ -1967,7 +2026,7 @@ static void setup_request_data(struct ceph_osd_request *req)
 			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify_ack.request_data);
 			break;
-		case CEPH_OSD_OP_COPY_FROM:
+		case CEPH_OSD_OP_COPY_FROM2:
 			ceph_osdc_msg_data_add(request_msg,
 					       &op->copy_from.osd_data);
 			break;
@@ -2374,6 +2433,7 @@ static void account_request(struct ceph_osd_request *req)
 	atomic_inc(&req->r_osdc->num_requests);
 
 	req->r_start_stamp = jiffies;
+	req->r_start_latency = ktime_get();
 }
 
 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -2390,6 +2450,8 @@ static void finish_request(struct ceph_osd_request *req)
 	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+	req->r_end_latency = ktime_get();
+
 	if (req->r_osd)
 		unlink_request(req->r_osd, req);
 	atomic_dec(&osdc->num_requests);
@@ -3007,9 +3069,7 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
 		cancel_linger_request(req);
 
 	request_reinit(req);
-	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
-	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
-	req->r_flags = lreq->t.flags;
+	target_copy(&req->r_t, &lreq->t);
 	req->r_mtime = lreq->mtime;
 
 	mutex_lock(&lreq->lock);
@@ -3484,9 +3544,6 @@ static int ceph_redirect_decode(void **p, void *end,
 		goto e_inval;
 	}
 
-	len = ceph_decode_32(p);
-	*p += len; /* skip osd_instructions */
-
 	/* skip the rest */
 	*p = struct_end;
 out:
@@ -3661,6 +3718,26 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 		goto out_unlock_osdc;
 	}
 
+	if (m.result == -EAGAIN) {
+		dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
+		unlink_request(osd, req);
+		mutex_unlock(&osd->lock);
+
+		/*
+		 * The object is missing on the replica or not (yet)
+		 * readable.  Clear pgid to force a resend to the primary
+		 * via legacy_change.
+		 */
+		req->r_t.pgid.pool = 0;
+		req->r_t.pgid.seed = 0;
+		WARN_ON(!req->r_t.used_replica);
+		req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+				  CEPH_OSD_FLAG_LOCALIZE_READS);
+		req->r_tid = 0;
+		__submit_request(req, false);
+		goto out_unlock_osdc;
+	}
+
 	if (m.num_ops != req->r_num_ops) {
 		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
 		       req->r_num_ops, req->r_tid);
@@ -3777,7 +3854,7 @@ static void scan_requests(struct ceph_osd *osd,
 			if (!force_resend && !force_resend_writes)
 				break;
 
-			/* fall through */
+			fallthrough;
 		case CALC_TARGET_NEED_RESEND:
 			cancel_linger_map_check(lreq);
 			/*
@@ -3814,7 +3891,7 @@ static void scan_requests(struct ceph_osd *osd,
 			     !force_resend_writes))
 				break;
 
-			/* fall through */
+			fallthrough;
 		case CALC_TARGET_NEED_RESEND:
 			cancel_map_check(req);
 			unlink_request(osd, req);
@@ -4715,7 +4792,7 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 	struct ceph_pagelist *pl;
 	int ret;
 
-	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
+	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
 	pl = ceph_pagelist_alloc(GFP_NOIO);
 	if (!pl)
@@ -4784,7 +4861,7 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
 	struct ceph_pagelist *pl;
 	int ret;
 
-	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
+	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
 	op->notify.cookie = cookie;
 
 	pl = ceph_pagelist_alloc(GFP_NOIO);
@@ -5231,91 +5308,13 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
-/*
- * Read some contiguous pages.  If we cross a stripe boundary, shorten
- * *plen.  Return number of bytes read, or error.
- */
-int ceph_osdc_readpages(struct ceph_osd_client *osdc,
-			struct ceph_vino vino, struct ceph_file_layout *layout,
-			u64 off, u64 *plen,
-			u32 truncate_seq, u64 truncate_size,
-			struct page **pages, int num_pages, int page_align)
-{
-	struct ceph_osd_request *req;
-	int rc = 0;
-
-	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
-	     vino.snap, off, *plen);
-	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
-				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-				    NULL, truncate_seq, truncate_size,
-				    false);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
-
-	/* it may be a short read due to an object boundary */
-	osd_req_op_extent_osd_data_pages(req, 0,
-				pages, *plen, page_align, false, false);
-
-	dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
-	     off, *plen, *plen, page_align);
-
-	rc = ceph_osdc_start_request(osdc, req, false);
-	if (!rc)
-		rc = ceph_osdc_wait_request(osdc, req);
-
-	ceph_osdc_put_request(req);
-	dout("readpages result %d\n", rc);
-	return rc;
-}
-EXPORT_SYMBOL(ceph_osdc_readpages);
-
-/*
- * do a synchronous write on N pages
- */
-int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
-			 struct ceph_file_layout *layout,
-			 struct ceph_snap_context *snapc,
-			 u64 off, u64 len,
-			 u32 truncate_seq, u64 truncate_size,
-			 struct timespec64 *mtime,
-			 struct page **pages, int num_pages)
-{
-	struct ceph_osd_request *req;
-	int rc = 0;
-	int page_align = off & ~PAGE_MASK;
-
-	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
-				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
-				    snapc, truncate_seq, truncate_size,
-				    true);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
-
-	/* it may be a short write due to an object boundary */
-	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
-				false, false);
-	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
-
-	req->r_mtime = *mtime;
-	rc = ceph_osdc_start_request(osdc, req, true);
-	if (!rc)
-		rc = ceph_osdc_wait_request(osdc, req);
-
-	ceph_osdc_put_request(req);
-	if (rc == 0)
-		rc = len;
-	dout("writepages result %d\n", rc);
-	return rc;
-}
-EXPORT_SYMBOL(ceph_osdc_writepages);
-
 static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
 				     u64 src_snapid, u64 src_version,
 				     struct ceph_object_id *src_oid,
 				     struct ceph_object_locator *src_oloc,
 				     u32 src_fadvise_flags,
 				     u32 dst_fadvise_flags,
+				     u32 truncate_seq, u64 truncate_size,
 				     u8 copy_from_flags)
 {
 	struct ceph_osd_req_op *op;
@@ -5326,7 +5325,8 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
 	if (IS_ERR(pages))
 		return PTR_ERR(pages);
 
-	op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+	op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
+			     dst_fadvise_flags);
 	op->copy_from.snapid = src_snapid;
 	op->copy_from.src_version = src_version;
 	op->copy_from.flags = copy_from_flags;
@@ -5336,6 +5336,8 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
 	end = p + PAGE_SIZE;
 	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
 	encode_oloc(&p, end, src_oloc);
+	ceph_encode_32(&p, truncate_seq);
+	ceph_encode_64(&p, truncate_size);
 	op->indata_len = PAGE_SIZE - (end - p);
 
 	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
@@ -5351,6 +5353,7 @@ int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
 			struct ceph_object_id *dst_oid,
 			struct ceph_object_locator *dst_oloc,
 			u32 dst_fadvise_flags,
+			u32 truncate_seq, u64 truncate_size,
 			u8 copy_from_flags)
 {
 	struct ceph_osd_request *req;
@@ -5367,7 +5370,8 @@ int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
 
 	ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
 					src_oloc, src_fadvise_flags,
-					dst_fadvise_flags, copy_from_flags);
+					dst_fadvise_flags, truncate_seq,
+					truncate_size, copy_from_flags);
 	if (ret)
 		goto out;
 
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 2a6e63a..fa08c15 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -138,6 +138,79 @@ static int crush_decode_straw2_bucket(void **p, void *end,
 	return -EINVAL;
 }
 
+struct crush_name_node {
+	struct rb_node cn_node;
+	int cn_id;
+	char cn_name[];
+};
+
+static struct crush_name_node *alloc_crush_name(size_t name_len)
+{
+	struct crush_name_node *cn;
+
+	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
+	if (!cn)
+		return NULL;
+
+	RB_CLEAR_NODE(&cn->cn_node);
+	return cn;
+}
+
+static void free_crush_name(struct crush_name_node *cn)
+{
+	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
+
+	kfree(cn);
+}
+
+DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
+
+static int decode_crush_names(void **p, void *end, struct rb_root *root)
+{
+	u32 n;
+
+	ceph_decode_32_safe(p, end, n, e_inval);
+	while (n--) {
+		struct crush_name_node *cn;
+		int id;
+		u32 name_len;
+
+		ceph_decode_32_safe(p, end, id, e_inval);
+		ceph_decode_32_safe(p, end, name_len, e_inval);
+		ceph_decode_need(p, end, name_len, e_inval);
+
+		cn = alloc_crush_name(name_len);
+		if (!cn)
+			return -ENOMEM;
+
+		cn->cn_id = id;
+		memcpy(cn->cn_name, *p, name_len);
+		cn->cn_name[name_len] = '\0';
+		*p += name_len;
+
+		if (!__insert_crush_name(root, cn)) {
+			free_crush_name(cn);
+			return -EEXIST;
+		}
+	}
+
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+void clear_crush_names(struct rb_root *root)
+{
+	while (!RB_EMPTY_ROOT(root)) {
+		struct crush_name_node *cn =
+		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
+
+		erase_crush_name(root, cn);
+		free_crush_name(cn);
+	}
+}
+
 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
 {
 	struct crush_choose_arg_map *arg_map;
@@ -354,6 +427,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	if (c == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	c->type_names = RB_ROOT;
+	c->names = RB_ROOT;
 	c->choose_args = RB_ROOT;
 
         /* set tunables to default values */
@@ -510,8 +585,14 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 		}
 	}
 
-	ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
-	ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+	err = decode_crush_names(p, end, &c->type_names);
+	if (err)
+		goto fail;
+
+	err = decode_crush_names(p, end, &c->names);
+	if (err)
+		goto fail;
+
 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
 
         /* tunables */
@@ -636,48 +717,11 @@ DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
 /*
  * rbtree of pg pool info
  */
-static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
-{
-	struct rb_node **p = &root->rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_pg_pool_info *pi = NULL;
-
-	while (*p) {
-		parent = *p;
-		pi = rb_entry(parent, struct ceph_pg_pool_info, node);
-		if (new->id < pi->id)
-			p = &(*p)->rb_left;
-		else if (new->id > pi->id)
-			p = &(*p)->rb_right;
-		else
-			return -EEXIST;
-	}
-
-	rb_link_node(&new->node, parent, p);
-	rb_insert_color(&new->node, root);
-	return 0;
-}
-
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
-{
-	struct ceph_pg_pool_info *pi;
-	struct rb_node *n = root->rb_node;
-
-	while (n) {
-		pi = rb_entry(n, struct ceph_pg_pool_info, node);
-		if (id < pi->id)
-			n = n->rb_left;
-		else if (id > pi->id)
-			n = n->rb_right;
-		else
-			return pi;
-	}
-	return NULL;
-}
+DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
 
 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 {
-	return __lookup_pg_pool(&map->pg_pools, id);
+	return lookup_pg_pool(&map->pg_pools, id);
 }
 
 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
@@ -690,8 +734,7 @@ const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
 		return NULL;
 
-	pi = __lookup_pg_pool(&map->pg_pools, (int) id);
-
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->name : NULL;
 }
 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
@@ -714,14 +757,14 @@ u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
 {
 	struct ceph_pg_pool_info *pi;
 
-	pi = __lookup_pg_pool(&map->pg_pools, id);
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->flags : 0;
 }
 EXPORT_SYMBOL(ceph_pg_pool_flags);
 
 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 {
-	rb_erase(&pi->node, root);
+	erase_pg_pool(root, pi);
 	kfree(pi->name);
 	kfree(pi);
 }
@@ -903,7 +946,7 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 		ceph_decode_32_safe(p, end, len, bad);
 		dout("  pool %llu len %d\n", pool, len);
 		ceph_decode_need(p, end, len, bad);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi) {
 			char *name = kstrndup(*p, len, GFP_NOFS);
 
@@ -922,6 +965,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 }
 
 /*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+	struct crush_work *work;
+	size_t work_size;
+
+	WARN_ON(!c->working_size);
+	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+
+	work = ceph_kvmalloc(work_size, GFP_NOIO);
+	if (!work)
+		return NULL;
+
+	INIT_LIST_HEAD(&work->item);
+	crush_init_workspace(c, work);
+	return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+	WARN_ON(!list_empty(&work->item));
+	kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+	INIT_LIST_HEAD(&wsm->idle_ws);
+	spin_lock_init(&wsm->ws_lock);
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+	init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+				  struct crush_work *work)
+{
+	WARN_ON(!list_empty(&wsm->idle_ws));
+
+	list_add(&work->item, &wsm->idle_ws);
+	atomic_set(&wsm->total_ws, 1);
+	wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+	struct crush_work *work;
+
+	while (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		free_workspace(work);
+	}
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+					const struct crush_map *c)
+{
+	struct crush_work *work;
+	int cpus = num_online_cpus();
+
+again:
+	spin_lock(&wsm->ws_lock);
+	if (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		wsm->free_ws--;
+		spin_unlock(&wsm->ws_lock);
+		return work;
+
+	}
+	if (atomic_read(&wsm->total_ws) > cpus) {
+		DEFINE_WAIT(wait);
+
+		spin_unlock(&wsm->ws_lock);
+		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+			schedule();
+		finish_wait(&wsm->ws_wait, &wait);
+		goto again;
+	}
+	atomic_inc(&wsm->total_ws);
+	spin_unlock(&wsm->ws_lock);
+
+	work = alloc_workspace(c);
+	if (!work) {
+		atomic_dec(&wsm->total_ws);
+		wake_up(&wsm->ws_wait);
+
+		/*
+		 * Do not return the error but go back to waiting.  We
+		 * have the inital workspace and the CRUSH computation
+		 * time is bounded so we will get it eventually.
+		 */
+		WARN_ON(atomic_read(&wsm->total_ws) < 1);
+		goto again;
+	}
+	return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+			  struct crush_work *work)
+{
+	spin_lock(&wsm->ws_lock);
+	if (wsm->free_ws <= num_online_cpus()) {
+		list_add(&work->item, &wsm->idle_ws);
+		wsm->free_ws++;
+		spin_unlock(&wsm->ws_lock);
+		goto wake;
+	}
+	spin_unlock(&wsm->ws_lock);
+
+	free_workspace(work);
+	atomic_dec(&wsm->total_ws);
+wake:
+	if (wq_has_sleeper(&wsm->ws_wait))
+		wake_up(&wsm->ws_wait);
+}
+
+/*
  * osd map
  */
 struct ceph_osdmap *ceph_osdmap_alloc(void)
@@ -938,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 	map->primary_temp = RB_ROOT;
 	map->pg_upmap = RB_ROOT;
 	map->pg_upmap_items = RB_ROOT;
-	mutex_init(&map->crush_workspace_mutex);
+
+	init_workspace_manager(&map->crush_wsm);
 
 	return map;
 }
@@ -946,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
 	dout("osdmap_destroy %p\n", map);
+
 	if (map->crush)
 		crush_destroy(map->crush);
+	cleanup_workspace_manager(&map->crush_wsm);
+
 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 		struct ceph_pg_mapping *pg =
 			rb_entry(rb_first(&map->pg_temp),
@@ -986,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
 	kvfree(map->osd_weight);
 	kvfree(map->osd_addr);
 	kvfree(map->osd_primary_affinity);
-	kvfree(map->crush_workspace);
 	kfree(map);
 }
 
@@ -1061,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-	void *workspace;
-	size_t work_size;
+	struct crush_work *work;
 
 	if (IS_ERR(crush))
 		return PTR_ERR(crush);
 
-	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-	dout("%s work_size %zu bytes\n", __func__, work_size);
-	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-	if (!workspace) {
+	work = alloc_workspace(crush);
+	if (!work) {
 		crush_destroy(crush);
 		return -ENOMEM;
 	}
-	crush_init_workspace(crush, workspace);
 
 	if (map->crush)
 		crush_destroy(map->crush);
-	kvfree(map->crush_workspace);
+	cleanup_workspace_manager(&map->crush_wsm);
 	map->crush = crush;
-	map->crush_workspace = workspace;
+	add_initial_workspace(&map->crush_wsm, work);
 	return 0;
 }
 
@@ -1154,18 +1333,18 @@ static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
 
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (!incremental || !pi) {
 			pi = kzalloc(sizeof(*pi), GFP_NOFS);
 			if (!pi)
 				return -ENOMEM;
 
+			RB_CLEAR_NODE(&pi->node);
 			pi->id = pool;
 
-			ret = __insert_pg_pool(&map->pg_pools, pi);
-			if (ret) {
+			if (!__insert_pg_pool(&map->pg_pools, pi)) {
 				kfree(pi);
-				return ret;
+				return -EEXIST;
 			}
 		}
 
@@ -1829,7 +2008,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 		struct ceph_pg_pool_info *pi;
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi)
 			__remove_pg_pool(&map->pg_pools, pi);
 	}
@@ -2279,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		    s64 choose_args_index)
 {
 	struct crush_choose_arg_map *arg_map;
+	struct crush_work *work;
 	int r;
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2289,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
 						CEPH_DEFAULT_CHOOSE_ARGS);
 
-	mutex_lock(&map->crush_workspace_mutex);
+	work = get_workspace(&map->crush_wsm, map->crush);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_workspace,
+			  weight, weight_max, work,
 			  arg_map ? arg_map->args : NULL);
-	mutex_unlock(&map->crush_workspace_mutex);
-
+	put_workspace(&map->crush_wsm, work);
 	return r;
 }
 
@@ -2672,3 +2851,221 @@ int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
 	return acting.primary;
 }
 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
+
+static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
+					      size_t name_len)
+{
+	struct crush_loc_node *loc;
+
+	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
+	if (!loc)
+		return NULL;
+
+	RB_CLEAR_NODE(&loc->cl_node);
+	return loc;
+}
+
+static void free_crush_loc(struct crush_loc_node *loc)
+{
+	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
+
+	kfree(loc);
+}
+
+static int crush_loc_compare(const struct crush_loc *loc1,
+			     const struct crush_loc *loc2)
+{
+	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
+	       strcmp(loc1->cl_name, loc2->cl_name);
+}
+
+DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
+		 RB_BYPTR, const struct crush_loc *, cl_node)
+
+/*
+ * Parses a set of <bucket type name>':'<bucket name> pairs separated
+ * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
+ *
+ * Note that @crush_location is modified by strsep().
+ */
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
+{
+	struct crush_loc_node *loc;
+	const char *type_name, *name, *colon;
+	size_t type_name_len, name_len;
+
+	dout("%s '%s'\n", __func__, crush_location);
+	while ((type_name = strsep(&crush_location, "|"))) {
+		colon = strchr(type_name, ':');
+		if (!colon)
+			return -EINVAL;
+
+		type_name_len = colon - type_name;
+		if (type_name_len == 0)
+			return -EINVAL;
+
+		name = colon + 1;
+		name_len = strlen(name);
+		if (name_len == 0)
+			return -EINVAL;
+
+		loc = alloc_crush_loc(type_name_len, name_len);
+		if (!loc)
+			return -ENOMEM;
+
+		loc->cl_loc.cl_type_name = loc->cl_data;
+		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
+		loc->cl_loc.cl_type_name[type_name_len] = '\0';
+
+		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
+		memcpy(loc->cl_loc.cl_name, name, name_len);
+		loc->cl_loc.cl_name[name_len] = '\0';
+
+		if (!__insert_crush_loc(locs, loc)) {
+			free_crush_loc(loc);
+			return -EEXIST;
+		}
+
+		dout("%s type_name '%s' name '%s'\n", __func__,
+		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
+	}
+
+	return 0;
+}
+
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
+{
+	struct rb_node *n1 = rb_first(locs1);
+	struct rb_node *n2 = rb_first(locs2);
+	int ret;
+
+	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
+		struct crush_loc_node *loc1 =
+		    rb_entry(n1, struct crush_loc_node, cl_node);
+		struct crush_loc_node *loc2 =
+		    rb_entry(n2, struct crush_loc_node, cl_node);
+
+		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
+		if (ret)
+			return ret;
+	}
+
+	if (!n1 && n2)
+		return -1;
+	if (n1 && !n2)
+		return 1;
+	return 0;
+}
+
+void ceph_clear_crush_locs(struct rb_root *locs)
+{
+	while (!RB_EMPTY_ROOT(locs)) {
+		struct crush_loc_node *loc =
+		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
+
+		erase_crush_loc(locs, loc);
+		free_crush_loc(loc);
+	}
+}
+
+/*
+ * [a-zA-Z0-9-_.]+
+ */
+static bool is_valid_crush_name(const char *name)
+{
+	do {
+		if (!('a' <= *name && *name <= 'z') &&
+		    !('A' <= *name && *name <= 'Z') &&
+		    !('0' <= *name && *name <= '9') &&
+		    *name != '-' && *name != '_' && *name != '.')
+			return false;
+	} while (*++name != '\0');
+
+	return true;
+}
+
+/*
+ * Gets the parent of an item.  Returns its id (<0 because the
+ * parent is always a bucket), type id (>0 for the same reason,
+ * via @parent_type_id) and location (via @parent_loc).  If no
+ * parent, returns 0.
+ *
+ * Does a linear search, as there are no parent pointers of any
+ * kind.  Note that the result is ambigous for items that occur
+ * multiple times in the map.
+ */
+static int get_immediate_parent(struct crush_map *c, int id,
+				u16 *parent_type_id,
+				struct crush_loc *parent_loc)
+{
+	struct crush_bucket *b;
+	struct crush_name_node *type_cn, *cn;
+	int i, j;
+
+	for (i = 0; i < c->max_buckets; i++) {
+		b = c->buckets[i];
+		if (!b)
+			continue;
+
+		/* ignore per-class shadow hierarchy */
+		cn = lookup_crush_name(&c->names, b->id);
+		if (!cn || !is_valid_crush_name(cn->cn_name))
+			continue;
+
+		for (j = 0; j < b->size; j++) {
+			if (b->items[j] != id)
+				continue;
+
+			*parent_type_id = b->type;
+			type_cn = lookup_crush_name(&c->type_names, b->type);
+			parent_loc->cl_type_name = type_cn->cn_name;
+			parent_loc->cl_name = cn->cn_name;
+			return b->id;
+		}
+	}
+
+	return 0;  /* no parent */
+}
+
+/*
+ * Calculates the locality/distance from an item to a client
+ * location expressed in terms of CRUSH hierarchy as a set of
+ * (bucket type name, bucket name) pairs.  Specifically, looks
+ * for the lowest-valued bucket type for which the location of
+ * @id matches one of the locations in @locs, so for standard
+ * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
+ * a matching host is closer than a matching rack and a matching
+ * data center is closer than a matching zone.
+ *
+ * Specifying multiple locations (a "multipath" location) such
+ * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
+ * is a multimap.  The locality will be:
+ *
+ * - 3 for OSDs in racks foo1 and foo2
+ * - 8 for OSDs in data center bar
+ * - -1 for all other OSDs
+ *
+ * The lowest possible bucket type is 1, so the best locality
+ * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
+ * the OSD itself.
+ */
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
+			    struct rb_root *locs)
+{
+	struct crush_loc loc;
+	u16 type_id;
+
+	/*
+	 * Instead of repeated get_immediate_parent() calls,
+	 * the location of @id could be obtained with a single
+	 * depth-first traversal.
+	 */
+	for (;;) {
+		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
+		if (id >= 0)
+			return -1;  /* not local */
+
+		if (lookup_crush_loc(locs, &loc))
+			return type_id;
+	}
+}