Update Linux to v5.10.109

Sourced from [1]

[1] https://cdn.kernel.org/pub/linux/kernel/v5.x/linux-5.10.109.tar.xz

Change-Id: I19bca9fc6762d4e63bcf3e4cba88bbe560d9c76c
Signed-off-by: Olivier Deprez <olivier.deprez@arm.com>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fbbac9b..8d2c985 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -48,12 +48,13 @@
 #include "group.h"
 #include "trace.h"
 
+#define NAGLE_START_INIT	4
+#define NAGLE_START_MAX		1024
 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
 #define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
-#define TIPC_FWD_MSG		1
 #define TIPC_MAX_PORT		0xffffffff
 #define TIPC_MIN_PORT		1
-#define TIPC_ACK_RATE		4       /* ACK at 1/4 of of rcv window size */
+#define TIPC_ACK_RATE		4       /* ACK at 1/4 of rcv window size */
 
 enum {
 	TIPC_LISTEN = TCP_LISTEN,
@@ -75,6 +76,7 @@
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
+ * @maxnagle: maximum size of msg which can be subject to nagle
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * #cong_links: list of congested links
@@ -97,6 +99,7 @@
 	u32 conn_instance;
 	int published;
 	u32 max_pkt;
+	u32 maxnagle;
 	u32 portid;
 	struct tipc_msg phdr;
 	struct list_head cong_links;
@@ -116,6 +119,13 @@
 	struct tipc_mc_method mc_method;
 	struct rcu_head rcu;
 	struct tipc_group *group;
+	u32 oneway;
+	u32 nagle_start;
+	u16 snd_backlog;
+	u16 msg_acc;
+	u16 pkt_cnt;
+	bool expect_ack;
+	bool nodelay;
 	bool group_is_open;
 };
 
@@ -137,6 +147,8 @@
 static void tipc_sk_remove(struct tipc_sock *tsk);
 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
+static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -184,19 +196,19 @@
 	return msg_importance(&tsk->phdr);
 }
 
-static int tsk_set_importance(struct tipc_sock *tsk, int imp)
-{
-	if (imp > TIPC_CRITICAL_IMPORTANCE)
-		return -EINVAL;
-	msg_set_importance(&tsk->phdr, (u32)imp);
-	return 0;
-}
-
 static struct tipc_sock *tipc_sk(const struct sock *sk)
 {
 	return container_of(sk, struct tipc_sock, sk);
 }
 
+int tsk_set_importance(struct sock *sk, int imp)
+{
+	if (imp > TIPC_CRITICAL_IMPORTANCE)
+		return -EINVAL;
+	msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
+	return 0;
+}
+
 static bool tsk_conn_cong(struct tipc_sock *tsk)
 {
 	return tsk->snt_unacked > tsk->snd_win;
@@ -227,6 +239,26 @@
 	return 1;
 }
 
+/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
+ */
+static void tsk_set_nagle(struct tipc_sock *tsk)
+{
+	struct sock *sk = &tsk->sk;
+
+	tsk->maxnagle = 0;
+	if (sk->sk_type != SOCK_STREAM)
+		return;
+	if (tsk->nodelay)
+		return;
+	if (!(tsk->peer_caps & TIPC_NAGLE))
+		return;
+	/* Limit node local buffer size to avoid receive queue overflow */
+	if (tsk->max_pkt == MAX_MSG_SIZE)
+		tsk->maxnagle = 1500;
+	else
+		tsk->maxnagle = tsk->max_pkt;
+}
+
 /**
  * tsk_advance_rx_queue - discard first buffer in socket receive queue
  *
@@ -446,6 +478,8 @@
 
 	tsk = tipc_sk(sk);
 	tsk->max_pkt = MAX_PKT_DEFAULT;
+	tsk->maxnagle = 0;
+	tsk->nagle_start = NAGLE_START_INIT;
 	INIT_LIST_HEAD(&tsk->publications);
 	INIT_LIST_HEAD(&tsk->cong_links);
 	msg = &tsk->phdr;
@@ -512,7 +546,9 @@
 	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
 					    !tsk_conn_cong(tsk)));
 
-	/* Remove any pending SYN message */
+	/* Push out delayed messages if in Nagle mode */
+	tipc_sk_push_backlog(tsk, false);
+	/* Remove pending SYN */
 	__skb_queue_purge(&sk->sk_write_queue);
 
 	/* Remove partially received buffer if any */
@@ -675,7 +711,6 @@
  * tipc_getname - get port ID of socket or peer socket
  * @sock: socket structure
  * @uaddr: area for returned socket address
- * @uaddr_len: area for returned length of socket address
  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
  *
  * Returns 0 on success, errno otherwise
@@ -748,7 +783,7 @@
 	case TIPC_ESTABLISHED:
 		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
 			revents |= EPOLLOUT;
-		/* fall through */
+		fallthrough;
 	case TIPC_LISTEN:
 	case TIPC_CONNECTING:
 		if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
@@ -865,7 +900,7 @@
 
 	/* Build message as chain of buffers */
 	__skb_queue_head_init(&pkts);
-	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
 		return rc;
@@ -1017,7 +1052,7 @@
 
 /**
  * tipc_send_group_bcast - send message to all members in communication group
- * @sk: socket structure
+ * @sock: socket structure
  * @m: message to send
  * @dlen: total length of message data
  * @timeout: timeout to wait for wakeup
@@ -1222,6 +1257,56 @@
 	tipc_sk_rcv(net, inputq);
 }
 
+/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
+ *                         when socket is in Nagle mode
+ */
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
+{
+	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+	struct sk_buff *skb = skb_peek_tail(txq);
+	struct net *net = sock_net(&tsk->sk);
+	u32 dnode = tsk_peer_node(tsk);
+	int rc;
+
+	if (nagle_ack) {
+		tsk->pkt_cnt += skb_queue_len(txq);
+		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
+			tsk->oneway = 0;
+			if (tsk->nagle_start < NAGLE_START_MAX)
+				tsk->nagle_start *= 2;
+			tsk->expect_ack = false;
+			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
+				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
+				 tsk->nagle_start);
+		} else {
+			tsk->nagle_start = NAGLE_START_INIT;
+			if (skb) {
+				msg_set_ack_required(buf_msg(skb));
+				tsk->expect_ack = true;
+			} else {
+				tsk->expect_ack = false;
+			}
+		}
+		tsk->msg_acc = 0;
+		tsk->pkt_cnt = 0;
+	}
+
+	if (!skb || tsk->cong_link_cnt)
+		return;
+
+	/* Do not send SYN again after congestion */
+	if (msg_is_syn(buf_msg(skb)))
+		return;
+
+	if (tsk->msg_acc)
+		tsk->pkt_cnt += skb_queue_len(txq);
+	tsk->snt_unacked += tsk->snd_backlog;
+	tsk->snd_backlog = 0;
+	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
+	if (rc == -ELINKCONG)
+		tsk->cong_link_cnt = 1;
+}
+
 /**
  * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
@@ -1235,7 +1320,7 @@
 	u32 onode = tsk_own_node(tsk);
 	struct sock *sk = &tsk->sk;
 	int mtyp = msg_type(hdr);
-	bool conn_cong;
+	bool was_cong;
 
 	/* Ignore if connection cannot be validated: */
 	if (!tsk_peer_msg(tsk, hdr)) {
@@ -1268,11 +1353,12 @@
 			__skb_queue_tail(xmitq, skb);
 		return;
 	} else if (mtyp == CONN_ACK) {
-		conn_cong = tsk_conn_cong(tsk);
+		was_cong = tsk_conn_cong(tsk);
+		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
 		tsk->snt_unacked -= msg_conn_ack(hdr);
 		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 			tsk->snd_win = msg_adv_win(hdr);
-		if (conn_cong)
+		if (was_cong && !tsk_conn_cong(tsk))
 			sk->sk_write_space(sk);
 	} else if (mtyp != CONN_PROBE_REPLY) {
 		pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1406,7 +1492,7 @@
 	}
 
 	__skb_queue_head_init(&pkts);
-	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, true);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
 		return rc;
@@ -1423,8 +1509,13 @@
 		rc = 0;
 	}
 
-	if (unlikely(syn && !rc))
+	if (unlikely(syn && !rc)) {
 		tipc_set_sk_state(sk, TIPC_CONNECTING);
+		if (dlen && timeout) {
+			timeout = msecs_to_jiffies(timeout);
+			tipc_wait_for_connect(sock, &timeout);
+		}
+	}
 
 	return rc ? rc : dlen;
 }
@@ -1457,21 +1548,22 @@
 	struct sock *sk = sock->sk;
 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	struct sk_buff_head *txq = &sk->sk_write_queue;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
-	struct sk_buff_head pkts;
+	struct sk_buff *skb;
 	u32 dnode = tsk_peer_node(tsk);
+	int maxnagle = tsk->maxnagle;
+	int maxpkt = tsk->max_pkt;
 	int send, sent = 0;
-	int rc = 0;
-
-	__skb_queue_head_init(&pkts);
+	int blocks, rc = 0;
 
 	if (unlikely(dlen > INT_MAX))
 		return -EMSGSIZE;
 
 	/* Handle implicit connection setup */
-	if (unlikely(dest)) {
+	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
 		rc = __tipc_sendmsg(sock, m, dlen);
 		if (dlen && dlen == rc) {
 			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
@@ -1487,21 +1579,48 @@
 					 tipc_sk_connected(sk)));
 		if (unlikely(rc))
 			break;
-
 		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
-		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
-		if (unlikely(rc != send))
-			break;
-
-		trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+		blocks = tsk->snd_backlog;
+		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
+		    send <= maxnagle) {
+			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
+			if (unlikely(rc < 0))
+				break;
+			blocks += rc;
+			tsk->msg_acc++;
+			if (blocks <= 64 && tsk->expect_ack) {
+				tsk->snd_backlog = blocks;
+				sent += send;
+				break;
+			} else if (blocks > 64) {
+				tsk->pkt_cnt += skb_queue_len(txq);
+			} else {
+				skb = skb_peek_tail(txq);
+				if (skb) {
+					msg_set_ack_required(buf_msg(skb));
+					tsk->expect_ack = true;
+				} else {
+					tsk->expect_ack = false;
+				}
+				tsk->msg_acc = 0;
+				tsk->pkt_cnt = 0;
+			}
+		} else {
+			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
+			if (unlikely(rc != send))
+				break;
+			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
+		}
+		trace_tipc_sk_sendstream(sk, skb_peek(txq),
 					 TIPC_DUMP_SK_SNDQ, " ");
-		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
 		if (unlikely(rc == -ELINKCONG)) {
 			tsk->cong_link_cnt = 1;
 			rc = 0;
 		}
 		if (likely(!rc)) {
-			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+			tsk->snt_unacked += blocks;
+			tsk->snd_backlog = 0;
 			sent += send;
 		}
 	} while (sent < dlen && !rc);
@@ -1546,8 +1665,9 @@
 	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
 	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
 	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
-	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
 	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+	tsk_set_nagle(tsk);
 	__skb_queue_purge(&sk->sk_write_queue);
 	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 		return;
@@ -1560,7 +1680,7 @@
 /**
  * tipc_sk_set_orig_addr - capture sender's address for received message
  * @m: descriptor for message info
- * @hdr: received message header
+ * @skb: received message
  *
  * Note: Address is not captured if not requested by receiver.
  */
@@ -1669,22 +1789,21 @@
 	return 0;
 }
 
-static void tipc_sk_send_ack(struct tipc_sock *tsk)
+static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
 {
 	struct sock *sk = &tsk->sk;
-	struct net *net = sock_net(sk);
 	struct sk_buff *skb = NULL;
 	struct tipc_msg *msg;
 	u32 peer_port = tsk_peer_port(tsk);
 	u32 dnode = tsk_peer_node(tsk);
 
 	if (!tipc_sk_connected(sk))
-		return;
+		return NULL;
 	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
 			      dnode, tsk_own_node(tsk), peer_port,
 			      tsk->portid, TIPC_OK);
 	if (!skb)
-		return;
+		return NULL;
 	msg = buf_msg(skb);
 	msg_set_conn_ack(msg, tsk->rcv_unacked);
 	tsk->rcv_unacked = 0;
@@ -1694,7 +1813,19 @@
 		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
 		msg_set_adv_win(msg, tsk->rcv_win);
 	}
-	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+	return skb;
+}
+
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
+{
+	struct sk_buff *skb;
+
+	skb = tipc_sk_build_ack(tsk);
+	if (!skb)
+		return;
+
+	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
+			   msg_link_selector(buf_msg(skb)));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1959,7 +2090,7 @@
 
 		/* Send connection flow control advertisement when applicable */
 		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
-		if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+		if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
 			tipc_sk_send_ack(tsk);
 
 		/* Exit if all requested data or FIN/error received */
@@ -1991,7 +2122,6 @@
 /**
  * tipc_data_ready - wake up threads to indicate messages have been received
  * @sk: socket
- * @len: the length of messages
  */
 static void tipc_data_ready(struct sock *sk)
 {
@@ -2030,6 +2160,7 @@
 		smp_wmb();
 		tsk->cong_link_cnt--;
 		wakeup = true;
+		tipc_sk_push_backlog(tsk, false);
 		break;
 	case GROUP_PROTOCOL:
 		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2052,9 +2183,11 @@
  * tipc_sk_filter_connect - check incoming message for a connection-based socket
  * @tsk: TIPC socket
  * @skb: pointer to message buffer.
+ * @xmitq: for Nagle ACK if any
  * Returns true if message should be added to receive queue, false otherwise
  */
-static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
+static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
+				   struct sk_buff_head *xmitq)
 {
 	struct sock *sk = &tsk->sk;
 	struct net *net = sock_net(sk);
@@ -2069,6 +2202,7 @@
 
 	if (unlikely(msg_mcast(hdr)))
 		return false;
+	tsk->oneway = 0;
 
 	switch (sk->sk_state) {
 	case TIPC_CONNECTING:
@@ -2114,9 +2248,22 @@
 			return true;
 		return false;
 	case TIPC_ESTABLISHED:
+		if (!skb_queue_empty(&sk->sk_write_queue))
+			tipc_sk_push_backlog(tsk, false);
 		/* Accept only connection-based messages sent by peer */
-		if (likely(con_msg && !err && pport == oport && pnode == onode))
+		if (likely(con_msg && !err && pport == oport &&
+			   pnode == onode)) {
+			if (msg_ack_required(hdr)) {
+				struct sk_buff *skb;
+
+				skb = tipc_sk_build_ack(tsk);
+				if (skb) {
+					msg_set_nagle_ack(buf_msg(skb));
+					__skb_queue_tail(xmitq, skb);
+				}
+			}
 			return true;
+		}
 		if (!tsk_peer_msg(tsk, hdr))
 			return false;
 		if (!err)
@@ -2211,7 +2358,7 @@
 	while ((skb = __skb_dequeue(&inputq))) {
 		hdr = buf_msg(skb);
 		limit = rcvbuf_limit(sk, skb);
-		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
+		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
 		    (!sk_conn && msg_connected(hdr)) ||
 		    (!grp && msg_in_group(hdr)))
 			err = TIPC_ERR_NO_PORT;
@@ -2385,10 +2532,12 @@
 			return -ETIMEDOUT;
 		if (signal_pending(current))
 			return sock_intr_errno(*timeo_p);
+		if (sk->sk_state == TIPC_DISCONNECTING)
+			break;
 
 		add_wait_queue(sk_sleep(sk), &wait);
-		done = sk_wait_event(sk, timeo_p,
-				     sk->sk_state != TIPC_CONNECTING, &wait);
+		done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
+				     &wait);
 		remove_wait_queue(sk_sleep(sk), &wait);
 	} while (!done);
 	return 0;
@@ -2476,7 +2625,7 @@
 		 * case is EINPROGRESS, rather than EALREADY.
 		 */
 		res = -EINPROGRESS;
-		/* fall through */
+		fallthrough;
 	case TIPC_CONNECTING:
 		if (!timeout) {
 			if (previous == TIPC_CONNECTING)
@@ -2553,7 +2702,7 @@
 /**
  * tipc_accept - wait for connection request
  * @sock: listening socket
- * @newsock: new socket that is to be connected
+ * @new_sock: new socket that is to be connected
  * @flags: file-related flags associated with socket
  *
  * Returns 0 on success, errno otherwise
@@ -2562,9 +2711,10 @@
 		       bool kern)
 {
 	struct sock *new_sk, *sk = sock->sk;
-	struct sk_buff *buf;
 	struct tipc_sock *new_tsock;
+	struct msghdr m = {NULL,};
 	struct tipc_msg *msg;
+	struct sk_buff *buf;
 	long timeo;
 	int res;
 
@@ -2602,26 +2752,24 @@
 	/* Connect new socket to it's peer */
 	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
 
-	tsk_set_importance(new_tsock, msg_importance(msg));
+	tsk_set_importance(new_sk, msg_importance(msg));
 	if (msg_named(msg)) {
 		new_tsock->conn_type = msg_nametype(msg);
 		new_tsock->conn_instance = msg_nameinst(msg);
 	}
 
 	/*
-	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
-	 * Respond to 'SYN+' by queuing it on new socket.
+	 * Respond to 'SYN-' by discarding it & returning 'ACK'.
+	 * Respond to 'SYN+' by queuing it on new socket & returning 'ACK'.
 	 */
 	if (!msg_data_sz(msg)) {
-		struct msghdr m = {NULL,};
-
 		tsk_advance_rx_queue(sk);
-		__tipc_sendstream(new_sock, &m, 0);
 	} else {
 		__skb_dequeue(&sk->sk_receive_queue);
 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
 		skb_set_owner_r(buf, new_sk);
 	}
+	__tipc_sendstream(new_sock, &m, 0);
 	release_sock(new_sk);
 exit:
 	release_sock(sk);
@@ -2844,7 +2992,7 @@
 	struct tipc_sock *tsk;
 
 	rcu_read_lock();
-	tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
+	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
 	if (tsk)
 		sock_hold(&tsk->sk);
 	rcu_read_unlock();
@@ -2981,7 +3129,7 @@
  * Returns 0 on success, errno otherwise
  */
 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
-			   char __user *ov, unsigned int ol)
+			   sockptr_t ov, unsigned int ol)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
@@ -2999,19 +3147,20 @@
 	case TIPC_SRC_DROPPABLE:
 	case TIPC_DEST_DROPPABLE:
 	case TIPC_CONN_TIMEOUT:
+	case TIPC_NODELAY:
 		if (ol < sizeof(value))
 			return -EINVAL;
-		if (get_user(value, (u32 __user *)ov))
+		if (copy_from_sockptr(&value, ov, sizeof(u32)))
 			return -EFAULT;
 		break;
 	case TIPC_GROUP_JOIN:
 		if (ol < sizeof(mreq))
 			return -EINVAL;
-		if (copy_from_user(&mreq, ov, sizeof(mreq)))
+		if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
 			return -EFAULT;
 		break;
 	default:
-		if (ov || ol)
+		if (!sockptr_is_null(ov) || ol)
 			return -EINVAL;
 	}
 
@@ -3019,7 +3168,7 @@
 
 	switch (opt) {
 	case TIPC_IMPORTANCE:
-		res = tsk_set_importance(tsk, value);
+		res = tsk_set_importance(sk, value);
 		break;
 	case TIPC_SRC_DROPPABLE:
 		if (sock->type != SOCK_STREAM)
@@ -3047,6 +3196,10 @@
 	case TIPC_GROUP_LEAVE:
 		res = tipc_sk_leave(tsk);
 		break;
+	case TIPC_NODELAY:
+		tsk->nodelay = !!value;
+		tsk_set_nagle(tsk);
+		break;
 	default:
 		res = -EINVAL;
 	}
@@ -3590,7 +3743,7 @@
 			if (p->key == *last_publ)
 				break;
 		}
-		if (p->key != *last_publ) {
+		if (list_entry_is_head(p, &tsk->publications, binding_sock)) {
 			/* We never set seq or call nl_dump_check_consistent()
 			 * this means that setting prev_seq here will cause the
 			 * consistence check to fail in the netlink callback
@@ -3628,13 +3781,9 @@
 	struct tipc_sock *tsk;
 
 	if (!tsk_portid) {
-		struct nlattr **attrs;
+		struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
 		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
 
-		err = tipc_nlmsg_parse(cb->nlh, &attrs);
-		if (err)
-			return err;
-
 		if (!attrs[TIPC_NLA_SOCK])
 			return -EINVAL;