Update Linux to v5.4.2
Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 92ab204..8de6339 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
#include <linux/module.h>
#include <linux/sched.h>
@@ -9,10 +10,12 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/falloc.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
#include "cache.h"
+#include "io.h"
static __le32 ceph_flags_sys2wire(u32 flags)
{
@@ -199,6 +202,7 @@
static int ceph_init_file_info(struct inode *inode, struct file *file,
int fmode, bool isdir)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi;
dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
@@ -209,7 +213,7 @@
struct ceph_dir_file_info *dfi =
kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
if (!dfi) {
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+ ceph_put_fmode(ci, fmode); /* clean up */
return -ENOMEM;
}
@@ -220,7 +224,7 @@
} else {
fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
if (!fi) {
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+ ceph_put_fmode(ci, fmode); /* clean up */
return -ENOMEM;
}
@@ -230,6 +234,8 @@
fi->fmode = fmode;
spin_lock_init(&fi->rw_contexts_lock);
INIT_LIST_HEAD(&fi->rw_contexts);
+ fi->meta_err = errseq_sample(&ci->i_meta_err);
+ fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
return 0;
}
@@ -246,6 +252,7 @@
case S_IFREG:
ceph_fscache_register_inode_cookie(inode);
ceph_fscache_file_set_cookie(inode, file);
+ /* fall through */
case S_IFDIR:
ret = ceph_init_file_info(inode, file, fmode,
S_ISDIR(inode->i_mode));
@@ -435,7 +442,7 @@
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
struct dentry *dn;
- struct ceph_acls_info acls = {};
+ struct ceph_acl_sec_ctx as_ctx = {};
int mask;
int err;
@@ -449,25 +456,31 @@
if (flags & O_CREAT) {
if (ceph_quota_is_max_files_exceeded(dir))
return -EDQUOT;
- err = ceph_pre_init_acls(dir, &mode, &acls);
+ err = ceph_pre_init_acls(dir, &mode, &as_ctx);
if (err < 0)
return err;
+ err = ceph_security_init_secctx(dentry, mode, &as_ctx);
+ if (err < 0)
+ goto out_ctx;
+ } else if (!d_in_lookup(dentry)) {
+ /* If it's not being looked up, it's negative */
+ return -ENOENT;
}
/* do the open */
req = prepare_open_request(dir->i_sb, flags, mode);
if (IS_ERR(req)) {
err = PTR_ERR(req);
- goto out_acl;
+ goto out_ctx;
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
if (flags & O_CREAT) {
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (acls.pagelist) {
- req->r_pagelist = acls.pagelist;
- acls.pagelist = NULL;
+ if (as_ctx.pagelist) {
+ req->r_pagelist = as_ctx.pagelist;
+ as_ctx.pagelist = NULL;
}
}
@@ -505,7 +518,7 @@
} else {
dout("atomic_open finish_open on dn %p\n", dn);
if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
- ceph_init_inode_acls(d_inode(dentry), &acls);
+ ceph_init_inode_acls(d_inode(dentry), &as_ctx);
file->f_mode |= FMODE_CREATED;
}
err = finish_open(file, dentry, ceph_open);
@@ -514,8 +527,8 @@
if (!req->r_err && req->r_target_inode)
ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
ceph_mdsc_put_request(req);
-out_acl:
- ceph_release_acls_info(&acls);
+out_ctx:
+ ceph_release_acl_sec_ctx(&as_ctx);
dout("atomic_open result=%d\n", err);
return err;
}
@@ -557,90 +570,26 @@
};
/*
- * Read a range of bytes striped over one or more objects. Iterate over
- * objects we stripe over. (That's not atomic, but good enough for now.)
+ * Completely synchronous read and write methods. Direct from __user
+ * buffer to osd, or directly to user pages (if O_DIRECT).
+ *
+ * If the read spans object boundary, just do multiple reads. (That's not
+ * atomic, but good enough for now.)
*
* If we get a short result from the OSD, check against i_size; we need to
* only return a short read to the caller if we hit EOF.
*/
-static int striped_read(struct inode *inode,
- u64 pos, u64 len,
- struct page **pages, int num_pages,
- int page_align, int *checkeof)
-{
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_inode_info *ci = ceph_inode(inode);
- u64 this_len;
- loff_t i_size;
- int page_idx;
- int ret, read = 0;
- bool hit_stripe, was_short;
-
- /*
- * we may need to do multiple reads. not atomic, unfortunately.
- */
-more:
- this_len = len;
- page_idx = (page_align + read) >> PAGE_SHIFT;
- ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
- &ci->i_layout, pos, &this_len,
- ci->i_truncate_seq, ci->i_truncate_size,
- pages + page_idx, num_pages - page_idx,
- ((page_align + read) & ~PAGE_MASK));
- if (ret == -ENOENT)
- ret = 0;
- hit_stripe = this_len < len;
- was_short = ret >= 0 && ret < this_len;
- dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
- ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
-
- i_size = i_size_read(inode);
- if (ret >= 0) {
- if (was_short && (pos + ret < i_size)) {
- int zlen = min(this_len - ret, i_size - pos - ret);
- int zoff = page_align + read + ret;
- dout(" zero gap %llu to %llu\n",
- pos + ret, pos + ret + zlen);
- ceph_zero_page_vector_range(zoff, zlen, pages);
- ret += zlen;
- }
-
- read += ret;
- pos += ret;
- len -= ret;
-
- /* hit stripe and need continue*/
- if (len && hit_stripe && pos < i_size)
- goto more;
- }
-
- if (read > 0) {
- ret = read;
- /* did we bounce off eof? */
- if (pos + len > i_size)
- *checkeof = CHECK_EOF;
- }
-
- dout("striped_read returns %d\n", ret);
- return ret;
-}
-
-/*
- * Completely synchronous read and write methods. Direct from __user
- * buffer to osd, or directly to user pages (if O_DIRECT).
- *
- * If the read spans object boundary, just do multiple reads.
- */
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
- int *checkeof)
+ int *retry_op)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
- struct page **pages;
- u64 off = iocb->ki_pos;
- int num_pages;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
ssize_t ret;
- size_t len = iov_iter_count(to);
+ u64 off = iocb->ki_pos;
+ u64 len = iov_iter_count(to);
dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +602,125 @@
* but it will at least behave sensibly when they are
* in sequence.
*/
- ret = filemap_write_and_wait_range(inode->i_mapping, off,
- off + len);
+ ret = filemap_write_and_wait_range(inode->i_mapping,
+ off, off + len - 1);
if (ret < 0)
return ret;
- if (unlikely(to->type & ITER_PIPE)) {
+ ret = 0;
+ while ((len = iov_iter_count(to)) > 0) {
+ struct ceph_osd_request *req;
+ struct page **pages;
+ int num_pages;
size_t page_off;
- ret = iov_iter_get_pages_alloc(to, &pages, len,
- &page_off);
- if (ret <= 0)
- return -ENOMEM;
- num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+ u64 i_size;
+ bool more;
- ret = striped_read(inode, off, ret, pages, num_pages,
- page_off, checkeof);
- if (ret > 0) {
- iov_iter_advance(to, ret);
- off += ret;
- } else {
- iov_iter_advance(to, 0);
+ req = ceph_osdc_new_request(osdc, &ci->i_layout,
+ ci->i_vino, off, &len, 0, 1,
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+ NULL, ci->i_truncate_seq,
+ ci->i_truncate_size, false);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ break;
}
- ceph_put_page_vector(pages, num_pages, false);
- } else {
- num_pages = calc_pages_for(off, len);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
- ret = striped_read(inode, off, len, pages, num_pages,
- (off & ~PAGE_MASK), checkeof);
- if (ret > 0) {
- int l, k = 0;
- size_t left = ret;
+ more = len < iov_iter_count(to);
- while (left) {
- size_t page_off = off & ~PAGE_MASK;
- size_t copy = min_t(size_t, left,
- PAGE_SIZE - page_off);
- l = copy_page_to_iter(pages[k++], page_off,
- copy, to);
- off += l;
- left -= l;
- if (l < copy)
- break;
+ if (unlikely(iov_iter_is_pipe(to))) {
+ ret = iov_iter_get_pages_alloc(to, &pages, len,
+ &page_off);
+ if (ret <= 0) {
+ ceph_osdc_put_request(req);
+ ret = -ENOMEM;
+ break;
+ }
+ num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+ if (ret < len) {
+ len = ret;
+ osd_req_op_extent_update(req, 0, len);
+ more = false;
+ }
+ } else {
+ num_pages = calc_pages_for(off, len);
+ page_off = off & ~PAGE_MASK;
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages)) {
+ ceph_osdc_put_request(req);
+ ret = PTR_ERR(pages);
+ break;
}
}
- ceph_release_page_vector(pages, num_pages);
+
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
+ false, false);
+ ret = ceph_osdc_start_request(osdc, req, false);
+ if (!ret)
+ ret = ceph_osdc_wait_request(osdc, req);
+ ceph_osdc_put_request(req);
+
+ i_size = i_size_read(inode);
+ dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
+ off, len, ret, i_size, (more ? " MORE" : ""));
+
+ if (ret == -ENOENT)
+ ret = 0;
+ if (ret >= 0 && ret < len && (off + ret < i_size)) {
+ int zlen = min(len - ret, i_size - off - ret);
+ int zoff = page_off + ret;
+ dout("sync_read zero gap %llu~%llu\n",
+ off + ret, off + ret + zlen);
+ ceph_zero_page_vector_range(zoff, zlen, pages);
+ ret += zlen;
+ }
+
+ if (unlikely(iov_iter_is_pipe(to))) {
+ if (ret > 0) {
+ iov_iter_advance(to, ret);
+ off += ret;
+ } else {
+ iov_iter_advance(to, 0);
+ }
+ ceph_put_page_vector(pages, num_pages, false);
+ } else {
+ int idx = 0;
+ size_t left = ret > 0 ? ret : 0;
+ while (left > 0) {
+ size_t len, copied;
+ page_off = off & ~PAGE_MASK;
+ len = min_t(size_t, left, PAGE_SIZE - page_off);
+ copied = copy_page_to_iter(pages[idx++],
+ page_off, len, to);
+ off += copied;
+ left -= copied;
+ if (copied < len) {
+ ret = -EFAULT;
+ break;
+ }
+ }
+ ceph_release_page_vector(pages, num_pages);
+ }
+
+ if (ret < 0) {
+ if (ret == -EBLACKLISTED)
+ fsc->blacklisted = true;
+ break;
+ }
+
+ if (off >= i_size || !more)
+ break;
}
if (off > iocb->ki_pos) {
+ if (ret >= 0 &&
+ iov_iter_count(to) > 0 && off >= i_size_read(inode))
+ *retry_op = CHECK_EOF;
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}
- dout("sync_read result %zd\n", ret);
+ dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
return ret;
}
@@ -740,6 +753,9 @@
if (!atomic_dec_and_test(&aio_req->pending_reqs))
return;
+ if (aio_req->iocb->ki_flags & IOCB_DIRECT)
+ inode_dio_end(inode);
+
ret = aio_req->error;
if (!ret)
ret = aio_req->total_len;
@@ -795,7 +811,7 @@
if (aio_work) {
INIT_WORK(&aio_work->work, ceph_aio_retry_work);
aio_work->req = req;
- queue_work(ceph_inode_to_client(inode)->wb_wq,
+ queue_work(ceph_inode_to_client(inode)->inode_wq,
&aio_work->work);
return;
}
@@ -821,7 +837,7 @@
aio_req->total_len = rc + zlen;
}
- iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
+ iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
osd_data->num_bvecs,
osd_data->bvec_pos.iter.bi_size);
iov_iter_advance(&i, rc);
@@ -865,7 +881,7 @@
}
spin_unlock(&ci->i_ceph_lock);
- req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+ req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
false, GFP_NOFS);
if (!req) {
ret = -ENOMEM;
@@ -877,6 +893,11 @@
ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
+ req->r_ops[0] = orig_req->r_ops[0];
+
+ req->r_mtime = aio_req->mtime;
+ req->r_data_offset = req->r_ops[0].extent.offset;
+
ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (ret) {
ceph_osdc_put_request(req);
@@ -884,11 +905,6 @@
goto out;
}
- req->r_ops[0] = orig_req->r_ops[0];
-
- req->r_mtime = aio_req->mtime;
- req->r_data_offset = req->r_ops[0].extent.offset;
-
ceph_osdc_put_request(orig_req);
req->r_callback = ceph_aio_complete_req;
@@ -921,7 +937,7 @@
struct ceph_aio_request *aio_req = NULL;
int num_pages = 0;
int flags;
- int ret;
+ int ret = 0;
struct timespec64 mtime = current_time(inode);
size_t count = iov_iter_count(iter);
loff_t pos = iocb->ki_pos;
@@ -933,16 +949,12 @@
dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
(write ? "write" : "read"), file, pos, (unsigned)count,
- snapc, snapc->seq);
-
- ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
- if (ret < 0)
- return ret;
+ snapc, snapc ? snapc->seq : 0);
if (write) {
int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
- (pos + count) >> PAGE_SHIFT);
+ (pos + count - 1) >> PAGE_SHIFT);
if (ret2 < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret2);
@@ -1010,7 +1022,7 @@
* may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
- (pos+len) | (PAGE_SIZE - 1));
+ PAGE_ALIGN(pos + len) - 1);
req->r_mtime = mtime;
}
@@ -1025,7 +1037,7 @@
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
- list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+ list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
pos += len;
continue;
@@ -1044,8 +1056,7 @@
int zlen = min_t(size_t, len - ret,
size - pos - ret);
- iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages,
- len);
+ iov_iter_bvec(&i, READ, bvecs, num_pages, len);
iov_iter_advance(&i, ret);
iov_iter_zero(zlen, &i);
ret += zlen;
@@ -1083,11 +1094,12 @@
CEPH_CAP_FILE_RD);
list_splice(&aio_req->osd_reqs, &osd_reqs);
+ inode_dio_begin(inode);
while (!list_empty(&osd_reqs)) {
req = list_first_entry(&osd_reqs,
struct ceph_osd_request,
- r_unsafe_item);
- list_del_init(&req->r_unsafe_item);
+ r_private_item);
+ list_del_init(&req->r_private_item);
if (ret >= 0)
ret = ceph_osdc_start_request(req->r_osdc,
req, false);
@@ -1139,13 +1151,14 @@
dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
file, pos, (unsigned)count, snapc, snapc->seq);
- ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
+ ret = filemap_write_and_wait_range(inode->i_mapping,
+ pos, pos + count - 1);
if (ret < 0)
return ret;
ret = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
- (pos + count) >> PAGE_SHIFT);
+ (pos + count - 1) >> PAGE_SHIFT);
if (ret < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
@@ -1255,13 +1268,24 @@
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_start_io_direct(inode);
+ else
+ ceph_start_io_read(inode);
+
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
- if (ret < 0)
+ ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
+ &got, &pinned_page);
+ if (ret < 0) {
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_read(inode);
return ret;
+ }
if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_flags & IOCB_DIRECT) ||
@@ -1292,6 +1316,7 @@
ret = generic_file_read_iter(iocb, to);
ceph_del_rw_context(fi, &rw_ctx);
}
+
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) {
@@ -1299,6 +1324,12 @@
pinned_page = NULL;
}
ceph_put_cap_refs(ci, got);
+
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_read(inode);
+
if (retry_op > HAVE_RETRIED && ret >= 0) {
int statret;
struct page *page = NULL;
@@ -1398,7 +1429,10 @@
return -ENOMEM;
retry_snap:
- inode_lock(inode);
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_start_io_direct(inode);
+ else
+ ceph_start_io_write(inode);
/* We can write back this queue in page reclaim */
current->backing_dev_info = inode_to_bdi(inode);
@@ -1435,6 +1469,8 @@
if (err)
goto out;
+ inode_inc_iversion_raw(inode);
+
if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
@@ -1454,7 +1490,7 @@
else
want = CEPH_CAP_FILE_BUFFER;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+ err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
&got, NULL);
if (err < 0)
goto out;
@@ -1467,7 +1503,6 @@
(ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
struct ceph_snap_context *snapc;
struct iov_iter data;
- inode_unlock(inode);
spin_lock(&ci->i_ceph_lock);
if (__ceph_have_pending_cap_snap(ci)) {
@@ -1484,11 +1519,14 @@
/* we might need to revert back to that point */
data = *from;
- if (iocb->ki_flags & IOCB_DIRECT)
+ if (iocb->ki_flags & IOCB_DIRECT) {
written = ceph_direct_read_write(iocb, &data, snapc,
&prealloc_cf);
- else
+ ceph_end_io_direct(inode);
+ } else {
written = ceph_sync_write(iocb, &data, pos, snapc);
+ ceph_end_io_write(inode);
+ }
if (written > 0)
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
@@ -1503,7 +1541,7 @@
written = generic_perform_write(file, from, pos);
if (likely(written >= 0))
iocb->ki_pos = pos + written;
- inode_unlock(inode);
+ ceph_end_io_write(inode);
}
if (written >= 0) {
@@ -1538,9 +1576,11 @@
}
goto out_unlocked;
-
out:
- inode_unlock(inode);
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_write(inode);
out_unlocked:
ceph_free_cap_flush(prealloc_cf);
current->backing_dev_info = NULL;
@@ -1735,7 +1775,6 @@
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_cap_flush *prealloc_cf;
int want, got = 0;
int dirty;
@@ -1743,10 +1782,7 @@
loff_t endoff = 0;
loff_t size;
- if ((offset + length) > max(i_size_read(inode), fsc->max_file_size))
- return -EFBIG;
-
- if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+ if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
return -EOPNOTSUPP;
if (!S_ISREG(inode->i_mode))
@@ -1763,18 +1799,6 @@
goto unlock;
}
- if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
- ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
- ret = -EDQUOT;
- goto unlock;
- }
-
- if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) &&
- !(mode & FALLOC_FL_PUNCH_HOLE)) {
- ret = -ENOSPC;
- goto unlock;
- }
-
if (ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file, NULL);
if (ret < 0)
@@ -1782,32 +1806,24 @@
}
size = i_size_read(inode);
- if (!(mode & FALLOC_FL_KEEP_SIZE)) {
- endoff = offset + length;
- ret = inode_newsize_ok(inode, endoff);
- if (ret)
- goto unlock;
- }
+
+ /* Are we punching a hole beyond EOF? */
+ if (offset >= size)
+ goto unlock;
+ if ((offset + length) > size)
+ length = size - offset;
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
+ ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0)
goto unlock;
- if (mode & FALLOC_FL_PUNCH_HOLE) {
- if (offset < size)
- ceph_zero_pagecache_range(inode, offset, length);
- ret = ceph_zero_objects(inode, offset, length);
- } else if (endoff > size) {
- truncate_pagecache_range(inode, size, -1);
- if (ceph_inode_set_size(inode, endoff))
- ceph_check_caps(ceph_inode(inode),
- CHECK_CAPS_AUTHONLY, NULL);
- }
+ ceph_zero_pagecache_range(inode, offset, length);
+ ret = ceph_zero_objects(inode, offset, length);
if (!ret) {
spin_lock(&ci->i_ceph_lock);
@@ -1817,9 +1833,6 @@
spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
- if ((endoff > size) &&
- ceph_quota_is_max_bytes_approaching(inode, endoff))
- ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
}
ceph_put_cap_refs(ci, got);
@@ -1829,6 +1842,339 @@
return ret;
}
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci. Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct file *src_filp, int *src_got,
+ struct file *dst_filp,
+ loff_t dst_endoff, int *dst_got)
+{
+ int ret = 0;
+ bool retrying = false;
+
+retry_caps:
+ ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+ dst_endoff, dst_got, NULL);
+ if (ret < 0)
+ return ret;
+
+ /*
+ * Since we're already holding the FILE_WR capability for the dst file,
+ * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
+ * retry dance instead to try to get both capabilities.
+ */
+ ret = ceph_try_get_caps(file_inode(src_filp),
+ CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+ false, src_got);
+ if (ret <= 0) {
+ /* Start by dropping dst_ci caps and getting src_ci caps */
+ ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
+ if (retrying) {
+ if (!ret)
+ /* ceph_try_get_caps masks EAGAIN */
+ ret = -EAGAIN;
+ return ret;
+ }
+ ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
+ CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
+ if (ret < 0)
+ return ret;
+ /*... drop src_ci caps too, and retry */
+ ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
+ retrying = true;
+ goto retry_caps;
+ }
+ return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+ struct ceph_inode_info *dst_ci, int dst_got)
+{
+ ceph_put_cap_refs(src_ci, src_got);
+ ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ * - source file is smaller than off+len
+ * - destination file size is not OK (inode_newsize_ok())
+ * - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+ loff_t src_off, loff_t dst_off, size_t len)
+{
+ loff_t size, endoff;
+
+ size = i_size_read(src_inode);
+ /*
+ * Don't copy beyond source file EOF. Instead of simply setting length
+ * to (size - src_off), just drop to VFS default implementation, as the
+ * local i_size may be stale due to other clients writing to the source
+ * inode.
+ */
+ if (src_off + len > size) {
+ dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+ src_off, len, size);
+ return -EOPNOTSUPP;
+ }
+ size = i_size_read(dst_inode);
+
+ endoff = dst_off + len;
+ if (inode_newsize_ok(dst_inode, endoff))
+ return -EOPNOTSUPP;
+
+ if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+ return -EDQUOT;
+
+ return 0;
+}
+
+static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
+ struct file *dst_file, loff_t dst_off,
+ size_t len, unsigned int flags)
+{
+ struct inode *src_inode = file_inode(src_file);
+ struct inode *dst_inode = file_inode(dst_file);
+ struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+ struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+ struct ceph_cap_flush *prealloc_cf;
+ struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
+ struct ceph_object_locator src_oloc, dst_oloc;
+ struct ceph_object_id src_oid, dst_oid;
+ loff_t endoff = 0, size;
+ ssize_t ret = -EIO;
+ u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+ u32 src_objlen, dst_objlen, object_size;
+ int src_got = 0, dst_got = 0, err, dirty;
+ bool do_final_copy = false;
+
+ if (src_inode->i_sb != dst_inode->i_sb) {
+ struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
+
+ if (ceph_fsid_compare(&src_fsc->client->fsid,
+ &dst_fsc->client->fsid)) {
+ dout("Copying files across clusters: src: %pU dst: %pU\n",
+ &src_fsc->client->fsid, &dst_fsc->client->fsid);
+ return -EXDEV;
+ }
+ }
+ if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+ return -EROFS;
+
+ /*
+ * Some of the checks below will return -EOPNOTSUPP, which will force a
+ * fallback to the default VFS copy_file_range implementation. This is
+ * desirable in several cases (for ex, the 'len' is smaller than the
+ * size of the objects, or in cases where that would be more
+ * efficient).
+ */
+
+ if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
+ return -EOPNOTSUPP;
+
+ /*
+ * Striped file layouts require that we copy partial objects, but the
+ * OSD copy-from operation only supports full-object copies. Limit
+ * this to non-striped file layouts for now.
+ */
+ if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+ (src_ci->i_layout.stripe_count != 1) ||
+ (dst_ci->i_layout.stripe_count != 1) ||
+ (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) {
+ dout("Invalid src/dst files layout\n");
+ return -EOPNOTSUPP;
+ }
+
+ if (len < src_ci->i_layout.object_size)
+ return -EOPNOTSUPP; /* no remote copy will be done */
+
+ prealloc_cf = ceph_alloc_cap_flush();
+ if (!prealloc_cf)
+ return -ENOMEM;
+
+ /* Start by sync'ing the source and destination files */
+ ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+ if (ret < 0) {
+ dout("failed to write src file (%zd)\n", ret);
+ goto out;
+ }
+ ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len));
+ if (ret < 0) {
+ dout("failed to write dst file (%zd)\n", ret);
+ goto out;
+ }
+
+ /*
+ * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+ * clients may have dirty data in their caches. And OSDs know nothing
+ * about caps, so they can't safely do the remote object copies.
+ */
+ err = get_rd_wr_caps(src_file, &src_got,
+ dst_file, (dst_off + len), &dst_got);
+ if (err < 0) {
+ dout("get_rd_wr_caps returned %d\n", err);
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+
+ ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+ if (ret < 0)
+ goto out_caps;
+
+ size = i_size_read(dst_inode);
+ endoff = dst_off + len;
+
+ /* Drop dst file cached pages */
+ ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+ dst_off >> PAGE_SHIFT,
+ endoff >> PAGE_SHIFT);
+ if (ret < 0) {
+ dout("Failed to invalidate inode pages (%zd)\n", ret);
+ ret = 0; /* XXX */
+ }
+ src_oloc.pool = src_ci->i_layout.pool_id;
+ src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+ dst_oloc.pool = dst_ci->i_layout.pool_id;
+ dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+ ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+ src_ci->i_layout.object_size,
+ &src_objnum, &src_objoff, &src_objlen);
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+ dst_ci->i_layout.object_size,
+ &dst_objnum, &dst_objoff, &dst_objlen);
+ /* object-level offsets need to the same */
+ if (src_objoff != dst_objoff) {
+ ret = -EOPNOTSUPP;
+ goto out_caps;
+ }
+
+ /*
+ * Do a manual copy if the object offset isn't object aligned.
+ * 'src_objlen' contains the bytes left until the end of the object,
+ * starting at the src_off
+ */
+ if (src_objoff) {
+ /*
+ * we need to temporarily drop all caps as we'll be calling
+ * {read,write}_iter, which will get caps again.
+ */
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+ ret = do_splice_direct(src_file, &src_off, dst_file,
+ &dst_off, src_objlen, flags);
+ if (ret < 0) {
+ dout("do_splice_direct returned %d\n", err);
+ goto out;
+ }
+ len -= ret;
+ err = get_rd_wr_caps(src_file, &src_got,
+ dst_file, (dst_off + len), &dst_got);
+ if (err < 0)
+ goto out;
+ err = is_file_size_ok(src_inode, dst_inode,
+ src_off, dst_off, len);
+ if (err < 0)
+ goto out_caps;
+ }
+ object_size = src_ci->i_layout.object_size;
+ while (len >= object_size) {
+ ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+ object_size, &src_objnum,
+ &src_objoff, &src_objlen);
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+ object_size, &dst_objnum,
+ &dst_objoff, &dst_objlen);
+ ceph_oid_init(&src_oid);
+ ceph_oid_printf(&src_oid, "%llx.%08llx",
+ src_ci->i_vino.ino, src_objnum);
+ ceph_oid_init(&dst_oid);
+ ceph_oid_printf(&dst_oid, "%llx.%08llx",
+ dst_ci->i_vino.ino, dst_objnum);
+ /* Do an object remote copy */
+ err = ceph_osdc_copy_from(
+ &src_fsc->client->osdc,
+ src_ci->i_vino.snap, 0,
+ &src_oid, &src_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+ &dst_oid, &dst_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+ if (err) {
+ dout("ceph_osdc_copy_from returned %d\n", err);
+ if (!ret)
+ ret = err;
+ goto out_caps;
+ }
+ len -= object_size;
+ src_off += object_size;
+ dst_off += object_size;
+ ret += object_size;
+ }
+
+ if (len)
+ /* We still need one final local copy */
+ do_final_copy = true;
+
+ file_update_time(dst_file);
+ inode_inc_iversion_raw(dst_inode);
+
+ if (endoff > size) {
+ int caps_flags = 0;
+
+ /* Let the MDS know about dst file size change */
+ if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+ caps_flags |= CHECK_CAPS_NODELAY;
+ if (ceph_inode_set_size(dst_inode, endoff))
+ caps_flags |= CHECK_CAPS_AUTHONLY;
+ if (caps_flags)
+ ceph_check_caps(dst_ci, caps_flags, NULL);
+ }
+ /* Mark Fw dirty */
+ spin_lock(&dst_ci->i_ceph_lock);
+ dst_ci->i_inline_version = CEPH_INLINE_NONE;
+ dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+ spin_unlock(&dst_ci->i_ceph_lock);
+ if (dirty)
+ __mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+ if (do_final_copy) {
+ err = do_splice_direct(src_file, &src_off, dst_file,
+ &dst_off, len, flags);
+ if (err < 0) {
+ dout("do_splice_direct returned %d\n", err);
+ goto out;
+ }
+ len -= err;
+ ret += err;
+ }
+
+out:
+ ceph_free_cap_flush(prealloc_cf);
+
+ return ret;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+ struct file *dst_file, loff_t dst_off,
+ size_t len, unsigned int flags)
+{
+ ssize_t ret;
+
+ ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off,
+ len, flags);
+
+ if (ret == -EOPNOTSUPP || ret == -EXDEV)
+ ret = generic_copy_file_range(src_file, src_off, dst_file,
+ dst_off, len, flags);
+ return ret;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -1844,5 +2190,5 @@
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
.fallocate = ceph_fallocate,
+ .copy_file_range = ceph_copy_file_range,
};
-