diff --git a/.github/workflows/version_scanner.yml b/.github/workflows/version_scanner.yml index 52f813e67995..078e4259e491 100644 --- a/.github/workflows/version_scanner.yml +++ b/.github/workflows/version_scanner.yml @@ -35,7 +35,7 @@ jobs: # Uses -o to output a detailed, raw CSV to a file # Uses --stdout to print a slim, easier to parse summary to the GitHub Actions UI # Uses --soft-fail to temporarily limit causing CI/CD failures during the migration to full operation. - python scripts/version_scanner/version_scanner.py -d python -v 3.7 --stdout -o version_scanner_output.csv --soft-fail + python scripts/version_scanner/version_scanner.py --matrix-file scripts/version_scanner/matrix.yaml --package-file scripts/version_scanner/example-list-non-generated-packages.txt --stdout -o version_scanner_output.csv --soft-fail - name: Upload CSV Results if: always() diff --git a/scripts/version_scanner/example-list-non-generated-packages.txt b/scripts/version_scanner/example-list-non-generated-packages.txt new file mode 100644 index 000000000000..bfc1e3fe8658 --- /dev/null +++ b/scripts/version_scanner/example-list-non-generated-packages.txt @@ -0,0 +1,31 @@ +packages/bigframes +packages/bigquery-magics +packages/db-dtypes +packages/django-google-spanner +packages/gapic-generator +packages/google-api-core +# packages/google-api-python-client # non-monorepo, ignore for now. +packages/google-auth +packages/google-auth-httplib2 +packages/google-auth-oauthlib +packages/google-cloud-bigquery +packages/pandas-gbq +packages/google-cloud-bigtable +packages/google-cloud-core +packages/google-crc32c +packages/google-cloud-datastore +packages/google-cloud-dns +packages/google-cloud-documentai-toolbox +packages/google-cloud-error-reporting +packages/google-cloud-firestore +packages/google-cloud-logging +packages/google-cloud-ndb +packages/google-cloud-pubsub +packages/google-cloud-runtimeconfig +packages/google-cloud-spanner +packages/google-cloud-storage +packages/google-cloud-testutils +packages/google-resumable-media +packages/proto-plus +packages/sqlalchemy-bigquery +packages/sqlalchemy-spanner diff --git a/scripts/version_scanner/matrix.yaml b/scripts/version_scanner/matrix.yaml new file mode 100644 index 000000000000..0f50b31fd48f --- /dev/null +++ b/scripts/version_scanner/matrix.yaml @@ -0,0 +1,4 @@ +python: + - "3.7" + - "3.8" + - "3.9" diff --git a/scripts/version_scanner/small_package_list.txt b/scripts/version_scanner/small_package_list.txt deleted file mode 100644 index 8c9a4f39e879..000000000000 --- a/scripts/version_scanner/small_package_list.txt +++ /dev/null @@ -1,6 +0,0 @@ -# Example package list for filtering scanning targets via the --package-file option. -packages/google-cloud-access-context-manager -packages/google-cloud-bigtable -packages/google-cloud-biglake-hive -packages/google-cloud-documentai-toolbox -packages/google-cloud-core diff --git a/scripts/version_scanner/tests/integration/test_scanner_integration.py b/scripts/version_scanner/tests/integration/test_scanner_integration.py index 36eff2402d38..3ce6d2cd3ab1 100644 --- a/scripts/version_scanner/tests/integration/test_scanner_integration.py +++ b/scripts/version_scanner/tests/integration/test_scanner_integration.py @@ -32,7 +32,8 @@ def test_integration_scan(tmp_path): "-v", "3.7", "-p", data_dir, "--config", config_path, - "-o", "scanner_report.csv" + "-o", "scanner_report.csv", + "--soft-fail" ] result = subprocess.run(cmd, cwd=tmp_path, capture_output=True, text=True, check=True) diff --git a/scripts/version_scanner/tests/unit/test_version_scanner.py b/scripts/version_scanner/tests/unit/test_version_scanner.py index f76b66d4d55d..820bcc9c20cc 100644 --- a/scripts/version_scanner/tests/unit/test_version_scanner.py +++ b/scripts/version_scanner/tests/unit/test_version_scanner.py @@ -32,6 +32,62 @@ format_for_console ) +@pytest.fixture +def sample_match(): + return { + "file_name": "setup.py", + "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", + "repo_path": "packages/pkg_a/setup.py", + "package_name": "pkg_a", + "rule_name": "python_requires_check", + "line_number": "123", + "matched_string": "3.7", + "context_line": "python_requires = '>=3.7'", + "dependency": "python", + "version": "3.7" + } + + +@pytest.mark.parametrize( + "exception_to_raise, required, silent_missing, expected_exit, expected_output, expected_return", + [ + (None, True, False, False, None, "file content"), # Success + (FileNotFoundError(), True, True, False, None, None), # Silent missing FileNotFoundError + (FileNotFoundError(), True, False, True, "Error: Test_desc not found", None), # Required FileNotFoundError + (FileNotFoundError(), False, False, False, "Warning: Test_desc not found", None), # Optional FileNotFoundError + (PermissionError(), True, False, True, "Error: Permission denied reading test_desc", None), # Required PermissionError + (PermissionError(), False, False, False, "Warning: Permission denied reading test_desc", None), # Optional PermissionError + (IOError("disk full"), True, False, True, "Error reading test_desc", None), # Required IOError + (IOError("disk full"), False, False, False, "Warning: Error reading test_desc", None), # Optional IOError + (ValueError("invalid bytes"), True, False, True, "Error reading test_desc", None), # Required ValueError + (ValueError("invalid bytes"), False, False, False, "Warning: Error reading test_desc", None), # Optional ValueError + ] +) +def test_safe_read_file_scenarios( + capsys, exception_to_raise, required, silent_missing, expected_exit, expected_output, expected_return +): + from version_scanner import _safe_read_file + + if exception_to_raise: + mock_open = mock.mock_open() + mock_open.side_effect = exception_to_raise + else: + mock_open = mock.mock_open(read_data="file content") + + with patch("builtins.open", mock_open): + if expected_exit: + with pytest.raises(SystemExit) as excinfo: + _safe_read_file("dummy.txt", required=required, description="test_desc", silent_missing=silent_missing) + assert excinfo.value.code == 1 + else: + res = _safe_read_file("dummy.txt", required=required, description="test_desc", silent_missing=silent_missing) + assert res == expected_return + + if expected_output: + captured = capsys.readouterr() + assert expected_output in captured.err + + # Test ConfigManager @pytest.mark.parametrize("dependency, version, expected", [ ( @@ -92,6 +148,56 @@ def test_scan_file_ignores_pragma(tmp_path): results = scan_file(str(test_file), rules) assert len(results) == 0 + +def test_scan_file_ignores_targeted_pragma(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.7' # version-scanner: ignore-rule=python_requires_check:3.7\n") + + rules = [ + { + "name": "python_requires_check", + "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]"), + "version": "3.7" + } + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 0 + + +def test_scan_file_handles_non_string_rule_name_and_version(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.7' # version-scanner: ignore-rule=123:3.7\n") + + rules = [ + { + "name": 123, # Integer rule name + "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]"), + "version": 3.7 # Float version + } + ] + + # This should not raise TypeError + results = scan_file(str(test_file), rules) + assert len(results) == 0 + + + +def test_scan_file_does_not_ignore_mismatched_targeted_pragma(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.7' # version-scanner: ignore-rule=python_requires_check:3.8\n") + + rules = [ + { + "name": "python_requires_check", + "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]"), + "version": "3.7" + } + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 1 + def test_scan_file_ignores_next_line(tmp_path): test_file = tmp_path / "test.py" test_file.write_text("# version-scanner: ignore-next-line\npython_requires = '>=3.7'\n") @@ -310,6 +416,53 @@ def test_scan_repository_ignores_version_scanner(tmp_path): assert len(results) == 0 +def test_scan_repository_wildcard_ignores(tmp_path): + # Create files + (tmp_path / "test.jpg").write_text("dummy version 3.7\n") + (tmp_path / "test.py").write_text("python_requires = '>=3.7'\n") + + rules = [ + {"name": "python_requires_check", "pattern": "python_requires\\s*=\\s*['\"]>=3\\.7['\"]"}, + {"name": "explicit_version_string", "pattern": "3\\.7"} + ] + + from version_scanner import scan_repository + # Without ignore + results = scan_repository(str(tmp_path), rules) + assert len(results) >= 2 + + # With wildcard ignore for *.jpg + results_ignored = scan_repository(str(tmp_path), rules, ignore_dirs=["*.jpg"]) + # test.jpg should be ignored completely + for match in results_ignored: + assert not match["file_path"].endswith("test.jpg") + + +def test__should_ignore(): + from version_scanner import _should_ignore + + ignore_patterns = [ + ".git", + "*.jpg", + "packages/pkg_a/.nox", + "*.egg-info" + ] + + # Exact match + assert _should_ignore(".git", ".git", ignore_patterns) is True + # Case insensitivity + assert _should_ignore(".GIT", ".GIT", ignore_patterns) is True + # Wildcard match + assert _should_ignore("some/path/image.jpg", "image.jpg", ignore_patterns) is True + assert _should_ignore("image.JPG", "image.JPG", ignore_patterns) is True + # Subpath match + assert _should_ignore("packages/pkg_a/.nox", ".nox", ignore_patterns) is True + # Wildcard directory match + assert _should_ignore("google_cloud_pubsub.egg-info", "google_cloud_pubsub.egg-info", ignore_patterns) is True + # Negative match + assert _should_ignore("setup.py", "setup.py", ignore_patterns) is False + + def test_load_ignore_file(tmp_path): from version_scanner import load_ignore_file @@ -682,34 +835,13 @@ def test_safe_int(): assert _safe_int(None) == 0 assert _safe_int("abc") == 0 -def test_format_for_raw_csv_handles_empty_line_number(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" - } - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv_handles_empty_line_number(sample_match): + sample_match["line_number"] = "" + formatted = format_for_raw_csv(sample_match) assert formatted["line_number"] == 0 -def test_format_for_raw_csv(): - match = { - "file_name": "setup.py", - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "123", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv(sample_match): + formatted = format_for_raw_csv(sample_match) assert formatted["file_name"] == "setup.py" assert formatted["file_path"] == "google-cloud-python/main/packages/pkg_a/setup.py" @@ -721,38 +853,14 @@ def test_format_for_raw_csv(): assert formatted["dependency"] == "python" assert formatted["version"] == "3.7" -def test_format_for_raw_csv_fallback_filename(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "123", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv_fallback_filename(sample_match): + del sample_match["file_name"] + formatted = format_for_raw_csv(sample_match) assert formatted["file_name"] == "setup.py" -def test_format_for_spreadsheet(): - match = { - "file_name": "setup.py", - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": 123, - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - +def test_format_for_spreadsheet(sample_match): # Without github_repo - formatted_no_repo = format_for_spreadsheet(match) + formatted_no_repo = format_for_spreadsheet(sample_match) assert formatted_no_repo["file_name"] == "setup.py" assert formatted_no_repo["line_number"] == 123 assert formatted_no_repo["matched_string"] == '="3.7"' # Decimal protection formula @@ -760,25 +868,127 @@ def test_format_for_spreadsheet(): assert formatted_no_repo["version"] == "3.7" # With github_repo - formatted_repo = format_for_spreadsheet(match, github_repo="https://github.com/user/repo", branch="main") + formatted_repo = format_for_spreadsheet(sample_match, github_repo="https://github.com/user/repo", branch="main") expected_url = "https://github.com/user/repo/blob/main/packages/pkg_a/setup.py#L123" assert formatted_repo["line_number"] == f'=HYPERLINK("{expected_url}", "123")' assert formatted_repo["matched_string"] == '="3.7"' -def test_format_for_console(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": 123, - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" - } - - log_str = format_for_console(match) +def test_format_for_console(sample_match): + log_str = format_for_console(sample_match) assert "google-cloud-python/main/packages/pkg_a/setup.py:123" in log_str assert "[python_requires_check]" in log_str assert "3.7" in log_str assert "python_requires = " not in log_str # Slim format doesn't print context line + +def test_parse_matrix_file(tmp_path): + from version_scanner import parse_matrix_file + yaml_file = tmp_path / "matrix.yaml" + yaml_file.write_text(""" +python: + - "3.7" + - "3.8" +protobuf: "4.25.8" +""") + targets = parse_matrix_file(str(yaml_file)) + assert targets == [("python", "3.7"), ("python", "3.8"), ("protobuf", "4.25.8")] + +@pytest.mark.parametrize( + "file_content, file_exists", + [ + (None, False), # File not found + ("invalid: {", True), # Invalid YAML + ("- not_a_mapping", True), # Invalid structure (list instead of map) + ("python:\n - null", True), # Invalid version type (null/None value) + ("python:\n - 3.10", True), # Invalid version type (float instead of string in list) + ("python: 3.10", True), # Invalid version type (float instead of string) + ] +) +def test_parse_matrix_file_failures(tmp_path, file_content, file_exists): + from version_scanner import parse_matrix_file + + if file_exists: + yaml_file = tmp_path / "matrix_failures.yaml" + yaml_file.write_text(file_content) + path = str(yaml_file) + else: + path = "nonexistent_file.yaml" + + with pytest.raises(SystemExit) as excinfo: + parse_matrix_file(path) + assert excinfo.value.code == 1 + +def test_scan_repository_multi_targets(tmp_path): + # Setup files in tmp repository + file1 = tmp_path / "packages" / "pkg1" / "setup.py" + file1.parent.mkdir(parents=True) + file1.write_text("python_requires = '>=3.7'\n") + + file2 = tmp_path / "packages" / "pkg2" / "requirements.txt" + file2.parent.mkdir(parents=True) + file2.write_text("protobuf==4.25.8\n") + + # Let's mock a config file with rules for both python and protobuf + config_file = tmp_path / "regex_config.yaml" + config_file.write_text(""" +rules: + - name: python_requires_check + applies_to: + - python + rules: + - python_requires\\s*=\\s*['\"]>={version}['\"] + - name: protobuf_check + applies_to: + - protobuf + rules: + - protobuf=={version} +""") + + from version_scanner import ConfigManager, scan_repository + + targets = [("python", "3.7"), ("protobuf", "4.25.8")] + rules = [] + for dep, ver in targets: + cm = ConfigManager(str(config_file), dep, ver) + rules.extend(cm.load_config()) + + results = scan_repository(str(tmp_path), rules, targets=targets) + + # We should have 2 matches + assert len(results) == 2 + + # Match for python + python_match = [r for r in results if r["dependency"] == "python"] + assert len(python_match) == 1 + assert python_match[0]["version"] == "3.7" + assert python_match[0]["rule_name"] == "python_requires_check" + + # Match for protobuf + protobuf_match = [r for r in results if r["dependency"] == "protobuf"] + assert len(protobuf_match) == 1 + assert protobuf_match[0]["version"] == "4.25.8" + assert protobuf_match[0]["rule_name"] == "protobuf_check" + + +@pytest.mark.parametrize( + "args, expected_error_msg", + [ + # Mixing -m/--matrix-file with -d or -v + (['version_scanner.py', '-m', 'matrix.yaml', '-d', 'python'], "Cannot specify -d/--dependency or -v/--version when using -m/--matrix-file"), + (['version_scanner.py', '-m', 'matrix.yaml', '-v', '3.7'], "Cannot specify -d/--dependency or -v/--version when using -m/--matrix-file"), + (['version_scanner.py', '-m', 'matrix.yaml', '-d', 'python', '-v', '3.7'], "Cannot specify -d/--dependency or -v/--version when using -m/--matrix-file"), + # Missing either -d or -v when not using -m + (['version_scanner.py', '-d', 'python'], "Must specify both -d/--dependency and -v/--version when not using -m/--matrix-file"), + (['version_scanner.py', '-v', '3.7'], "Must specify both -d/--dependency and -v/--version when not using -m/--matrix-file"), + (['version_scanner.py'], "Must specify both -d/--dependency and -v/--version when not using -m/--matrix-file"), + ] +) +def test_main_cli_validation(capsys, args, expected_error_msg): + from version_scanner import main + with mock.patch('sys.argv', args): + with pytest.raises(SystemExit) as excinfo: + main() + assert excinfo.value.code == 2 + captured = capsys.readouterr() + assert expected_error_msg in captured.err + diff --git a/scripts/version_scanner/version_scanner.py b/scripts/version_scanner/version_scanner.py index 484a6eacacae..c86721cfe133 100644 --- a/scripts/version_scanner/version_scanner.py +++ b/scripts/version_scanner/version_scanner.py @@ -20,12 +20,61 @@ import argparse import csv import datetime +import fnmatch import os import re import sys -from typing import Dict, List, Tuple, Any +from typing import Dict, List, Tuple, Any, Optional import yaml + +def _safe_read_file( + file_path: str, + required: bool = True, + description: str = "file", + silent_missing: bool = False +) -> Optional[str]: + """ + Safely reads file content and handles common file errors. + + Args: + file_path: Path to the file. + required: If True, exits the program with code 1 on read failure. + If False, prints a warning (or ignores) and returns None. + description: Description of the file type for error logging. + silent_missing: If True, silently ignores FileNotFoundError (returns None). + + Returns: + The file content string, or None if reading failed/was ignored. + """ + try: + with open(file_path, 'r', encoding='utf-8') as f: + return f.read() + except FileNotFoundError: + if silent_missing: + return None + if required: + print(f"Error: {description.capitalize()} not found: {file_path}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: {description.capitalize()} not found: {file_path}", file=sys.stderr) + return None + except PermissionError: + if required: + print(f"Error: Permission denied reading {description}: {file_path}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: Permission denied reading {description}: {file_path}", file=sys.stderr) + return None + except (IOError, ValueError) as e: + if required: + print(f"Error reading {description} {file_path}: {e}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: Error reading {description} {file_path}: {e}", file=sys.stderr) + return None + + class ConfigManager: """ Handles loading, validation, and interpolation of the regex configuration rules. @@ -106,15 +155,9 @@ def _compute_variables(self) -> Dict[str, str]: def load_config(self) -> List[Dict[str, str]]: """Load and resolve rules from config.""" + content = _safe_read_file(self.config_path, required=True, description="config file") try: - with open(self.config_path, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - except FileNotFoundError: - print(f"Error: Config file not found: {self.config_path}", file=sys.stderr) - sys.exit(1) - except PermissionError: - print(f"Error: Permission denied reading config file: {self.config_path}", file=sys.stderr) - sys.exit(1) + config = yaml.safe_load(content) except yaml.YAMLError as e: print(f"Error parsing config file: {e}", file=sys.stderr) sys.exit(1) @@ -137,7 +180,9 @@ def load_config(self) -> List[Dict[str, str]]: resolved_pattern = template.strip().format(**self.variables) resolved_rules.append({ "name": name, - "pattern": resolved_pattern + "pattern": resolved_pattern, + "dependency": self.dependency, + "version": self.version }) except KeyError as e: print(f"Warning: Missing variable for interpolation in rule {name}: {e}", file=sys.stderr) @@ -169,16 +214,23 @@ def scan_file(file_path: str, compiled_rules: List[Dict[str, re.Pattern]]) -> Li if "version-scanner: ignore-next-line" in line: skip_next = True continue - if "version-scanner: ignore" in line: + if "version-scanner: ignore" in line and "version-scanner: ignore-rule" not in line and "version-scanner: ignore-next-line" not in line: continue for rule in compiled_rules: match = rule["pattern"].search(line) if match: + version = rule.get("version") + if version: + pragma_pattern = rf"version-scanner\s*:\s*ignore-rule\s*=\s*{re.escape(str(rule['name']))}\s*:\s*{re.escape(str(version))}" + if re.search(pragma_pattern, line, re.IGNORECASE): + continue results.append({ "rule_name": rule["name"], "line_number": line_num, "matched_string": match.group(0).strip(), - "context_line": line.strip() + "context_line": line.strip(), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except IOError as e: print(f"Warning: Could not read file {file_path}: {e}", file=sys.stderr) @@ -326,14 +378,12 @@ def load_ignore_file(file_path: str) -> List[str]: Read ignore paths from a file. """ ignore_dirs = [] - try: - with open(file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - ignore_dirs.append(line) - except FileNotFoundError: - pass + content = _safe_read_file(file_path, required=False, silent_missing=True) + if content: + for line in content.splitlines(): + line = line.strip() + if line and not line.startswith('#'): + ignore_dirs.append(line) return ignore_dirs @@ -445,30 +495,44 @@ def read_package_file(file_path: str) -> List[str]: A list of package paths. """ packages = [] - try: - with open(file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - packages.append(line) - except FileNotFoundError: - print(f"Error: Package file not found: {file_path}", file=sys.stderr) - sys.exit(1) - except PermissionError: - print(f"Error: Permission denied reading package file: {file_path}", file=sys.stderr) - sys.exit(1) - except IOError as e: - print(f"Error reading package file: {e}", file=sys.stderr) - sys.exit(1) + content = _safe_read_file(file_path, required=True, description="package file") + if content: + for line in content.splitlines(): + line = line.strip() + if line and not line.startswith('#'): + packages.append(line) return packages +def _should_ignore(rel_path: str, name: str, ignore_patterns: List[str]) -> bool: + """Check if a file or directory matches any of the ignore patterns.""" + if not ignore_patterns: + return False + name_lower = name.lower() + rel_path_norm = rel_path.replace(os.sep, '/').lower() + + for pattern in ignore_patterns: + pattern_lower = pattern.lower() + if '/' in pattern: + if pattern_lower.startswith('/'): + p = pattern_lower[1:] + else: + p = pattern_lower + if fnmatch.fnmatchcase(rel_path_norm, p) or fnmatch.fnmatchcase(rel_path_norm, f"*/{p}"): + return True + else: + if fnmatch.fnmatchcase(name_lower, pattern_lower): + return True + return False + + def scan_repository( root_path: str, - rules: List[Dict[str, str]], + rules: List[Dict[str, Any]], target_packages: List[str] = None, ignore_dirs: List[str] = None, - version_string: str = None + version_string: str = None, + targets: List[Tuple[str, str]] = None ) -> List[Dict[str, Any]]: """ Scans the repository directory tree applying resolved regex patterns to files. @@ -487,21 +551,29 @@ def scan_repository( performs a full recursive scan of the repository. ignore_dirs: Optional list of directory names or glob-like files to ignore (case-insensitive). version_string: Optional target version string (e.g. "3.7") to scan for in filenames. + targets: Optional list of (dependency, version) tuples. Returns: - A list of dictionaries detailing each match: 'file_path', 'repo_path', - 'package_name', 'rule_name', 'line_number', 'matched_string', 'context_line'. + A list of dictionaries detailing each match. """ - ignore_lower = {i.lower() for i in ignore_dirs} if ignore_dirs else set() results = [] + filename_targets = [] + if targets: + filename_targets = targets + elif version_string: + dep = rules[0].get("dependency") if rules else None + filename_targets = [(dep, version_string)] + # Compile patterns once here compiled_rules = [] for rule in rules: try: compiled_rules.append({ "name": rule["name"], - "pattern": re.compile(rule["pattern"], re.IGNORECASE) + "pattern": re.compile(rule["pattern"], re.IGNORECASE), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except re.error as e: print(f"Error compiling regex for rule {rule['name']}: {e}", file=sys.stderr) @@ -512,13 +584,23 @@ def scan_repository( print(f"Filtering for packages: {target_packages}") for root, dirs, files in os.walk(root_path): + rel_root = os.path.relpath(root, root_path) + + # Helper to construct relative path for ignore matching + def get_rel_path(name): + return name if rel_root == "." else os.path.join(rel_root, name) + # Prune ignore directories (case-insensitive) - dirs[:] = [d for d in dirs if d.lower() not in ignore_lower] + dirs[:] = [ + d for d in dirs + if not _should_ignore(get_rel_path(d), d, ignore_dirs) + ] # Filter ignore files (case-insensitive) - files = [f for f in files if f.lower() not in ignore_lower] - - rel_root = os.path.relpath(root, root_path) + files = [ + f for f in files + if not _should_ignore(get_rel_path(f), f, ignore_dirs) + ] # Layout-agnostic generic subdirectory filtering if target_packages: @@ -541,13 +623,16 @@ def scan_repository( matches = scan_file(file_path, compiled_rules) # Add filename match if applicable - if version_string and version_string in file: - matches.append({ - "rule_name": "filename_match", - "line_number": 0, - "matched_string": version_string, - "context_line": f"Filename contains {version_string}" - }) + for dep, ver in filename_targets: + if ver and ver in file: + matches.append({ + "rule_name": "filename_match", + "line_number": 0, + "matched_string": ver, + "context_line": f"Filename contains {ver}", + "dependency": dep or "", + "version": ver + }) # Compute display path and package name rel_file_path = os.path.relpath(file_path, root_path) @@ -576,6 +661,41 @@ def scan_repository( return results +def parse_matrix_file(file_path: str) -> List[Tuple[str, str]]: + """ + Parses a YAML matrix file into a list of (dependency, version) tuples. + """ + content = _safe_read_file(file_path, required=True, description="matrix file") + try: + raw_matrix = yaml.safe_load(content) + except Exception as e: + print(f"Error parsing matrix YAML mapping: {e}", file=sys.stderr) + sys.exit(1) + + if not isinstance(raw_matrix, dict): + print("Error: Matrix file content must resolve to a YAML mapping", file=sys.stderr) + sys.exit(1) + + targets = [] + for dep, versions in raw_matrix.items(): + if isinstance(versions, list): + for v in versions: + if v is None or isinstance(v, (dict, list)): + print(f"Error: Invalid version '{v}' for dependency '{dep}'", file=sys.stderr) + sys.exit(1) + if not isinstance(v, str): + print(f"Error: Version '{v}' for dependency '{dep}' must be specified as a quoted string to prevent YAML parsing issues (e.g., 3.10 parsed as 3.1).", file=sys.stderr) + sys.exit(1) + targets.append((str(dep), v)) + elif isinstance(versions, str): + targets.append((str(dep), versions)) + else: + print(f"Error: Invalid version '{versions}' for dependency '{dep}'. Versions must be specified as quoted strings.", file=sys.stderr) + sys.exit(1) + + return targets + + def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) default_config = os.path.join(script_dir, "regex_config.yaml") @@ -586,16 +706,19 @@ def main(): parser.add_argument( "-d", "--dependency", - required=True, help="Name of the dependency (e.g., python, protobuf)" ) parser.add_argument( "-v", "--version", - required=True, help="Specific version to search for (e.g., 3.7, 4.25.8)" ) + parser.add_argument( + "-m", "--matrix-file", + help="Path to a YAML file containing target dependencies and versions." + ) + parser.add_argument( "-p", "--path", default=".", @@ -659,6 +782,25 @@ def main(): args = parser.parse_args() + # Validation of required inputs + has_matrix_file = bool(args.matrix_file) + if has_matrix_file: + if args.dependency or args.version: + parser.error("Cannot specify -d/--dependency or -v/--version when using -m/--matrix-file") + else: + if not (args.dependency and args.version): + parser.error("Must specify both -d/--dependency and -v/--version when not using -m/--matrix-file") + + targets = [] + if has_matrix_file: + targets = parse_matrix_file(args.matrix_file) + else: + targets = [(args.dependency, args.version)] + + if not targets: + print("Error: No targets resolved to scan.", file=sys.stderr) + sys.exit(1) + # Resolve target packages if filtering is requested target_packages = [] if args.package: @@ -670,7 +812,12 @@ def main(): elif args.package_file: target_packages = read_package_file(args.package_file) - print(f"Starting scan for dependency: {args.dependency} version: {args.version}") + if has_matrix_file: + print("Starting scan for multiple targets:") + for dep, ver in targets: + print(f" - {dep}: {ver}") + else: + print(f"Starting scan for dependency: {args.dependency} version: {args.version}") print(f"Root path: {args.path}") print("Targets to scan:") if target_packages: @@ -681,8 +828,10 @@ def main(): print(f"Using config: {args.config}") # Load and resolve rules - config_manager = ConfigManager(args.config, args.dependency, args.version) - rules = config_manager.load_config() + rules = [] + for dep, ver in targets: + config_manager = ConfigManager(args.config, dep, ver) + rules.extend(config_manager.load_config()) @@ -695,7 +844,14 @@ def main(): print(f"Loaded {len(ignore_dirs)} ignore patterns from {ignore_file_path}") # Scan repository - all_matches = scan_repository(args.path, rules, target_packages, ignore_dirs, version_string=args.version) + all_matches = scan_repository( + args.path, + rules, + target_packages, + ignore_dirs, + version_string=(None if has_matrix_file else args.version), + targets=targets + ) print(f"\nFound {len(all_matches)} matches.") display_matches = all_matches if args.stdout else all_matches[:10] @@ -717,7 +873,11 @@ def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) results_dir = os.path.join(script_dir, "results") os.makedirs(results_dir, exist_ok=True) - output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") + if has_matrix_file: + base_name = os.path.splitext(os.path.basename(args.matrix_file))[0] + output_path = os.path.join(results_dir, f"{base_name}-{timestamp}.csv") + else: + output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") write_csv_report(output_path, all_matches)