diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 11ae274..844c358 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1,12 +1,12 @@
 // SPDX-License-Identifier: GPL-2.0+
 /*
- * Read-Copy Update mechanism for mutual exclusion
+ * Read-Copy Update mechanism for mutual exclusion (tree-based version)
  *
  * Copyright IBM Corporation, 2008
  *
  * Authors: Dipankar Sarma <dipankar@in.ibm.com>
  *	    Manfred Spraul <manfred@colorfullife.com>
- *	    Paul E. McKenney <paulmck@linux.ibm.com> Hierarchical version
+ *	    Paul E. McKenney <paulmck@linux.ibm.com>
  *
  * Based on the original work by Paul McKenney <paulmck@linux.ibm.com>
  * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
@@ -43,7 +43,6 @@
 #include <uapi/linux/sched/types.h>
 #include <linux/prefetch.h>
 #include <linux/delay.h>
-#include <linux/stop_machine.h>
 #include <linux/random.h>
 #include <linux/trace_events.h>
 #include <linux/suspend.h>
@@ -55,8 +54,12 @@
 #include <linux/oom.h>
 #include <linux/smpboot.h>
 #include <linux/jiffies.h>
+#include <linux/slab.h>
 #include <linux/sched/isolation.h>
 #include <linux/sched/clock.h>
+#include <linux/vmalloc.h>
+#include <linux/mm.h>
+#include <linux/kasan.h>
 #include "../time/tick-internal.h"
 
 #include "tree.h"
@@ -75,16 +78,13 @@
  */
 #define RCU_DYNTICK_CTRL_MASK 0x1
 #define RCU_DYNTICK_CTRL_CTR  (RCU_DYNTICK_CTRL_MASK + 1)
-#ifndef rcu_eqs_special_exit
-#define rcu_eqs_special_exit() do { } while (0)
-#endif
 
 static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
 	.dynticks_nesting = 1,
 	.dynticks_nmi_nesting = DYNTICK_IRQ_NONIDLE,
 	.dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
 };
-struct rcu_state rcu_state = {
+static struct rcu_state rcu_state = {
 	.level = { &rcu_state.node[0] },
 	.gp_state = RCU_GP_IDLE,
 	.gp_seq = (0UL - 300UL) << RCU_SEQ_CTR_SHIFT,
@@ -100,7 +100,7 @@
 static bool dump_tree;
 module_param(dump_tree, bool, 0444);
 /* By default, use RCU_SOFTIRQ instead of rcuc kthreads. */
-static bool use_softirq = 1;
+static bool use_softirq = true;
 module_param(use_softirq, bool, 0444);
 /* Control rcu_node-tree auto-balancing at boot time. */
 static bool rcu_fanout_exact;
@@ -150,6 +150,7 @@
 static void invoke_rcu_core(void);
 static void rcu_report_exp_rdp(struct rcu_data *rdp);
 static void sync_sched_exp_online_cleanup(int cpu);
+static void check_cb_ovld_locked(struct rcu_data *rdp, struct rcu_node *rnp);
 
 /* rcuc/rcub kthread realtime priority */
 static int kthread_prio = IS_ENABLED(CONFIG_RCU_BOOST) ? 1 : 0;
@@ -164,6 +165,21 @@
 static int gp_cleanup_delay;
 module_param(gp_cleanup_delay, int, 0444);
 
+// Add delay to rcu_read_unlock() for strict grace periods.
+static int rcu_unlock_delay;
+#ifdef CONFIG_RCU_STRICT_GRACE_PERIOD
+module_param(rcu_unlock_delay, int, 0444);
+#endif
+
+/*
+ * This rcu parameter is runtime-read-only. It reflects
+ * a minimum allowed number of objects which can be cached
+ * per-CPU. Object size is equal to one page. This value
+ * can be changed at boot time.
+ */
+static int rcu_min_cached_objs = 5;
+module_param(rcu_min_cached_objs, int, 0444);
+
 /* Retrieve RCU kthreads priority for rcutorture */
 int rcu_get_gp_kthreads_prio(void)
 {
@@ -188,7 +204,7 @@
  * held, but the bit corresponding to the current CPU will be stable
  * in most contexts.
  */
-unsigned long rcu_rnp_online_cpus(struct rcu_node *rnp)
+static unsigned long rcu_rnp_online_cpus(struct rcu_node *rnp)
 {
 	return READ_ONCE(rnp->qsmaskinitnext);
 }
@@ -224,9 +240,11 @@
 
 /*
  * Record entry into an extended quiescent state.  This is only to be
- * called when not already in an extended quiescent state.
+ * called when not already in an extended quiescent state, that is,
+ * RCU is watching prior to the call to this function and is no longer
+ * watching upon return.
  */
-static void rcu_dynticks_eqs_enter(void)
+static noinstr void rcu_dynticks_eqs_enter(void)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
@@ -236,8 +254,9 @@
 	 * critical sections, and we also must force ordering with the
 	 * next idle sojourn.
 	 */
-	seq = atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
-	/* Better be in an extended quiescent state! */
+	rcu_dynticks_task_trace_enter();  // Before ->dynticks update!
+	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	// RCU is no longer watching.  Better be in extended quiescent state!
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
 		     (seq & RCU_DYNTICK_CTRL_CTR));
 	/* Better not have special action (TLB flush) pending! */
@@ -247,9 +266,10 @@
 
 /*
  * Record exit from an extended quiescent state.  This is only to be
- * called from an extended quiescent state.
+ * called from an extended quiescent state, that is, RCU is not watching
+ * prior to the call to this function and is watching upon return.
  */
-static void rcu_dynticks_eqs_exit(void)
+static noinstr void rcu_dynticks_eqs_exit(void)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
@@ -259,14 +279,14 @@
 	 * and we also must force ordering with the next RCU read-side
 	 * critical section.
 	 */
-	seq = atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	// RCU is now watching.  Better not be in an extended quiescent state!
+	rcu_dynticks_task_trace_exit();  // After ->dynticks update!
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
 		     !(seq & RCU_DYNTICK_CTRL_CTR));
 	if (seq & RCU_DYNTICK_CTRL_MASK) {
-		atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdp->dynticks);
+		arch_atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdp->dynticks);
 		smp_mb__after_atomic(); /* _exit after clearing mask. */
-		/* Prefer duplicate flushes to losing a flush. */
-		rcu_eqs_special_exit();
 	}
 }
 
@@ -294,18 +314,18 @@
  *
  * No ordering, as we are sampling CPU-local information.
  */
-bool rcu_dynticks_curr_cpu_in_eqs(void)
+static __always_inline bool rcu_dynticks_curr_cpu_in_eqs(void)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
-	return !(atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR);
+	return !(arch_atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR);
 }
 
 /*
  * Snapshot the ->dynticks counter with full ordering so as to allow
  * stable comparison of this counter with past and future snapshots.
  */
-int rcu_dynticks_snap(struct rcu_data *rdp)
+static int rcu_dynticks_snap(struct rcu_data *rdp)
 {
 	int snap = atomic_add_return(0, &rdp->dynticks);
 
@@ -332,6 +352,28 @@
 }
 
 /*
+ * Return true if the referenced integer is zero while the specified
+ * CPU remains within a single extended quiescent state.
+ */
+bool rcu_dynticks_zero_in_eqs(int cpu, int *vp)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int snap;
+
+	// If not quiescent, force back to earlier extended quiescent state.
+	snap = atomic_read(&rdp->dynticks) & ~(RCU_DYNTICK_CTRL_MASK |
+					       RCU_DYNTICK_CTRL_CTR);
+
+	smp_rmb(); // Order ->dynticks and *vp reads.
+	if (READ_ONCE(*vp))
+		return false;  // Non-zero, so report failure;
+	smp_rmb(); // Order *vp read and ->dynticks re-read.
+
+	// If still in the same extended quiescent state, we are good!
+	return snap == (atomic_read(&rdp->dynticks) & ~RCU_DYNTICK_CTRL_MASK);
+}
+
+/*
  * Set the special (bottom) bit of the specified CPU so that it
  * will take special action (such as flushing its TLB) on the
  * next exit from an extended quiescent state.  Returns true if
@@ -342,14 +384,17 @@
 {
 	int old;
 	int new;
+	int new_old;
 	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 
+	new_old = atomic_read(&rdp->dynticks);
 	do {
-		old = atomic_read(&rdp->dynticks);
+		old = new_old;
 		if (old & RCU_DYNTICK_CTRL_CTR)
 			return false;
 		new = old | RCU_DYNTICK_CTRL_MASK;
-	} while (atomic_cmpxchg(&rdp->dynticks, old, new) != old);
+		new_old = atomic_cmpxchg(&rdp->dynticks, old, new);
+	} while (new_old != old);
 	return true;
 }
 
@@ -364,7 +409,7 @@
  *
  * The caller must have disabled interrupts and must not be idle.
  */
-static void __maybe_unused rcu_momentary_dyntick_idle(void)
+notrace void rcu_momentary_dyntick_idle(void)
 {
 	int special;
 
@@ -375,18 +420,26 @@
 	WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
 	rcu_preempt_deferred_qs(current);
 }
+EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
 
 /**
- * rcu_is_cpu_rrupt_from_idle - see if interrupted from idle
+ * rcu_is_cpu_rrupt_from_idle - see if 'interrupted' from idle
  *
  * If the current CPU is idle and running at a first-level (not nested)
- * interrupt from idle, return true.  The caller must have at least
- * disabled preemption.
+ * interrupt, or directly, from idle, return true.
+ *
+ * The caller must have at least disabled IRQs.
  */
 static int rcu_is_cpu_rrupt_from_idle(void)
 {
-	/* Called only from within the scheduling-clock interrupt */
-	lockdep_assert_in_irq();
+	long nesting;
+
+	/*
+	 * Usually called from the tick; but also used from smp_function_call()
+	 * for expedited grace periods. This latter can result in running from
+	 * the idle task, instead of an actual IPI.
+	 */
+	lockdep_assert_irqs_disabled();
 
 	/* Check for counter underflows */
 	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nesting) < 0,
@@ -395,26 +448,38 @@
 			 "RCU dynticks_nmi_nesting counter underflow/zero!");
 
 	/* Are we at first interrupt nesting level? */
-	if (__this_cpu_read(rcu_data.dynticks_nmi_nesting) != 1)
+	nesting = __this_cpu_read(rcu_data.dynticks_nmi_nesting);
+	if (nesting > 1)
 		return false;
 
+	/*
+	 * If we're not in an interrupt, we must be in the idle task!
+	 */
+	WARN_ON_ONCE(!nesting && !is_idle_task(current));
+
 	/* Does CPU appear to be idle from an RCU standpoint? */
 	return __this_cpu_read(rcu_data.dynticks_nesting) == 0;
 }
 
-#define DEFAULT_RCU_BLIMIT 10     /* Maximum callbacks per rcu_do_batch ... */
-#define DEFAULT_MAX_RCU_BLIMIT 10000 /* ... even during callback flood. */
+#define DEFAULT_RCU_BLIMIT (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) ? 1000 : 10)
+				// Maximum callbacks per rcu_do_batch ...
+#define DEFAULT_MAX_RCU_BLIMIT 10000 // ... even during callback flood.
 static long blimit = DEFAULT_RCU_BLIMIT;
-#define DEFAULT_RCU_QHIMARK 10000 /* If this many pending, ignore blimit. */
+#define DEFAULT_RCU_QHIMARK 10000 // If this many pending, ignore blimit.
 static long qhimark = DEFAULT_RCU_QHIMARK;
-#define DEFAULT_RCU_QLOMARK 100   /* Once only this many pending, use blimit. */
+#define DEFAULT_RCU_QLOMARK 100   // Once only this many pending, use blimit.
 static long qlowmark = DEFAULT_RCU_QLOMARK;
+#define DEFAULT_RCU_QOVLD_MULT 2
+#define DEFAULT_RCU_QOVLD (DEFAULT_RCU_QOVLD_MULT * DEFAULT_RCU_QHIMARK)
+static long qovld = DEFAULT_RCU_QOVLD; // If this many pending, hammer QS.
+static long qovld_calc = -1;	  // No pre-initialization lock acquisitions!
 
 module_param(blimit, long, 0444);
 module_param(qhimark, long, 0444);
 module_param(qlowmark, long, 0444);
+module_param(qovld, long, 0444);
 
-static ulong jiffies_till_first_fqs = ULONG_MAX;
+static ulong jiffies_till_first_fqs = IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) ? 0 : ULONG_MAX;
 static ulong jiffies_till_next_fqs = ULONG_MAX;
 static bool rcu_kick_kthreads;
 static int rcu_divisor = 7;
@@ -496,7 +561,7 @@
 module_param(rcu_kick_kthreads, bool, 0644);
 
 static void force_qs_rnp(int (*f)(struct rcu_data *rdp));
-static int rcu_pending(void);
+static int rcu_pending(int user);
 
 /*
  * Return the number of RCU GPs completed thus far for debug & stats.
@@ -528,16 +593,6 @@
 }
 
 /*
- * Convert a ->gp_state value to a character string.
- */
-static const char *gp_state_getname(short gs)
-{
-	if (gs < 0 || gs >= ARRAY_SIZE(gp_state_names))
-		return "???";
-	return gp_state_names[gs];
-}
-
-/*
  * Send along grace-period-related data for rcutorture diagnostics.
  */
 void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
@@ -562,7 +617,7 @@
  * the possibility of usermode upcalls having messed up our count
  * of interrupt nesting level during the prior busy period.
  */
-static void rcu_eqs_enter(bool user)
+static noinstr void rcu_eqs_enter(bool user)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
@@ -571,18 +626,27 @@
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
 		     rdp->dynticks_nesting == 0);
 	if (rdp->dynticks_nesting != 1) {
+		// RCU will still be watching, so just do accounting and leave.
 		rdp->dynticks_nesting--;
 		return;
 	}
 
 	lockdep_assert_irqs_disabled();
+	instrumentation_begin();
 	trace_rcu_dyntick(TPS("Start"), rdp->dynticks_nesting, 0, atomic_read(&rdp->dynticks));
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !user && !is_idle_task(current));
 	rdp = this_cpu_ptr(&rcu_data);
 	rcu_prepare_for_idle();
 	rcu_preempt_deferred_qs(current);
+
+	// instrumentation for the noinstr rcu_dynticks_eqs_enter()
+	instrument_atomic_write(&rdp->dynticks, sizeof(rdp->dynticks));
+
+	instrumentation_end();
 	WRITE_ONCE(rdp->dynticks_nesting, 0); /* Avoid irq-access tearing. */
+	// RCU is watching here ...
 	rcu_dynticks_eqs_enter();
+	// ... but is no longer watching here.
 	rcu_dynticks_task_enter();
 }
 
@@ -602,6 +666,7 @@
 	lockdep_assert_irqs_disabled();
 	rcu_eqs_enter(false);
 }
+EXPORT_SYMBOL_GPL(rcu_idle_enter);
 
 #ifdef CONFIG_NO_HZ_FULL
 /**
@@ -615,7 +680,7 @@
  * If you add or remove a call to rcu_user_enter(), be sure to test with
  * CONFIG_RCU_EQS_DEBUG=y.
  */
-void rcu_user_enter(void)
+noinstr void rcu_user_enter(void)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
@@ -629,19 +694,22 @@
 }
 #endif /* CONFIG_NO_HZ_FULL */
 
-/*
+/**
+ * rcu_nmi_exit - inform RCU of exit from NMI context
+ *
  * If we are returning from the outermost NMI handler that interrupted an
  * RCU-idle period, update rdp->dynticks and rdp->dynticks_nmi_nesting
  * to let the RCU grace-period handling know that the CPU is back to
  * being RCU-idle.
  *
- * If you add or remove a call to rcu_nmi_exit_common(), be sure to test
+ * If you add or remove a call to rcu_nmi_exit(), be sure to test
  * with CONFIG_RCU_EQS_DEBUG=y.
  */
-static __always_inline void rcu_nmi_exit_common(bool irq)
+noinstr void rcu_nmi_exit(void)
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
+	instrumentation_begin();
 	/*
 	 * Check for ->dynticks_nmi_nesting underflow and bad ->dynticks.
 	 * (We are exiting an NMI handler, so RCU better be paying attention
@@ -659,6 +727,7 @@
 				  atomic_read(&rdp->dynticks));
 		WRITE_ONCE(rdp->dynticks_nmi_nesting, /* No store tearing. */
 			   rdp->dynticks_nmi_nesting - 2);
+		instrumentation_end();
 		return;
 	}
 
@@ -666,27 +735,22 @@
 	trace_rcu_dyntick(TPS("Startirq"), rdp->dynticks_nmi_nesting, 0, atomic_read(&rdp->dynticks));
 	WRITE_ONCE(rdp->dynticks_nmi_nesting, 0); /* Avoid store tearing. */
 
-	if (irq)
+	if (!in_nmi())
 		rcu_prepare_for_idle();
 
+	// instrumentation for the noinstr rcu_dynticks_eqs_enter()
+	instrument_atomic_write(&rdp->dynticks, sizeof(rdp->dynticks));
+	instrumentation_end();
+
+	// RCU is watching here ...
 	rcu_dynticks_eqs_enter();
+	// ... but is no longer watching here.
 
-	if (irq)
+	if (!in_nmi())
 		rcu_dynticks_task_enter();
 }
 
 /**
- * rcu_nmi_exit - inform RCU of exit from NMI context
- *
- * If you add or remove a call to rcu_nmi_exit(), be sure to test
- * with CONFIG_RCU_EQS_DEBUG=y.
- */
-void rcu_nmi_exit(void)
-{
-	rcu_nmi_exit_common(false);
-}
-
-/**
  * rcu_irq_exit - inform RCU that current CPU is exiting irq towards idle
  *
  * Exit from an interrupt handler, which might possibly result in entering
@@ -705,12 +769,52 @@
  * If you add or remove a call to rcu_irq_exit(), be sure to test with
  * CONFIG_RCU_EQS_DEBUG=y.
  */
-void rcu_irq_exit(void)
+void noinstr rcu_irq_exit(void)
 {
 	lockdep_assert_irqs_disabled();
-	rcu_nmi_exit_common(true);
+	rcu_nmi_exit();
 }
 
+/**
+ * rcu_irq_exit_preempt - Inform RCU that current CPU is exiting irq
+ *			  towards in kernel preemption
+ *
+ * Same as rcu_irq_exit() but has a sanity check that scheduling is safe
+ * from RCU point of view. Invoked from return from interrupt before kernel
+ * preemption.
+ */
+void rcu_irq_exit_preempt(void)
+{
+	lockdep_assert_irqs_disabled();
+	rcu_nmi_exit();
+
+	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nesting) <= 0,
+			 "RCU dynticks_nesting counter underflow/zero!");
+	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nmi_nesting) !=
+			 DYNTICK_IRQ_NONIDLE,
+			 "Bad RCU  dynticks_nmi_nesting counter\n");
+	RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
+			 "RCU in extended quiescent state!");
+}
+
+#ifdef CONFIG_PROVE_RCU
+/**
+ * rcu_irq_exit_check_preempt - Validate that scheduling is possible
+ */
+void rcu_irq_exit_check_preempt(void)
+{
+	lockdep_assert_irqs_disabled();
+
+	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nesting) <= 0,
+			 "RCU dynticks_nesting counter underflow/zero!");
+	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nmi_nesting) !=
+			 DYNTICK_IRQ_NONIDLE,
+			 "Bad RCU  dynticks_nmi_nesting counter\n");
+	RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
+			 "RCU in extended quiescent state!");
+}
+#endif /* #ifdef CONFIG_PROVE_RCU */
+
 /*
  * Wrapper for rcu_irq_exit() where interrupts are enabled.
  *
@@ -734,7 +838,7 @@
  * allow for the possibility of usermode upcalls messing up our count of
  * interrupt nesting level during the busy period that is just now starting.
  */
-static void rcu_eqs_exit(bool user)
+static void noinstr rcu_eqs_exit(bool user)
 {
 	struct rcu_data *rdp;
 	long oldval;
@@ -744,17 +848,26 @@
 	oldval = rdp->dynticks_nesting;
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && oldval < 0);
 	if (oldval) {
+		// RCU was already watching, so just do accounting and leave.
 		rdp->dynticks_nesting++;
 		return;
 	}
 	rcu_dynticks_task_exit();
+	// RCU is not watching here ...
 	rcu_dynticks_eqs_exit();
+	// ... but is watching here.
+	instrumentation_begin();
+
+	// instrumentation for the noinstr rcu_dynticks_eqs_exit()
+	instrument_atomic_write(&rdp->dynticks, sizeof(rdp->dynticks));
+
 	rcu_cleanup_after_idle();
 	trace_rcu_dyntick(TPS("End"), rdp->dynticks_nesting, 1, atomic_read(&rdp->dynticks));
 	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !user && !is_idle_task(current));
 	WRITE_ONCE(rdp->dynticks_nesting, 1);
 	WARN_ON_ONCE(rdp->dynticks_nmi_nesting);
 	WRITE_ONCE(rdp->dynticks_nmi_nesting, DYNTICK_IRQ_NONIDLE);
+	instrumentation_end();
 }
 
 /**
@@ -774,6 +887,7 @@
 	rcu_eqs_exit(false);
 	local_irq_restore(flags);
 }
+EXPORT_SYMBOL_GPL(rcu_idle_exit);
 
 #ifdef CONFIG_NO_HZ_FULL
 /**
@@ -785,15 +899,75 @@
  * If you add or remove a call to rcu_user_exit(), be sure to test with
  * CONFIG_RCU_EQS_DEBUG=y.
  */
-void rcu_user_exit(void)
+void noinstr rcu_user_exit(void)
 {
 	rcu_eqs_exit(1);
 }
+
+/**
+ * __rcu_irq_enter_check_tick - Enable scheduler tick on CPU if RCU needs it.
+ *
+ * The scheduler tick is not normally enabled when CPUs enter the kernel
+ * from nohz_full userspace execution.  After all, nohz_full userspace
+ * execution is an RCU quiescent state and the time executing in the kernel
+ * is quite short.  Except of course when it isn't.  And it is not hard to
+ * cause a large system to spend tens of seconds or even minutes looping
+ * in the kernel, which can cause a number of problems, include RCU CPU
+ * stall warnings.
+ *
+ * Therefore, if a nohz_full CPU fails to report a quiescent state
+ * in a timely manner, the RCU grace-period kthread sets that CPU's
+ * ->rcu_urgent_qs flag with the expectation that the next interrupt or
+ * exception will invoke this function, which will turn on the scheduler
+ * tick, which will enable RCU to detect that CPU's quiescent states,
+ * for example, due to cond_resched() calls in CONFIG_PREEMPT=n kernels.
+ * The tick will be disabled once a quiescent state is reported for
+ * this CPU.
+ *
+ * Of course, in carefully tuned systems, there might never be an
+ * interrupt or exception.  In that case, the RCU grace-period kthread
+ * will eventually cause one to happen.  However, in less carefully
+ * controlled environments, this function allows RCU to get what it
+ * needs without creating otherwise useless interruptions.
+ */
+void __rcu_irq_enter_check_tick(void)
+{
+	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
+
+	// If we're here from NMI there's nothing to do.
+	if (in_nmi())
+		return;
+
+	RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
+			 "Illegal rcu_irq_enter_check_tick() from extended quiescent state");
+
+	if (!tick_nohz_full_cpu(rdp->cpu) ||
+	    !READ_ONCE(rdp->rcu_urgent_qs) ||
+	    READ_ONCE(rdp->rcu_forced_tick)) {
+		// RCU doesn't need nohz_full help from this CPU, or it is
+		// already getting that help.
+		return;
+	}
+
+	// We get here only when not in an extended quiescent state and
+	// from interrupts (as opposed to NMIs).  Therefore, (1) RCU is
+	// already watching and (2) The fact that we are in an interrupt
+	// handler and that the rcu_node lock is an irq-disabled lock
+	// prevents self-deadlock.  So we can safely recheck under the lock.
+	// Note that the nohz_full state currently cannot change.
+	raw_spin_lock_rcu_node(rdp->mynode);
+	if (rdp->rcu_urgent_qs && !rdp->rcu_forced_tick) {
+		// A nohz_full CPU is in the kernel and RCU needs a
+		// quiescent state.  Turn on the tick!
+		WRITE_ONCE(rdp->rcu_forced_tick, true);
+		tick_dep_set_cpu(rdp->cpu, TICK_DEP_BIT_RCU);
+	}
+	raw_spin_unlock_rcu_node(rdp->mynode);
+}
 #endif /* CONFIG_NO_HZ_FULL */
 
 /**
- * rcu_nmi_enter_common - inform RCU of entry to NMI context
- * @irq: Is this call from rcu_irq_enter?
+ * rcu_nmi_enter - inform RCU of entry to NMI context
  *
  * If the CPU was idle from RCU's viewpoint, update rdp->dynticks and
  * rdp->dynticks_nmi_nesting to let the RCU grace-period handling know
@@ -801,13 +975,13 @@
  * long as the nesting level does not overflow an int.  (You will probably
  * run out of stack space first.)
  *
- * If you add or remove a call to rcu_nmi_enter_common(), be sure to test
+ * If you add or remove a call to rcu_nmi_enter(), be sure to test
  * with CONFIG_RCU_EQS_DEBUG=y.
  */
-static __always_inline void rcu_nmi_enter_common(bool irq)
+noinstr void rcu_nmi_enter(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	long incby = 2;
+	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
 	/* Complain about underflow. */
 	WARN_ON_ONCE(rdp->dynticks_nmi_nesting < 0);
@@ -822,34 +996,43 @@
 	 */
 	if (rcu_dynticks_curr_cpu_in_eqs()) {
 
-		if (irq)
+		if (!in_nmi())
 			rcu_dynticks_task_exit();
 
+		// RCU is not watching here ...
 		rcu_dynticks_eqs_exit();
+		// ... but is watching here.
 
-		if (irq)
+		if (!in_nmi()) {
+			instrumentation_begin();
 			rcu_cleanup_after_idle();
+			instrumentation_end();
+		}
+
+		instrumentation_begin();
+		// instrumentation for the noinstr rcu_dynticks_curr_cpu_in_eqs()
+		instrument_atomic_read(&rdp->dynticks, sizeof(rdp->dynticks));
+		// instrumentation for the noinstr rcu_dynticks_eqs_exit()
+		instrument_atomic_write(&rdp->dynticks, sizeof(rdp->dynticks));
 
 		incby = 1;
+	} else if (!in_nmi()) {
+		instrumentation_begin();
+		rcu_irq_enter_check_tick();
+	} else  {
+		instrumentation_begin();
 	}
+
 	trace_rcu_dyntick(incby == 1 ? TPS("Endirq") : TPS("++="),
 			  rdp->dynticks_nmi_nesting,
 			  rdp->dynticks_nmi_nesting + incby, atomic_read(&rdp->dynticks));
+	instrumentation_end();
 	WRITE_ONCE(rdp->dynticks_nmi_nesting, /* Prevent store tearing. */
 		   rdp->dynticks_nmi_nesting + incby);
 	barrier();
 }
 
 /**
- * rcu_nmi_enter - inform RCU of entry to NMI context
- */
-void rcu_nmi_enter(void)
-{
-	rcu_nmi_enter_common(false);
-}
-NOKPROBE_SYMBOL(rcu_nmi_enter);
-
-/**
  * rcu_irq_enter - inform RCU that current CPU is entering irq away from idle
  *
  * Enter an interrupt handler, which might possibly result in exiting
@@ -871,10 +1054,10 @@
  * If you add or remove a call to rcu_irq_enter(), be sure to test with
  * CONFIG_RCU_EQS_DEBUG=y.
  */
-void rcu_irq_enter(void)
+noinstr void rcu_irq_enter(void)
 {
 	lockdep_assert_irqs_disabled();
-	rcu_nmi_enter_common(true);
+	rcu_nmi_enter();
 }
 
 /*
@@ -892,6 +1075,22 @@
 	local_irq_restore(flags);
 }
 
+/*
+ * If any sort of urgency was applied to the current CPU (for example,
+ * the scheduler-clock interrupt was enabled on a nohz_full CPU) in order
+ * to get to a quiescent state, disable it.
+ */
+static void rcu_disable_urgency_upon_qs(struct rcu_data *rdp)
+{
+	raw_lockdep_assert_held_rcu_node(rdp->mynode);
+	WRITE_ONCE(rdp->rcu_urgent_qs, false);
+	WRITE_ONCE(rdp->rcu_need_heavy_qs, false);
+	if (tick_nohz_full_cpu(rdp->cpu) && rdp->rcu_forced_tick) {
+		tick_dep_clear_cpu(rdp->cpu, TICK_DEP_BIT_RCU);
+		WRITE_ONCE(rdp->rcu_forced_tick, false);
+	}
+}
+
 /**
  * rcu_is_watching - see if RCU thinks that the current CPU is not idle
  *
@@ -899,8 +1098,11 @@
  * CPU can safely enter RCU read-side critical sections.  In other words,
  * if the current CPU is not in its idle loop or is in an interrupt or
  * NMI handler, return true.
+ *
+ * Make notrace because it can be called by the internal functions of
+ * ftrace, and making this notrace removes unnecessary recursion calls.
  */
-bool notrace rcu_is_watching(void)
+notrace bool rcu_is_watching(void)
 {
 	bool ret;
 
@@ -952,12 +1154,12 @@
 
 	if (in_nmi() || !rcu_scheduler_fully_active)
 		return true;
-	preempt_disable();
+	preempt_disable_notrace();
 	rdp = this_cpu_ptr(&rcu_data);
 	rnp = rdp->mynode;
 	if (rdp->grpmask & rcu_rnp_online_cpus(rnp))
 		ret = true;
-	preempt_enable();
+	preempt_enable_notrace();
 	return ret;
 }
 EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
@@ -1024,13 +1226,28 @@
 		return 1;
 	}
 
-	/* If waiting too long on an offline CPU, complain. */
-	if (!(rdp->grpmask & rcu_rnp_online_cpus(rnp)) &&
-	    time_after(jiffies, rcu_state.gp_start + HZ)) {
+	/*
+	 * Complain if a CPU that is considered to be offline from RCU's
+	 * perspective has not yet reported a quiescent state.  After all,
+	 * the offline CPU should have reported a quiescent state during
+	 * the CPU-offline process, or, failing that, by rcu_gp_init()
+	 * if it ran concurrently with either the CPU going offline or the
+	 * last task on a leaf rcu_node structure exiting its RCU read-side
+	 * critical section while all CPUs corresponding to that structure
+	 * are offline.  This added warning detects bugs in any of these
+	 * code paths.
+	 *
+	 * The rcu_node structure's ->lock is held here, which excludes
+	 * the relevant portions the CPU-hotplug code, the grace-period
+	 * initialization code, and the rcu_read_unlock() code paths.
+	 *
+	 * For more detail, please refer to the "Hotplug CPU" section
+	 * of RCU's Requirements documentation.
+	 */
+	if (WARN_ON_ONCE(!(rdp->grpmask & rcu_rnp_online_cpus(rnp)))) {
 		bool onl;
 		struct rcu_node *rnp1;
 
-		WARN_ON(1);  /* Offline CPUs are supposed to report QS! */
 		pr_info("%s: grp: %d-%d level: %d ->gp_seq %ld ->completedqs %ld\n",
 			__func__, rnp->grplo, rnp->grphi, rnp->level,
 			(long)rnp->gp_seq, (long)rnp->completedqs);
@@ -1061,7 +1278,8 @@
 	rnhqp = &per_cpu(rcu_data.rcu_need_heavy_qs, rdp->cpu);
 	if (!READ_ONCE(*rnhqp) &&
 	    (time_after(jiffies, rcu_state.gp_start + jtsq * 2) ||
-	     time_after(jiffies, rcu_state.jiffies_resched))) {
+	     time_after(jiffies, rcu_state.jiffies_resched) ||
+	     rcu_state.cbovld)) {
 		WRITE_ONCE(*rnhqp, true);
 		/* Store rcu_need_heavy_qs before rcu_urgent_qs. */
 		smp_store_release(ruqp, true);
@@ -1078,8 +1296,9 @@
 	 * So hit them over the head with the resched_cpu() hammer!
 	 */
 	if (tick_nohz_full_cpu(rdp->cpu) &&
-		   time_after(jiffies,
-			      READ_ONCE(rdp->last_fqs_resched) + jtsq * 3)) {
+	    (time_after(jiffies, READ_ONCE(rdp->last_fqs_resched) + jtsq * 3) ||
+	     rcu_state.cbovld)) {
+		WRITE_ONCE(*ruqp, true);
 		resched_cpu(rdp->cpu);
 		WRITE_ONCE(rdp->last_fqs_resched, jiffies);
 	}
@@ -1101,6 +1320,7 @@
 		    !rdp->rcu_iw_pending && rdp->rcu_iw_gp_seq != rnp->gp_seq &&
 		    (rnp->ffmask & rdp->grpmask)) {
 			init_irq_work(&rdp->rcu_iw, rcu_iw_handler);
+			atomic_set(&rdp->rcu_iw.flags, IRQ_WORK_HARD_IRQ);
 			rdp->rcu_iw_pending = true;
 			rdp->rcu_iw_gp_seq = rnp->gp_seq;
 			irq_work_queue_on(&rdp->rcu_iw, rdp->cpu);
@@ -1114,8 +1334,9 @@
 static void trace_rcu_this_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 			      unsigned long gp_seq_req, const char *s)
 {
-	trace_rcu_future_grace_period(rcu_state.name, rnp->gp_seq, gp_seq_req,
-				      rnp->level, rnp->grplo, rnp->grphi, s);
+	trace_rcu_future_grace_period(rcu_state.name, READ_ONCE(rnp->gp_seq),
+				      gp_seq_req, rnp->level,
+				      rnp->grplo, rnp->grphi, s);
 }
 
 /*
@@ -1162,7 +1383,7 @@
 					  TPS("Prestarted"));
 			goto unlock_out;
 		}
-		rnp->gp_seq_needed = gp_seq_req;
+		WRITE_ONCE(rnp->gp_seq_needed, gp_seq_req);
 		if (rcu_seq_state(rcu_seq_current(&rnp->gp_seq))) {
 			/*
 			 * We just marked the leaf or internal node, and a
@@ -1187,18 +1408,18 @@
 	}
 	trace_rcu_this_gp(rnp, rdp, gp_seq_req, TPS("Startedroot"));
 	WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags | RCU_GP_FLAG_INIT);
-	rcu_state.gp_req_activity = jiffies;
-	if (!rcu_state.gp_kthread) {
+	WRITE_ONCE(rcu_state.gp_req_activity, jiffies);
+	if (!READ_ONCE(rcu_state.gp_kthread)) {
 		trace_rcu_this_gp(rnp, rdp, gp_seq_req, TPS("NoGPkthread"));
 		goto unlock_out;
 	}
-	trace_rcu_grace_period(rcu_state.name, READ_ONCE(rcu_state.gp_seq), TPS("newreq"));
+	trace_rcu_grace_period(rcu_state.name, data_race(rcu_state.gp_seq), TPS("newreq"));
 	ret = true;  /* Caller must wake GP kthread. */
 unlock_out:
 	/* Push furthest requested GP to leaf node and rcu_data structure. */
 	if (ULONG_CMP_LT(gp_seq_req, rnp->gp_seq_needed)) {
-		rnp_start->gp_seq_needed = rnp->gp_seq_needed;
-		rdp->gp_seq_needed = rnp->gp_seq_needed;
+		WRITE_ONCE(rnp_start->gp_seq_needed, rnp->gp_seq_needed);
+		WRITE_ONCE(rdp->gp_seq_needed, rnp->gp_seq_needed);
 	}
 	if (rnp != rnp_start)
 		raw_spin_unlock_rcu_node(rnp);
@@ -1223,12 +1444,13 @@
 }
 
 /*
- * Awaken the grace-period kthread.  Don't do a self-awaken (unless in
- * an interrupt or softirq handler), and don't bother awakening when there
- * is nothing for the grace-period kthread to do (as in several CPUs raced
- * to awaken, and we lost), and finally don't try to awaken a kthread that
- * has not yet been created.  If all those checks are passed, track some
- * debug information and awaken.
+ * Awaken the grace-period kthread.  Don't do a self-awaken (unless in an
+ * interrupt or softirq handler, in which case we just might immediately
+ * sleep upon return, resulting in a grace-period hang), and don't bother
+ * awakening when there is nothing for the grace-period kthread to do
+ * (as in several CPUs raced to awaken, we lost), and finally don't try
+ * to awaken a kthread that has not yet been created.  If all those checks
+ * are passed, track some debug information and awaken.
  *
  * So why do the self-wakeup when in an interrupt or softirq handler
  * in the grace-period kthread's context?  Because the kthread might have
@@ -1238,10 +1460,10 @@
  */
 static void rcu_gp_kthread_wake(void)
 {
-	if ((current == rcu_state.gp_kthread &&
-	     !in_irq() && !in_serving_softirq()) ||
-	    !READ_ONCE(rcu_state.gp_flags) ||
-	    !rcu_state.gp_kthread)
+	struct task_struct *t = READ_ONCE(rcu_state.gp_kthread);
+
+	if ((current == t && !in_irq() && !in_serving_softirq()) ||
+	    !READ_ONCE(rcu_state.gp_flags) || !t)
 		return;
 	WRITE_ONCE(rcu_state.gp_wake_time, jiffies);
 	WRITE_ONCE(rcu_state.gp_wake_seq, READ_ONCE(rcu_state.gp_seq));
@@ -1288,9 +1510,10 @@
 
 	/* Trace depending on how much we were able to accelerate. */
 	if (rcu_segcblist_restempty(&rdp->cblist, RCU_WAIT_TAIL))
-		trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("AccWaitCB"));
+		trace_rcu_grace_period(rcu_state.name, gp_seq_req, TPS("AccWaitCB"));
 	else
-		trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("AccReadyCB"));
+		trace_rcu_grace_period(rcu_state.name, gp_seq_req, TPS("AccReadyCB"));
+
 	return ret;
 }
 
@@ -1309,7 +1532,7 @@
 
 	rcu_lockdep_assert_cblist_protected(rdp);
 	c = rcu_seq_snap(&rcu_state.gp_seq);
-	if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
+	if (!READ_ONCE(rdp->gpwrap) && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
 		/* Old request still live, so mark recent callbacks. */
 		(void)rcu_segcblist_accelerate(&rdp->cblist, c);
 		return;
@@ -1358,14 +1581,28 @@
 						  struct rcu_data *rdp)
 {
 	rcu_lockdep_assert_cblist_protected(rdp);
-	if (!rcu_seq_state(rcu_seq_current(&rnp->gp_seq)) ||
-	    !raw_spin_trylock_rcu_node(rnp))
+	if (!rcu_seq_state(rcu_seq_current(&rnp->gp_seq)) || !raw_spin_trylock_rcu_node(rnp))
 		return;
-	WARN_ON_ONCE(rcu_advance_cbs(rnp, rdp));
+	// The grace period cannot end while we hold the rcu_node lock.
+	if (rcu_seq_state(rcu_seq_current(&rnp->gp_seq)))
+		WARN_ON_ONCE(rcu_advance_cbs(rnp, rdp));
 	raw_spin_unlock_rcu_node(rnp);
 }
 
 /*
+ * In CONFIG_RCU_STRICT_GRACE_PERIOD=y kernels, attempt to generate a
+ * quiescent state.  This is intended to be invoked when the CPU notices
+ * a new grace period.
+ */
+static void rcu_strict_gp_check_qs(void)
+{
+	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD)) {
+		rcu_read_lock();
+		rcu_read_unlock();
+	}
+}
+
+/*
  * Update CPU-local rcu_data state to record the beginnings and ends of
  * grace periods.  The caller must hold the ->lock of the leaf rcu_node
  * structure corresponding to the current CPU, and must have irqs disabled.
@@ -1374,7 +1611,7 @@
 static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
 {
 	bool ret = false;
-	bool need_gp;
+	bool need_qs;
 	const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
 			       rcu_segcblist_is_offloaded(&rdp->cblist);
 
@@ -1388,10 +1625,13 @@
 	    unlikely(READ_ONCE(rdp->gpwrap))) {
 		if (!offloaded)
 			ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */
+		rdp->core_needs_qs = false;
 		trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend"));
 	} else {
 		if (!offloaded)
 			ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */
+		if (rdp->core_needs_qs)
+			rdp->core_needs_qs = !!(rnp->qsmask & rdp->grpmask);
 	}
 
 	/* Now handle the beginnings of any new-to-this-CPU grace periods. */
@@ -1403,14 +1643,14 @@
 		 * go looking for one.
 		 */
 		trace_rcu_grace_period(rcu_state.name, rnp->gp_seq, TPS("cpustart"));
-		need_gp = !!(rnp->qsmask & rdp->grpmask);
-		rdp->cpu_no_qs.b.norm = need_gp;
-		rdp->core_needs_qs = need_gp;
+		need_qs = !!(rnp->qsmask & rdp->grpmask);
+		rdp->cpu_no_qs.b.norm = need_qs;
+		rdp->core_needs_qs = need_qs;
 		zero_cpu_stall_ticks(rdp);
 	}
 	rdp->gp_seq = rnp->gp_seq;  /* Remember new grace-period state. */
 	if (ULONG_CMP_LT(rdp->gp_seq_needed, rnp->gp_seq_needed) || rdp->gpwrap)
-		rdp->gp_seq_needed = rnp->gp_seq_needed;
+		WRITE_ONCE(rdp->gp_seq_needed, rnp->gp_seq_needed);
 	WRITE_ONCE(rdp->gpwrap, false);
 	rcu_gpnum_ovf(rnp, rdp);
 	return ret;
@@ -1432,6 +1672,7 @@
 	}
 	needwake = __note_gp_changes(rnp, rdp);
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+	rcu_strict_gp_check_qs();
 	if (needwake)
 		rcu_gp_kthread_wake();
 }
@@ -1441,7 +1682,41 @@
 	if (delay > 0 &&
 	    !(rcu_seq_ctr(rcu_state.gp_seq) %
 	      (rcu_num_nodes * PER_RCU_NODE_PERIOD * delay)))
-		schedule_timeout_uninterruptible(delay);
+		schedule_timeout_idle(delay);
+}
+
+static unsigned long sleep_duration;
+
+/* Allow rcutorture to stall the grace-period kthread. */
+void rcu_gp_set_torture_wait(int duration)
+{
+	if (IS_ENABLED(CONFIG_RCU_TORTURE_TEST) && duration > 0)
+		WRITE_ONCE(sleep_duration, duration);
+}
+EXPORT_SYMBOL_GPL(rcu_gp_set_torture_wait);
+
+/* Actually implement the aforementioned wait. */
+static void rcu_gp_torture_wait(void)
+{
+	unsigned long duration;
+
+	if (!IS_ENABLED(CONFIG_RCU_TORTURE_TEST))
+		return;
+	duration = xchg(&sleep_duration, 0UL);
+	if (duration > 0) {
+		pr_alert("%s: Waiting %lu jiffies\n", __func__, duration);
+		schedule_timeout_idle(duration);
+		pr_alert("%s: Wait complete\n", __func__);
+	}
+}
+
+/*
+ * Handler for on_each_cpu() to invoke the target CPU's RCU core
+ * processing.
+ */
+static void rcu_strict_gp_boundary(void *unused)
+{
+	invoke_rcu_core();
 }
 
 /*
@@ -1477,14 +1752,18 @@
 	record_gp_stall_check_time();
 	/* Record GP times before starting GP, hence rcu_seq_start(). */
 	rcu_seq_start(&rcu_state.gp_seq);
+	ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq);
 	trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("start"));
 	raw_spin_unlock_irq_rcu_node(rnp);
 
 	/*
-	 * Apply per-leaf buffered online and offline operations to the
-	 * rcu_node tree.  Note that this new grace period need not wait
-	 * for subsequent online CPUs, and that quiescent-state forcing
-	 * will handle subsequent offline CPUs.
+	 * Apply per-leaf buffered online and offline operations to
+	 * the rcu_node tree. Note that this new grace period need not
+	 * wait for subsequent online CPUs, and that RCU hooks in the CPU
+	 * offlining path, when combined with checks in this function,
+	 * will handle CPUs that are currently going offline or that will
+	 * go offline later.  Please also refer to "Hotplug CPU" section
+	 * of RCU's Requirements documentation.
 	 */
 	rcu_state.gp_state = RCU_GP_ONOFF;
 	rcu_for_each_leaf_node(rnp) {
@@ -1571,6 +1850,10 @@
 		WRITE_ONCE(rcu_state.gp_activity, jiffies);
 	}
 
+	// If strict, make all CPUs aware of new grace period.
+	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
+		on_each_cpu(rcu_strict_gp_boundary, NULL, 0);
+
 	return true;
 }
 
@@ -1582,12 +1865,16 @@
 {
 	struct rcu_node *rnp = rcu_get_root();
 
-	/* Someone like call_rcu() requested a force-quiescent-state scan. */
+	// If under overload conditions, force an immediate FQS scan.
+	if (*gfp & RCU_GP_FLAG_OVLD)
+		return true;
+
+	// Someone like call_rcu() requested a force-quiescent-state scan.
 	*gfp = READ_ONCE(rcu_state.gp_flags);
 	if (*gfp & RCU_GP_FLAG_FQS)
 		return true;
 
-	/* The current grace period has completed. */
+	// The current grace period has completed.
 	if (!READ_ONCE(rnp->qsmask) && !rcu_preempt_blocked_readers_cgp(rnp))
 		return true;
 
@@ -1602,7 +1889,7 @@
 	struct rcu_node *rnp = rcu_get_root();
 
 	WRITE_ONCE(rcu_state.gp_activity, jiffies);
-	rcu_state.n_force_qs++;
+	WRITE_ONCE(rcu_state.n_force_qs, rcu_state.n_force_qs + 1);
 	if (first_time) {
 		/* Collect dyntick-idle snapshots. */
 		force_qs_rnp(dyntick_save_progress_counter);
@@ -1625,13 +1912,15 @@
 static void rcu_gp_fqs_loop(void)
 {
 	bool first_gp_fqs;
-	int gf;
+	int gf = 0;
 	unsigned long j;
 	int ret;
 	struct rcu_node *rnp = rcu_get_root();
 
 	first_gp_fqs = true;
 	j = READ_ONCE(jiffies_till_first_fqs);
+	if (rcu_state.cbovld)
+		gf = RCU_GP_FLAG_OVLD;
 	ret = 0;
 	for (;;) {
 		if (!ret) {
@@ -1639,12 +1928,12 @@
 			WRITE_ONCE(rcu_state.jiffies_kick_kthreads,
 				   jiffies + (j ? 3 * j : 2));
 		}
-		trace_rcu_grace_period(rcu_state.name,
-				       READ_ONCE(rcu_state.gp_seq),
+		trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 				       TPS("fqswait"));
 		rcu_state.gp_state = RCU_GP_WAIT_FQS;
 		ret = swait_event_idle_timeout_exclusive(
 				rcu_state.gp_wq, rcu_gp_fqs_check_wake(&gf), j);
+		rcu_gp_torture_wait();
 		rcu_state.gp_state = RCU_GP_DOING_FQS;
 		/* Locking provides needed memory barriers. */
 		/* If grace period done, leave loop. */
@@ -1652,15 +1941,17 @@
 		    !rcu_preempt_blocked_readers_cgp(rnp))
 			break;
 		/* If time for quiescent-state forcing, do it. */
-		if (ULONG_CMP_GE(jiffies, rcu_state.jiffies_force_qs) ||
-		    (gf & RCU_GP_FLAG_FQS)) {
-			trace_rcu_grace_period(rcu_state.name,
-					       READ_ONCE(rcu_state.gp_seq),
+		if (!time_after(rcu_state.jiffies_force_qs, jiffies) ||
+		    (gf & (RCU_GP_FLAG_FQS | RCU_GP_FLAG_OVLD))) {
+			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("fqsstart"));
 			rcu_gp_fqs(first_gp_fqs);
-			first_gp_fqs = false;
-			trace_rcu_grace_period(rcu_state.name,
-					       READ_ONCE(rcu_state.gp_seq),
+			gf = 0;
+			if (first_gp_fqs) {
+				first_gp_fqs = false;
+				gf = rcu_state.cbovld ? RCU_GP_FLAG_OVLD : 0;
+			}
+			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("fqsend"));
 			cond_resched_tasks_rcu_qs();
 			WRITE_ONCE(rcu_state.gp_activity, jiffies);
@@ -1671,8 +1962,7 @@
 			cond_resched_tasks_rcu_qs();
 			WRITE_ONCE(rcu_state.gp_activity, jiffies);
 			WARN_ON(signal_pending(current));
-			trace_rcu_grace_period(rcu_state.name,
-					       READ_ONCE(rcu_state.gp_seq),
+			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("fqswaitsig"));
 			ret = 1; /* Keep old FQS timing. */
 			j = jiffies;
@@ -1680,6 +1970,7 @@
 				j = 1;
 			else
 				j = rcu_state.jiffies_force_qs - j;
+			gf = 0;
 		}
 	}
 }
@@ -1689,8 +1980,9 @@
  */
 static void rcu_gp_cleanup(void)
 {
-	unsigned long gp_duration;
+	int cpu;
 	bool needgp = false;
+	unsigned long gp_duration;
 	unsigned long new_gp_seq;
 	bool offloaded;
 	struct rcu_data *rdp;
@@ -1736,6 +2028,12 @@
 			needgp = __note_gp_changes(rnp, rdp) || needgp;
 		/* smp_mb() provided by prior unlock-lock pair. */
 		needgp = rcu_future_gp_cleanup(rnp) || needgp;
+		// Reset overload indication for CPUs no longer overloaded
+		if (rcu_is_leaf_node(rnp))
+			for_each_leaf_node_cpu_mask(rnp, cpu, rnp->cbovldmask) {
+				rdp = per_cpu_ptr(&rcu_data, cpu);
+				check_cb_ovld_locked(rdp, rnp);
+			}
 		sq = rcu_nocb_gp_get(rnp);
 		raw_spin_unlock_irq_rcu_node(rnp);
 		rcu_nocb_gp_cleanup(sq);
@@ -1749,6 +2047,7 @@
 	/* Declare grace period done, trace first to use old GP number. */
 	trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("end"));
 	rcu_seq_end(&rcu_state.gp_seq);
+	ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq);
 	rcu_state.gp_state = RCU_GP_IDLE;
 	/* Check for GP requests since above loop. */
 	rdp = this_cpu_ptr(&rcu_data);
@@ -1762,15 +2061,19 @@
 		    rcu_segcblist_is_offloaded(&rdp->cblist);
 	if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) {
 		WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT);
-		rcu_state.gp_req_activity = jiffies;
+		WRITE_ONCE(rcu_state.gp_req_activity, jiffies);
 		trace_rcu_grace_period(rcu_state.name,
-				       READ_ONCE(rcu_state.gp_seq),
+				       rcu_state.gp_seq,
 				       TPS("newreq"));
 	} else {
 		WRITE_ONCE(rcu_state.gp_flags,
 			   rcu_state.gp_flags & RCU_GP_FLAG_INIT);
 	}
 	raw_spin_unlock_irq_rcu_node(rnp);
+
+	// If strict, make all CPUs aware of the end of the old grace period.
+	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
+		on_each_cpu(rcu_strict_gp_boundary, NULL, 0);
 }
 
 /*
@@ -1783,13 +2086,13 @@
 
 		/* Handle grace-period start. */
 		for (;;) {
-			trace_rcu_grace_period(rcu_state.name,
-					       READ_ONCE(rcu_state.gp_seq),
+			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("reqwait"));
 			rcu_state.gp_state = RCU_GP_WAIT_GPS;
 			swait_event_idle_exclusive(rcu_state.gp_wq,
 					 READ_ONCE(rcu_state.gp_flags) &
 					 RCU_GP_FLAG_INIT);
+			rcu_gp_torture_wait();
 			rcu_state.gp_state = RCU_GP_DONE_GPS;
 			/* Locking provides needed memory barrier. */
 			if (rcu_gp_init())
@@ -1797,8 +2100,7 @@
 			cond_resched_tasks_rcu_qs();
 			WRITE_ONCE(rcu_state.gp_activity, jiffies);
 			WARN_ON(signal_pending(current));
-			trace_rcu_grace_period(rcu_state.name,
-					       READ_ONCE(rcu_state.gp_seq),
+			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("reqwaitsig"));
 		}
 
@@ -1869,7 +2171,7 @@
 		WARN_ON_ONCE(oldmask); /* Any child must be all zeroed! */
 		WARN_ON_ONCE(!rcu_is_leaf_node(rnp) &&
 			     rcu_preempt_blocked_readers_cgp(rnp));
-		rnp->qsmask &= ~mask;
+		WRITE_ONCE(rnp->qsmask, rnp->qsmask & ~mask);
 		trace_rcu_quiescent_state_report(rcu_state.name, rnp->gp_seq,
 						 mask, rnp->qsmask, rnp->level,
 						 rnp->grplo, rnp->grphi,
@@ -1892,7 +2194,7 @@
 		rnp_c = rnp;
 		rnp = rnp->parent;
 		raw_spin_lock_irqsave_rcu_node(rnp, flags);
-		oldmask = rnp_c->qsmask;
+		oldmask = READ_ONCE(rnp_c->qsmask);
 	}
 
 	/*
@@ -1919,7 +2221,7 @@
 	struct rcu_node *rnp_p;
 
 	raw_lockdep_assert_held_rcu_node(rnp);
-	if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPTION)) ||
+	if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPT_RCU)) ||
 	    WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp)) ||
 	    rnp->qsmask != 0) {
 		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
@@ -1950,7 +2252,7 @@
  * structure.  This must be called from the specified CPU.
  */
 static void
-rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
+rcu_report_qs_rdp(struct rcu_data *rdp)
 {
 	unsigned long flags;
 	unsigned long mask;
@@ -1959,6 +2261,7 @@
 			       rcu_segcblist_is_offloaded(&rdp->cblist);
 	struct rcu_node *rnp;
 
+	WARN_ON_ONCE(rdp->cpu != smp_processor_id());
 	rnp = rdp->mynode;
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
 	if (rdp->cpu_no_qs.b.norm || rdp->gp_seq != rnp->gp_seq ||
@@ -1986,6 +2289,7 @@
 		if (!offloaded)
 			needwake = rcu_accelerate_cbs(rnp, rdp);
 
+		rcu_disable_urgency_upon_qs(rdp);
 		rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
 		/* ^^^ Released rnp->lock */
 		if (needwake)
@@ -2023,7 +2327,7 @@
 	 * Tell RCU we are done (but rcu_report_qs_rdp() will be the
 	 * judge of that).
 	 */
-	rcu_report_qs_rdp(rdp->cpu, rdp);
+	rcu_report_qs_rdp(rdp);
 }
 
 /*
@@ -2040,7 +2344,7 @@
 		return 0;
 
 	blkd = !!(rnp->qsmask & rdp->grpmask);
-	trace_rcu_grace_period(rcu_state.name, rnp->gp_seq,
+	trace_rcu_grace_period(rcu_state.name, READ_ONCE(rnp->gp_seq),
 			       blkd ? TPS("cpuofl") : TPS("cpuofl-bgp"));
 	return 0;
 }
@@ -2108,6 +2412,9 @@
 	rcu_boost_kthread_setaffinity(rnp, -1);
 	/* Do any needed no-CB deferred wakeups from this CPU. */
 	do_nocb_deferred_wakeup(per_cpu_ptr(&rcu_data, cpu));
+
+	// Stop-machine done, so allow nohz_full to disable tick.
+	tick_dep_clear(TICK_DEP_BIT_RCU);
 	return 0;
 }
 
@@ -2117,6 +2424,7 @@
  */
 static void rcu_do_batch(struct rcu_data *rdp)
 {
+	int div;
 	unsigned long flags;
 	const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
 			       rcu_segcblist_is_offloaded(&rdp->cblist);
@@ -2128,7 +2436,6 @@
 	/* If no callbacks are ready, just return. */
 	if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
 		trace_rcu_batch_start(rcu_state.name,
-				      rcu_segcblist_n_lazy_cbs(&rdp->cblist),
 				      rcu_segcblist_n_cbs(&rdp->cblist), 0);
 		trace_rcu_batch_end(rcu_state.name, 0,
 				    !rcu_segcblist_empty(&rdp->cblist),
@@ -2146,11 +2453,16 @@
 	rcu_nocb_lock(rdp);
 	WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
 	pending = rcu_segcblist_n_cbs(&rdp->cblist);
-	bl = max(rdp->blimit, pending >> rcu_divisor);
-	if (unlikely(bl > 100))
-		tlimit = local_clock() + rcu_resched_ns;
+	div = READ_ONCE(rcu_divisor);
+	div = div < 0 ? 7 : div > sizeof(long) * 8 - 2 ? sizeof(long) * 8 - 2 : div;
+	bl = max(rdp->blimit, pending >> div);
+	if (unlikely(bl > 100)) {
+		long rrn = READ_ONCE(rcu_resched_ns);
+
+		rrn = rrn < NSEC_PER_MSEC ? NSEC_PER_MSEC : rrn > NSEC_PER_SEC ? NSEC_PER_SEC : rrn;
+		tlimit = local_clock() + rrn;
+	}
 	trace_rcu_batch_start(rcu_state.name,
-			      rcu_segcblist_n_lazy_cbs(&rdp->cblist),
 			      rcu_segcblist_n_cbs(&rdp->cblist), bl);
 	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
 	if (offloaded)
@@ -2158,11 +2470,22 @@
 	rcu_nocb_unlock_irqrestore(rdp, flags);
 
 	/* Invoke callbacks. */
+	tick_dep_set_task(current, TICK_DEP_BIT_RCU);
 	rhp = rcu_cblist_dequeue(&rcl);
 	for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+		rcu_callback_t f;
+
 		debug_rcu_head_unqueue(rhp);
-		if (__rcu_reclaim(rcu_state.name, rhp))
-			rcu_cblist_dequeued_lazy(&rcl);
+
+		rcu_lock_acquire(&rcu_callback_map);
+		trace_rcu_invoke_callback(rcu_state.name, rhp);
+
+		f = rhp->func;
+		WRITE_ONCE(rhp->func, (rcu_callback_t)0L);
+		f(rhp);
+
+		rcu_lock_release(&rcu_callback_map);
+
 		/*
 		 * Stop only if limit reached and CPU has something to do.
 		 * Note: The rcl structure counts down from zero.
@@ -2191,6 +2514,7 @@
 	local_irq_save(flags);
 	rcu_nocb_lock(rdp);
 	count = -rcl.len;
+	rdp->n_cbs_invoked += count;
 	trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
 			    is_idle_task(current), rcu_is_callbacks_kthread());
 
@@ -2207,7 +2531,7 @@
 	/* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */
 	if (count == 0 && rdp->qlen_last_fqs_check != 0) {
 		rdp->qlen_last_fqs_check = 0;
-		rdp->n_force_qs_snap = rcu_state.n_force_qs;
+		rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
 	} else if (count < rdp->qlen_last_fqs_check - qhimark)
 		rdp->qlen_last_fqs_check = count;
 
@@ -2224,6 +2548,7 @@
 	/* Re-invoke RCU core processing if there are callbacks remaining. */
 	if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist))
 		invoke_rcu_core();
+	tick_dep_clear_task(current, TICK_DEP_BIT_RCU);
 }
 
 /*
@@ -2237,6 +2562,7 @@
 void rcu_sched_clock_irq(int user)
 {
 	trace_rcu_utilization(TPS("Start scheduler-tick"));
+	lockdep_assert_irqs_disabled();
 	raw_cpu_inc(rcu_data.ticks_this_gp);
 	/* The load-acquire pairs with the store-release setting to true. */
 	if (smp_load_acquire(this_cpu_ptr(&rcu_data.rcu_urgent_qs))) {
@@ -2248,8 +2574,9 @@
 		__this_cpu_write(rcu_data.rcu_urgent_qs, false);
 	}
 	rcu_flavor_sched_clock_irq(user);
-	if (rcu_pending())
+	if (rcu_pending(user))
 		invoke_rcu_core();
+	lockdep_assert_irqs_disabled();
 
 	trace_rcu_utilization(TPS("End scheduler-tick"));
 }
@@ -2266,15 +2593,18 @@
 	int cpu;
 	unsigned long flags;
 	unsigned long mask;
+	struct rcu_data *rdp;
 	struct rcu_node *rnp;
 
+	rcu_state.cbovld = rcu_state.cbovldnext;
+	rcu_state.cbovldnext = false;
 	rcu_for_each_leaf_node(rnp) {
 		cond_resched_tasks_rcu_qs();
 		mask = 0;
 		raw_spin_lock_irqsave_rcu_node(rnp, flags);
+		rcu_state.cbovldnext |= !!rnp->cbovldmask;
 		if (rnp->qsmask == 0) {
-			if (!IS_ENABLED(CONFIG_PREEMPTION) ||
-			    rcu_preempt_blocked_readers_cgp(rnp)) {
+			if (rcu_preempt_blocked_readers_cgp(rnp)) {
 				/*
 				 * No point in scanning bits because they
 				 * are all zero.  But we might need to
@@ -2287,11 +2617,11 @@
 			raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 			continue;
 		}
-		for_each_leaf_node_possible_cpu(rnp, cpu) {
-			unsigned long bit = leaf_node_cpu_bit(rnp, cpu);
-			if ((rnp->qsmask & bit) != 0) {
-				if (f(per_cpu_ptr(&rcu_data, cpu)))
-					mask |= bit;
+		for_each_leaf_node_cpu_mask(rnp, cpu, rnp->qsmask) {
+			rdp = per_cpu_ptr(&rcu_data, cpu);
+			if (f(rdp)) {
+				mask |= rdp->grpmask;
+				rcu_disable_urgency_upon_qs(rdp);
 			}
 		}
 		if (mask != 0) {
@@ -2319,7 +2649,7 @@
 	rnp = __this_cpu_read(rcu_data.mynode);
 	for (; rnp != NULL; rnp = rnp->parent) {
 		ret = (READ_ONCE(rcu_state.gp_flags) & RCU_GP_FLAG_FQS) ||
-		      !raw_spin_trylock(&rnp->fqslock);
+		       !raw_spin_trylock(&rnp->fqslock);
 		if (rnp_old != NULL)
 			raw_spin_unlock(&rnp_old->fqslock);
 		if (ret)
@@ -2342,6 +2672,14 @@
 }
 EXPORT_SYMBOL_GPL(rcu_force_quiescent_state);
 
+// Workqueue handler for an RCU reader for kernels enforcing struct RCU
+// grace periods.
+static void strict_work_handler(struct work_struct *work)
+{
+	rcu_read_lock();
+	rcu_read_unlock();
+}
+
 /* Perform RCU core processing work for the current CPU.  */
 static __latent_entropy void rcu_core(void)
 {
@@ -2386,6 +2724,10 @@
 	/* Do any needed deferred wakeups of rcuo kthreads. */
 	do_nocb_deferred_wakeup(rdp);
 	trace_rcu_utilization(TPS("End RCU core"));
+
+	// If strict GPs, schedule an RCU reader in a clean environment.
+	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
+		queue_work_on(rdp->cpu, rcu_gp_wq, &rdp->strict_work);
 }
 
 static void rcu_core_si(struct softirq_action *h)
@@ -2450,8 +2792,8 @@
 	char work, *workp = this_cpu_ptr(&rcu_data.rcu_cpu_has_work);
 	int spincnt;
 
+	trace_rcu_utilization(TPS("Start CPU kthread@rcu_run"));
 	for (spincnt = 0; spincnt < 10; spincnt++) {
-		trace_rcu_utilization(TPS("Start CPU kthread@rcu_wait"));
 		local_bh_disable();
 		*statusp = RCU_KTHREAD_RUNNING;
 		local_irq_disable();
@@ -2469,7 +2811,7 @@
 	}
 	*statusp = RCU_KTHREAD_YIELDING;
 	trace_rcu_utilization(TPS("Start CPU kthread@rcu_yield"));
-	schedule_timeout_interruptible(2);
+	schedule_timeout_idle(2);
 	trace_rcu_utilization(TPS("End CPU kthread@rcu_yield"));
 	*statusp = RCU_KTHREAD_WAITING;
 }
@@ -2535,10 +2877,10 @@
 		} else {
 			/* Give the grace period a kick. */
 			rdp->blimit = DEFAULT_MAX_RCU_BLIMIT;
-			if (rcu_state.n_force_qs == rdp->n_force_qs_snap &&
+			if (READ_ONCE(rcu_state.n_force_qs) == rdp->n_force_qs_snap &&
 			    rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
 				rcu_force_quiescent_state();
-			rdp->n_force_qs_snap = rcu_state.n_force_qs;
+			rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
 			rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
 		}
 	}
@@ -2552,13 +2894,50 @@
 }
 
 /*
- * Helper function for call_rcu() and friends.  The cpu argument will
- * normally be -1, indicating "currently running CPU".  It may specify
- * a CPU only if that CPU is a no-CBs CPU.  Currently, only rcu_barrier()
- * is expected to specify a CPU.
+ * Check and if necessary update the leaf rcu_node structure's
+ * ->cbovldmask bit corresponding to the current CPU based on that CPU's
+ * number of queued RCU callbacks.  The caller must hold the leaf rcu_node
+ * structure's ->lock.
  */
+static void check_cb_ovld_locked(struct rcu_data *rdp, struct rcu_node *rnp)
+{
+	raw_lockdep_assert_held_rcu_node(rnp);
+	if (qovld_calc <= 0)
+		return; // Early boot and wildcard value set.
+	if (rcu_segcblist_n_cbs(&rdp->cblist) >= qovld_calc)
+		WRITE_ONCE(rnp->cbovldmask, rnp->cbovldmask | rdp->grpmask);
+	else
+		WRITE_ONCE(rnp->cbovldmask, rnp->cbovldmask & ~rdp->grpmask);
+}
+
+/*
+ * Check and if necessary update the leaf rcu_node structure's
+ * ->cbovldmask bit corresponding to the current CPU based on that CPU's
+ * number of queued RCU callbacks.  No locks need be held, but the
+ * caller must have disabled interrupts.
+ *
+ * Note that this function ignores the possibility that there are a lot
+ * of callbacks all of which have already seen the end of their respective
+ * grace periods.  This omission is due to the need for no-CBs CPUs to
+ * be holding ->nocb_lock to do this check, which is too heavy for a
+ * common-case operation.
+ */
+static void check_cb_ovld(struct rcu_data *rdp)
+{
+	struct rcu_node *const rnp = rdp->mynode;
+
+	if (qovld_calc <= 0 ||
+	    ((rcu_segcblist_n_cbs(&rdp->cblist) >= qovld_calc) ==
+	     !!(READ_ONCE(rnp->cbovldmask) & rdp->grpmask)))
+		return; // Early boot wildcard value or already set correctly.
+	raw_spin_lock_rcu_node(rnp);
+	check_cb_ovld_locked(rdp, rnp);
+	raw_spin_unlock_rcu_node(rnp);
+}
+
+/* Helper function for call_rcu() and friends.  */
 static void
-__call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy)
+__call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
 	unsigned long flags;
 	struct rcu_data *rdp;
@@ -2581,6 +2960,7 @@
 	head->func = func;
 	head->next = NULL;
 	local_irq_save(flags);
+	kasan_record_aux_stack(head);
 	rdp = this_cpu_ptr(&rcu_data);
 
 	/* Add the callback to our list. */
@@ -2593,18 +2973,18 @@
 		if (rcu_segcblist_empty(&rdp->cblist))
 			rcu_segcblist_init(&rdp->cblist);
 	}
+
+	check_cb_ovld(rdp);
 	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
 		return; // Enqueued onto ->nocb_bypass, so just leave.
-	/* If we get here, rcu_nocb_try_bypass() acquired ->nocb_lock. */
-	rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
-	if (__is_kfree_rcu_offset((unsigned long)func))
-		trace_rcu_kfree_callback(rcu_state.name, head,
+	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
+	rcu_segcblist_enqueue(&rdp->cblist, head);
+	if (__is_kvfree_rcu_offset((unsigned long)func))
+		trace_rcu_kvfree_callback(rcu_state.name, head,
 					 (unsigned long)func,
-					 rcu_segcblist_n_lazy_cbs(&rdp->cblist),
 					 rcu_segcblist_n_cbs(&rdp->cblist));
 	else
 		trace_rcu_callback(rcu_state.name, head,
-				   rcu_segcblist_n_lazy_cbs(&rdp->cblist),
 				   rcu_segcblist_n_cbs(&rdp->cblist));
 
 	/* Go handle any RCU core processing required. */
@@ -2654,28 +3034,583 @@
  */
 void call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
-	__call_rcu(head, func, 0);
+	__call_rcu(head, func);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
-/*
- * Queue an RCU callback for lazy invocation after a grace period.
- * This will likely be later named something like "call_rcu_lazy()",
- * but this change will require some way of tagging the lazy RCU
- * callbacks in the list of pending callbacks. Until then, this
- * function may only be called from __kfree_rcu().
+
+/* Maximum number of jiffies to wait before draining a batch. */
+#define KFREE_DRAIN_JIFFIES (HZ / 50)
+#define KFREE_N_BATCHES 2
+#define FREE_N_CHANNELS 2
+
+/**
+ * struct kvfree_rcu_bulk_data - single block to store kvfree_rcu() pointers
+ * @nr_records: Number of active pointers in the array
+ * @next: Next bulk object in the block chain
+ * @records: Array of the kvfree_rcu() pointers
  */
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+struct kvfree_rcu_bulk_data {
+	unsigned long nr_records;
+	struct kvfree_rcu_bulk_data *next;
+	void *records[];
+};
+
+/*
+ * This macro defines how many entries the "records" array
+ * will contain. It is based on the fact that the size of
+ * kvfree_rcu_bulk_data structure becomes exactly one page.
+ */
+#define KVFREE_BULK_MAX_ENTR \
+	((PAGE_SIZE - sizeof(struct kvfree_rcu_bulk_data)) / sizeof(void *))
+
+/**
+ * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
+ * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
+ * @head_free: List of kfree_rcu() objects waiting for a grace period
+ * @bkvhead_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
+ * @krcp: Pointer to @kfree_rcu_cpu structure
+ */
+
+struct kfree_rcu_cpu_work {
+	struct rcu_work rcu_work;
+	struct rcu_head *head_free;
+	struct kvfree_rcu_bulk_data *bkvhead_free[FREE_N_CHANNELS];
+	struct kfree_rcu_cpu *krcp;
+};
+
+/**
+ * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
+ * @head: List of kfree_rcu() objects not yet waiting for a grace period
+ * @bkvhead: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
+ * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
+ * @lock: Synchronize access to this structure
+ * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
+ * @monitor_todo: Tracks whether a @monitor_work delayed work is pending
+ * @initialized: The @rcu_work fields have been initialized
+ * @count: Number of objects for which GP not started
+ * @bkvcache:
+ *	A simple cache list that contains objects for reuse purpose.
+ *	In order to save some per-cpu space the list is singular.
+ *	Even though it is lockless an access has to be protected by the
+ *	per-cpu lock.
+ * @page_cache_work: A work to refill the cache when it is empty
+ * @work_in_progress: Indicates that page_cache_work is running
+ * @hrtimer: A hrtimer for scheduling a page_cache_work
+ * @nr_bkv_objs: number of allocated objects at @bkvcache.
+ *
+ * This is a per-CPU structure.  The reason that it is not included in
+ * the rcu_data structure is to permit this code to be extracted from
+ * the RCU files.  Such extraction could allow further optimization of
+ * the interactions with the slab allocators.
+ */
+struct kfree_rcu_cpu {
+	struct rcu_head *head;
+	struct kvfree_rcu_bulk_data *bkvhead[FREE_N_CHANNELS];
+	struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
+	raw_spinlock_t lock;
+	struct delayed_work monitor_work;
+	bool monitor_todo;
+	bool initialized;
+	int count;
+
+	struct work_struct page_cache_work;
+	atomic_t work_in_progress;
+	struct hrtimer hrtimer;
+
+	struct llist_head bkvcache;
+	int nr_bkv_objs;
+};
+
+static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc) = {
+	.lock = __RAW_SPIN_LOCK_UNLOCKED(krc.lock),
+};
+
+static __always_inline void
+debug_rcu_bhead_unqueue(struct kvfree_rcu_bulk_data *bhead)
 {
-	__call_rcu(head, func, 1);
+#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
+	int i;
+
+	for (i = 0; i < bhead->nr_records; i++)
+		debug_rcu_head_unqueue((struct rcu_head *)(bhead->records[i]));
+#endif
 }
-EXPORT_SYMBOL_GPL(kfree_call_rcu);
+
+static inline struct kfree_rcu_cpu *
+krc_this_cpu_lock(unsigned long *flags)
+{
+	struct kfree_rcu_cpu *krcp;
+
+	local_irq_save(*flags);	// For safely calling this_cpu_ptr().
+	krcp = this_cpu_ptr(&krc);
+	raw_spin_lock(&krcp->lock);
+
+	return krcp;
+}
+
+static inline void
+krc_this_cpu_unlock(struct kfree_rcu_cpu *krcp, unsigned long flags)
+{
+	raw_spin_unlock(&krcp->lock);
+	local_irq_restore(flags);
+}
+
+static inline struct kvfree_rcu_bulk_data *
+get_cached_bnode(struct kfree_rcu_cpu *krcp)
+{
+	if (!krcp->nr_bkv_objs)
+		return NULL;
+
+	krcp->nr_bkv_objs--;
+	return (struct kvfree_rcu_bulk_data *)
+		llist_del_first(&krcp->bkvcache);
+}
+
+static inline bool
+put_cached_bnode(struct kfree_rcu_cpu *krcp,
+	struct kvfree_rcu_bulk_data *bnode)
+{
+	// Check the limit.
+	if (krcp->nr_bkv_objs >= rcu_min_cached_objs)
+		return false;
+
+	llist_add((struct llist_node *) bnode, &krcp->bkvcache);
+	krcp->nr_bkv_objs++;
+	return true;
+
+}
+
+/*
+ * This function is invoked in workqueue context after a grace period.
+ * It frees all the objects queued on ->bhead_free or ->head_free.
+ */
+static void kfree_rcu_work(struct work_struct *work)
+{
+	unsigned long flags;
+	struct kvfree_rcu_bulk_data *bkvhead[FREE_N_CHANNELS], *bnext;
+	struct rcu_head *head, *next;
+	struct kfree_rcu_cpu *krcp;
+	struct kfree_rcu_cpu_work *krwp;
+	int i, j;
+
+	krwp = container_of(to_rcu_work(work),
+			    struct kfree_rcu_cpu_work, rcu_work);
+	krcp = krwp->krcp;
+
+	raw_spin_lock_irqsave(&krcp->lock, flags);
+	// Channels 1 and 2.
+	for (i = 0; i < FREE_N_CHANNELS; i++) {
+		bkvhead[i] = krwp->bkvhead_free[i];
+		krwp->bkvhead_free[i] = NULL;
+	}
+
+	// Channel 3.
+	head = krwp->head_free;
+	krwp->head_free = NULL;
+	raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+	// Handle two first channels.
+	for (i = 0; i < FREE_N_CHANNELS; i++) {
+		for (; bkvhead[i]; bkvhead[i] = bnext) {
+			bnext = bkvhead[i]->next;
+			debug_rcu_bhead_unqueue(bkvhead[i]);
+
+			rcu_lock_acquire(&rcu_callback_map);
+			if (i == 0) { // kmalloc() / kfree().
+				trace_rcu_invoke_kfree_bulk_callback(
+					rcu_state.name, bkvhead[i]->nr_records,
+					bkvhead[i]->records);
+
+				kfree_bulk(bkvhead[i]->nr_records,
+					bkvhead[i]->records);
+			} else { // vmalloc() / vfree().
+				for (j = 0; j < bkvhead[i]->nr_records; j++) {
+					trace_rcu_invoke_kvfree_callback(
+						rcu_state.name,
+						bkvhead[i]->records[j], 0);
+
+					vfree(bkvhead[i]->records[j]);
+				}
+			}
+			rcu_lock_release(&rcu_callback_map);
+
+			raw_spin_lock_irqsave(&krcp->lock, flags);
+			if (put_cached_bnode(krcp, bkvhead[i]))
+				bkvhead[i] = NULL;
+			raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+			if (bkvhead[i])
+				free_page((unsigned long) bkvhead[i]);
+
+			cond_resched_tasks_rcu_qs();
+		}
+	}
+
+	/*
+	 * Emergency case only. It can happen under low memory
+	 * condition when an allocation gets failed, so the "bulk"
+	 * path can not be temporary maintained.
+	 */
+	for (; head; head = next) {
+		unsigned long offset = (unsigned long)head->func;
+		void *ptr = (void *)head - offset;
+
+		next = head->next;
+		debug_rcu_head_unqueue((struct rcu_head *)ptr);
+		rcu_lock_acquire(&rcu_callback_map);
+		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
+
+		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
+			kvfree(ptr);
+
+		rcu_lock_release(&rcu_callback_map);
+		cond_resched_tasks_rcu_qs();
+	}
+}
+
+/*
+ * Schedule the kfree batch RCU work to run in workqueue context after a GP.
+ *
+ * This function is invoked by kfree_rcu_monitor() when the KFREE_DRAIN_JIFFIES
+ * timeout has been reached.
+ */
+static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
+{
+	struct kfree_rcu_cpu_work *krwp;
+	bool repeat = false;
+	int i, j;
+
+	lockdep_assert_held(&krcp->lock);
+
+	for (i = 0; i < KFREE_N_BATCHES; i++) {
+		krwp = &(krcp->krw_arr[i]);
+
+		/*
+		 * Try to detach bkvhead or head and attach it over any
+		 * available corresponding free channel. It can be that
+		 * a previous RCU batch is in progress, it means that
+		 * immediately to queue another one is not possible so
+		 * return false to tell caller to retry.
+		 */
+		if ((krcp->bkvhead[0] && !krwp->bkvhead_free[0]) ||
+			(krcp->bkvhead[1] && !krwp->bkvhead_free[1]) ||
+				(krcp->head && !krwp->head_free)) {
+			// Channel 1 corresponds to SLAB ptrs.
+			// Channel 2 corresponds to vmalloc ptrs.
+			for (j = 0; j < FREE_N_CHANNELS; j++) {
+				if (!krwp->bkvhead_free[j]) {
+					krwp->bkvhead_free[j] = krcp->bkvhead[j];
+					krcp->bkvhead[j] = NULL;
+				}
+			}
+
+			// Channel 3 corresponds to emergency path.
+			if (!krwp->head_free) {
+				krwp->head_free = krcp->head;
+				krcp->head = NULL;
+			}
+
+			WRITE_ONCE(krcp->count, 0);
+
+			/*
+			 * One work is per one batch, so there are three
+			 * "free channels", the batch can handle. It can
+			 * be that the work is in the pending state when
+			 * channels have been detached following by each
+			 * other.
+			 */
+			queue_rcu_work(system_wq, &krwp->rcu_work);
+		}
+
+		// Repeat if any "free" corresponding channel is still busy.
+		if (krcp->bkvhead[0] || krcp->bkvhead[1] || krcp->head)
+			repeat = true;
+	}
+
+	return !repeat;
+}
+
+static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
+					  unsigned long flags)
+{
+	// Attempt to start a new batch.
+	krcp->monitor_todo = false;
+	if (queue_kfree_rcu_work(krcp)) {
+		// Success! Our job is done here.
+		raw_spin_unlock_irqrestore(&krcp->lock, flags);
+		return;
+	}
+
+	// Previous RCU batch still in progress, try again later.
+	krcp->monitor_todo = true;
+	schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+	raw_spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+/*
+ * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
+ * It invokes kfree_rcu_drain_unlock() to attempt to start another batch.
+ */
+static void kfree_rcu_monitor(struct work_struct *work)
+{
+	unsigned long flags;
+	struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
+						 monitor_work.work);
+
+	raw_spin_lock_irqsave(&krcp->lock, flags);
+	if (krcp->monitor_todo)
+		kfree_rcu_drain_unlock(krcp, flags);
+	else
+		raw_spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+static enum hrtimer_restart
+schedule_page_work_fn(struct hrtimer *t)
+{
+	struct kfree_rcu_cpu *krcp =
+		container_of(t, struct kfree_rcu_cpu, hrtimer);
+
+	queue_work(system_highpri_wq, &krcp->page_cache_work);
+	return HRTIMER_NORESTART;
+}
+
+static void fill_page_cache_func(struct work_struct *work)
+{
+	struct kvfree_rcu_bulk_data *bnode;
+	struct kfree_rcu_cpu *krcp =
+		container_of(work, struct kfree_rcu_cpu,
+			page_cache_work);
+	unsigned long flags;
+	bool pushed;
+	int i;
+
+	for (i = 0; i < rcu_min_cached_objs; i++) {
+		bnode = (struct kvfree_rcu_bulk_data *)
+			__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
+
+		if (bnode) {
+			raw_spin_lock_irqsave(&krcp->lock, flags);
+			pushed = put_cached_bnode(krcp, bnode);
+			raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+			if (!pushed) {
+				free_page((unsigned long) bnode);
+				break;
+			}
+		}
+	}
+
+	atomic_set(&krcp->work_in_progress, 0);
+}
+
+static void
+run_page_cache_worker(struct kfree_rcu_cpu *krcp)
+{
+	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
+			!atomic_xchg(&krcp->work_in_progress, 1)) {
+		hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC,
+			HRTIMER_MODE_REL);
+		krcp->hrtimer.function = schedule_page_work_fn;
+		hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
+	}
+}
+
+static inline bool
+kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
+{
+	struct kvfree_rcu_bulk_data *bnode;
+	int idx;
+
+	if (unlikely(!krcp->initialized))
+		return false;
+
+	lockdep_assert_held(&krcp->lock);
+	idx = !!is_vmalloc_addr(ptr);
+
+	/* Check if a new block is required. */
+	if (!krcp->bkvhead[idx] ||
+			krcp->bkvhead[idx]->nr_records == KVFREE_BULK_MAX_ENTR) {
+		bnode = get_cached_bnode(krcp);
+		/* Switch to emergency path. */
+		if (!bnode)
+			return false;
+
+		/* Initialize the new block. */
+		bnode->nr_records = 0;
+		bnode->next = krcp->bkvhead[idx];
+
+		/* Attach it to the head. */
+		krcp->bkvhead[idx] = bnode;
+	}
+
+	/* Finally insert. */
+	krcp->bkvhead[idx]->records
+		[krcp->bkvhead[idx]->nr_records++] = ptr;
+
+	return true;
+}
+
+/*
+ * Queue a request for lazy invocation of appropriate free routine after a
+ * grace period. Please note there are three paths are maintained, two are the
+ * main ones that use array of pointers interface and third one is emergency
+ * one, that is used only when the main path can not be maintained temporary,
+ * due to memory pressure.
+ *
+ * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
+ * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
+ * be free'd in workqueue context. This allows us to: batch requests together to
+ * reduce the number of grace periods during heavy kfree_rcu()/kvfree_rcu() load.
+ */
+void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+	unsigned long flags;
+	struct kfree_rcu_cpu *krcp;
+	bool success;
+	void *ptr;
+
+	if (head) {
+		ptr = (void *) head - (unsigned long) func;
+	} else {
+		/*
+		 * Please note there is a limitation for the head-less
+		 * variant, that is why there is a clear rule for such
+		 * objects: it can be used from might_sleep() context
+		 * only. For other places please embed an rcu_head to
+		 * your data.
+		 */
+		might_sleep();
+		ptr = (unsigned long *) func;
+	}
+
+	krcp = krc_this_cpu_lock(&flags);
+
+	// Queue the object but don't yet schedule the batch.
+	if (debug_rcu_head_queue(ptr)) {
+		// Probable double kfree_rcu(), just leak.
+		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
+			  __func__, head);
+
+		// Mark as success and leave.
+		success = true;
+		goto unlock_return;
+	}
+
+	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
+	if (!success) {
+		run_page_cache_worker(krcp);
+
+		if (head == NULL)
+			// Inline if kvfree_rcu(one_arg) call.
+			goto unlock_return;
+
+		head->func = func;
+		head->next = krcp->head;
+		krcp->head = head;
+		success = true;
+	}
+
+	WRITE_ONCE(krcp->count, krcp->count + 1);
+
+	// Set timer to drain after KFREE_DRAIN_JIFFIES.
+	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
+	    !krcp->monitor_todo) {
+		krcp->monitor_todo = true;
+		schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+	}
+
+unlock_return:
+	krc_this_cpu_unlock(krcp, flags);
+
+	/*
+	 * Inline kvfree() after synchronize_rcu(). We can do
+	 * it from might_sleep() context only, so the current
+	 * CPU can pass the QS state.
+	 */
+	if (!success) {
+		debug_rcu_head_unqueue((struct rcu_head *) ptr);
+		synchronize_rcu();
+		kvfree(ptr);
+	}
+}
+EXPORT_SYMBOL_GPL(kvfree_call_rcu);
+
+static unsigned long
+kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
+{
+	int cpu;
+	unsigned long count = 0;
+
+	/* Snapshot count of all CPUs */
+	for_each_possible_cpu(cpu) {
+		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+		count += READ_ONCE(krcp->count);
+	}
+
+	return count;
+}
+
+static unsigned long
+kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
+{
+	int cpu, freed = 0;
+	unsigned long flags;
+
+	for_each_possible_cpu(cpu) {
+		int count;
+		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+		count = krcp->count;
+		raw_spin_lock_irqsave(&krcp->lock, flags);
+		if (krcp->monitor_todo)
+			kfree_rcu_drain_unlock(krcp, flags);
+		else
+			raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+		sc->nr_to_scan -= count;
+		freed += count;
+
+		if (sc->nr_to_scan <= 0)
+			break;
+	}
+
+	return freed == 0 ? SHRINK_STOP : freed;
+}
+
+static struct shrinker kfree_rcu_shrinker = {
+	.count_objects = kfree_rcu_shrink_count,
+	.scan_objects = kfree_rcu_shrink_scan,
+	.batch = 0,
+	.seeks = DEFAULT_SEEKS,
+};
+
+void __init kfree_rcu_scheduler_running(void)
+{
+	int cpu;
+	unsigned long flags;
+
+	for_each_possible_cpu(cpu) {
+		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+		raw_spin_lock_irqsave(&krcp->lock, flags);
+		if (!krcp->head || krcp->monitor_todo) {
+			raw_spin_unlock_irqrestore(&krcp->lock, flags);
+			continue;
+		}
+		krcp->monitor_todo = true;
+		schedule_delayed_work_on(cpu, &krcp->monitor_work,
+					 KFREE_DRAIN_JIFFIES);
+		raw_spin_unlock_irqrestore(&krcp->lock, flags);
+	}
+}
 
 /*
  * During early boot, any blocking grace-period wait automatically
- * implies a grace period.  Later on, this is never the case for PREEMPT.
+ * implies a grace period.  Later on, this is never the case for PREEMPTION.
  *
- * Howevr, because a context switch is a grace period for !PREEMPT, any
+ * Howevr, because a context switch is a grace period for !PREEMPTION, any
  * blocking grace-period wait automatically implies a grace period if
  * there is only one CPU online at any point time during execution of
  * either synchronize_rcu() or synchronize_rcu_expedited().  It is OK to
@@ -2792,11 +3727,14 @@
  * CPU-local state are performed first.  However, we must check for CPU
  * stalls first, else we might not get a chance.
  */
-static int rcu_pending(void)
+static int rcu_pending(int user)
 {
+	bool gp_in_progress;
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	struct rcu_node *rnp = rdp->mynode;
 
+	lockdep_assert_irqs_disabled();
+
 	/* Check for CPU stalls, if enabled. */
 	check_cpu_stall(rdp);
 
@@ -2804,12 +3742,13 @@
 	if (rcu_nocb_need_deferred_wakeup(rdp))
 		return 1;
 
-	/* Is this CPU a NO_HZ_FULL CPU that should ignore RCU? */
-	if (rcu_nohz_full_cpu())
+	/* Is this a nohz_full CPU in userspace or idle?  (Ignore RCU if so.) */
+	if ((user || rcu_is_cpu_rrupt_from_idle()) && rcu_nohz_full_cpu())
 		return 0;
 
 	/* Is the RCU core waiting for a quiescent state from this CPU? */
-	if (rdp->core_needs_qs && !rdp->cpu_no_qs.b.norm)
+	gp_in_progress = rcu_gp_in_progress();
+	if (rdp->core_needs_qs && !rdp->cpu_no_qs.b.norm && gp_in_progress)
 		return 1;
 
 	/* Does this CPU have callbacks ready to invoke? */
@@ -2817,8 +3756,7 @@
 		return 1;
 
 	/* Has RCU gone idle with this CPU needing another grace period? */
-	if (!rcu_gp_in_progress() &&
-	    rcu_segcblist_is_enabled(&rdp->cblist) &&
+	if (!gp_in_progress && rcu_segcblist_is_enabled(&rdp->cblist) &&
 	    (!IS_ENABLED(CONFIG_RCU_NOCB_CPU) ||
 	     !rcu_segcblist_is_offloaded(&rdp->cblist)) &&
 	    !rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
@@ -2846,36 +3784,44 @@
 /*
  * RCU callback function for rcu_barrier().  If we are last, wake
  * up the task executing rcu_barrier().
+ *
+ * Note that the value of rcu_state.barrier_sequence must be captured
+ * before the atomic_dec_and_test().  Otherwise, if this CPU is not last,
+ * other CPUs might count the value down to zero before this CPU gets
+ * around to invoking rcu_barrier_trace(), which might result in bogus
+ * data from the next instance of rcu_barrier().
  */
 static void rcu_barrier_callback(struct rcu_head *rhp)
 {
+	unsigned long __maybe_unused s = rcu_state.barrier_sequence;
+
 	if (atomic_dec_and_test(&rcu_state.barrier_cpu_count)) {
-		rcu_barrier_trace(TPS("LastCB"), -1,
-				   rcu_state.barrier_sequence);
+		rcu_barrier_trace(TPS("LastCB"), -1, s);
 		complete(&rcu_state.barrier_completion);
 	} else {
-		rcu_barrier_trace(TPS("CB"), -1, rcu_state.barrier_sequence);
+		rcu_barrier_trace(TPS("CB"), -1, s);
 	}
 }
 
 /*
  * Called with preemption disabled, and from cross-cpu IRQ context.
  */
-static void rcu_barrier_func(void *unused)
+static void rcu_barrier_func(void *cpu_in)
 {
-	struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
+	uintptr_t cpu = (uintptr_t)cpu_in;
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 
 	rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence);
 	rdp->barrier_head.func = rcu_barrier_callback;
 	debug_rcu_head_queue(&rdp->barrier_head);
 	rcu_nocb_lock(rdp);
 	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
-	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) {
+	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
 		atomic_inc(&rcu_state.barrier_cpu_count);
 	} else {
 		debug_rcu_head_unqueue(&rdp->barrier_head);
 		rcu_barrier_trace(TPS("IRQNQ"), -1,
-				   rcu_state.barrier_sequence);
+				  rcu_state.barrier_sequence);
 	}
 	rcu_nocb_unlock(rdp);
 }
@@ -2890,7 +3836,7 @@
  */
 void rcu_barrier(void)
 {
-	int cpu;
+	uintptr_t cpu;
 	struct rcu_data *rdp;
 	unsigned long s = rcu_seq_snap(&rcu_state.barrier_sequence);
 
@@ -2902,7 +3848,7 @@
 	/* Did someone else do our work for us? */
 	if (rcu_seq_done(&rcu_state.barrier_sequence, s)) {
 		rcu_barrier_trace(TPS("EarlyExit"), -1,
-				   rcu_state.barrier_sequence);
+				  rcu_state.barrier_sequence);
 		smp_mb(); /* caller's subsequent code after above check. */
 		mutex_unlock(&rcu_state.barrier_mutex);
 		return;
@@ -2913,13 +3859,14 @@
 	rcu_barrier_trace(TPS("Inc1"), -1, rcu_state.barrier_sequence);
 
 	/*
-	 * Initialize the count to one rather than to zero in order to
-	 * avoid a too-soon return to zero in case of a short grace period
-	 * (or preemption of this task).  Exclude CPU-hotplug operations
-	 * to ensure that no offline CPU has callbacks queued.
+	 * Initialize the count to two rather than to zero in order
+	 * to avoid a too-soon return to zero in case of an immediate
+	 * invocation of the just-enqueued callback (or preemption of
+	 * this task).  Exclude CPU-hotplug operations to ensure that no
+	 * offline non-offloaded CPU has callbacks queued.
 	 */
 	init_completion(&rcu_state.barrier_completion);
-	atomic_set(&rcu_state.barrier_cpu_count, 1);
+	atomic_set(&rcu_state.barrier_cpu_count, 2);
 	get_online_cpus();
 
 	/*
@@ -2929,16 +3876,26 @@
 	 */
 	for_each_possible_cpu(cpu) {
 		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (!cpu_online(cpu) &&
+		if (cpu_is_offline(cpu) &&
 		    !rcu_segcblist_is_offloaded(&rdp->cblist))
 			continue;
-		if (rcu_segcblist_n_cbs(&rdp->cblist)) {
+		if (rcu_segcblist_n_cbs(&rdp->cblist) && cpu_online(cpu)) {
 			rcu_barrier_trace(TPS("OnlineQ"), cpu,
-					   rcu_state.barrier_sequence);
-			smp_call_function_single(cpu, rcu_barrier_func, NULL, 1);
+					  rcu_state.barrier_sequence);
+			smp_call_function_single(cpu, rcu_barrier_func, (void *)cpu, 1);
+		} else if (rcu_segcblist_n_cbs(&rdp->cblist) &&
+			   cpu_is_offline(cpu)) {
+			rcu_barrier_trace(TPS("OfflineNoCBQ"), cpu,
+					  rcu_state.barrier_sequence);
+			local_irq_disable();
+			rcu_barrier_func((void *)cpu);
+			local_irq_enable();
+		} else if (cpu_is_offline(cpu)) {
+			rcu_barrier_trace(TPS("OfflineNoCBNoQ"), cpu,
+					  rcu_state.barrier_sequence);
 		} else {
 			rcu_barrier_trace(TPS("OnlineNQ"), cpu,
-					   rcu_state.barrier_sequence);
+					  rcu_state.barrier_sequence);
 		}
 	}
 	put_online_cpus();
@@ -2947,7 +3904,7 @@
 	 * Now that we have an rcu_barrier_callback() callback on each
 	 * CPU, and thus each counted, remove the initial count.
 	 */
-	if (atomic_dec_and_test(&rcu_state.barrier_cpu_count))
+	if (atomic_sub_and_test(2, &rcu_state.barrier_cpu_count))
 		complete(&rcu_state.barrier_completion);
 
 	/* Wait for all rcu_barrier_callback() callbacks to be invoked. */
@@ -3000,6 +3957,7 @@
 
 	/* Set up local state, ensuring consistent view of global state. */
 	rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu);
+	INIT_WORK(&rdp->strict_work, strict_work_handler);
 	WARN_ON_ONCE(rdp->dynticks_nesting != 1);
 	WARN_ON_ONCE(rcu_dynticks_in_eqs(rcu_dynticks_snap(rdp)));
 	rdp->rcu_ofl_gp_seq = rcu_state.gp_seq;
@@ -3029,7 +3987,7 @@
 	/* Set up local state, ensuring consistent view of global state. */
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
 	rdp->qlen_last_fqs_check = 0;
-	rdp->n_force_qs_snap = rcu_state.n_force_qs;
+	rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
 	rdp->blimit = blimit;
 	if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
 	    !rcu_segcblist_is_offloaded(&rdp->cblist))
@@ -3046,12 +4004,12 @@
 	rnp = rdp->mynode;
 	raw_spin_lock_rcu_node(rnp);		/* irqs already disabled. */
 	rdp->beenonline = true;	 /* We have now been online. */
-	rdp->gp_seq = rnp->gp_seq;
-	rdp->gp_seq_needed = rnp->gp_seq;
+	rdp->gp_seq = READ_ONCE(rnp->gp_seq);
+	rdp->gp_seq_needed = rdp->gp_seq;
 	rdp->cpu_no_qs.b.norm = true;
 	rdp->core_needs_qs = false;
 	rdp->rcu_iw_pending = false;
-	rdp->rcu_iw_gp_seq = rnp->gp_seq - 1;
+	rdp->rcu_iw_gp_seq = rdp->gp_seq - 1;
 	trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuonl"));
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 	rcu_prepare_kthreads(cpu);
@@ -3089,6 +4047,9 @@
 		return 0; /* Too early in boot for scheduler work. */
 	sync_sched_exp_online_cleanup(cpu);
 	rcutree_affinity_setting(cpu, -1);
+
+	// Stop-machine done, so allow nohz_full to disable tick.
+	tick_dep_clear(TICK_DEP_BIT_RCU);
 	return 0;
 }
 
@@ -3109,11 +4070,12 @@
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 
 	rcutree_affinity_setting(cpu, cpu);
+
+	// nohz_full CPUs need the tick for stop-machine to work quickly
+	tick_dep_set(TICK_DEP_BIT_RCU);
 	return 0;
 }
 
-static DEFINE_PER_CPU(int, rcu_cpu_started);
-
 /*
  * Mark the specified CPU as being online so that subsequent grace periods
  * (both expedited and normal) will wait on it.  Note that this means that
@@ -3129,31 +4091,29 @@
 {
 	unsigned long flags;
 	unsigned long mask;
-	int nbits;
-	unsigned long oldmask;
 	struct rcu_data *rdp;
 	struct rcu_node *rnp;
-
-	if (per_cpu(rcu_cpu_started, cpu))
-		return;
-
-	per_cpu(rcu_cpu_started, cpu) = 1;
+	bool newcpu;
 
 	rdp = per_cpu_ptr(&rcu_data, cpu);
+	if (rdp->cpu_started)
+		return;
+	rdp->cpu_started = true;
+
 	rnp = rdp->mynode;
 	mask = rdp->grpmask;
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
-	rnp->qsmaskinitnext |= mask;
-	oldmask = rnp->expmaskinitnext;
+	WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask);
+	newcpu = !(rnp->expmaskinitnext & mask);
 	rnp->expmaskinitnext |= mask;
-	oldmask ^= rnp->expmaskinitnext;
-	nbits = bitmap_weight(&oldmask, BITS_PER_LONG);
 	/* Allow lockless access for expedited grace periods. */
-	smp_store_release(&rcu_state.ncpus, rcu_state.ncpus + nbits); /* ^^^ */
+	smp_store_release(&rcu_state.ncpus, rcu_state.ncpus + newcpu); /* ^^^ */
+	ASSERT_EXCLUSIVE_WRITER(rcu_state.ncpus);
 	rcu_gpnum_ovf(rnp, rdp); /* Offline-induced counter wrap? */
 	rdp->rcu_onl_gp_seq = READ_ONCE(rcu_state.gp_seq);
 	rdp->rcu_onl_gp_flags = READ_ONCE(rcu_state.gp_flags);
 	if (rnp->qsmask & mask) { /* RCU waiting on incoming CPU? */
+		rcu_disable_urgency_upon_qs(rdp);
 		/* Report QS -after- changing ->qsmaskinitnext! */
 		rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
 	} else {
@@ -3194,11 +4154,11 @@
 		rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
 		raw_spin_lock_irqsave_rcu_node(rnp, flags);
 	}
-	rnp->qsmaskinitnext &= ~mask;
+	WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext & ~mask);
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 	raw_spin_unlock(&rcu_state.ofl_lock);
 
-	per_cpu(rcu_cpu_started, cpu) = 0;
+	rdp->cpu_started = false;
 }
 
 #ifdef CONFIG_HOTPLUG_CPU
@@ -3309,7 +4269,10 @@
 	}
 	rnp = rcu_get_root();
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
-	rcu_state.gp_kthread = t;
+	WRITE_ONCE(rcu_state.gp_activity, jiffies);
+	WRITE_ONCE(rcu_state.gp_req_activity, jiffies);
+	// Reset .gp_activity and .gp_req_activity before setting .gp_kthread.
+	smp_store_release(&rcu_state.gp_kthread, t);  /* ^^^ */
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 	wake_up_process(t);
 	rcu_spawn_nocb_kthreads();
@@ -3539,12 +4502,34 @@
 struct workqueue_struct *rcu_gp_wq;
 struct workqueue_struct *rcu_par_gp_wq;
 
+static void __init kfree_rcu_batch_init(void)
+{
+	int cpu;
+	int i;
+
+	for_each_possible_cpu(cpu) {
+		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+		for (i = 0; i < KFREE_N_BATCHES; i++) {
+			INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
+			krcp->krw_arr[i].krcp = krcp;
+		}
+
+		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
+		INIT_WORK(&krcp->page_cache_work, fill_page_cache_func);
+		krcp->initialized = true;
+	}
+	if (register_shrinker(&kfree_rcu_shrinker))
+		pr_err("Failed to register kfree_rcu() shrinker!\n");
+}
+
 void __init rcu_init(void)
 {
 	int cpu;
 
 	rcu_early_boot_tests();
 
+	kfree_rcu_batch_init();
 	rcu_bootup_announce();
 	rcu_init_geometry();
 	rcu_init_one();
@@ -3571,6 +4556,13 @@
 	rcu_par_gp_wq = alloc_workqueue("rcu_par_gp", WQ_MEM_RECLAIM, 0);
 	WARN_ON(!rcu_par_gp_wq);
 	srcu_init();
+
+	/* Fill in default value for rcutree.qovld boot parameter. */
+	/* -After- the rcu_node ->lock fields are initialized! */
+	if (qovld < 0)
+		qovld_calc = DEFAULT_RCU_QOVLD_MULT * qhimark;
+	else
+		qovld_calc = qovld;
 }
 
 #include "tree_stall.h"
