Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/DIRAC/Resources/MessageQueue/StompMQConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,14 @@ def setupConnection(self, parameters=None):

try:
# Get IP addresses of brokers
# Start with the IPv6, and randomize it
ipv6_addrInfo = socket.getaddrinfo(host, port, socket.AF_INET6, socket.SOCK_STREAM)
random.shuffle(ipv6_addrInfo)
# Same with IPv4
ipv4_addrInfo = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
random.shuffle(ipv4_addrInfo)

# Create the host_port tuples, keeping the ipv6 in front
addrInfo = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
random.shuffle(addrInfo)
# Prefer IPv6 while preserving the randomized order within each address family
addrInfo.sort(key=lambda address: address[0] != socket.AF_INET6)

# Create the host_port tuples, keeping IPv6 in front
host_and_ports = []
for _family, _socktype, _proto, _canonname, sockaddr in ipv6_addrInfo + ipv4_addrInfo:
for _family, _socktype, _proto, _canonname, sockaddr in addrInfo:
host_and_ports.append((sockaddr[0], sockaddr[1]))

connectionArgs.update({"host_and_ports": host_and_ports})
Expand Down
60 changes: 60 additions & 0 deletions src/DIRAC/Resources/MessageQueue/test/Test_StompMQConnector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Unit tests for the STOMP message queue connector."""

import socket
from unittest import mock

from DIRAC.Resources.MessageQueue.StompMQConnector import StompMQConnector


@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.random.shuffle")
@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.stomp.Connection")
@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo")
def test_setup_connection_with_ipv4_only(getaddrinfo, connection, _shuffle):
getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("192.0.2.1", 61613)),
]

result = StompMQConnector().setupConnection({"Host": "mq.example", "VHost": "/"})

assert result["OK"]
getaddrinfo.assert_called_once_with("mq.example", 61613, socket.AF_UNSPEC, socket.SOCK_STREAM)
connection.assert_called_once_with(
vhost="/",
keepalive=True,
timeout=60,
heartbeats=(15_000, 15_000),
reconnect_sleep_initial=1,
reconnect_sleep_increase=0.5,
reconnect_sleep_max=120,
reconnect_sleep_jitter=0.1,
reconnect_attempts_max=1e4,
host_and_ports=[("192.0.2.1", 61613)],
)


@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.random.shuffle")
@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.stomp.Connection")
@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo")
def test_setup_connection_prefers_ipv6(getaddrinfo, connection, _shuffle):
getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("192.0.2.1", 61613)),
(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("2001:db8::1", 61613, 0, 0)),
]

result = StompMQConnector().setupConnection({"Host": "mq.example", "VHost": "/"})

assert result["OK"]
assert connection.call_args.kwargs["host_and_ports"] == [
("2001:db8::1", 61613),
("192.0.2.1", 61613),
]


@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo")
def test_setup_connection_reports_resolution_failure(getaddrinfo):
getaddrinfo.side_effect = socket.gaierror(socket.EAI_NONAME, "Name or service not known")

result = StompMQConnector().setupConnection({"Host": "missing.example", "VHost": "/"})

assert not result["OK"]
assert "Name or service not known" in result["Message"]
Loading