summaryrefslogtreecommitdiff
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 16:00:03 -0800
commitcb1b728096f54e7408d60fb571944bed00c5b771 (patch)
tree1b1e50e705f4ddd3b36a43ac9067538db3e8233f /net/tipc/socket.c
parent3c724acdd5049907555a831f814bfd5927c3350c (diff)
tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during unicast message reception. Here, we resolve the same problem at multicast reception. We apply the same technique: an input queue serializing the delivery of arriving buffers. The main difference is that here we do it in two steps. First, the broadcast link feeds arriving buffers into the tail of an arrival queue, which head is consumed at the socket level, and where destination lookup is performed. Second, if the lookup is successful, the resulting buffer clones are fed into a second queue, the input queue. This queue is consumed at reception in the socket just like in the unicast case. Both queues are protected by the same lock, -the one of the input queue. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c72
1 files changed, 44 insertions, 28 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
return rc;
}
-/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+/**
+ * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
+ * @arrvq: queue with arriving messages, to be cloned after destination lookup
+ * @inputq: queue with cloned messages, delivered to socket after dest lookup
+ *
+ * Multi-threaded: parallel calls with reference to same queues may occur
*/
-void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb)
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+ struct sk_buff_head *inputq)
{
- struct tipc_msg *msg = buf_msg(skb);
+ struct tipc_msg *msg;
struct tipc_plist dports;
- struct sk_buff *cskb;
u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE;
- struct sk_buff_head msgq;
- uint hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+ struct sk_buff_head tmpq;
+ uint hsz;
+ struct sk_buff *skb, *_skb;
- skb_queue_head_init(&msgq);
+ __skb_queue_head_init(&tmpq);
tipc_plist_init(&dports);
- if (in_own_node(net, msg_orignode(msg)))
- scope = TIPC_NODE_SCOPE;
-
- if (unlikely(!msg_mcast(msg))) {
- pr_warn("Received non-multicast msg in multicast\n");
- goto exit;
- }
- /* Create destination port list: */
- tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
- msg_nameupper(msg), scope, &dports);
- portid = tipc_plist_pop(&dports);
- for (; portid; portid = tipc_plist_pop(&dports)) {
- cskb = __pskb_copy(skb, hsz, GFP_ATOMIC);
- if (!cskb) {
- pr_warn("Failed do clone mcast rcv buffer\n");
- continue;
+ skb = tipc_skb_peek(arrvq, &inputq->lock);
+ for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
+ msg = buf_msg(skb);
+ hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+
+ if (in_own_node(net, msg_orignode(msg)))
+ scope = TIPC_NODE_SCOPE;
+
+ /* Create destination port list and message clones: */
+ tipc_nametbl_mc_translate(net,
+ msg_nametype(msg), msg_namelower(msg),
+ msg_nameupper(msg), scope, &dports);
+ portid = tipc_plist_pop(&dports);
+ for (; portid; portid = tipc_plist_pop(&dports)) {
+ _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+ if (_skb) {
+ msg_set_destport(buf_msg(_skb), portid);
+ __skb_queue_tail(&tmpq, _skb);
+ continue;
+ }
+ pr_warn("Failed to clone mcast rcv buffer\n");
}
- msg_set_destport(buf_msg(cskb), portid);
- skb_queue_tail(&msgq, cskb);
+ /* Append to inputq if not already done by other thread */
+ spin_lock_bh(&inputq->lock);
+ if (skb_peek(arrvq) == skb) {
+ skb_queue_splice_tail_init(&tmpq, inputq);
+ kfree_skb(__skb_dequeue(arrvq));
+ }
+ spin_unlock_bh(&inputq->lock);
+ __skb_queue_purge(&tmpq);
+ kfree_skb(skb);
}
- tipc_sk_rcv(net, &msgq);
-exit:
- kfree_skb(skb);
+ tipc_sk_rcv(net, inputq);
}
/**