Update Linux to v5.4.2

Change-Id: Idf6911045d9d382da2cfe01b1edff026404ac8fd
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c
index c3b610b..641d07f 100644
--- a/fs/xfs/xfs_log.c
+++ b/fs/xfs/xfs_log.c
@@ -16,13 +16,10 @@
 #include "xfs_trans_priv.h"
 #include "xfs_log.h"
 #include "xfs_log_priv.h"
-#include "xfs_log_recover.h"
-#include "xfs_inode.h"
 #include "xfs_trace.h"
-#include "xfs_fsops.h"
-#include "xfs_cksum.h"
 #include "xfs_sysfs.h"
 #include "xfs_sb.h"
+#include "xfs_health.h"
 
 kmem_zone_t	*xfs_log_ticket_zone;
 
@@ -44,21 +41,14 @@
 xlog_space_left(
 	struct xlog		*log,
 	atomic64_t		*head);
-STATIC int
-xlog_sync(
-	struct xlog		*log,
-	struct xlog_in_core	*iclog);
 STATIC void
 xlog_dealloc_log(
 	struct xlog		*log);
 
 /* local state machine functions */
-STATIC void xlog_state_done_syncing(xlog_in_core_t *iclog, int);
-STATIC void
-xlog_state_do_callback(
-	struct xlog		*log,
-	int			aborted,
-	struct xlog_in_core	*iclog);
+STATIC void xlog_state_done_syncing(
+	struct xlog_in_core	*iclog,
+	bool			aborted);
 STATIC int
 xlog_state_get_iclog_space(
 	struct xlog		*log,
@@ -106,8 +96,7 @@
 xlog_verify_iclog(
 	struct xlog		*log,
 	struct xlog_in_core	*iclog,
-	int			count,
-	bool                    syncing);
+	int			count);
 STATIC void
 xlog_verify_tail_lsn(
 	struct xlog		*log,
@@ -116,7 +105,7 @@
 #else
 #define xlog_verify_dest_ptr(a,b)
 #define xlog_verify_grant_tail(a)
-#define xlog_verify_iclog(a,b,c,d)
+#define xlog_verify_iclog(a,b,c)
 #define xlog_verify_tail_lsn(a,b,c)
 #endif
 
@@ -225,15 +214,42 @@
 {
 	struct xlog_ticket	*tic;
 	int			need_bytes;
+	bool			woken_task = false;
 
 	list_for_each_entry(tic, &head->waiters, t_queue) {
+
+		/*
+		 * There is a chance that the size of the CIL checkpoints in
+		 * progress at the last AIL push target calculation resulted in
+		 * limiting the target to the log head (l_last_sync_lsn) at the
+		 * time. This may not reflect where the log head is now as the
+		 * CIL checkpoints may have completed.
+		 *
+		 * Hence when we are woken here, it may be that the head of the
+		 * log that has moved rather than the tail. As the tail didn't
+		 * move, there still won't be space available for the
+		 * reservation we require.  However, if the AIL has already
+		 * pushed to the target defined by the old log head location, we
+		 * will hang here waiting for something else to update the AIL
+		 * push target.
+		 *
+		 * Therefore, if there isn't space to wake the first waiter on
+		 * the grant head, we need to push the AIL again to ensure the
+		 * target reflects both the current log tail and log head
+		 * position before we wait for the tail to move again.
+		 */
+
 		need_bytes = xlog_ticket_reservation(log, head, tic);
-		if (*free_bytes < need_bytes)
+		if (*free_bytes < need_bytes) {
+			if (!woken_task)
+				xlog_grant_push_ail(log, need_bytes);
 			return false;
+		}
 
 		*free_bytes -= need_bytes;
 		trace_xfs_log_grant_wake_up(log, tic);
 		wake_up_process(tic->t_task);
+		woken_task = true;
 	}
 
 	return true;
@@ -439,11 +455,7 @@
 	XFS_STATS_INC(mp, xs_try_logspace);
 
 	ASSERT(*ticp == NULL);
-	tic = xlog_ticket_alloc(log, unit_bytes, cnt, client, permanent,
-				KM_SLEEP | KM_MAYFAIL);
-	if (!tic)
-		return -ENOMEM;
-
+	tic = xlog_ticket_alloc(log, unit_bytes, cnt, client, permanent, 0);
 	*ticp = tic;
 
 	xlog_grant_push_ail(log, tic->t_cnt ? tic->t_unit_res * tic->t_cnt
@@ -540,32 +552,6 @@
 	return lsn;
 }
 
-/*
- * Attaches a new iclog I/O completion callback routine during
- * transaction commit.  If the log is in error state, a non-zero
- * return code is handed back and the caller is responsible for
- * executing the callback at an appropriate time.
- */
-int
-xfs_log_notify(
-	struct xlog_in_core	*iclog,
-	xfs_log_callback_t	*cb)
-{
-	int	abortflg;
-
-	spin_lock(&iclog->ic_callback_lock);
-	abortflg = (iclog->ic_state & XLOG_STATE_IOERROR);
-	if (!abortflg) {
-		ASSERT_ALWAYS((iclog->ic_state == XLOG_STATE_ACTIVE) ||
-			      (iclog->ic_state == XLOG_STATE_WANT_SYNC));
-		cb->cb_next = NULL;
-		*(iclog->ic_callback_tail) = cb;
-		iclog->ic_callback_tail = &(cb->cb_next);
-	}
-	spin_unlock(&iclog->ic_callback_lock);
-	return abortflg;
-}
-
 int
 xfs_log_release_iclog(
 	struct xfs_mount	*mp,
@@ -806,16 +792,12 @@
  * The mount has failed. Cancel the recovery if it hasn't completed and destroy
  * the log.
  */
-int
+void
 xfs_log_mount_cancel(
 	struct xfs_mount	*mp)
 {
-	int			error;
-
-	error = xlog_recover_cancel(mp->m_log);
+	xlog_recover_cancel(mp->m_log);
 	xfs_log_unmount(mp);
-
-	return error;
 }
 
 /*
@@ -861,7 +843,7 @@
 	 * recalculated during log recovery at next mount.  Refer to
 	 * xlog_check_unmount_rec for more details.
 	 */
-	if (XFS_TEST_ERROR((mp->m_flags & XFS_MOUNT_BAD_SUMMARY), mp,
+	if (XFS_TEST_ERROR(xfs_fs_has_sickness(mp, XFS_SICK_FS_COUNTERS), mp,
 			XFS_ERRTAG_FORCE_SUMMARY_RECALC)) {
 		xfs_alert(mp, "%s: will fix summary counters at next mount",
 				__func__);
@@ -931,7 +913,7 @@
 	 * Or, if we are doing a forced umount (typically because of IO errors).
 	 */
 	if (mp->m_flags & XFS_MOUNT_NORECOVERY ||
-	    xfs_readonly_buftarg(log->l_mp->m_logdev_targp)) {
+	    xfs_readonly_buftarg(log->l_targ)) {
 		ASSERT(mp->m_flags & XFS_MOUNT_RDONLY);
 		return 0;
 	}
@@ -1243,53 +1225,49 @@
 }
 
 
-/*
- * Log function which is called when an io completes.
- *
- * The log manager needs its own routine, in order to control what
- * happens with the buffer after the write completes.
- */
 static void
-xlog_iodone(xfs_buf_t *bp)
+xlog_ioend_work(
+	struct work_struct	*work)
 {
-	struct xlog_in_core	*iclog = bp->b_log_item;
-	struct xlog		*l = iclog->ic_log;
-	int			aborted = 0;
+	struct xlog_in_core     *iclog =
+		container_of(work, struct xlog_in_core, ic_end_io_work);
+	struct xlog		*log = iclog->ic_log;
+	bool			aborted = false;
+	int			error;
+
+	error = blk_status_to_errno(iclog->ic_bio.bi_status);
+#ifdef DEBUG
+	/* treat writes with injected CRC errors as failed */
+	if (iclog->ic_fail_crc)
+		error = -EIO;
+#endif
 
 	/*
-	 * Race to shutdown the filesystem if we see an error or the iclog is in
-	 * IOABORT state. The IOABORT state is only set in DEBUG mode to inject
-	 * CRC errors into log recovery.
+	 * Race to shutdown the filesystem if we see an error.
 	 */
-	if (XFS_TEST_ERROR(bp->b_error, l->l_mp, XFS_ERRTAG_IODONE_IOERR) ||
-	    iclog->ic_state & XLOG_STATE_IOABORT) {
-		if (iclog->ic_state & XLOG_STATE_IOABORT)
-			iclog->ic_state &= ~XLOG_STATE_IOABORT;
-
-		xfs_buf_ioerror_alert(bp, __func__);
-		xfs_buf_stale(bp);
-		xfs_force_shutdown(l->l_mp, SHUTDOWN_LOG_IO_ERROR);
+	if (XFS_TEST_ERROR(error, log->l_mp, XFS_ERRTAG_IODONE_IOERR)) {
+		xfs_alert(log->l_mp, "log I/O error %d", error);
+		xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
 		/*
 		 * This flag will be propagated to the trans-committed
 		 * callback routines to let them know that the log-commit
 		 * didn't succeed.
 		 */
-		aborted = XFS_LI_ABORTED;
+		aborted = true;
 	} else if (iclog->ic_state & XLOG_STATE_IOERROR) {
-		aborted = XFS_LI_ABORTED;
+		aborted = true;
 	}
 
-	/* log I/O is always issued ASYNC */
-	ASSERT(bp->b_flags & XBF_ASYNC);
 	xlog_state_done_syncing(iclog, aborted);
+	bio_uninit(&iclog->ic_bio);
 
 	/*
-	 * drop the buffer lock now that we are done. Nothing references
-	 * the buffer after this, so an unmount waiting on this lock can now
-	 * tear it down safely. As such, it is unsafe to reference the buffer
-	 * (bp) after the unlock as we could race with it being freed.
+	 * Drop the lock to signal that we are done. Nothing references the
+	 * iclog after this, so an unmount waiting on this lock can now tear it
+	 * down safely. As such, it is unsafe to reference the iclog after the
+	 * unlock as we could race with it being freed.
 	 */
-	xfs_buf_unlock(bp);
+	up(&iclog->ic_sema);
 }
 
 /*
@@ -1300,65 +1278,26 @@
  * If the filesystem blocksize is too large, we may need to choose a
  * larger size since the directory code currently logs entire blocks.
  */
-
 STATIC void
 xlog_get_iclog_buffer_size(
 	struct xfs_mount	*mp,
 	struct xlog		*log)
 {
-	int size;
-	int xhdrs;
-
 	if (mp->m_logbufs <= 0)
-		log->l_iclog_bufs = XLOG_MAX_ICLOGS;
-	else
-		log->l_iclog_bufs = mp->m_logbufs;
+		mp->m_logbufs = XLOG_MAX_ICLOGS;
+	if (mp->m_logbsize <= 0)
+		mp->m_logbsize = XLOG_BIG_RECORD_BSIZE;
+
+	log->l_iclog_bufs = mp->m_logbufs;
+	log->l_iclog_size = mp->m_logbsize;
 
 	/*
-	 * Buffer size passed in from mount system call.
+	 * # headers = size / 32k - one header holds cycles from 32k of data.
 	 */
-	if (mp->m_logbsize > 0) {
-		size = log->l_iclog_size = mp->m_logbsize;
-		log->l_iclog_size_log = 0;
-		while (size != 1) {
-			log->l_iclog_size_log++;
-			size >>= 1;
-		}
-
-		if (xfs_sb_version_haslogv2(&mp->m_sb)) {
-			/* # headers = size / 32k
-			 * one header holds cycles from 32k of data
-			 */
-
-			xhdrs = mp->m_logbsize / XLOG_HEADER_CYCLE_SIZE;
-			if (mp->m_logbsize % XLOG_HEADER_CYCLE_SIZE)
-				xhdrs++;
-			log->l_iclog_hsize = xhdrs << BBSHIFT;
-			log->l_iclog_heads = xhdrs;
-		} else {
-			ASSERT(mp->m_logbsize <= XLOG_BIG_RECORD_BSIZE);
-			log->l_iclog_hsize = BBSIZE;
-			log->l_iclog_heads = 1;
-		}
-		goto done;
-	}
-
-	/* All machines use 32kB buffers by default. */
-	log->l_iclog_size = XLOG_BIG_RECORD_BSIZE;
-	log->l_iclog_size_log = XLOG_BIG_RECORD_BSHIFT;
-
-	/* the default log size is 16k or 32k which is one header sector */
-	log->l_iclog_hsize = BBSIZE;
-	log->l_iclog_heads = 1;
-
-done:
-	/* are we being asked to make the sizes selected above visible? */
-	if (mp->m_logbufs == 0)
-		mp->m_logbufs = log->l_iclog_bufs;
-	if (mp->m_logbsize == 0)
-		mp->m_logbsize = log->l_iclog_size;
-}	/* xlog_get_iclog_buffer_size */
-
+	log->l_iclog_heads =
+		DIV_ROUND_UP(mp->m_logbsize, XLOG_HEADER_CYCLE_SIZE);
+	log->l_iclog_hsize = log->l_iclog_heads << BBSHIFT;
+}
 
 void
 xfs_log_work_queue(
@@ -1421,7 +1360,6 @@
 	xlog_rec_header_t	*head;
 	xlog_in_core_t		**iclogp;
 	xlog_in_core_t		*iclog, *prev_iclog=NULL;
-	xfs_buf_t		*bp;
 	int			i;
 	int			error = -ENOMEM;
 	uint			log2_size = 0;
@@ -1479,30 +1417,6 @@
 
 	xlog_get_iclog_buffer_size(mp, log);
 
-	/*
-	 * Use a NULL block for the extra log buffer used during splits so that
-	 * it will trigger errors if we ever try to do IO on it without first
-	 * having set it up properly.
-	 */
-	error = -ENOMEM;
-	bp = xfs_buf_alloc(mp->m_logdev_targp, XFS_BUF_DADDR_NULL,
-			   BTOBB(log->l_iclog_size), XBF_NO_IOACCT);
-	if (!bp)
-		goto out_free_log;
-
-	/*
-	 * The iclogbuf buffer locks are held over IO but we are not going to do
-	 * IO yet.  Hence unlock the buffer so that the log IO path can grab it
-	 * when appropriately.
-	 */
-	ASSERT(xfs_buf_islocked(bp));
-	xfs_buf_unlock(bp);
-
-	/* use high priority wq for log I/O completion */
-	bp->b_ioend_wq = mp->m_log_workqueue;
-	bp->b_iodone = xlog_iodone;
-	log->l_xbuf = bp;
-
 	spin_lock_init(&log->l_icloglock);
 	init_waitqueue_head(&log->l_flush_wait);
 
@@ -1515,29 +1429,23 @@
 	 * xlog_in_core_t in xfs_log_priv.h for details.
 	 */
 	ASSERT(log->l_iclog_size >= 4096);
-	for (i=0; i < log->l_iclog_bufs; i++) {
-		*iclogp = kmem_zalloc(sizeof(xlog_in_core_t), KM_MAYFAIL);
-		if (!*iclogp)
+	for (i = 0; i < log->l_iclog_bufs; i++) {
+		int align_mask = xfs_buftarg_dma_alignment(mp->m_logdev_targp);
+		size_t bvec_size = howmany(log->l_iclog_size, PAGE_SIZE) *
+				sizeof(struct bio_vec);
+
+		iclog = kmem_zalloc(sizeof(*iclog) + bvec_size, KM_MAYFAIL);
+		if (!iclog)
 			goto out_free_iclog;
 
-		iclog = *iclogp;
+		*iclogp = iclog;
 		iclog->ic_prev = prev_iclog;
 		prev_iclog = iclog;
 
-		bp = xfs_buf_get_uncached(mp->m_logdev_targp,
-					  BTOBB(log->l_iclog_size),
-					  XBF_NO_IOACCT);
-		if (!bp)
+		iclog->ic_data = kmem_alloc_io(log->l_iclog_size, align_mask,
+						KM_MAYFAIL | KM_ZERO);
+		if (!iclog->ic_data)
 			goto out_free_iclog;
-
-		ASSERT(xfs_buf_islocked(bp));
-		xfs_buf_unlock(bp);
-
-		/* use high priority wq for log I/O completion */
-		bp->b_ioend_wq = mp->m_log_workqueue;
-		bp->b_iodone = xlog_iodone;
-		iclog->ic_bp = bp;
-		iclog->ic_data = bp->b_addr;
 #ifdef DEBUG
 		log->l_iclog_bak[i] = &iclog->ic_header;
 #endif
@@ -1551,36 +1459,43 @@
 		head->h_fmt = cpu_to_be32(XLOG_FMT);
 		memcpy(&head->h_fs_uuid, &mp->m_sb.sb_uuid, sizeof(uuid_t));
 
-		iclog->ic_size = BBTOB(bp->b_length) - log->l_iclog_hsize;
+		iclog->ic_size = log->l_iclog_size - log->l_iclog_hsize;
 		iclog->ic_state = XLOG_STATE_ACTIVE;
 		iclog->ic_log = log;
 		atomic_set(&iclog->ic_refcnt, 0);
 		spin_lock_init(&iclog->ic_callback_lock);
-		iclog->ic_callback_tail = &(iclog->ic_callback);
+		INIT_LIST_HEAD(&iclog->ic_callbacks);
 		iclog->ic_datap = (char *)iclog->ic_data + log->l_iclog_hsize;
 
 		init_waitqueue_head(&iclog->ic_force_wait);
 		init_waitqueue_head(&iclog->ic_write_wait);
+		INIT_WORK(&iclog->ic_end_io_work, xlog_ioend_work);
+		sema_init(&iclog->ic_sema, 1);
 
 		iclogp = &iclog->ic_next;
 	}
 	*iclogp = log->l_iclog;			/* complete ring */
 	log->l_iclog->ic_prev = prev_iclog;	/* re-write 1st prev ptr */
 
+	log->l_ioend_workqueue = alloc_workqueue("xfs-log/%s",
+			WQ_MEM_RECLAIM | WQ_FREEZABLE | WQ_HIGHPRI, 0,
+			mp->m_fsname);
+	if (!log->l_ioend_workqueue)
+		goto out_free_iclog;
+
 	error = xlog_cil_init(log);
 	if (error)
-		goto out_free_iclog;
+		goto out_destroy_workqueue;
 	return log;
 
+out_destroy_workqueue:
+	destroy_workqueue(log->l_ioend_workqueue);
 out_free_iclog:
 	for (iclog = log->l_iclog; iclog; iclog = prev_iclog) {
 		prev_iclog = iclog->ic_next;
-		if (iclog->ic_bp)
-			xfs_buf_free(iclog->ic_bp);
+		kmem_free(iclog->ic_data);
 		kmem_free(iclog);
 	}
-	spinlock_destroy(&log->l_icloglock);
-	xfs_buf_free(log->l_xbuf);
 out_free_log:
 	kmem_free(log);
 out:
@@ -1765,42 +1680,155 @@
 	return xfs_end_cksum(crc);
 }
 
-/*
- * The bdstrat callback function for log bufs. This gives us a central
- * place to trap bufs in case we get hit by a log I/O error and need to
- * shutdown. Actually, in practice, even when we didn't get a log error,
- * we transition the iclogs to IOERROR state *after* flushing all existing
- * iclogs to disk. This is because we don't want anymore new transactions to be
- * started or completed afterwards.
- *
- * We lock the iclogbufs here so that we can serialise against IO completion
- * during unmount. We might be processing a shutdown triggered during unmount,
- * and that can occur asynchronously to the unmount thread, and hence we need to
- * ensure that completes before tearing down the iclogbufs. Hence we need to
- * hold the buffer lock across the log IO to acheive that.
- */
-STATIC int
-xlog_bdstrat(
-	struct xfs_buf		*bp)
+static void
+xlog_bio_end_io(
+	struct bio		*bio)
 {
-	struct xlog_in_core	*iclog = bp->b_log_item;
+	struct xlog_in_core	*iclog = bio->bi_private;
 
-	xfs_buf_lock(bp);
-	if (iclog->ic_state & XLOG_STATE_IOERROR) {
-		xfs_buf_ioerror(bp, -EIO);
-		xfs_buf_stale(bp);
-		xfs_buf_ioend(bp);
+	queue_work(iclog->ic_log->l_ioend_workqueue,
+		   &iclog->ic_end_io_work);
+}
+
+static void
+xlog_map_iclog_data(
+	struct bio		*bio,
+	void			*data,
+	size_t			count)
+{
+	do {
+		struct page	*page = kmem_to_page(data);
+		unsigned int	off = offset_in_page(data);
+		size_t		len = min_t(size_t, count, PAGE_SIZE - off);
+
+		WARN_ON_ONCE(bio_add_page(bio, page, len, off) != len);
+
+		data += len;
+		count -= len;
+	} while (count);
+}
+
+STATIC void
+xlog_write_iclog(
+	struct xlog		*log,
+	struct xlog_in_core	*iclog,
+	uint64_t		bno,
+	unsigned int		count,
+	bool			need_flush)
+{
+	ASSERT(bno < log->l_logBBsize);
+
+	/*
+	 * We lock the iclogbufs here so that we can serialise against I/O
+	 * completion during unmount.  We might be processing a shutdown
+	 * triggered during unmount, and that can occur asynchronously to the
+	 * unmount thread, and hence we need to ensure that completes before
+	 * tearing down the iclogbufs.  Hence we need to hold the buffer lock
+	 * across the log IO to archieve that.
+	 */
+	down(&iclog->ic_sema);
+	if (unlikely(iclog->ic_state & XLOG_STATE_IOERROR)) {
 		/*
 		 * It would seem logical to return EIO here, but we rely on
 		 * the log state machine to propagate I/O errors instead of
-		 * doing it here. Similarly, IO completion will unlock the
-		 * buffer, so we don't do it here.
+		 * doing it here.  We kick of the state machine and unlock
+		 * the buffer manually, the code needs to be kept in sync
+		 * with the I/O completion path.
 		 */
-		return 0;
+		xlog_state_done_syncing(iclog, XFS_LI_ABORTED);
+		up(&iclog->ic_sema);
+		return;
 	}
 
-	xfs_buf_submit(bp);
-	return 0;
+	iclog->ic_io_size = count;
+
+	bio_init(&iclog->ic_bio, iclog->ic_bvec, howmany(count, PAGE_SIZE));
+	bio_set_dev(&iclog->ic_bio, log->l_targ->bt_bdev);
+	iclog->ic_bio.bi_iter.bi_sector = log->l_logBBstart + bno;
+	iclog->ic_bio.bi_end_io = xlog_bio_end_io;
+	iclog->ic_bio.bi_private = iclog;
+	iclog->ic_bio.bi_opf = REQ_OP_WRITE | REQ_META | REQ_SYNC | REQ_FUA;
+	if (need_flush)
+		iclog->ic_bio.bi_opf |= REQ_PREFLUSH;
+
+	xlog_map_iclog_data(&iclog->ic_bio, iclog->ic_data, iclog->ic_io_size);
+	if (is_vmalloc_addr(iclog->ic_data))
+		flush_kernel_vmap_range(iclog->ic_data, iclog->ic_io_size);
+
+	/*
+	 * If this log buffer would straddle the end of the log we will have
+	 * to split it up into two bios, so that we can continue at the start.
+	 */
+	if (bno + BTOBB(count) > log->l_logBBsize) {
+		struct bio *split;
+
+		split = bio_split(&iclog->ic_bio, log->l_logBBsize - bno,
+				  GFP_NOIO, &fs_bio_set);
+		bio_chain(split, &iclog->ic_bio);
+		submit_bio(split);
+
+		/* restart at logical offset zero for the remainder */
+		iclog->ic_bio.bi_iter.bi_sector = log->l_logBBstart;
+	}
+
+	submit_bio(&iclog->ic_bio);
+}
+
+/*
+ * We need to bump cycle number for the part of the iclog that is
+ * written to the start of the log. Watch out for the header magic
+ * number case, though.
+ */
+static void
+xlog_split_iclog(
+	struct xlog		*log,
+	void			*data,
+	uint64_t		bno,
+	unsigned int		count)
+{
+	unsigned int		split_offset = BBTOB(log->l_logBBsize - bno);
+	unsigned int		i;
+
+	for (i = split_offset; i < count; i += BBSIZE) {
+		uint32_t cycle = get_unaligned_be32(data + i);
+
+		if (++cycle == XLOG_HEADER_MAGIC_NUM)
+			cycle++;
+		put_unaligned_be32(cycle, data + i);
+	}
+}
+
+static int
+xlog_calc_iclog_size(
+	struct xlog		*log,
+	struct xlog_in_core	*iclog,
+	uint32_t		*roundoff)
+{
+	uint32_t		count_init, count;
+	bool			use_lsunit;
+
+	use_lsunit = xfs_sb_version_haslogv2(&log->l_mp->m_sb) &&
+			log->l_mp->m_sb.sb_logsunit > 1;
+
+	/* Add for LR header */
+	count_init = log->l_iclog_hsize + iclog->ic_offset;
+
+	/* Round out the log write size */
+	if (use_lsunit) {
+		/* we have a v2 stripe unit to use */
+		count = XLOG_LSUNITTOB(log, XLOG_BTOLSUNIT(log, count_init));
+	} else {
+		count = BBTOB(BTOBB(count_init));
+	}
+
+	ASSERT(count >= count_init);
+	*roundoff = count - count_init;
+
+	if (use_lsunit)
+		ASSERT(*roundoff < log->l_mp->m_sb.sb_logsunit);
+	else
+		ASSERT(*roundoff < BBTOB(1));
+	return count;
 }
 
 /*
@@ -1823,46 +1851,23 @@
  * log will require grabbing the lock though.
  *
  * The entire log manager uses a logical block numbering scheme.  Only
- * log_sync (and then only bwrite()) know about the fact that the log may
- * not start with block zero on a given device.  The log block start offset
- * is added immediately before calling bwrite().
+ * xlog_write_iclog knows about the fact that the log may not start with
+ * block zero on a given device.
  */
-
-STATIC int
+STATIC void
 xlog_sync(
 	struct xlog		*log,
 	struct xlog_in_core	*iclog)
 {
-	xfs_buf_t	*bp;
-	int		i;
-	uint		count;		/* byte count of bwrite */
-	uint		count_init;	/* initial count before roundup */
-	int		roundoff;       /* roundoff to BB or stripe */
-	int		split = 0;	/* split write into two regions */
-	int		error;
-	int		v2 = xfs_sb_version_haslogv2(&log->l_mp->m_sb);
-	int		size;
+	unsigned int		count;		/* byte count of bwrite */
+	unsigned int		roundoff;       /* roundoff to BB or stripe */
+	uint64_t		bno;
+	unsigned int		size;
+	bool			need_flush = true, split = false;
 
-	XFS_STATS_INC(log->l_mp, xs_log_writes);
 	ASSERT(atomic_read(&iclog->ic_refcnt) == 0);
 
-	/* Add for LR header */
-	count_init = log->l_iclog_hsize + iclog->ic_offset;
-
-	/* Round out the log write size */
-	if (v2 && log->l_mp->m_sb.sb_logsunit > 1) {
-		/* we have a v2 stripe unit to use */
-		count = XLOG_LSUNITTOB(log, XLOG_BTOLSUNIT(log, count_init));
-	} else {
-		count = BBTOB(BTOBB(count_init));
-	}
-	roundoff = count - count_init;
-	ASSERT(roundoff >= 0);
-	ASSERT((v2 && log->l_mp->m_sb.sb_logsunit > 1 && 
-                roundoff < log->l_mp->m_sb.sb_logsunit)
-		|| 
-		(log->l_mp->m_sb.sb_logsunit <= 1 && 
-		 roundoff < BBTOB(1)));
+	count = xlog_calc_iclog_size(log, iclog, &roundoff);
 
 	/* move grant heads by roundoff in sync */
 	xlog_grant_add_space(log, &log->l_reserve_head.grant, roundoff);
@@ -1873,41 +1878,19 @@
 
 	/* real byte length */
 	size = iclog->ic_offset;
-	if (v2)
+	if (xfs_sb_version_haslogv2(&log->l_mp->m_sb))
 		size += roundoff;
 	iclog->ic_header.h_len = cpu_to_be32(size);
 
-	bp = iclog->ic_bp;
-	XFS_BUF_SET_ADDR(bp, BLOCK_LSN(be64_to_cpu(iclog->ic_header.h_lsn)));
-
+	XFS_STATS_INC(log->l_mp, xs_log_writes);
 	XFS_STATS_ADD(log->l_mp, xs_log_blocks, BTOBB(count));
 
+	bno = BLOCK_LSN(be64_to_cpu(iclog->ic_header.h_lsn));
+
 	/* Do we need to split this write into 2 parts? */
-	if (XFS_BUF_ADDR(bp) + BTOBB(count) > log->l_logBBsize) {
-		char		*dptr;
-
-		split = count - (BBTOB(log->l_logBBsize - XFS_BUF_ADDR(bp)));
-		count = BBTOB(log->l_logBBsize - XFS_BUF_ADDR(bp));
-		iclog->ic_bwritecnt = 2;
-
-		/*
-		 * Bump the cycle numbers at the start of each block in the
-		 * part of the iclog that ends up in the buffer that gets
-		 * written to the start of the log.
-		 *
-		 * Watch out for the header magic number case, though.
-		 */
-		dptr = (char *)&iclog->ic_header + count;
-		for (i = 0; i < split; i += BBSIZE) {
-			uint32_t cycle = be32_to_cpu(*(__be32 *)dptr);
-			if (++cycle == XLOG_HEADER_MAGIC_NUM)
-				cycle++;
-			*(__be32 *)dptr = cpu_to_be32(cycle);
-
-			dptr += BBSIZE;
-		}
-	} else {
-		iclog->ic_bwritecnt = 1;
+	if (bno + BTOBB(count) > log->l_logBBsize) {
+		xlog_split_iclog(log, &iclog->ic_header, bno, count);
+		split = true;
 	}
 
 	/* calculcate the checksum */
@@ -1920,18 +1903,15 @@
 	 * write on I/O completion and shutdown the fs. The subsequent mount
 	 * detects the bad CRC and attempts to recover.
 	 */
+#ifdef DEBUG
 	if (XFS_TEST_ERROR(false, log->l_mp, XFS_ERRTAG_LOG_BAD_CRC)) {
 		iclog->ic_header.h_crc &= cpu_to_le32(0xAAAAAAAA);
-		iclog->ic_state |= XLOG_STATE_IOABORT;
+		iclog->ic_fail_crc = true;
 		xfs_warn(log->l_mp,
 	"Intentionally corrupted log record at LSN 0x%llx. Shutdown imminent.",
 			 be64_to_cpu(iclog->ic_header.h_lsn));
 	}
-
-	bp->b_io_length = BTOBB(count);
-	bp->b_log_item = iclog;
-	bp->b_flags &= ~XBF_FLUSH;
-	bp->b_flags |= (XBF_ASYNC | XBF_SYNCIO | XBF_WRITE | XBF_FUA);
+#endif
 
 	/*
 	 * Flush the data device before flushing the log to make sure all meta
@@ -1941,50 +1921,14 @@
 	 * synchronously here; for an internal log we can simply use the block
 	 * layer state machine for preflushes.
 	 */
-	if (log->l_mp->m_logdev_targp != log->l_mp->m_ddev_targp)
+	if (log->l_targ != log->l_mp->m_ddev_targp || split) {
 		xfs_blkdev_issue_flush(log->l_mp->m_ddev_targp);
-	else
-		bp->b_flags |= XBF_FLUSH;
-
-	ASSERT(XFS_BUF_ADDR(bp) <= log->l_logBBsize-1);
-	ASSERT(XFS_BUF_ADDR(bp) + BTOBB(count) <= log->l_logBBsize);
-
-	xlog_verify_iclog(log, iclog, count, true);
-
-	/* account for log which doesn't start at block #0 */
-	XFS_BUF_SET_ADDR(bp, XFS_BUF_ADDR(bp) + log->l_logBBstart);
-
-	/*
-	 * Don't call xfs_bwrite here. We do log-syncs even when the filesystem
-	 * is shutting down.
-	 */
-	error = xlog_bdstrat(bp);
-	if (error) {
-		xfs_buf_ioerror_alert(bp, "xlog_sync");
-		return error;
+		need_flush = false;
 	}
-	if (split) {
-		bp = iclog->ic_log->l_xbuf;
-		XFS_BUF_SET_ADDR(bp, 0);	     /* logical 0 */
-		xfs_buf_associate_memory(bp,
-				(char *)&iclog->ic_header + count, split);
-		bp->b_log_item = iclog;
-		bp->b_flags &= ~XBF_FLUSH;
-		bp->b_flags |= (XBF_ASYNC | XBF_SYNCIO | XBF_WRITE | XBF_FUA);
 
-		ASSERT(XFS_BUF_ADDR(bp) <= log->l_logBBsize-1);
-		ASSERT(XFS_BUF_ADDR(bp) + BTOBB(count) <= log->l_logBBsize);
-
-		/* account for internal log which doesn't start at block #0 */
-		XFS_BUF_SET_ADDR(bp, XFS_BUF_ADDR(bp) + log->l_logBBstart);
-		error = xlog_bdstrat(bp);
-		if (error) {
-			xfs_buf_ioerror_alert(bp, "xlog_sync (split)");
-			return error;
-		}
-	}
-	return 0;
-}	/* xlog_sync */
+	xlog_verify_iclog(log, iclog, count);
+	xlog_write_iclog(log, iclog, bno, count, need_flush);
+}
 
 /*
  * Deallocate a log structure
@@ -2004,31 +1948,21 @@
 	 */
 	iclog = log->l_iclog;
 	for (i = 0; i < log->l_iclog_bufs; i++) {
-		xfs_buf_lock(iclog->ic_bp);
-		xfs_buf_unlock(iclog->ic_bp);
+		down(&iclog->ic_sema);
+		up(&iclog->ic_sema);
 		iclog = iclog->ic_next;
 	}
 
-	/*
-	 * Always need to ensure that the extra buffer does not point to memory
-	 * owned by another log buffer before we free it. Also, cycle the lock
-	 * first to ensure we've completed IO on it.
-	 */
-	xfs_buf_lock(log->l_xbuf);
-	xfs_buf_unlock(log->l_xbuf);
-	xfs_buf_set_empty(log->l_xbuf, BTOBB(log->l_iclog_size));
-	xfs_buf_free(log->l_xbuf);
-
 	iclog = log->l_iclog;
 	for (i = 0; i < log->l_iclog_bufs; i++) {
-		xfs_buf_free(iclog->ic_bp);
 		next_iclog = iclog->ic_next;
+		kmem_free(iclog->ic_data);
 		kmem_free(iclog);
 		iclog = next_iclog;
 	}
-	spinlock_destroy(&log->l_icloglock);
 
 	log->l_mp->m_log = NULL;
+	destroy_workqueue(log->l_ioend_workqueue);
 	kmem_free(log);
 }	/* xlog_dealloc_log */
 
@@ -2068,7 +2002,7 @@
 
 	/* match with XLOG_REG_TYPE_* in xfs_log.h */
 #define REG_TYPE_STR(type, str)	[XLOG_REG_TYPE_##type] = str
-	static char *res_type_str[XLOG_REG_TYPE_MAX + 1] = {
+	static char *res_type_str[] = {
 	    REG_TYPE_STR(BFORMAT, "bformat"),
 	    REG_TYPE_STR(BCHUNK, "bchunk"),
 	    REG_TYPE_STR(EFI_FORMAT, "efi_format"),
@@ -2088,8 +2022,15 @@
 	    REG_TYPE_STR(UNMOUNT, "unmount"),
 	    REG_TYPE_STR(COMMIT, "commit"),
 	    REG_TYPE_STR(TRANSHDR, "trans header"),
-	    REG_TYPE_STR(ICREATE, "inode create")
+	    REG_TYPE_STR(ICREATE, "inode create"),
+	    REG_TYPE_STR(RUI_FORMAT, "rui_format"),
+	    REG_TYPE_STR(RUD_FORMAT, "rud_format"),
+	    REG_TYPE_STR(CUI_FORMAT, "cui_format"),
+	    REG_TYPE_STR(CUD_FORMAT, "cud_format"),
+	    REG_TYPE_STR(BUI_FORMAT, "bui_format"),
+	    REG_TYPE_STR(BUD_FORMAT, "bud_format"),
 	};
+	BUILD_BUG_ON(ARRAY_SIZE(res_type_str) != XLOG_REG_TYPE_MAX + 1);
 #undef REG_TYPE_STR
 
 	xfs_warn(mp, "ticket reservation summary:");
@@ -2582,27 +2523,41 @@
  *****************************************************************************
  */
 
-/* Clean iclogs starting from the head.  This ordering must be
- * maintained, so an iclog doesn't become ACTIVE beyond one that
- * is SYNCING.  This is also required to maintain the notion that we use
- * a ordered wait queue to hold off would be writers to the log when every
- * iclog is trying to sync to disk.
+/*
+ * An iclog has just finished IO completion processing, so we need to update
+ * the iclog state and propagate that up into the overall log state. Hence we
+ * prepare the iclog for cleaning, and then clean all the pending dirty iclogs
+ * starting from the head, and then wake up any threads that are waiting for the
+ * iclog to be marked clean.
  *
- * State Change: DIRTY -> ACTIVE
+ * The ordering of marking iclogs ACTIVE must be maintained, so an iclog
+ * doesn't become ACTIVE beyond one that is SYNCING.  This is also required to
+ * maintain the notion that we use a ordered wait queue to hold off would be
+ * writers to the log when every iclog is trying to sync to disk.
+ *
+ * Caller must hold the icloglock before calling us.
+ *
+ * State Change: !IOERROR -> DIRTY -> ACTIVE
  */
 STATIC void
-xlog_state_clean_log(
-	struct xlog *log)
+xlog_state_clean_iclog(
+	struct xlog		*log,
+	struct xlog_in_core	*dirty_iclog)
 {
-	xlog_in_core_t	*iclog;
-	int changed = 0;
+	struct xlog_in_core	*iclog;
+	int			changed = 0;
 
+	/* Prepare the completed iclog. */
+	if (!(dirty_iclog->ic_state & XLOG_STATE_IOERROR))
+		dirty_iclog->ic_state = XLOG_STATE_DIRTY;
+
+	/* Walk all the iclogs to update the ordered active state. */
 	iclog = log->l_iclog;
 	do {
 		if (iclog->ic_state == XLOG_STATE_DIRTY) {
 			iclog->ic_state	= XLOG_STATE_ACTIVE;
 			iclog->ic_offset       = 0;
-			ASSERT(iclog->ic_callback == NULL);
+			ASSERT(list_empty_careful(&iclog->ic_callbacks));
 			/*
 			 * If the number of ops in this iclog indicate it just
 			 * contains the dummy transaction, we can
@@ -2634,7 +2589,13 @@
 		iclog = iclog->ic_next;
 	} while (iclog != log->l_iclog);
 
-	/* log is locked when we are called */
+
+	/*
+	 * Wake up threads waiting in xfs_log_force() for the dirty iclog
+	 * to be cleaned.
+	 */
+	wake_up_all(&dirty_iclog->ic_force_wait);
+
 	/*
 	 * Change state for the dummy log recording.
 	 * We usually go to NEED. But we go to NEED2 if the changed indicates
@@ -2668,56 +2629,226 @@
 			ASSERT(0);
 		}
 	}
-}	/* xlog_state_clean_log */
+}
 
 STATIC xfs_lsn_t
 xlog_get_lowest_lsn(
-	struct xlog	*log)
+	struct xlog		*log)
 {
-	xlog_in_core_t  *lsn_log;
-	xfs_lsn_t	lowest_lsn, lsn;
+	struct xlog_in_core	*iclog = log->l_iclog;
+	xfs_lsn_t		lowest_lsn = 0, lsn;
 
-	lsn_log = log->l_iclog;
-	lowest_lsn = 0;
 	do {
-	    if (!(lsn_log->ic_state & (XLOG_STATE_ACTIVE|XLOG_STATE_DIRTY))) {
-		lsn = be64_to_cpu(lsn_log->ic_header.h_lsn);
-		if ((lsn && !lowest_lsn) ||
-		    (XFS_LSN_CMP(lsn, lowest_lsn) < 0)) {
+		if (iclog->ic_state & (XLOG_STATE_ACTIVE | XLOG_STATE_DIRTY))
+			continue;
+
+		lsn = be64_to_cpu(iclog->ic_header.h_lsn);
+		if ((lsn && !lowest_lsn) || XFS_LSN_CMP(lsn, lowest_lsn) < 0)
 			lowest_lsn = lsn;
-		}
-	    }
-	    lsn_log = lsn_log->ic_next;
-	} while (lsn_log != log->l_iclog);
+	} while ((iclog = iclog->ic_next) != log->l_iclog);
+
 	return lowest_lsn;
 }
 
+/*
+ * Completion of a iclog IO does not imply that a transaction has completed, as
+ * transactions can be large enough to span many iclogs. We cannot change the
+ * tail of the log half way through a transaction as this may be the only
+ * transaction in the log and moving the tail to point to the middle of it
+ * will prevent recovery from finding the start of the transaction. Hence we
+ * should only update the last_sync_lsn if this iclog contains transaction
+ * completion callbacks on it.
+ *
+ * We have to do this before we drop the icloglock to ensure we are the only one
+ * that can update it.
+ *
+ * If we are moving the last_sync_lsn forwards, we also need to ensure we kick
+ * the reservation grant head pushing. This is due to the fact that the push
+ * target is bound by the current last_sync_lsn value. Hence if we have a large
+ * amount of log space bound up in this committing transaction then the
+ * last_sync_lsn value may be the limiting factor preventing tail pushing from
+ * freeing space in the log. Hence once we've updated the last_sync_lsn we
+ * should push the AIL to ensure the push target (and hence the grant head) is
+ * no longer bound by the old log head location and can move forwards and make
+ * progress again.
+ */
+static void
+xlog_state_set_callback(
+	struct xlog		*log,
+	struct xlog_in_core	*iclog,
+	xfs_lsn_t		header_lsn)
+{
+	iclog->ic_state = XLOG_STATE_CALLBACK;
+
+	ASSERT(XFS_LSN_CMP(atomic64_read(&log->l_last_sync_lsn),
+			   header_lsn) <= 0);
+
+	if (list_empty_careful(&iclog->ic_callbacks))
+		return;
+
+	atomic64_set(&log->l_last_sync_lsn, header_lsn);
+	xlog_grant_push_ail(log, 0);
+}
+
+/*
+ * Return true if we need to stop processing, false to continue to the next
+ * iclog. The caller will need to run callbacks if the iclog is returned in the
+ * XLOG_STATE_CALLBACK state.
+ */
+static bool
+xlog_state_iodone_process_iclog(
+	struct xlog		*log,
+	struct xlog_in_core	*iclog,
+	struct xlog_in_core	*completed_iclog,
+	bool			*ioerror)
+{
+	xfs_lsn_t		lowest_lsn;
+	xfs_lsn_t		header_lsn;
+
+	/* Skip all iclogs in the ACTIVE & DIRTY states */
+	if (iclog->ic_state & (XLOG_STATE_ACTIVE | XLOG_STATE_DIRTY))
+		return false;
+
+	/*
+	 * Between marking a filesystem SHUTDOWN and stopping the log, we do
+	 * flush all iclogs to disk (if there wasn't a log I/O error). So, we do
+	 * want things to go smoothly in case of just a SHUTDOWN  w/o a
+	 * LOG_IO_ERROR.
+	 */
+	if (iclog->ic_state & XLOG_STATE_IOERROR) {
+		*ioerror = true;
+		return false;
+	}
+
+	/*
+	 * Can only perform callbacks in order.  Since this iclog is not in the
+	 * DONE_SYNC/ DO_CALLBACK state, we skip the rest and just try to clean
+	 * up.  If we set our iclog to DO_CALLBACK, we will not process it when
+	 * we retry since a previous iclog is in the CALLBACK and the state
+	 * cannot change since we are holding the l_icloglock.
+	 */
+	if (!(iclog->ic_state &
+			(XLOG_STATE_DONE_SYNC | XLOG_STATE_DO_CALLBACK))) {
+		if (completed_iclog &&
+		    (completed_iclog->ic_state == XLOG_STATE_DONE_SYNC)) {
+			completed_iclog->ic_state = XLOG_STATE_DO_CALLBACK;
+		}
+		return true;
+	}
+
+	/*
+	 * We now have an iclog that is in either the DO_CALLBACK or DONE_SYNC
+	 * states. The other states (WANT_SYNC, SYNCING, or CALLBACK were caught
+	 * by the above if and are going to clean (i.e. we aren't doing their
+	 * callbacks) see the above if.
+	 *
+	 * We will do one more check here to see if we have chased our tail
+	 * around. If this is not the lowest lsn iclog, then we will leave it
+	 * for another completion to process.
+	 */
+	header_lsn = be64_to_cpu(iclog->ic_header.h_lsn);
+	lowest_lsn = xlog_get_lowest_lsn(log);
+	if (lowest_lsn && XFS_LSN_CMP(lowest_lsn, header_lsn) < 0)
+		return false;
+
+	xlog_state_set_callback(log, iclog, header_lsn);
+	return false;
+
+}
+
+/*
+ * Keep processing entries in the iclog callback list until we come around and
+ * it is empty.  We need to atomically see that the list is empty and change the
+ * state to DIRTY so that we don't miss any more callbacks being added.
+ *
+ * This function is called with the icloglock held and returns with it held. We
+ * drop it while running callbacks, however, as holding it over thousands of
+ * callbacks is unnecessary and causes excessive contention if we do.
+ */
+static void
+xlog_state_do_iclog_callbacks(
+	struct xlog		*log,
+	struct xlog_in_core	*iclog,
+	bool			aborted)
+{
+	spin_unlock(&log->l_icloglock);
+	spin_lock(&iclog->ic_callback_lock);
+	while (!list_empty(&iclog->ic_callbacks)) {
+		LIST_HEAD(tmp);
+
+		list_splice_init(&iclog->ic_callbacks, &tmp);
+
+		spin_unlock(&iclog->ic_callback_lock);
+		xlog_cil_process_committed(&tmp, aborted);
+		spin_lock(&iclog->ic_callback_lock);
+	}
+
+	/*
+	 * Pick up the icloglock while still holding the callback lock so we
+	 * serialise against anyone trying to add more callbacks to this iclog
+	 * now we've finished processing.
+	 */
+	spin_lock(&log->l_icloglock);
+	spin_unlock(&iclog->ic_callback_lock);
+}
+
+#ifdef DEBUG
+/*
+ * Make one last gasp attempt to see if iclogs are being left in limbo.  If the
+ * above loop finds an iclog earlier than the current iclog and in one of the
+ * syncing states, the current iclog is put into DO_CALLBACK and the callbacks
+ * are deferred to the completion of the earlier iclog. Walk the iclogs in order
+ * and make sure that no iclog is in DO_CALLBACK unless an earlier iclog is in
+ * one of the syncing states.
+ *
+ * Note that SYNCING|IOERROR is a valid state so we cannot just check for
+ * ic_state == SYNCING.
+ */
+static void
+xlog_state_callback_check_state(
+	struct xlog		*log)
+{
+	struct xlog_in_core	*first_iclog = log->l_iclog;
+	struct xlog_in_core	*iclog = first_iclog;
+
+	do {
+		ASSERT(iclog->ic_state != XLOG_STATE_DO_CALLBACK);
+		/*
+		 * Terminate the loop if iclogs are found in states
+		 * which will cause other threads to clean up iclogs.
+		 *
+		 * SYNCING - i/o completion will go through logs
+		 * DONE_SYNC - interrupt thread should be waiting for
+		 *              l_icloglock
+		 * IOERROR - give up hope all ye who enter here
+		 */
+		if (iclog->ic_state == XLOG_STATE_WANT_SYNC ||
+		    iclog->ic_state & XLOG_STATE_SYNCING ||
+		    iclog->ic_state == XLOG_STATE_DONE_SYNC ||
+		    iclog->ic_state == XLOG_STATE_IOERROR )
+			break;
+		iclog = iclog->ic_next;
+	} while (first_iclog != iclog);
+}
+#else
+#define xlog_state_callback_check_state(l)	((void)0)
+#endif
 
 STATIC void
 xlog_state_do_callback(
 	struct xlog		*log,
-	int			aborted,
+	bool			aborted,
 	struct xlog_in_core	*ciclog)
 {
-	xlog_in_core_t	   *iclog;
-	xlog_in_core_t	   *first_iclog;	/* used to know when we've
-						 * processed all iclogs once */
-	xfs_log_callback_t *cb, *cb_next;
-	int		   flushcnt = 0;
-	xfs_lsn_t	   lowest_lsn;
-	int		   ioerrors;	/* counter: iclogs with errors */
-	int		   loopdidcallbacks; /* flag: inner loop did callbacks*/
-	int		   funcdidcallbacks; /* flag: function did callbacks */
-	int		   repeats;	/* for issuing console warnings if
-					 * looping too many times */
-	int		   wake = 0;
+	struct xlog_in_core	*iclog;
+	struct xlog_in_core	*first_iclog;
+	bool			did_callbacks = false;
+	bool			cycled_icloglock;
+	bool			ioerror;
+	int			flushcnt = 0;
+	int			repeats = 0;
 
 	spin_lock(&log->l_icloglock);
-	first_iclog = iclog = log->l_iclog;
-	ioerrors = 0;
-	funcdidcallbacks = 0;
-	repeats = 0;
-
 	do {
 		/*
 		 * Scan all iclogs starting with the one pointed to by the
@@ -2729,143 +2860,34 @@
 		 */
 		first_iclog = log->l_iclog;
 		iclog = log->l_iclog;
-		loopdidcallbacks = 0;
+		cycled_icloglock = false;
+		ioerror = false;
 		repeats++;
 
 		do {
+			if (xlog_state_iodone_process_iclog(log, iclog,
+							ciclog, &ioerror))
+				break;
 
-			/* skip all iclogs in the ACTIVE & DIRTY states */
-			if (iclog->ic_state &
-			    (XLOG_STATE_ACTIVE|XLOG_STATE_DIRTY)) {
+			if (!(iclog->ic_state &
+			      (XLOG_STATE_CALLBACK | XLOG_STATE_IOERROR))) {
 				iclog = iclog->ic_next;
 				continue;
 			}
 
 			/*
-			 * Between marking a filesystem SHUTDOWN and stopping
-			 * the log, we do flush all iclogs to disk (if there
-			 * wasn't a log I/O error). So, we do want things to
-			 * go smoothly in case of just a SHUTDOWN  w/o a
-			 * LOG_IO_ERROR.
+			 * Running callbacks will drop the icloglock which means
+			 * we'll have to run at least one more complete loop.
 			 */
-			if (!(iclog->ic_state & XLOG_STATE_IOERROR)) {
-				/*
-				 * Can only perform callbacks in order.  Since
-				 * this iclog is not in the DONE_SYNC/
-				 * DO_CALLBACK state, we skip the rest and
-				 * just try to clean up.  If we set our iclog
-				 * to DO_CALLBACK, we will not process it when
-				 * we retry since a previous iclog is in the
-				 * CALLBACK and the state cannot change since
-				 * we are holding the l_icloglock.
-				 */
-				if (!(iclog->ic_state &
-					(XLOG_STATE_DONE_SYNC |
-						 XLOG_STATE_DO_CALLBACK))) {
-					if (ciclog && (ciclog->ic_state ==
-							XLOG_STATE_DONE_SYNC)) {
-						ciclog->ic_state = XLOG_STATE_DO_CALLBACK;
-					}
-					break;
-				}
-				/*
-				 * We now have an iclog that is in either the
-				 * DO_CALLBACK or DONE_SYNC states. The other
-				 * states (WANT_SYNC, SYNCING, or CALLBACK were
-				 * caught by the above if and are going to
-				 * clean (i.e. we aren't doing their callbacks)
-				 * see the above if.
-				 */
+			cycled_icloglock = true;
+			xlog_state_do_iclog_callbacks(log, iclog, aborted);
 
-				/*
-				 * We will do one more check here to see if we
-				 * have chased our tail around.
-				 */
-
-				lowest_lsn = xlog_get_lowest_lsn(log);
-				if (lowest_lsn &&
-				    XFS_LSN_CMP(lowest_lsn,
-						be64_to_cpu(iclog->ic_header.h_lsn)) < 0) {
-					iclog = iclog->ic_next;
-					continue; /* Leave this iclog for
-						   * another thread */
-				}
-
-				iclog->ic_state = XLOG_STATE_CALLBACK;
-
-
-				/*
-				 * Completion of a iclog IO does not imply that
-				 * a transaction has completed, as transactions
-				 * can be large enough to span many iclogs. We
-				 * cannot change the tail of the log half way
-				 * through a transaction as this may be the only
-				 * transaction in the log and moving th etail to
-				 * point to the middle of it will prevent
-				 * recovery from finding the start of the
-				 * transaction. Hence we should only update the
-				 * last_sync_lsn if this iclog contains
-				 * transaction completion callbacks on it.
-				 *
-				 * We have to do this before we drop the
-				 * icloglock to ensure we are the only one that
-				 * can update it.
-				 */
-				ASSERT(XFS_LSN_CMP(atomic64_read(&log->l_last_sync_lsn),
-					be64_to_cpu(iclog->ic_header.h_lsn)) <= 0);
-				if (iclog->ic_callback)
-					atomic64_set(&log->l_last_sync_lsn,
-						be64_to_cpu(iclog->ic_header.h_lsn));
-
-			} else
-				ioerrors++;
-
-			spin_unlock(&log->l_icloglock);
-
-			/*
-			 * Keep processing entries in the callback list until
-			 * we come around and it is empty.  We need to
-			 * atomically see that the list is empty and change the
-			 * state to DIRTY so that we don't miss any more
-			 * callbacks being added.
-			 */
-			spin_lock(&iclog->ic_callback_lock);
-			cb = iclog->ic_callback;
-			while (cb) {
-				iclog->ic_callback_tail = &(iclog->ic_callback);
-				iclog->ic_callback = NULL;
-				spin_unlock(&iclog->ic_callback_lock);
-
-				/* perform callbacks in the order given */
-				for (; cb; cb = cb_next) {
-					cb_next = cb->cb_next;
-					cb->cb_func(cb->cb_arg, aborted);
-				}
-				spin_lock(&iclog->ic_callback_lock);
-				cb = iclog->ic_callback;
-			}
-
-			loopdidcallbacks++;
-			funcdidcallbacks++;
-
-			spin_lock(&log->l_icloglock);
-			ASSERT(iclog->ic_callback == NULL);
-			spin_unlock(&iclog->ic_callback_lock);
-			if (!(iclog->ic_state & XLOG_STATE_IOERROR))
-				iclog->ic_state = XLOG_STATE_DIRTY;
-
-			/*
-			 * Transition from DIRTY to ACTIVE if applicable.
-			 * NOP if STATE_IOERROR.
-			 */
-			xlog_state_clean_log(log);
-
-			/* wake up threads waiting in xfs_log_force() */
-			wake_up_all(&iclog->ic_force_wait);
-
+			xlog_state_clean_iclog(log, iclog);
 			iclog = iclog->ic_next;
 		} while (first_iclog != iclog);
 
+		did_callbacks |= cycled_icloglock;
+
 		if (repeats > 5000) {
 			flushcnt += repeats;
 			repeats = 0;
@@ -2873,50 +2895,15 @@
 				"%s: possible infinite loop (%d iterations)",
 				__func__, flushcnt);
 		}
-	} while (!ioerrors && loopdidcallbacks);
+	} while (!ioerror && cycled_icloglock);
 
-#ifdef DEBUG
-	/*
-	 * Make one last gasp attempt to see if iclogs are being left in limbo.
-	 * If the above loop finds an iclog earlier than the current iclog and
-	 * in one of the syncing states, the current iclog is put into
-	 * DO_CALLBACK and the callbacks are deferred to the completion of the
-	 * earlier iclog. Walk the iclogs in order and make sure that no iclog
-	 * is in DO_CALLBACK unless an earlier iclog is in one of the syncing
-	 * states.
-	 *
-	 * Note that SYNCING|IOABORT is a valid state so we cannot just check
-	 * for ic_state == SYNCING.
-	 */
-	if (funcdidcallbacks) {
-		first_iclog = iclog = log->l_iclog;
-		do {
-			ASSERT(iclog->ic_state != XLOG_STATE_DO_CALLBACK);
-			/*
-			 * Terminate the loop if iclogs are found in states
-			 * which will cause other threads to clean up iclogs.
-			 *
-			 * SYNCING - i/o completion will go through logs
-			 * DONE_SYNC - interrupt thread should be waiting for
-			 *              l_icloglock
-			 * IOERROR - give up hope all ye who enter here
-			 */
-			if (iclog->ic_state == XLOG_STATE_WANT_SYNC ||
-			    iclog->ic_state & XLOG_STATE_SYNCING ||
-			    iclog->ic_state == XLOG_STATE_DONE_SYNC ||
-			    iclog->ic_state == XLOG_STATE_IOERROR )
-				break;
-			iclog = iclog->ic_next;
-		} while (first_iclog != iclog);
-	}
-#endif
+	if (did_callbacks)
+		xlog_state_callback_check_state(log);
 
 	if (log->l_iclog->ic_state & (XLOG_STATE_ACTIVE|XLOG_STATE_IOERROR))
-		wake = 1;
-	spin_unlock(&log->l_icloglock);
-
-	if (wake)
 		wake_up_all(&log->l_flush_wait);
+
+	spin_unlock(&log->l_icloglock);
 }
 
 
@@ -2935,18 +2922,16 @@
  */
 STATIC void
 xlog_state_done_syncing(
-	xlog_in_core_t	*iclog,
-	int		aborted)
+	struct xlog_in_core	*iclog,
+	bool			aborted)
 {
-	struct xlog	   *log = iclog->ic_log;
+	struct xlog		*log = iclog->ic_log;
 
 	spin_lock(&log->l_icloglock);
 
 	ASSERT(iclog->ic_state == XLOG_STATE_SYNCING ||
 	       iclog->ic_state == XLOG_STATE_IOERROR);
 	ASSERT(atomic_read(&iclog->ic_refcnt) == 0);
-	ASSERT(iclog->ic_bwritecnt == 1 || iclog->ic_bwritecnt == 2);
-
 
 	/*
 	 * If we got an error, either on the first buffer, or in the case of
@@ -2954,13 +2939,8 @@
 	 * and none should ever be attempted to be written to disk
 	 * again.
 	 */
-	if (iclog->ic_state != XLOG_STATE_IOERROR) {
-		if (--iclog->ic_bwritecnt == 1) {
-			spin_unlock(&log->l_icloglock);
-			return;
-		}
+	if (iclog->ic_state != XLOG_STATE_IOERROR)
 		iclog->ic_state = XLOG_STATE_DONE_SYNC;
-	}
 
 	/*
 	 * Someone could be sleeping prior to writing out the next
@@ -3229,7 +3209,7 @@
 	 * flags after this point.
 	 */
 	if (sync)
-		return xlog_sync(log, iclog);
+		xlog_sync(log, iclog);
 	return 0;
 }	/* xlog_state_release_iclog */
 
@@ -3820,8 +3800,7 @@
 xlog_verify_iclog(
 	struct xlog		*log,
 	struct xlog_in_core	*iclog,
-	int			count,
-	bool                    syncing)
+	int			count)
 {
 	xlog_op_header_t	*ophead;
 	xlog_in_core_t		*icptr;
@@ -3865,7 +3844,7 @@
 		/* clientid is only 1 byte */
 		p = &ophead->oh_clientid;
 		field_offset = p - base_ptr;
-		if (!syncing || (field_offset & 0x1ff)) {
+		if (field_offset & 0x1ff) {
 			clientid = ophead->oh_clientid;
 		} else {
 			idx = BTOBBT((char *)&ophead->oh_clientid - iclog->ic_datap);
@@ -3888,7 +3867,7 @@
 		/* check length */
 		p = &ophead->oh_len;
 		field_offset = p - base_ptr;
-		if (!syncing || (field_offset & 0x1ff)) {
+		if (field_offset & 0x1ff) {
 			op_len = be32_to_cpu(ophead->oh_len);
 		} else {
 			idx = BTOBBT((uintptr_t)&ophead->oh_len -
@@ -4024,8 +4003,10 @@
 	 * item committed callback functions will do this again under lock to
 	 * avoid races.
 	 */
+	spin_lock(&log->l_cilp->xc_push_lock);
 	wake_up_all(&log->l_cilp->xc_commit_wait);
-	xlog_state_do_callback(log, XFS_LI_ABORTED, NULL);
+	spin_unlock(&log->l_cilp->xc_push_lock);
+	xlog_state_do_callback(log, true, NULL);
 
 #ifdef XFSERRORDEBUG
 	{