diff --git a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py index 6bae7effa30..5271e133d2a 100644 --- a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py +++ b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py @@ -122,16 +122,14 @@ def setupConnection(self, parameters=None): try: # Get IP addresses of brokers - # Start with the IPv6, and randomize it - ipv6_addrInfo = socket.getaddrinfo(host, port, socket.AF_INET6, socket.SOCK_STREAM) - random.shuffle(ipv6_addrInfo) - # Same with IPv4 - ipv4_addrInfo = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM) - random.shuffle(ipv4_addrInfo) - - # Create the host_port tuples, keeping the ipv6 in front + addrInfo = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) + random.shuffle(addrInfo) + # Prefer IPv6 while preserving the randomized order within each address family + addrInfo.sort(key=lambda address: address[0] != socket.AF_INET6) + + # Create the host_port tuples, keeping IPv6 in front host_and_ports = [] - for _family, _socktype, _proto, _canonname, sockaddr in ipv6_addrInfo + ipv4_addrInfo: + for _family, _socktype, _proto, _canonname, sockaddr in addrInfo: host_and_ports.append((sockaddr[0], sockaddr[1])) connectionArgs.update({"host_and_ports": host_and_ports}) diff --git a/src/DIRAC/Resources/MessageQueue/test/Test_StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/test/Test_StompMQConnector.py new file mode 100644 index 00000000000..1ee0d1be5c1 --- /dev/null +++ b/src/DIRAC/Resources/MessageQueue/test/Test_StompMQConnector.py @@ -0,0 +1,60 @@ +"""Unit tests for the STOMP message queue connector.""" + +import socket +from unittest import mock + +from DIRAC.Resources.MessageQueue.StompMQConnector import StompMQConnector + + +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.random.shuffle") +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.stomp.Connection") +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo") +def test_setup_connection_with_ipv4_only(getaddrinfo, connection, _shuffle): + getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("192.0.2.1", 61613)), + ] + + result = StompMQConnector().setupConnection({"Host": "mq.example", "VHost": "/"}) + + assert result["OK"] + getaddrinfo.assert_called_once_with("mq.example", 61613, socket.AF_UNSPEC, socket.SOCK_STREAM) + connection.assert_called_once_with( + vhost="/", + keepalive=True, + timeout=60, + heartbeats=(15_000, 15_000), + reconnect_sleep_initial=1, + reconnect_sleep_increase=0.5, + reconnect_sleep_max=120, + reconnect_sleep_jitter=0.1, + reconnect_attempts_max=1e4, + host_and_ports=[("192.0.2.1", 61613)], + ) + + +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.random.shuffle") +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.stomp.Connection") +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo") +def test_setup_connection_prefers_ipv6(getaddrinfo, connection, _shuffle): + getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("192.0.2.1", 61613)), + (socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("2001:db8::1", 61613, 0, 0)), + ] + + result = StompMQConnector().setupConnection({"Host": "mq.example", "VHost": "/"}) + + assert result["OK"] + assert connection.call_args.kwargs["host_and_ports"] == [ + ("2001:db8::1", 61613), + ("192.0.2.1", 61613), + ] + + +@mock.patch("DIRAC.Resources.MessageQueue.StompMQConnector.socket.getaddrinfo") +def test_setup_connection_reports_resolution_failure(getaddrinfo): + getaddrinfo.side_effect = socket.gaierror(socket.EAI_NONAME, "Name or service not known") + + result = StompMQConnector().setupConnection({"Host": "missing.example", "VHost": "/"}) + + assert not result["OK"] + assert "Name or service not known" in result["Message"]