diff --git a/docs/source/en/_toctree.yml b/docs/source/en/_toctree.yml index 93d9886d85..37b6fa60d2 100644 --- a/docs/source/en/_toctree.yml +++ b/docs/source/en/_toctree.yml @@ -747,6 +747,8 @@ title: DPT - local: model_doc/efficientformer title: EfficientFormer + - local: model_doc/efficientloftr + title: EfficientLoFTR - local: model_doc/efficientnet title: EfficientNet - local: model_doc/eomt diff --git a/docs/source/en/model_doc/auto.md b/docs/source/en/model_doc/auto.md index 0a36c7c0a1..326d8e2d2d 100644 --- a/docs/source/en/model_doc/auto.md +++ b/docs/source/en/model_doc/auto.md @@ -258,6 +258,10 @@ The following auto classes are available for the following computer vision tasks [[autodoc]] AutoModelForKeypointDetection +### AutoModelForKeypointMatching + +[[autodoc]] AutoModelForKeypointMatching + ### AutoModelForMaskedImageModeling [[autodoc]] AutoModelForMaskedImageModeling diff --git a/docs/source/en/model_doc/efficientloftr.md b/docs/source/en/model_doc/efficientloftr.md new file mode 100644 index 0000000000..0e84d94c10 --- /dev/null +++ b/docs/source/en/model_doc/efficientloftr.md @@ -0,0 +1,114 @@ + + +# EfficientLoFTR + +
+PyTorch +
+ +## Overview + +The EfficientLoFTR model was proposed in [Efficient LoFTR: Semi-Dense Local Feature Matching with Sparse-Like Speed](https://arxiv.org/abs/2403.04765) by Yifan Wang, Xingyi He, Sida Peng, Dongli Tan and Xiaowei Zhou. + +This model consists of matching two images together by finding pixel correspondences. It can be used to estimate the pose between them. +This model is useful for tasks such as image matching, homography estimation, etc. + +The abstract from the paper is the following: + +*We present a novel method for efficiently producing semidense matches across images. Previous detector-free matcher +LoFTR has shown remarkable matching capability in handling large-viewpoint change and texture-poor scenarios but suffers +from low efficiency. We revisit its design choices and derive multiple improvements for both efficiency and accuracy. +One key observation is that performing the transformer over the entire feature map is redundant due to shared local +information, therefore we propose an aggregated attention mechanism with adaptive token selection for efficiency. +Furthermore, we find spatial variance exists in LoFTR’s fine correlation module, which is adverse to matching accuracy. +A novel two-stage correlation layer is proposed to achieve accurate subpixel correspondences for accuracy improvement. +Our efficiency optimized model is ∼ 2.5× faster than LoFTR which can even surpass state-of-the-art efficient sparse +matching pipeline SuperPoint + LightGlue. Moreover, extensive experiments show that our method can achieve higher +accuracy compared with competitive semi-dense matchers, with considerable efficiency benefits. This opens up exciting +prospects for large-scale or latency-sensitive applications such as image retrieval and 3D reconstruction. +Project page: [https://zju3dv.github.io/efficientloftr/](https://zju3dv.github.io/efficientloftr/).* + +## How to use + +Here is a quick example of using the model. +```python +import torch + +from transformers import AutoImageProcessor, AutoModelForKeypointMatching +from transformers.image_utils import load_image + + +image1 = load_image("https://raw.githubusercontent.com/magicleap/SuperGluePretrainedNetwork/refs/heads/master/assets/phototourism_sample_images/united_states_capitol_98169888_3347710852.jpg") +image2 = load_image("https://raw.githubusercontent.com/magicleap/SuperGluePretrainedNetwork/refs/heads/master/assets/phototourism_sample_images/united_states_capitol_26757027_6717084061.jpg") + +images = [image1, image2] + +processor = AutoImageProcessor.from_pretrained("stevenbucaille/efficientloftr") +model = AutoModelForKeypointMatching.from_pretrained("stevenbucaille/efficientloftr") + +inputs = processor(images, return_tensors="pt") +with torch.no_grad(): + outputs = model(**inputs) +``` + +You can use the `post_process_keypoint_matching` method from the `ImageProcessor` to get the keypoints and matches in a more readable format: + +```python +image_sizes = [[(image.height, image.width) for image in images]] +outputs = processor.post_process_keypoint_matching(outputs, image_sizes, threshold=0.2) +for i, output in enumerate(outputs): + print("For the image pair", i) + for keypoint0, keypoint1, matching_score in zip( + output["keypoints0"], output["keypoints1"], output["matching_scores"] + ): + print( + f"Keypoint at coordinate {keypoint0.numpy()} in the first image matches with keypoint at coordinate {keypoint1.numpy()} in the second image with a score of {matching_score}." + ) +``` + +From the post processed outputs, you can visualize the matches between the two images using the following code: +```python +images_with_matching = processor.visualize_keypoint_matching(images, outputs) +``` + +![image/png](https://cdn-uploads.huggingface.co/production/uploads/632885ba1558dac67c440aa8/2nJZQlFToCYp_iLurvcZ4.png) + +This model was contributed by [stevenbucaille](https://huggingface.co/stevenbucaille). +The original code can be found [here](https://github.com/zju3dv/EfficientLoFTR). + +## EfficientLoFTRConfig + +[[autodoc]] EfficientLoFTRConfig + +## EfficientLoFTRImageProcessor + +[[autodoc]] EfficientLoFTRImageProcessor + +- preprocess +- post_process_keypoint_matching +- visualize_keypoint_matching + +## EfficientLoFTRModel + +[[autodoc]] EfficientLoFTRModel + +- forward + +## EfficientLoFTRForKeypointMatching + +[[autodoc]] EfficientLoFTRForKeypointMatching + +- forward \ No newline at end of file diff --git a/src/transformers/models/__init__.py b/src/transformers/models/__init__.py index ec8c4dfc8f..738d2ab83c 100644 --- a/src/transformers/models/__init__.py +++ b/src/transformers/models/__init__.py @@ -102,6 +102,7 @@ if TYPE_CHECKING: from .dots1 import * from .dpr import * from .dpt import * + from .efficientloftr import * from .efficientnet import * from .electra import * from .emu3 import * diff --git a/src/transformers/models/auto/configuration_auto.py b/src/transformers/models/auto/configuration_auto.py index b4642b9caf..78c2ad034b 100644 --- a/src/transformers/models/auto/configuration_auto.py +++ b/src/transformers/models/auto/configuration_auto.py @@ -121,6 +121,7 @@ CONFIG_MAPPING_NAMES = OrderedDict[str, str]( ("dpr", "DPRConfig"), ("dpt", "DPTConfig"), ("efficientformer", "EfficientFormerConfig"), + ("efficientloftr", "EfficientLoFTRConfig"), ("efficientnet", "EfficientNetConfig"), ("electra", "ElectraConfig"), ("emu3", "Emu3Config"), @@ -515,6 +516,7 @@ MODEL_NAMES_MAPPING = OrderedDict[str, str]( ("dpr", "DPR"), ("dpt", "DPT"), ("efficientformer", "EfficientFormer"), + ("efficientloftr", "EfficientLoFTR"), ("efficientnet", "EfficientNet"), ("electra", "ELECTRA"), ("emu3", "Emu3"), diff --git a/src/transformers/models/auto/image_processing_auto.py b/src/transformers/models/auto/image_processing_auto.py index 84e6a75b16..775d94b25b 100644 --- a/src/transformers/models/auto/image_processing_auto.py +++ b/src/transformers/models/auto/image_processing_auto.py @@ -85,6 +85,7 @@ else: ("donut-swin", ("DonutImageProcessor", "DonutImageProcessorFast")), ("dpt", ("DPTImageProcessor", "DPTImageProcessorFast")), ("efficientformer", ("EfficientFormerImageProcessor",)), + ("efficientloftr", ("EfficientLoFTRImageProcessor",)), ("efficientnet", ("EfficientNetImageProcessor", "EfficientNetImageProcessorFast")), ("eomt", ("EomtImageProcessor", "EomtImageProcessorFast")), ("flava", ("FlavaImageProcessor", "FlavaImageProcessorFast")), diff --git a/src/transformers/models/auto/modeling_auto.py b/src/transformers/models/auto/modeling_auto.py index 36aaec4d53..20f039b22b 100644 --- a/src/transformers/models/auto/modeling_auto.py +++ b/src/transformers/models/auto/modeling_auto.py @@ -114,6 +114,7 @@ MODEL_MAPPING_NAMES = OrderedDict( ("dpr", "DPRQuestionEncoder"), ("dpt", "DPTModel"), ("efficientformer", "EfficientFormerModel"), + ("efficientloftr", "EfficientLoFTRModel"), ("efficientnet", "EfficientNetModel"), ("electra", "ElectraModel"), ("emu3", "Emu3Model"), @@ -322,7 +323,6 @@ MODEL_MAPPING_NAMES = OrderedDict( ("squeezebert", "SqueezeBertModel"), ("stablelm", "StableLmModel"), ("starcoder2", "Starcoder2Model"), - ("superglue", "SuperGlueForKeypointMatching"), ("swiftformer", "SwiftFormerModel"), ("swin", "SwinModel"), ("swin2sr", "Swin2SRModel"), @@ -1607,6 +1607,13 @@ MODEL_FOR_KEYPOINT_DETECTION_MAPPING_NAMES = OrderedDict( ] ) +MODEL_FOR_KEYPOINT_MATCHING_MAPPING_NAMES = OrderedDict( + [ + ("efficientloftr", "EfficientLoFTRForKeypointMatching"), + ("lightglue", "LightGlueForKeypointMatching"), + ("superglue", "SuperGlueForKeypointMatching"), + ] +) MODEL_FOR_TEXT_ENCODING_MAPPING_NAMES = OrderedDict( [ @@ -1768,6 +1775,8 @@ MODEL_FOR_KEYPOINT_DETECTION_MAPPING = _LazyAutoMapping( CONFIG_MAPPING_NAMES, MODEL_FOR_KEYPOINT_DETECTION_MAPPING_NAMES ) +MODEL_FOR_KEYPOINT_MATCHING_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, MODEL_FOR_KEYPOINT_MATCHING_MAPPING_NAMES) + MODEL_FOR_TEXT_ENCODING_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, MODEL_FOR_TEXT_ENCODING_MAPPING_NAMES) MODEL_FOR_TIME_SERIES_CLASSIFICATION_MAPPING = _LazyAutoMapping( @@ -1795,6 +1804,10 @@ class AutoModelForKeypointDetection(_BaseAutoModelClass): _model_mapping = MODEL_FOR_KEYPOINT_DETECTION_MAPPING +class AutoModelForKeypointMatching(_BaseAutoModelClass): + _model_mapping = MODEL_FOR_KEYPOINT_MATCHING_MAPPING + + class AutoModelForTextEncoding(_BaseAutoModelClass): _model_mapping = MODEL_FOR_TEXT_ENCODING_MAPPING @@ -2151,6 +2164,7 @@ __all__ = [ "MODEL_FOR_IMAGE_SEGMENTATION_MAPPING", "MODEL_FOR_IMAGE_TO_IMAGE_MAPPING", "MODEL_FOR_KEYPOINT_DETECTION_MAPPING", + "MODEL_FOR_KEYPOINT_MATCHING_MAPPING", "MODEL_FOR_INSTANCE_SEGMENTATION_MAPPING", "MODEL_FOR_MASKED_IMAGE_MODELING_MAPPING", "MODEL_FOR_MASKED_LM_MAPPING", @@ -2196,6 +2210,7 @@ __all__ = [ "AutoModelForImageToImage", "AutoModelForInstanceSegmentation", "AutoModelForKeypointDetection", + "AutoModelForKeypointMatching", "AutoModelForMaskGeneration", "AutoModelForTextEncoding", "AutoModelForMaskedImageModeling", diff --git a/src/transformers/models/efficientloftr/__init__.py b/src/transformers/models/efficientloftr/__init__.py new file mode 100644 index 0000000000..5ded8084c3 --- /dev/null +++ b/src/transformers/models/efficientloftr/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +from ...utils import _LazyModule +from ...utils.import_utils import define_import_structure + + +if TYPE_CHECKING: + from .configuration_efficientloftr import * + from .image_processing_efficientloftr import * + from .modeling_efficientloftr import * +else: + import sys + + _file = globals()["__file__"] + sys.modules[__name__] = _LazyModule(__name__, _file, define_import_structure(_file), module_spec=__spec__) diff --git a/src/transformers/models/efficientloftr/configuration_efficientloftr.py b/src/transformers/models/efficientloftr/configuration_efficientloftr.py new file mode 100644 index 0000000000..d3e4acde4f --- /dev/null +++ b/src/transformers/models/efficientloftr/configuration_efficientloftr.py @@ -0,0 +1,203 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from ...configuration_utils import PretrainedConfig +from ...modeling_rope_utils import rope_config_validation + + +class EfficientLoFTRConfig(PretrainedConfig): + r""" + This is the configuration class to store the configuration of a [`EffientLoFTRFromKeypointMatching`]. + It is used to instantiate a EfficientLoFTR model according to the specified arguments, defining the model + architecture. Instantiating a configuration with the defaults will yield a similar configuration to that of the + EfficientLoFTR [zju-community/efficientloftr](https://huggingface.co/zju-community/efficientloftr) architecture. + + Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the + documentation from [`PretrainedConfig`] for more information. + + Args: + stage_num_blocks (`List`, *optional*, defaults to [1, 2, 4, 14]): + The number of blocks in each stages + out_features (`List`, *optional*, defaults to [64, 64, 128, 256]): + The number of channels in each stage + stage_stride (`List`, *optional*, defaults to [2, 1, 2, 2]): + The stride used in each stage + hidden_size (`int`, *optional*, defaults to 256): + The dimension of the descriptors. + activation_function (`str`, *optional*, defaults to `"relu"`): + The activation function used in the backbone + q_aggregation_kernel_size (`int`, *optional*, defaults to 4): + The kernel size of the aggregation of query states in the fusion network + kv_aggregation_kernel_size (`int`, *optional*, defaults to 4): + The kernel size of the aggregation of key and value states in the fusion network + q_aggregation_stride (`int`, *optional*, defaults to 4): + The stride of the aggregation of query states in the fusion network + kv_aggregation_stride (`int`, *optional*, defaults to 4): + The stride of the aggregation of key and value states in the fusion network + num_attention_layers (`int`, *optional*, defaults to 4): + Number of attention layers in the LocalFeatureTransformer + num_attention_heads (`int`, *optional*, defaults to 8): + The number of heads in the GNN layers. + attention_dropout (`float`, *optional*, defaults to 0.0): + The dropout ratio for the attention probabilities. + attention_bias (`bool`, *optional*, defaults to `False`): + Whether to use a bias in the query, key, value and output projection layers during attention. + mlp_activation_function (`str`, *optional*, defaults to `"leaky_relu"`): + Activation function used in the attention mlp layer. + coarse_matching_skip_softmax (`bool`, *optional*, defaults to `False`): + Whether to skip softmax or not at the coarse matching step. + coarse_matching_threshold (`float`, *optional*, defaults to 0.2): + The threshold for the minimum score required for a match. + coarse_matching_temperature (`float`, *optional*, defaults to 0.1): + The temperature to apply to the coarse similarity matrix + coarse_matching_border_removal (`int`, *optional*, defaults to 2): + The size of the border to remove during coarse matching + fine_kernel_size (`int`, *optional*, defaults to 8): + Kernel size used for the fine feature matching + batch_norm_eps (`float`, *optional*, defaults to 1e-05): + The epsilon used by the batch normalization layers. + embedding_size (`List`, *optional*, defaults to [15, 20]): + The size (height, width) of the embedding for the position embeddings. + rope_theta (`float`, *optional*, defaults to 10000.0): + The base period of the RoPE embeddings. + partial_rotary_factor (`float`, *optional*, defaults to 4.0): + Dim factor for the RoPE embeddings, in EfficientLoFTR, frequencies should be generated for + the whole hidden_size, so this factor is used to compensate. + rope_scaling (`Dict`, *optional*): + Dictionary containing the scaling configuration for the RoPE embeddings. NOTE: if you apply new rope type + and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value + accordingly. + Expected contents: + `rope_type` (`str`): + The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope', + 'llama3', '2d'], with 'default' being the original RoPE implementation. + `dim` (`int`): The dimension of the RoPE embeddings. + fine_matching_slice_dim (`int`, *optional*, defaults to 8): + The size of the slice used to divide the fine features for the first and second fine matching stages. + fine_matching_regress_temperature (`float`, *optional*, defaults to 10.0): + The temperature to apply to the fine similarity matrix + initializer_range (`float`, *optional*, defaults to 0.02): + The standard deviation of the truncated_normal_initializer for initializing all weight matrices. + + Examples: + ```python + >>> from transformers import EfficientLoFTRConfig, EfficientLoFTRForKeypointMatching + + >>> # Initializing a EfficientLoFTR configuration + >>> configuration = EfficientLoFTRConfig() + + >>> # Initializing a model from the EfficientLoFTR configuration + >>> model = EfficientLoFTRForKeypointMatching(configuration) + + >>> # Accessing the model configuration + >>> configuration = model.config + ``` + """ + + model_type = "efficientloftr" + + def __init__( + self, + stage_num_blocks: Optional[list[int]] = None, + out_features: Optional[list[int]] = None, + stage_stride: Optional[list[int]] = None, + hidden_size: int = 256, + activation_function: str = "relu", + q_aggregation_kernel_size: int = 4, + kv_aggregation_kernel_size: int = 4, + q_aggregation_stride: int = 4, + kv_aggregation_stride: int = 4, + num_attention_layers: int = 4, + num_attention_heads: int = 8, + attention_dropout: float = 0.0, + attention_bias: bool = False, + mlp_activation_function: str = "leaky_relu", + coarse_matching_skip_softmax: bool = False, + coarse_matching_threshold: float = 0.2, + coarse_matching_temperature: float = 0.1, + coarse_matching_border_removal: int = 2, + fine_kernel_size: int = 8, + batch_norm_eps: float = 1e-5, + embedding_size: Optional[list[int]] = None, + rope_theta: float = 10000.0, + partial_rotary_factor: float = 4.0, + rope_scaling: Optional[dict] = None, + fine_matching_slice_dim: int = 8, + fine_matching_regress_temperature: float = 10.0, + initializer_range: float = 0.02, + **kwargs, + ): + # Stage level of RepVGG + self.stage_num_blocks = stage_num_blocks if stage_num_blocks is not None else [1, 2, 4, 14] + self.stage_stride = stage_stride if stage_stride is not None else [2, 1, 2, 2] + self.out_features = out_features if out_features is not None else [64, 64, 128, 256] + self.stage_in_channels = [1] + self.out_features[:-1] + + # Block level of RepVGG + self.stage_block_stride = [ + [stride] + [1] * (num_blocks - 1) for stride, num_blocks in zip(self.stage_stride, self.stage_num_blocks) + ] + self.stage_block_out_channels = [ + [self.out_features[stage_idx]] * num_blocks for stage_idx, num_blocks in enumerate(self.stage_num_blocks) + ] + self.stage_block_in_channels = [ + [self.stage_in_channels[stage_idx]] + self.stage_block_out_channels[stage_idx][:-1] + for stage_idx in range(len(self.stage_num_blocks)) + ] + + # Fine matching level of EfficientLoFTR + self.fine_fusion_dims = list(reversed(self.out_features))[:-1] + + self.hidden_size = hidden_size + if self.hidden_size != self.out_features[-1]: + raise ValueError( + f"hidden_size should be equal to the last value in out_features. hidden_size = {self.hidden_size}, out_features = {self.stage_out_channels}" + ) + + self.activation_function = activation_function + self.q_aggregation_kernel_size = q_aggregation_kernel_size + self.kv_aggregation_kernel_size = kv_aggregation_kernel_size + self.q_aggregation_stride = q_aggregation_stride + self.kv_aggregation_stride = kv_aggregation_stride + self.num_attention_layers = num_attention_layers + self.num_attention_heads = num_attention_heads + self.attention_dropout = attention_dropout + self.attention_bias = attention_bias + self.intermediate_size = self.hidden_size * 2 + self.mlp_activation_function = mlp_activation_function + self.coarse_matching_skip_softmax = coarse_matching_skip_softmax + self.coarse_matching_threshold = coarse_matching_threshold + self.coarse_matching_temperature = coarse_matching_temperature + self.coarse_matching_border_removal = coarse_matching_border_removal + self.fine_kernel_size = fine_kernel_size + self.batch_norm_eps = batch_norm_eps + self.fine_matching_slice_dim = fine_matching_slice_dim + self.fine_matching_regress_temperature = fine_matching_regress_temperature + + self.num_key_value_heads = num_attention_heads + self.embedding_size = embedding_size if embedding_size is not None else [15, 20] + self.rope_theta = rope_theta + self.rope_scaling = rope_scaling if rope_scaling is not None else {"rope_type": "default"} + + # for compatibility with "default" rope type + self.partial_rotary_factor = partial_rotary_factor + rope_config_validation(self) + + self.initializer_range = initializer_range + + super().__init__(**kwargs) + + +__all__ = ["EfficientLoFTRConfig"] diff --git a/src/transformers/models/efficientloftr/convert_efficientloftr_to_hf.py b/src/transformers/models/efficientloftr/convert_efficientloftr_to_hf.py new file mode 100644 index 0000000000..d15d07dbb8 --- /dev/null +++ b/src/transformers/models/efficientloftr/convert_efficientloftr_to_hf.py @@ -0,0 +1,257 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +import gc +import os +import re + +import torch +from datasets import load_dataset +from huggingface_hub import hf_hub_download + +from transformers.models.efficientloftr.image_processing_efficientloftr import EfficientLoFTRImageProcessor +from transformers.models.efficientloftr.modeling_efficientloftr import ( + EfficientLoFTRConfig, + EfficientLoFTRForKeypointMatching, +) + + +DEFAULT_MODEL_REPO = "stevenbucaille/efficient_loftr_pth" +DEFAULT_FILE = "eloftr.pth" + + +def prepare_imgs(): + dataset = load_dataset("hf-internal-testing/image-matching-test-dataset", split="train") + image0 = dataset[0]["image"] + image2 = dataset[2]["image"] + return [[image2, image0]] + + +def verify_model_outputs(model, device): + images = prepare_imgs() + preprocessor = EfficientLoFTRImageProcessor() + inputs = preprocessor(images=images, return_tensors="pt").to(device) + model.to(device) + model.eval() + with torch.no_grad(): + outputs = model(**inputs, output_hidden_states=True, output_attentions=True) + + predicted_number_of_matches = outputs.matches.shape[-1] + predicted_top10 = torch.topk(outputs.matching_scores[0, 0], k=10) + predicted_top10_matches_indices = predicted_top10.indices + predicted_top10_matching_scores = predicted_top10.values + + expected_number_of_matches = 4800 + expected_matches_shape = torch.Size((len(images), 2, expected_number_of_matches)) + expected_matching_scores_shape = torch.Size((len(images), 2, expected_number_of_matches)) + + expected_top10_matches_indices = torch.tensor( + [1798, 1639, 1401, 1559, 2596, 2362, 2441, 2605, 1643, 2607], dtype=torch.int64 + ).to(device) + expected_top10_matching_scores = torch.tensor( + [0.9563, 0.9355, 0.9265, 0.9091, 0.9071, 0.9062, 0.9000, 0.8978, 0.8908, 0.8853] + ).to(device) + + assert outputs.matches.shape == expected_matches_shape + assert outputs.matching_scores.shape == expected_matching_scores_shape + + torch.testing.assert_close(predicted_top10_matches_indices, expected_top10_matches_indices, rtol=5e-3, atol=5e-3) + torch.testing.assert_close(predicted_top10_matching_scores, expected_top10_matching_scores, rtol=5e-3, atol=5e-3) + + assert predicted_number_of_matches == expected_number_of_matches + + +ORIGINAL_TO_CONVERTED_KEY_MAPPING = { + r"matcher.backbone.layer(\d+).rbr_dense.conv": r"efficientloftr.backbone.stages.\1.blocks.0.conv1.conv", + r"matcher.backbone.layer(\d+).rbr_dense.bn": r"efficientloftr.backbone.stages.\1.blocks.0.conv1.norm", + r"matcher.backbone.layer(\d+).rbr_1x1.conv": r"efficientloftr.backbone.stages.\1.blocks.0.conv2.conv", + r"matcher.backbone.layer(\d+).rbr_1x1.bn": r"efficientloftr.backbone.stages.\1.blocks.0.conv2.norm", + r"matcher.backbone.layer(\d+).(\d+).rbr_dense.conv": r"efficientloftr.backbone.stages.\1.blocks.\2.conv1.conv", + r"matcher.backbone.layer(\d+).(\d+).rbr_dense.bn": r"efficientloftr.backbone.stages.\1.blocks.\2.conv1.norm", + r"matcher.backbone.layer(\d+).(\d+).rbr_1x1.conv": r"efficientloftr.backbone.stages.\1.blocks.\2.conv2.conv", + r"matcher.backbone.layer(\d+).(\d+).rbr_1x1.bn": r"efficientloftr.backbone.stages.\1.blocks.\2.conv2.norm", + r"matcher.backbone.layer(\d+).(\d+).rbr_identity": r"efficientloftr.backbone.stages.\1.blocks.\2.identity", + r"matcher.loftr_coarse.layers.(\d*[02468]).aggregate": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.aggregation.q_aggregation", + r"matcher.loftr_coarse.layers.(\d*[02468]).norm1": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.aggregation.norm", + r"matcher.loftr_coarse.layers.(\d*[02468]).q_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.attention.q_proj", + r"matcher.loftr_coarse.layers.(\d*[02468]).k_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.attention.k_proj", + r"matcher.loftr_coarse.layers.(\d*[02468]).v_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.attention.v_proj", + r"matcher.loftr_coarse.layers.(\d*[02468]).merge": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.attention.o_proj", + r"matcher.loftr_coarse.layers.(\d*[02468]).mlp.(\d+)": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.mlp.fc{1 if m.group(2) == '0' else 2}", + r"matcher.loftr_coarse.layers.(\d*[02468]).norm2": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.self_attention.mlp.layer_norm", + r"matcher.loftr_coarse.layers.(\d*[13579]).aggregate": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.aggregation.q_aggregation", + r"matcher.loftr_coarse.layers.(\d*[13579]).norm1": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.aggregation.norm", + r"matcher.loftr_coarse.layers.(\d*[13579]).q_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.attention.q_proj", + r"matcher.loftr_coarse.layers.(\d*[13579]).k_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.attention.k_proj", + r"matcher.loftr_coarse.layers.(\d*[13579]).v_proj": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.attention.v_proj", + r"matcher.loftr_coarse.layers.(\d*[13579]).merge": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.attention.o_proj", + r"matcher.loftr_coarse.layers.(\d*[13579]).mlp.(\d+)": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.mlp.fc{1 if m.group(2) == '0' else 2}", + r"matcher.loftr_coarse.layers.(\d*[13579]).norm2": lambda m: f"efficientloftr.local_feature_transformer.layers.{int(m.group(1)) // 2}.cross_attention.mlp.layer_norm", + r"matcher.fine_preprocess.layer3_outconv": "refinement_layer.out_conv", + r"matcher.fine_preprocess.layer(\d+)_outconv.weight": lambda m: f"refinement_layer.out_conv_layers.{0 if int(m.group(1)) == 2 else m.group(1)}.out_conv1.weight", + r"matcher.fine_preprocess.layer(\d+)_outconv2\.0": lambda m: f"refinement_layer.out_conv_layers.{0 if int(m.group(1)) == 2 else m.group(1)}.out_conv2", + r"matcher.fine_preprocess.layer(\d+)_outconv2\.1": lambda m: f"refinement_layer.out_conv_layers.{0 if int(m.group(1)) == 2 else m.group(1)}.batch_norm", + r"matcher.fine_preprocess.layer(\d+)_outconv2\.3": lambda m: f"refinement_layer.out_conv_layers.{0 if int(m.group(1)) == 2 else m.group(1)}.out_conv3", +} + + +def convert_old_keys_to_new_keys(state_dict_keys: list[str]): + """ + This function should be applied only once, on the concatenated keys to efficiently rename using + the key mappings. + """ + output_dict = {} + if state_dict_keys is not None: + old_text = "\n".join(state_dict_keys) + new_text = old_text + for pattern, replacement in ORIGINAL_TO_CONVERTED_KEY_MAPPING.items(): + if replacement is None: + new_text = re.sub(pattern, "", new_text) # an empty line + continue + new_text = re.sub(pattern, replacement, new_text) + output_dict = dict(zip(old_text.split("\n"), new_text.split("\n"))) + return output_dict + + +@torch.no_grad() +def write_model( + model_path, + model_repo, + file_name, + organization, + safe_serialization=True, + push_to_hub=False, +): + os.makedirs(model_path, exist_ok=True) + # ------------------------------------------------------------ + # EfficientLoFTR config + # ------------------------------------------------------------ + + config = EfficientLoFTRConfig() + config.architectures = ["EfficientLoFTRForKeypointMatching"] + config.save_pretrained(model_path) + print("Model config saved successfully...") + + # ------------------------------------------------------------ + # Convert weights + # ------------------------------------------------------------ + + print(f"Fetching all parameters from the checkpoint at {model_repo}/{file_name}...") + checkpoint_path = hf_hub_download(repo_id=model_repo, filename=file_name) + original_state_dict = torch.load(checkpoint_path, weights_only=True, map_location="cpu")["state_dict"] + + print("Converting model...") + all_keys = list(original_state_dict.keys()) + new_keys = convert_old_keys_to_new_keys(all_keys) + + state_dict = {} + for key in all_keys: + new_key = new_keys[key] + state_dict[new_key] = original_state_dict.pop(key).contiguous().clone() + + del original_state_dict + gc.collect() + + print("Loading the checkpoint in a EfficientLoFTR model...") + + device = "cuda" if torch.cuda.is_available() else "cpu" + with torch.device(device): + model = EfficientLoFTRForKeypointMatching(config) + model.load_state_dict(state_dict) + print("Checkpoint loaded successfully...") + del model.config._name_or_path + + print("Saving the model...") + model.save_pretrained(model_path, safe_serialization=safe_serialization) + del state_dict, model + + # Safety check: reload the converted model + gc.collect() + print("Reloading the model to check if it's saved correctly.") + model = EfficientLoFTRForKeypointMatching.from_pretrained(model_path) + print("Model reloaded successfully.") + + model_name = "efficientloftr" + if model_repo == DEFAULT_MODEL_REPO: + print("Checking the model outputs...") + verify_model_outputs(model, device) + print("Model outputs verified successfully.") + + if push_to_hub: + print("Pushing model to the hub...") + model.push_to_hub( + repo_id=f"{organization}/{model_name}", + commit_message="Add model", + ) + config.push_to_hub(repo_id=f"{organization}/{model_name}", commit_message="Add config") + + write_image_processor(model_path, model_name, organization, push_to_hub=push_to_hub) + + +def write_image_processor(save_dir, model_name, organization, push_to_hub=False): + image_processor = EfficientLoFTRImageProcessor() + image_processor.save_pretrained(save_dir) + + if push_to_hub: + print("Pushing image processor to the hub...") + image_processor.push_to_hub( + repo_id=f"{organization}/{model_name}", + commit_message="Add image processor", + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + # Required parameters + parser.add_argument( + "--repo_id", + default=DEFAULT_MODEL_REPO, + type=str, + help="Model repo ID of the original EfficientLoFTR checkpoint you'd like to convert.", + ) + parser.add_argument( + "--file_name", + default=DEFAULT_FILE, + type=str, + help="File name of the original EfficientLoFTR checkpoint you'd like to convert.", + ) + parser.add_argument( + "--pytorch_dump_folder_path", + default=None, + type=str, + required=True, + help="Path to the output PyTorch model directory.", + ) + parser.add_argument("--save_model", action="store_true", help="Save model to local") + parser.add_argument( + "--push_to_hub", + action="store_true", + help="Push model and image preprocessor to the hub", + ) + parser.add_argument( + "--organization", + default="zju-community", + type=str, + help="Hub organization in which you want the model to be uploaded.", + ) + + args = parser.parse_args() + write_model( + args.pytorch_dump_folder_path, + args.repo_id, + args.file_name, + args.organization, + safe_serialization=True, + push_to_hub=args.push_to_hub, + ) diff --git a/src/transformers/models/efficientloftr/image_processing_efficientloftr.py b/src/transformers/models/efficientloftr/image_processing_efficientloftr.py new file mode 100644 index 0000000000..a1bed128da --- /dev/null +++ b/src/transformers/models/efficientloftr/image_processing_efficientloftr.py @@ -0,0 +1,461 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Image processor class for SuperPoint.""" + +from typing import Optional, Union + +import numpy as np + +from ... import is_torch_available, is_vision_available +from ...image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict +from ...image_transforms import resize, to_channel_dimension_format +from ...image_utils import ( + ChannelDimension, + ImageInput, + ImageType, + PILImageResampling, + get_image_type, + infer_channel_dimension_format, + is_pil_image, + is_scaled_image, + is_valid_image, + to_numpy_array, + valid_images, + validate_preprocess_arguments, +) +from ...utils import TensorType, logging, requires_backends + + +if is_torch_available(): + import torch + +if is_vision_available(): + import PIL + from PIL import Image, ImageDraw + + from .modeling_efficientloftr import KeypointMatchingOutput + +logger = logging.get_logger(__name__) + + +# Copied from transformers.models.superpoint.image_processing_superpoint.is_grayscale +def is_grayscale( + image: np.ndarray, + input_data_format: Optional[Union[str, ChannelDimension]] = None, +): + if input_data_format == ChannelDimension.FIRST: + if image.shape[0] == 1: + return True + return np.all(image[0, ...] == image[1, ...]) and np.all(image[1, ...] == image[2, ...]) + elif input_data_format == ChannelDimension.LAST: + if image.shape[-1] == 1: + return True + return np.all(image[..., 0] == image[..., 1]) and np.all(image[..., 1] == image[..., 2]) + + +# Copied from transformers.models.superpoint.image_processing_superpoint.convert_to_grayscale +def convert_to_grayscale( + image: ImageInput, + input_data_format: Optional[Union[str, ChannelDimension]] = None, +) -> ImageInput: + """ + Converts an image to grayscale format using the NTSC formula. Only support numpy and PIL Image. TODO support torch + and tensorflow grayscale conversion + + This function is supposed to return a 1-channel image, but it returns a 3-channel image with the same value in each + channel, because of an issue that is discussed in : + https://github.com/huggingface/transformers/pull/25786#issuecomment-1730176446 + + Args: + image (Image): + The image to convert. + input_data_format (`ChannelDimension` or `str`, *optional*): + The channel dimension format for the input image. + """ + requires_backends(convert_to_grayscale, ["vision"]) + + if isinstance(image, np.ndarray): + if is_grayscale(image, input_data_format=input_data_format): + return image + if input_data_format == ChannelDimension.FIRST: + gray_image = image[0, ...] * 0.2989 + image[1, ...] * 0.5870 + image[2, ...] * 0.1140 + gray_image = np.stack([gray_image] * 3, axis=0) + elif input_data_format == ChannelDimension.LAST: + gray_image = image[..., 0] * 0.2989 + image[..., 1] * 0.5870 + image[..., 2] * 0.1140 + gray_image = np.stack([gray_image] * 3, axis=-1) + return gray_image + + if not isinstance(image, PIL.Image.Image): + return image + + image = image.convert("L") + return image + + +# Copied from transformers.models.superglue.image_processing_superglue.validate_and_format_image_pairs +def validate_and_format_image_pairs(images: ImageInput): + error_message = ( + "Input images must be a one of the following :", + " - A pair of PIL images.", + " - A pair of 3D arrays.", + " - A list of pairs of PIL images.", + " - A list of pairs of 3D arrays.", + ) + + def _is_valid_image(image): + """images is a PIL Image or a 3D array.""" + return is_pil_image(image) or ( + is_valid_image(image) and get_image_type(image) != ImageType.PIL and len(image.shape) == 3 + ) + + if isinstance(images, list): + if len(images) == 2 and all((_is_valid_image(image)) for image in images): + return images + if all( + isinstance(image_pair, list) + and len(image_pair) == 2 + and all(_is_valid_image(image) for image in image_pair) + for image_pair in images + ): + return [image for image_pair in images for image in image_pair] + raise ValueError(error_message) + + +class EfficientLoFTRImageProcessor(BaseImageProcessor): + r""" + Constructs a EfficientLoFTR image processor. + + Args: + do_resize (`bool`, *optional*, defaults to `True`): + Controls whether to resize the image's (height, width) dimensions to the specified `size`. Can be overriden + by `do_resize` in the `preprocess` method. + size (`Dict[str, int]` *optional*, defaults to `{"height": 480, "width": 640}`): + Resolution of the output image after `resize` is applied. Only has an effect if `do_resize` is set to + `True`. Can be overriden by `size` in the `preprocess` method. + resample (`PILImageResampling`, *optional*, defaults to `Resampling.BILINEAR`): + Resampling filter to use if resizing the image. Can be overriden by `resample` in the `preprocess` method. + do_rescale (`bool`, *optional*, defaults to `True`): + Whether to rescale the image by the specified scale `rescale_factor`. Can be overriden by `do_rescale` in + the `preprocess` method. + rescale_factor (`int` or `float`, *optional*, defaults to `1/255`): + Scale factor to use if rescaling the image. Can be overriden by `rescale_factor` in the `preprocess` + method. + do_grayscale (`bool`, *optional*, defaults to `True`): + Whether to convert the image to grayscale. Can be overriden by `do_grayscale` in the `preprocess` method. + """ + + model_input_names = ["pixel_values"] + + def __init__( + self, + do_resize: bool = True, + size: Optional[dict[str, int]] = None, + resample: PILImageResampling = PILImageResampling.BILINEAR, + do_rescale: bool = True, + rescale_factor: float = 1 / 255, + do_grayscale: bool = True, + **kwargs, + ) -> None: + super().__init__(**kwargs) + size = size if size is not None else {"height": 480, "width": 640} + size = get_size_dict(size, default_to_square=False) + + self.do_resize = do_resize + self.size = size + self.resample = resample + self.do_rescale = do_rescale + self.rescale_factor = rescale_factor + self.do_grayscale = do_grayscale + + # Copied from transformers.models.superpoint.image_processing_superpoint.SuperPointImageProcessor.resize + def resize( + self, + image: np.ndarray, + size: dict[str, int], + data_format: Optional[Union[str, ChannelDimension]] = None, + input_data_format: Optional[Union[str, ChannelDimension]] = None, + **kwargs, + ): + """ + Resize an image. + + Args: + image (`np.ndarray`): + Image to resize. + size (`dict[str, int]`): + Dictionary of the form `{"height": int, "width": int}`, specifying the size of the output image. + data_format (`ChannelDimension` or `str`, *optional*): + The channel dimension format of the output image. If not provided, it will be inferred from the input + image. Can be one of: + - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. + - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. + - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. + input_data_format (`ChannelDimension` or `str`, *optional*): + The channel dimension format for the input image. If unset, the channel dimension format is inferred + from the input image. Can be one of: + - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. + - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. + - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. + """ + size = get_size_dict(size, default_to_square=False) + + return resize( + image, + size=(size["height"], size["width"]), + data_format=data_format, + input_data_format=input_data_format, + **kwargs, + ) + + # Copied from transformers.models.superglue.image_processing_superglue.SuperGlueImageProcessor.preprocess + def preprocess( + self, + images, + do_resize: Optional[bool] = None, + size: Optional[dict[str, int]] = None, + resample: PILImageResampling = None, + do_rescale: Optional[bool] = None, + rescale_factor: Optional[float] = None, + do_grayscale: Optional[bool] = None, + return_tensors: Optional[Union[str, TensorType]] = None, + data_format: ChannelDimension = ChannelDimension.FIRST, + input_data_format: Optional[Union[str, ChannelDimension]] = None, + **kwargs, + ) -> BatchFeature: + """ + Preprocess an image or batch of images. + + Args: + images (`ImageInput`): + Image pairs to preprocess. Expects either a list of 2 images or a list of list of 2 images list with + pixel values ranging from 0 to 255. If passing in images with pixel values between 0 and 1, set + `do_rescale=False`. + do_resize (`bool`, *optional*, defaults to `self.do_resize`): + Whether to resize the image. + size (`dict[str, int]`, *optional*, defaults to `self.size`): + Size of the output image after `resize` has been applied. If `size["shortest_edge"]` >= 384, the image + is resized to `(size["shortest_edge"], size["shortest_edge"])`. Otherwise, the smaller edge of the + image will be matched to `int(size["shortest_edge"]/ crop_pct)`, after which the image is cropped to + `(size["shortest_edge"], size["shortest_edge"])`. Only has an effect if `do_resize` is set to `True`. + resample (`PILImageResampling`, *optional*, defaults to `self.resample`): + Resampling filter to use if resizing the image. This can be one of `PILImageResampling`, filters. Only + has an effect if `do_resize` is set to `True`. + do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): + Whether to rescale the image values between [0 - 1]. + rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): + Rescale factor to rescale the image by if `do_rescale` is set to `True`. + do_grayscale (`bool`, *optional*, defaults to `self.do_grayscale`): + Whether to convert the image to grayscale. + return_tensors (`str` or `TensorType`, *optional*): + The type of tensors to return. Can be one of: + - Unset: Return a list of `np.ndarray`. + - `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`. + - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. + - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. + - `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`. + data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`): + The channel dimension format for the output image. Can be one of: + - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. + - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. + - Unset: Use the channel dimension format of the input image. + input_data_format (`ChannelDimension` or `str`, *optional*): + The channel dimension format for the input image. If unset, the channel dimension format is inferred + from the input image. Can be one of: + - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. + - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. + - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. + """ + + do_resize = do_resize if do_resize is not None else self.do_resize + resample = resample if resample is not None else self.resample + do_rescale = do_rescale if do_rescale is not None else self.do_rescale + rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor + do_grayscale = do_grayscale if do_grayscale is not None else self.do_grayscale + + size = size if size is not None else self.size + size = get_size_dict(size, default_to_square=False) + + # Validate and convert the input images into a flattened list of images for all subsequent processing steps. + images = validate_and_format_image_pairs(images) + + if not valid_images(images): + raise ValueError( + "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " + "torch.Tensor, tf.Tensor or jax.ndarray." + ) + + validate_preprocess_arguments( + do_resize=do_resize, + size=size, + resample=resample, + do_rescale=do_rescale, + rescale_factor=rescale_factor, + ) + + # All transformations expect numpy arrays. + images = [to_numpy_array(image) for image in images] + + if is_scaled_image(images[0]) and do_rescale: + logger.warning_once( + "It looks like you are trying to rescale already rescaled images. If the input" + " images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." + ) + + if input_data_format is None: + # We assume that all images have the same channel dimension format. + input_data_format = infer_channel_dimension_format(images[0]) + + all_images = [] + for image in images: + if do_resize: + image = self.resize(image=image, size=size, resample=resample, input_data_format=input_data_format) + + if do_rescale: + image = self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format) + + if do_grayscale: + image = convert_to_grayscale(image, input_data_format=input_data_format) + + image = to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) + all_images.append(image) + + # Convert back the flattened list of images into a list of pairs of images. + image_pairs = [all_images[i : i + 2] for i in range(0, len(all_images), 2)] + + data = {"pixel_values": image_pairs} + + return BatchFeature(data=data, tensor_type=return_tensors) + + def post_process_keypoint_matching( + self, + outputs: "KeypointMatchingOutput", + target_sizes: Union[TensorType, list[tuple]], + threshold: float = 0.0, + ) -> list[dict[str, torch.Tensor]]: + """ + Converts the raw output of [`KeypointMatchingOutput`] into lists of keypoints, scores and descriptors + with coordinates absolute to the original image sizes. + Args: + outputs ([`KeypointMatchingOutput`]): + Raw outputs of the model. + target_sizes (`torch.Tensor` or `List[Tuple[Tuple[int, int]]]`, *optional*): + Tensor of shape `(batch_size, 2, 2)` or list of tuples of tuples (`Tuple[int, int]`) containing the + target size `(height, width)` of each image in the batch. This must be the original image size (before + any processing). + threshold (`float`, *optional*, defaults to 0.0): + Threshold to filter out the matches with low scores. + Returns: + `List[Dict]`: A list of dictionaries, each dictionary containing the keypoints in the first and second image + of the pair, the matching scores and the matching indices. + """ + if outputs.matches.shape[0] != len(target_sizes): + raise ValueError("Make sure that you pass in as many target sizes as the batch dimension of the mask") + if not all(len(target_size) == 2 for target_size in target_sizes): + raise ValueError("Each element of target_sizes must contain the size (h, w) of each image of the batch") + + if isinstance(target_sizes, list): + image_pair_sizes = torch.tensor(target_sizes, device=outputs.matches.device) + else: + if target_sizes.shape[1] != 2 or target_sizes.shape[2] != 2: + raise ValueError( + "Each element of target_sizes must contain the size (h, w) of each image of the batch" + ) + image_pair_sizes = target_sizes + + keypoints = outputs.keypoints.clone() + keypoints = keypoints * image_pair_sizes.flip(-1).reshape(-1, 2, 1, 2) + keypoints = keypoints.to(torch.int32) + + results = [] + for keypoints_pair, matches, scores in zip(keypoints, outputs.matches, outputs.matching_scores): + # Filter out matches with low scores + valid_matches = torch.logical_and(scores > threshold, matches > -1) + + matched_keypoints0 = keypoints_pair[0][valid_matches[0]] + matched_keypoints1 = keypoints_pair[1][valid_matches[1]] + matching_scores = scores[0][valid_matches[0]] + + results.append( + { + "keypoints0": matched_keypoints0, + "keypoints1": matched_keypoints1, + "matching_scores": matching_scores, + } + ) + + return results + + def visualize_keypoint_matching( + self, + images: ImageInput, + keypoint_matching_output: list[dict[str, torch.Tensor]], + ) -> list["Image.Image"]: + """ + Plots the image pairs side by side with the detected keypoints as well as the matching between them. + + Args: + images (`ImageInput`): + Image pairs to plot. Same as `EfficientLoFTRImageProcessor.preprocess`. Expects either a list of 2 + images or a list of list of 2 images list with pixel values ranging from 0 to 255. + outputs (List[Dict[str, torch.Tensor]]]): + A post processed keypoint matching output + + Returns: + `List[PIL.Image.Image]`: A list of PIL images, each containing the image pairs side by side with the detected + keypoints as well as the matching between them. + """ + images = validate_and_format_image_pairs(images) + images = [to_numpy_array(image) for image in images] + image_pairs = [images[i : i + 2] for i in range(0, len(images), 2)] + + results = [] + for image_pair, pair_output in zip(image_pairs, keypoint_matching_output): + height0, width0 = image_pair[0].shape[:2] + height1, width1 = image_pair[1].shape[:2] + plot_image = np.zeros((max(height0, height1), width0 + width1, 3), dtype=np.uint8) + plot_image[:height0, :width0] = image_pair[0] + plot_image[:height1, width0:] = image_pair[1] + + plot_image_pil = Image.fromarray(plot_image) + draw = ImageDraw.Draw(plot_image_pil) + + keypoints0_x, keypoints0_y = pair_output["keypoints0"].unbind(1) + keypoints1_x, keypoints1_y = pair_output["keypoints1"].unbind(1) + for keypoint0_x, keypoint0_y, keypoint1_x, keypoint1_y, matching_score in zip( + keypoints0_x, keypoints0_y, keypoints1_x, keypoints1_y, pair_output["matching_scores"] + ): + color = self._get_color(matching_score) + draw.line( + (keypoint0_x, keypoint0_y, keypoint1_x + width0, keypoint1_y), + fill=color, + width=3, + ) + draw.ellipse((keypoint0_x - 2, keypoint0_y - 2, keypoint0_x + 2, keypoint0_y + 2), fill="black") + draw.ellipse( + (keypoint1_x + width0 - 2, keypoint1_y - 2, keypoint1_x + width0 + 2, keypoint1_y + 2), + fill="black", + ) + + results.append(plot_image_pil) + return results + + def _get_color(self, score): + """Maps a score to a color.""" + r = int(255 * (1 - score)) + g = int(255 * score) + b = 0 + return (r, g, b) + + +__all__ = ["EfficientLoFTRImageProcessor"] diff --git a/src/transformers/models/efficientloftr/modeling_efficientloftr.py b/src/transformers/models/efficientloftr/modeling_efficientloftr.py new file mode 100644 index 0000000000..a934c1bb27 --- /dev/null +++ b/src/transformers/models/efficientloftr/modeling_efficientloftr.py @@ -0,0 +1,1302 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass +from typing import Callable, Optional, Union + +import torch +from torch import nn + +from ...activations import ACT2CLS, ACT2FN +from ...modeling_layers import GradientCheckpointingLayer +from ...modeling_outputs import BackboneOutput +from ...modeling_rope_utils import ROPE_INIT_FUNCTIONS +from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel +from ...processing_utils import Unpack +from ...utils import ( + ModelOutput, + TransformersKwargs, + auto_docstring, + can_return_tuple, + torch_int, +) +from ...utils.generic import check_model_inputs +from .configuration_efficientloftr import EfficientLoFTRConfig + + +@dataclass +@auto_docstring( + custom_intro=""" + Base class for outputs of keypoint matching models. Due to the nature of keypoint detection and matching, the number + of keypoints is not fixed and can vary from image to image, which makes batching non-trivial. In the batch of + images, the maximum number of matches is set as the dimension of the matches and matching scores. The mask tensor is + used to indicate which values in the keypoints, matches and matching_scores tensors are keypoint matching + information. + """ +) +class KeypointMatchingOutput(ModelOutput): + r""" + matches (`torch.FloatTensor` of shape `(batch_size, 2, num_matches)`): + Index of keypoint matched in the other image. + matching_scores (`torch.FloatTensor` of shape `(batch_size, 2, num_matches)`): + Scores of predicted matches. + keypoints (`torch.FloatTensor` of shape `(batch_size, num_keypoints, 2)`): + Absolute (x, y) coordinates of predicted keypoints in a given image. + hidden_states (`tuple[torch.FloatTensor, ...]`, *optional*): + Tuple of `torch.FloatTensor` (one for the output of each stage) of shape `(batch_size, 2, num_channels, + num_keypoints)`, returned when `output_hidden_states=True` is passed or when + `config.output_hidden_states=True`) + attentions (`tuple[torch.FloatTensor, ...]`, *optional*): + Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, 2, num_heads, num_keypoints, + num_keypoints)`, returned when `output_attentions=True` is passed or when `config.output_attentions=True`) + """ + + matches: Optional[torch.FloatTensor] = None + matching_scores: Optional[torch.FloatTensor] = None + keypoints: Optional[torch.FloatTensor] = None + hidden_states: Optional[tuple[torch.FloatTensor]] = None + attentions: Optional[tuple[torch.FloatTensor]] = None + + +class EfficientLoFTRRotaryEmbedding(nn.Module): + def __init__(self, config: EfficientLoFTRConfig, device=None): + super().__init__() + self.config = config + self.rope_type = config.rope_scaling["rope_type"] + self.rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type] + + inv_freq, _ = self.rope_init_fn(self.config, device) + inv_freq_expanded = inv_freq[None, None, None, :].float().expand(1, 1, 1, -1) + + embed_height, embed_width = config.embedding_size + i_indices = torch.ones(embed_height, embed_width).cumsum(0).float().unsqueeze(-1) + j_indices = torch.ones(embed_height, embed_width).cumsum(1).float().unsqueeze(-1) + + emb = torch.zeros(1, embed_height, embed_width, self.config.hidden_size // 2) + emb[:, :, :, 0::2] = i_indices * inv_freq_expanded + emb[:, :, :, 1::2] = j_indices * inv_freq_expanded + + self.register_buffer("inv_freq", emb, persistent=False) + + @torch.no_grad() + def forward( + self, x: torch.Tensor, position_ids: Optional[tuple[torch.LongTensor, torch.LongTensor]] = None + ) -> tuple[torch.Tensor, torch.Tensor]: + device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu" + with torch.autocast(device_type=device_type, enabled=False): # Force float32 + emb = self.inv_freq + sin = emb.sin() + cos = emb.cos() + + sin = sin.repeat_interleave(2, dim=-1) + cos = cos.repeat_interleave(2, dim=-1) + + sin = sin.to(device=x.device, dtype=x.dtype) + cos = cos.to(device=x.device, dtype=x.dtype) + + return cos, sin + + +# Copied from transformers.models.rt_detr_v2.modeling_rt_detr_v2.RTDetrV2ConvNormLayer with RTDetrV2->EfficientLoFTR +class EfficientLoFTRConvNormLayer(nn.Module): + def __init__(self, config, in_channels, out_channels, kernel_size, stride, padding=None, activation=None): + super().__init__() + self.conv = nn.Conv2d( + in_channels, + out_channels, + kernel_size, + stride, + padding=(kernel_size - 1) // 2 if padding is None else padding, + bias=False, + ) + self.norm = nn.BatchNorm2d(out_channels, config.batch_norm_eps) + self.activation = nn.Identity() if activation is None else ACT2CLS[activation]() + + def forward(self, hidden_state): + hidden_state = self.conv(hidden_state) + hidden_state = self.norm(hidden_state) + hidden_state = self.activation(hidden_state) + return hidden_state + + +class EfficientLoFTRRepVGGBlock(GradientCheckpointingLayer): + """ + RepVGG architecture block introduced by the work "RepVGG: Making VGG-style ConvNets Great Again". + """ + + def __init__(self, config: EfficientLoFTRConfig, stage_idx: int, block_idx: int): + super().__init__() + in_channels = config.stage_block_in_channels[stage_idx][block_idx] + out_channels = config.stage_block_out_channels[stage_idx][block_idx] + stride = config.stage_block_stride[stage_idx][block_idx] + activation = config.activation_function + self.conv1 = EfficientLoFTRConvNormLayer( + config, in_channels, out_channels, kernel_size=3, stride=stride, padding=1 + ) + self.conv2 = EfficientLoFTRConvNormLayer( + config, in_channels, out_channels, kernel_size=1, stride=stride, padding=0 + ) + self.identity = nn.BatchNorm2d(in_channels) if in_channels == out_channels and stride == 1 else None + self.activation = nn.Identity() if activation is None else ACT2FN[activation] + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + if self.identity is not None: + identity_out = self.identity(hidden_states) + else: + identity_out = 0 + hidden_states = self.conv1(hidden_states) + self.conv2(hidden_states) + identity_out + hidden_states = self.activation(hidden_states) + return hidden_states + + +class EfficientLoFTRRepVGGStage(nn.Module): + def __init__(self, config: EfficientLoFTRConfig, stage_idx: int): + super().__init__() + self.blocks = nn.ModuleList([]) + for block_idx in range(config.stage_num_blocks[stage_idx]): + self.blocks.append( + EfficientLoFTRRepVGGBlock( + config, + stage_idx, + block_idx, + ) + ) + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + for block in self.blocks: + hidden_states = block(hidden_states) + return hidden_states + + +class EfficientLoFTRepVGG(nn.Module): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__() + + self.stages = nn.ModuleList([]) + + for stage_idx in range(len(config.stage_stride)): + stage = EfficientLoFTRRepVGGStage(config, stage_idx) + self.stages.append(stage) + + def forward(self, hidden_states: torch.Tensor) -> list[torch.Tensor]: + outputs = [] + for stage in self.stages: + hidden_states = stage(hidden_states) + outputs.append(hidden_states) + + # Exclude first stage in outputs + outputs = outputs[1:] + return outputs + + +class EfficientLoFTRAggregationLayer(nn.Module): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__() + + hidden_size = config.hidden_size + + self.q_aggregation = nn.Conv2d( + hidden_size, + hidden_size, + kernel_size=config.q_aggregation_kernel_size, + padding=0, + stride=config.q_aggregation_stride, + bias=False, + groups=hidden_size, + ) + self.kv_aggregation = torch.nn.MaxPool2d( + kernel_size=config.kv_aggregation_kernel_size, stride=config.kv_aggregation_stride + ) + self.norm = nn.LayerNorm(hidden_size) + + def forward( + self, + hidden_states: torch.Tensor, + encoder_hidden_states: Optional[torch.Tensor] = None, + ) -> tuple[torch.Tensor, torch.Tensor]: + query_states = hidden_states + is_cross_attention = encoder_hidden_states is not None + kv_states = encoder_hidden_states if is_cross_attention else hidden_states + + query_states = self.q_aggregation(query_states) + kv_states = self.kv_aggregation(kv_states) + query_states = query_states.permute(0, 2, 3, 1) + kv_states = kv_states.permute(0, 2, 3, 1) + hidden_states = self.norm(query_states) + encoder_hidden_states = self.norm(kv_states) + return hidden_states, encoder_hidden_states + + +# Copied from transformers.models.cohere.modeling_cohere.rotate_half +def rotate_half(x): + # Split and rotate. Note that this function is different from e.g. Llama. + x1 = x[..., ::2] + x2 = x[..., 1::2] + rot_x = torch.stack([-x2, x1], dim=-1).flatten(-2) + return rot_x + + +# Copied from transformers.models.cohere.modeling_cohere.apply_rotary_pos_emb +def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1): + """Applies Rotary Position Embedding to the query and key tensors. + + Args: + q (`torch.Tensor`): The query tensor. + k (`torch.Tensor`): The key tensor. + cos (`torch.Tensor`): The cosine part of the rotary embedding. + sin (`torch.Tensor`): The sine part of the rotary embedding. + position_ids (`torch.Tensor`, *optional*): + Deprecated and unused. + unsqueeze_dim (`int`, *optional*, defaults to 1): + The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and + sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note + that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and + k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes + cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have + the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2. + Returns: + `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding. + """ + dtype = q.dtype + q = q.float() + k = k.float() + cos = cos.unsqueeze(unsqueeze_dim) + sin = sin.unsqueeze(unsqueeze_dim) + q_embed = (q * cos) + (rotate_half(q) * sin) + k_embed = (k * cos) + (rotate_half(k) * sin) + return q_embed.to(dtype=dtype), k_embed.to(dtype=dtype) + + +# Copied from transformers.models.cohere.modeling_cohere.repeat_kv +def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: + """ + This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, + num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) + """ + batch, num_key_value_heads, slen, head_dim = hidden_states.shape + if n_rep == 1: + return hidden_states + hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) + return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) + + +# Copied from transformers.models.llama.modeling_llama.eager_attention_forward +def eager_attention_forward( + module: nn.Module, + query: torch.Tensor, + key: torch.Tensor, + value: torch.Tensor, + attention_mask: Optional[torch.Tensor], + scaling: float, + dropout: float = 0.0, + **kwargs: Unpack[TransformersKwargs], +): + key_states = repeat_kv(key, module.num_key_value_groups) + value_states = repeat_kv(value, module.num_key_value_groups) + + attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling + if attention_mask is not None: + causal_mask = attention_mask[:, :, :, : key_states.shape[-2]] + attn_weights = attn_weights + causal_mask + + attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype) + attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) + attn_output = torch.matmul(attn_weights, value_states) + attn_output = attn_output.transpose(1, 2).contiguous() + + return attn_output, attn_weights + + +class EfficientLoFTRAttention(nn.Module): + """Multi-headed attention from 'Attention Is All You Need' paper""" + + # Copied from transformers.models.llama.modeling_llama.LlamaAttention.__init__ with Llama->EfficientLoFTR + def __init__(self, config: EfficientLoFTRConfig, layer_idx: int): + super().__init__() + self.config = config + self.layer_idx = layer_idx + self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) + self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads + self.scaling = self.head_dim**-0.5 + self.attention_dropout = config.attention_dropout + self.is_causal = True + + self.q_proj = nn.Linear( + config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias + ) + self.k_proj = nn.Linear( + config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias + ) + self.v_proj = nn.Linear( + config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias + ) + self.o_proj = nn.Linear( + config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias + ) + + def forward( + self, + hidden_states: torch.Tensor, + encoder_hidden_states: Optional[torch.Tensor] = None, + position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, + **kwargs: Unpack[TransformersKwargs], + ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: + batch_size, seq_len, dim = hidden_states.shape + input_shape = hidden_states.shape[:-1] + + query_states = self.q_proj(hidden_states).view(batch_size, seq_len, -1, dim) + + is_cross_attention = encoder_hidden_states is not None + current_states = encoder_hidden_states if is_cross_attention else hidden_states + + key_states = self.k_proj(current_states).view(batch_size, seq_len, -1, dim) + value_states = self.v_proj(current_states).view(batch_size, seq_len, -1, self.head_dim).transpose(1, 2) + + if position_embeddings is not None: + cos, sin = position_embeddings + query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, unsqueeze_dim=2) + + query_states = query_states.view(batch_size, seq_len, -1, self.head_dim).transpose(1, 2) + key_states = key_states.view(batch_size, seq_len, -1, self.head_dim).transpose(1, 2) + + attention_interface: Callable = eager_attention_forward + if self.config._attn_implementation != "eager": + attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] + + attn_output, attn_weights = attention_interface( + self, + query_states, + key_states, + value_states, + attention_mask=None, + dropout=0.0 if not self.training else self.attention_dropout, + scaling=self.scaling, + **kwargs, + ) + + attn_output = attn_output.reshape(*input_shape, -1).contiguous() + attn_output = self.o_proj(attn_output) + return attn_output, attn_weights + + +class EfficientLoFTRMLP(nn.Module): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__() + hidden_size = config.hidden_size + intermediate_size = config.intermediate_size + self.fc1 = nn.Linear(hidden_size * 2, intermediate_size, bias=False) + self.activation = ACT2FN[config.mlp_activation_function] + self.fc2 = nn.Linear(intermediate_size, hidden_size, bias=False) + self.layer_norm = nn.LayerNorm(hidden_size) + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + hidden_states = self.fc1(hidden_states) + hidden_states = self.activation(hidden_states) + hidden_states = self.fc2(hidden_states) + hidden_states = self.layer_norm(hidden_states) + return hidden_states + + +class EfficientLoFTRAggregatedAttention(nn.Module): + def __init__(self, config: EfficientLoFTRConfig, layer_idx: int): + super().__init__() + + self.q_aggregation_kernel_size = config.q_aggregation_kernel_size + self.aggregation = EfficientLoFTRAggregationLayer(config) + self.attention = EfficientLoFTRAttention(config, layer_idx) + self.mlp = EfficientLoFTRMLP(config) + + def forward( + self, + hidden_states: torch.Tensor, + encoder_hidden_states: Optional[torch.Tensor] = None, + position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, + **kwargs: Unpack[TransformersKwargs], + ) -> torch.Tensor: + batch_size, embed_dim, _, _ = hidden_states.shape + + # Aggregate features + aggregated_hidden_states, aggregated_encoder_hidden_states = self.aggregation( + hidden_states, encoder_hidden_states + ) + _, aggregated_h, aggregated_w, _ = aggregated_hidden_states.shape + + # Multi-head attention + aggregated_hidden_states = aggregated_hidden_states.reshape(batch_size, -1, embed_dim) + aggregated_encoder_hidden_states = aggregated_encoder_hidden_states.reshape(batch_size, -1, embed_dim) + attn_output, _ = self.attention( + aggregated_hidden_states, + aggregated_encoder_hidden_states, + position_embeddings=position_embeddings, + **kwargs, + ) + + # Upsample features + # (batch_size, seq_len, embed_dim) -> (batch_size, embed_dim, h, w) with seq_len = h * w + attn_output = attn_output.permute(0, 2, 1) + attn_output = attn_output.reshape(batch_size, embed_dim, aggregated_h, aggregated_w) + attn_output = torch.nn.functional.interpolate( + attn_output, scale_factor=self.q_aggregation_kernel_size, mode="bilinear", align_corners=False + ) + intermediate_states = torch.cat([hidden_states, attn_output], dim=1) + intermediate_states = intermediate_states.permute(0, 2, 3, 1) + output_states = self.mlp(intermediate_states) + output_states = output_states.permute(0, 3, 1, 2) + + hidden_states = hidden_states + output_states + + return hidden_states + + +class EfficientLoFTRLocalFeatureTransformerLayer(GradientCheckpointingLayer): + def __init__(self, config: EfficientLoFTRConfig, layer_idx: int): + super().__init__() + + self.self_attention = EfficientLoFTRAggregatedAttention(config, layer_idx) + self.cross_attention = EfficientLoFTRAggregatedAttention(config, layer_idx) + + def forward( + self, + hidden_states: torch.Tensor, + position_embeddings: tuple[torch.Tensor, torch.Tensor], + **kwargs: Unpack[TransformersKwargs], + ) -> torch.Tensor: + batch_size, _, embed_dim, height, width = hidden_states.shape + + hidden_states = hidden_states.reshape(-1, embed_dim, height, width) + hidden_states = self.self_attention(hidden_states, position_embeddings=position_embeddings, **kwargs) + + encoder_hidden_states = hidden_states.reshape(-1, 2, embed_dim, height, width) + encoder_hidden_states = encoder_hidden_states.flip(1) + encoder_hidden_states = encoder_hidden_states.reshape(-1, embed_dim, height, width) + + hidden_states = self.cross_attention(hidden_states, encoder_hidden_states, **kwargs) + hidden_states = hidden_states.reshape(batch_size, -1, embed_dim, height, width) + + return hidden_states + + +class EfficientLoFTRLocalFeatureTransformer(nn.Module): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__() + self.layers = nn.ModuleList( + [ + EfficientLoFTRLocalFeatureTransformerLayer(config, layer_idx=i) + for i in range(config.num_attention_layers) + ] + ) + + def forward( + self, + hidden_states: torch.Tensor, + position_embeddings: tuple[torch.Tensor, torch.Tensor], + **kwargs: Unpack[TransformersKwargs], + ) -> torch.Tensor: + for layer in self.layers: + hidden_states = layer(hidden_states, position_embeddings=position_embeddings, **kwargs) + return hidden_states + + +class EfficientLoFTROutConvBlock(nn.Module): + def __init__(self, config: EfficientLoFTRConfig, hidden_size: int, intermediate_size: int): + super().__init__() + + self.out_conv1 = nn.Conv2d(hidden_size, intermediate_size, kernel_size=1, stride=1, padding=0, bias=False) + self.out_conv2 = nn.Conv2d( + intermediate_size, intermediate_size, kernel_size=3, stride=1, padding=1, bias=False + ) + self.batch_norm = nn.BatchNorm2d(intermediate_size) + self.activation = ACT2CLS[config.mlp_activation_function]() + self.out_conv3 = nn.Conv2d(intermediate_size, hidden_size, kernel_size=3, stride=1, padding=1, bias=False) + + def forward(self, hidden_states: torch.Tensor, residual_states: torch.Tensor) -> torch.Tensor: + residual_states = self.out_conv1(residual_states) + residual_states = residual_states + hidden_states + residual_states = self.out_conv2(residual_states) + residual_states = self.batch_norm(residual_states) + residual_states = self.activation(residual_states) + residual_states = self.out_conv3(residual_states) + residual_states = nn.functional.interpolate( + residual_states, scale_factor=2.0, mode="bilinear", align_corners=False + ) + return residual_states + + +class EfficientLoFTRFineFusionLayer(nn.Module): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__() + + self.fine_kernel_size = config.fine_kernel_size + + fine_fusion_dims = config.fine_fusion_dims + self.out_conv = nn.Conv2d( + fine_fusion_dims[0], fine_fusion_dims[0], kernel_size=1, stride=1, padding=0, bias=False + ) + self.out_conv_layers = nn.ModuleList() + for i in range(1, len(fine_fusion_dims)): + out_conv = EfficientLoFTROutConvBlock(config, fine_fusion_dims[i], fine_fusion_dims[i - 1]) + self.out_conv_layers.append(out_conv) + + def forward_pyramid( + self, + hidden_states: torch.Tensor, + residual_states: list[torch.Tensor], + ) -> torch.Tensor: + hidden_states = self.out_conv(hidden_states) + hidden_states = nn.functional.interpolate( + hidden_states, scale_factor=2.0, mode="bilinear", align_corners=False + ) + for i, layer in enumerate(self.out_conv_layers): + hidden_states = layer(hidden_states, residual_states[i]) + + return hidden_states + + def forward( + self, + coarse_features: torch.Tensor, + residual_features: list[torch.Tensor], + ) -> tuple[torch.Tensor, torch.Tensor]: + """ + For each image pair, compute the fine features of pixels. + In both images, compute a patch of fine features center cropped around each coarse pixel. + In the first image, the feature patch is kernel_size large and long. + In the second image, it is (kernel_size + 2) large and long. + """ + batch_size, _, embed_dim, coarse_height, coarse_width = coarse_features.shape + + coarse_features = coarse_features.reshape(-1, embed_dim, coarse_height, coarse_width) + residual_features = list(reversed(residual_features)) + + # 1. Fine feature extraction + fine_features = self.forward_pyramid(coarse_features, residual_features) + _, fine_embed_dim, fine_height, fine_width = fine_features.shape + + fine_features = fine_features.reshape(batch_size, 2, fine_embed_dim, fine_height, fine_width) + fine_features_0 = fine_features[:, 0] + fine_features_1 = fine_features[:, 1] + + # 2. Unfold all local windows in crops + stride = int(fine_height // coarse_height) + fine_features_0 = nn.functional.unfold( + fine_features_0, kernel_size=self.fine_kernel_size, stride=stride, padding=0 + ) + _, _, seq_len = fine_features_0.shape + fine_features_0 = fine_features_0.reshape(batch_size, -1, self.fine_kernel_size**2, seq_len) + fine_features_0 = fine_features_0.permute(0, 3, 2, 1) + + fine_features_1 = nn.functional.unfold( + fine_features_1, kernel_size=self.fine_kernel_size + 2, stride=stride, padding=1 + ) + fine_features_1 = fine_features_1.reshape(batch_size, -1, (self.fine_kernel_size + 2) ** 2, seq_len) + fine_features_1 = fine_features_1.permute(0, 3, 2, 1) + + return fine_features_0, fine_features_1 + + +@auto_docstring +class EfficientLoFTRPreTrainedModel(PreTrainedModel): + """ + An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained + models. + """ + + config_class = EfficientLoFTRConfig + base_model_prefix = "efficientloftr" + main_input_name = "pixel_values" + supports_gradient_checkpointing = True + _supports_flash_attn = True + _supports_sdpa = True + _can_record_outputs = { + "hidden_states": EfficientLoFTRRepVGGBlock, + "attentions": EfficientLoFTRAttention, + } + + def _init_weights(self, module: nn.Module) -> None: + """Initialize the weights""" + if isinstance(module, (nn.Linear, nn.Conv2d, nn.Conv1d, nn.BatchNorm2d)): + module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) + if module.bias is not None: + module.bias.data.zero_() + elif isinstance(module, nn.LayerNorm): + module.bias.data.zero_() + module.weight.data.fill_(1.0) + + # Copied from transformers.models.superpoint.modeling_superpoint.SuperPointPreTrainedModel.extract_one_channel_pixel_values with SuperPoint->EfficientLoFTR + def extract_one_channel_pixel_values(self, pixel_values: torch.FloatTensor) -> torch.FloatTensor: + """ + Assuming pixel_values has shape (batch_size, 3, height, width), and that all channels values are the same, + extract the first channel value to get a tensor of shape (batch_size, 1, height, width) for EfficientLoFTR. This is + a workaround for the issue discussed in : + https://github.com/huggingface/transformers/pull/25786#issuecomment-1730176446 + + Args: + pixel_values: torch.FloatTensor of shape (batch_size, 3, height, width) + + Returns: + pixel_values: torch.FloatTensor of shape (batch_size, 1, height, width) + + """ + return pixel_values[:, 0, :, :][:, None, :, :] + + +@auto_docstring( + custom_intro=""" + EfficientLoFTR model taking images as inputs and outputting the features of the images. + """ +) +class EfficientLoFTRModel(EfficientLoFTRPreTrainedModel): + def __init__(self, config: EfficientLoFTRConfig): + super().__init__(config) + + self.config = config + self.backbone = EfficientLoFTRepVGG(config) + self.local_feature_transformer = EfficientLoFTRLocalFeatureTransformer(config) + self.rotary_emb = EfficientLoFTRRotaryEmbedding(config=config) + + self.post_init() + + @check_model_inputs + @auto_docstring + def forward( + self, + pixel_values: torch.FloatTensor, + labels: Optional[torch.LongTensor] = None, + **kwargs: Unpack[TransformersKwargs], + ) -> BackboneOutput: + r""" + Examples: + + ```python + >>> from transformers import AutoImageProcessor, AutoModel + >>> import torch + >>> from PIL import Image + >>> import requests + + >>> url = "https://github.com/magicleap/SuperGluePretrainedNetwork/blob/master/assets/phototourism_sample_images/london_bridge_78916675_4568141288.jpg?raw=true" + >>> image1 = Image.open(requests.get(url, stream=True).raw) + >>> url = "https://github.com/magicleap/SuperGluePretrainedNetwork/blob/master/assets/phototourism_sample_images/london_bridge_19481797_2295892421.jpg?raw=true" + >>> image2 = Image.open(requests.get(url, stream=True).raw) + >>> images = [image1, image2] + + >>> processor = AutoImageProcessor.from_pretrained("zju-community/efficient_loftr") + >>> model = AutoModel.from_pretrained("zju-community/efficient_loftr") + + >>> with torch.no_grad(): + >>> inputs = processor(images, return_tensors="pt") + >>> outputs = model(**inputs) + ```""" + if labels is not None: + raise ValueError("EfficientLoFTR is not trainable, no labels should be provided.") + + if pixel_values.ndim != 5 or pixel_values.size(1) != 2: + raise ValueError("Input must be a 5D tensor of shape (batch_size, 2, num_channels, height, width)") + + batch_size, _, channels, height, width = pixel_values.shape + pixel_values = pixel_values.reshape(batch_size * 2, channels, height, width) + pixel_values = self.extract_one_channel_pixel_values(pixel_values) + + # 1. Local Feature CNN + features = self.backbone(pixel_values) + # Last stage outputs are coarse outputs + coarse_features = features[-1] + # Rest is residual features used in EfficientLoFTRFineFusionLayer + residual_features = features[:-1] + coarse_embed_dim, coarse_height, coarse_width = coarse_features.shape[-3:] + + # 2. Coarse-level LoFTR module + cos, sin = self.rotary_emb(coarse_features) + cos = cos.expand(batch_size * 2, -1, -1, -1).reshape(batch_size * 2, -1, coarse_embed_dim) + sin = sin.expand(batch_size * 2, -1, -1, -1).reshape(batch_size * 2, -1, coarse_embed_dim) + position_embeddings = (cos, sin) + + coarse_features = coarse_features.reshape(batch_size, 2, coarse_embed_dim, coarse_height, coarse_width) + coarse_features = self.local_feature_transformer( + coarse_features, position_embeddings=position_embeddings, **kwargs + ) + + features = (coarse_features,) + tuple(residual_features) + + return BackboneOutput(feature_maps=features) + + +def mask_border(tensor: torch.Tensor, border_margin: int, value: Union[bool, float, int]) -> torch.Tensor: + """ + Mask a tensor border with a given value + + Args: + tensor (`torch.Tensor` of shape `(batch_size, height_0, width_0, height_1, width_1)`): + The tensor to mask + border_margin (`int`) : + The size of the border + value (`Union[bool, int, float]`): + The value to place in the tensor's borders + + Returns: + tensor (`torch.Tensor` of shape `(batch_size, height_0, width_0, height_1, width_1)`): + The masked tensor + """ + if border_margin <= 0: + return tensor + + tensor[:, :border_margin, :border_margin, :border_margin, :border_margin] = value + tensor[:, -border_margin:, -border_margin:, -border_margin:, -border_margin:] = value + return tensor + + +def create_meshgrid( + height: Union[int, torch.Tensor], + width: Union[int, torch.Tensor], + normalized_coordinates: bool = False, + device: Optional[torch.device] = None, + dtype: Optional[torch.dtype] = None, +) -> torch.Tensor: + """ + Copied from kornia library : kornia/kornia/utils/grid.py:26 + + Generate a coordinate grid for an image. + + When the flag ``normalized_coordinates`` is set to True, the grid is + normalized to be in the range :math:`[-1,1]` to be consistent with the pytorch + function :py:func:`torch.nn.functional.grid_sample`. + + Args: + height (`int`): + The image height (rows). + width (`int`): + The image width (cols). + normalized_coordinates (`bool`): + Whether to normalize coordinates in the range :math:`[-1,1]` in order to be consistent with the + PyTorch function :py:func:`torch.nn.functional.grid_sample`. + device (`torch.device`): + The device on which the grid will be generated. + dtype (`torch.dtype`): + The data type of the generated grid. + + Return: + grid (`torch.Tensor` of shape `(1, height, width, 2)`): + The grid tensor. + + Example: + >>> create_meshgrid(2, 2) + tensor([[[[-1., -1.], + [ 1., -1.]], + + [[-1., 1.], + [ 1., 1.]]]]) + + >>> create_meshgrid(2, 2, normalized_coordinates=False) + tensor([[[[0., 0.], + [1., 0.]], + + [[0., 1.], + [1., 1.]]]]) + + """ + xs = torch.linspace(0, width - 1, width, device=device, dtype=dtype) + ys = torch.linspace(0, height - 1, height, device=device, dtype=dtype) + if normalized_coordinates: + xs = (xs / (width - 1) - 0.5) * 2 + ys = (ys / (height - 1) - 0.5) * 2 + grid = torch.stack(torch.meshgrid(ys, xs, indexing="ij"), dim=-1) + grid = grid.permute(1, 0, 2).unsqueeze(0) + return grid + + +def spatial_expectation2d(input: torch.Tensor, normalized_coordinates: bool = True) -> torch.Tensor: + r""" + Copied from kornia library : kornia/geometry/subpix/dsnt.py:76 + Compute the expectation of coordinate values using spatial probabilities. + + The input heatmap is assumed to represent a valid spatial probability distribution, + which can be achieved using :func:`~kornia.geometry.subpixel.spatial_softmax2d`. + + Args: + input (`torch.Tensor` of shape `(batch_size, embed_dim, height, width)`): + The input tensor representing dense spatial probabilities. + normalized_coordinates (`bool`): + Whether to return the coordinates normalized in the range of :math:`[-1, 1]`. Otherwise, it will return + the coordinates in the range of the input shape. + + Returns: + output (`torch.Tensor` of shape `(batch_size, embed_dim, 2)`) + Expected value of the 2D coordinates. Output order of the coordinates is (x, y). + + Examples: + >>> heatmaps = torch.tensor([[[ + ... [0., 0., 0.], + ... [0., 0., 0.], + ... [0., 1., 0.]]]]) + >>> spatial_expectation2d(heatmaps, False) + tensor([[[1., 2.]]]) + + """ + batch_size, embed_dim, height, width = input.shape + + # Create coordinates grid. + grid = create_meshgrid(height, width, normalized_coordinates, input.device) + grid = grid.to(input.dtype) + + pos_x = grid[..., 0].reshape(-1) + pos_y = grid[..., 1].reshape(-1) + + input_flat = input.view(batch_size, embed_dim, -1) + + # Compute the expectation of the coordinates. + expected_y = torch.sum(pos_y * input_flat, -1, keepdim=True) + expected_x = torch.sum(pos_x * input_flat, -1, keepdim=True) + + output = torch.cat([expected_x, expected_y], -1) + + return output.view(batch_size, embed_dim, 2) + + +@auto_docstring( + custom_intro=""" + EfficientLoFTR model taking images as inputs and outputting the matching of them. + """ +) +class EfficientLoFTRForKeypointMatching(EfficientLoFTRPreTrainedModel): + """EfficientLoFTR dense image matcher + + Given two images, we determine the correspondences by: + 1. Extracting coarse and fine features through a backbone + 2. Transforming coarse features through self and cross attention + 3. Matching coarse features to obtain coarse coordinates of matches + 4. Obtaining full resolution fine features by fusing transformed and backbone coarse features + 5. Refining the coarse matches using fine feature patches centered at each coarse match in a two-stage refinement + + Yifan Wang, Xingyi He, Sida Peng, Dongli Tan and Xiaowei Zhou. + Efficient LoFTR: Semi-Dense Local Feature Matching with Sparse-Like Speed + In CVPR, 2024. https://arxiv.org/abs/2403.04765 + """ + + def __init__(self, config: EfficientLoFTRConfig): + super().__init__(config) + + self.config = config + self.efficientloftr = EfficientLoFTRModel(config) + self.refinement_layer = EfficientLoFTRFineFusionLayer(config) + + self.post_init() + + def _get_matches_from_scores(self, scores: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: + """ + Based on a keypoint score matrix, compute the best keypoint matches between the first and second image. + Since each image pair can have different number of matches, the matches are concatenated together for all pair + in the batch and a batch_indices tensor is returned to specify which match belong to which element in the batch. + + Note: + This step can be done as a postprocessing step, because does not involve any model weights/params. + However, we keep it in the modeling code for consistency with other keypoint matching models AND for + easier torch.compile/torch.export (all ops are in torch). + + Args: + scores (`torch.Tensor` of shape `(batch_size, height_0, width_0, height_1, width_1)`): + Scores of keypoints + + Returns: + matched_indices (`torch.Tensor` of shape `(2, num_matches)`): + Indices representing which pixel in the first image matches which pixel in the second image + matching_scores (`torch.Tensor` of shape `(num_matches,)`): + Scores of each match + """ + batch_size, height0, width0, height1, width1 = scores.shape + + scores = scores.view(batch_size, height0 * width0, height1 * width1) + + # For each keypoint, get the best match + max_0 = scores.max(2, keepdim=True).values + max_1 = scores.max(1, keepdim=True).values + + # 1. Thresholding + mask = scores > self.config.coarse_matching_threshold + + # 2. Border removal + mask = mask.reshape(batch_size, height0, width0, height1, width1) + mask = mask_border(mask, self.config.coarse_matching_border_removal, False) + mask = mask.reshape(batch_size, height0 * width0, height1 * width1) + + # 3. Mutual nearest neighbors + mask = mask * (scores == max_0) * (scores == max_1) + + # 4. Fine coarse matches + masked_scores = scores * mask + matching_scores_0, max_indices_0 = masked_scores.max(1) + matching_scores_1, max_indices_1 = masked_scores.max(2) + + matching_indices = torch.cat([max_indices_0, max_indices_1]).reshape(batch_size, 2, -1) + matching_scores = torch.stack([matching_scores_0, matching_scores_1], dim=1) + + # For the keypoints not meeting the threshold score, set the indices to -1 which corresponds to no matches found + matching_indices = torch.where(matching_scores > 0, matching_indices, -1) + + return matching_indices, matching_scores + + def _coarse_matching( + self, coarse_features: torch.Tensor, coarse_scale: float + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + For each image pair, compute the matching confidence between each coarse element (by default (image_height / 8) + * (image_width / 8 elements)) from the first image to the second image. + + Note: + This step can be done as a postprocessing step, because does not involve any model weights/params. + However, we keep it in the modeling code for consistency with other keypoint matching models AND for + easier torch.compile/torch.export (all ops are in torch). + + Args: + coarse_features (`torch.Tensor` of shape `(batch_size, 2, hidden_size, coarse_height, coarse_width)`): + Coarse features + coarse_scale (`float`): Scale between the image size and the coarse size + + Returns: + keypoints (`torch.Tensor` of shape `(batch_size, 2, num_matches, 2)`): + Keypoints coordinates. + matching_scores (`torch.Tensor` of shape `(batch_size, 2, num_matches)`): + The confidence matching score of each keypoint. + matched_indices (`torch.Tensor` of shape `(batch_size, 2, num_matches)`): + Indices which indicates which keypoint in an image matched with which keypoint in the other image. For + both image in the pair. + """ + batch_size, _, embed_dim, height, width = coarse_features.shape + + # (batch_size, 2, embed_dim, height, width) -> (batch_size, 2, height * width, embed_dim) + coarse_features = coarse_features.permute(0, 1, 3, 4, 2) + coarse_features = coarse_features.reshape(batch_size, 2, -1, embed_dim) + + coarse_features = coarse_features / coarse_features.shape[-1] ** 0.5 + coarse_features_0 = coarse_features[:, 0] + coarse_features_1 = coarse_features[:, 1] + + similarity = coarse_features_0 @ coarse_features_1.transpose(-1, -2) + similarity = similarity / self.config.coarse_matching_temperature + + if self.config.coarse_matching_skip_softmax: + confidence = similarity + else: + confidence = nn.functional.softmax(similarity, 1) * nn.functional.softmax(similarity, 2) + + confidence = confidence.view(batch_size, height, width, height, width) + matched_indices, matching_scores = self._get_matches_from_scores(confidence) + + keypoints = torch.stack([matched_indices % width, matched_indices // width], dim=-1) * coarse_scale + + return keypoints, matching_scores, matched_indices + + def _get_first_stage_fine_matching( + self, + fine_confidence: torch.Tensor, + coarse_matched_keypoints: torch.Tensor, + fine_window_size: int, + fine_scale: float, + ) -> tuple[torch.Tensor, torch.Tensor]: + """ + For each coarse pixel, retrieve the highest fine confidence score and index. + The index represents the matching between a pixel position in the fine window in the first image and a pixel + position in the fine window of the second image. + For example, for a fine_window_size of 64 (8 * 8), the index 2474 represents the matching between the index 38 + (2474 // 64) in the fine window of the first image, and the index 42 in the second image. This means that 38 + which corresponds to the position (4, 6) (4 // 8 and 4 % 8) is matched with the position (5, 2). In this example + the coarse matched coordinate will be shifted to the matched fine coordinates in the first and second image. + + Note: + This step can be done as a postprocessing step, because does not involve any model weights/params. + However, we keep it in the modeling code for consistency with other keypoint matching models AND for + easier torch.compile/torch.export (all ops are in torch). + + Args: + fine_confidence (`torch.Tensor` of shape `(num_matches, fine_window_size, fine_window_size)`): + First stage confidence of matching fine features between the first and the second image + coarse_matched_keypoints (`torch.Tensor` of shape `(2, num_matches, 2)`): + Coarse matched keypoint between the first and the second image. + fine_window_size (`int`): + Size of the window used to refine matches + fine_scale (`float`): + Scale between the size of fine features and coarse features + + Returns: + indices (`torch.Tensor` of shape `(2, num_matches, 1)`): + Indices of the fine coordinate matched in the fine window + fine_matches (`torch.Tensor` of shape `(2, num_matches, 2)`): + Coordinates of matched keypoints after the first fine stage + """ + batch_size, num_keypoints, _, _ = fine_confidence.shape + fine_kernel_size = torch_int(fine_window_size**0.5) + + fine_confidence = fine_confidence.reshape(batch_size, num_keypoints, -1) + values, indices = torch.max(fine_confidence, dim=-1) + indices = indices[..., None] + indices_0 = indices // fine_window_size + indices_1 = indices % fine_window_size + + grid = create_meshgrid( + fine_kernel_size, + fine_kernel_size, + normalized_coordinates=False, + device=fine_confidence.device, + dtype=fine_confidence.dtype, + ) + grid = grid - (fine_kernel_size // 2) + 0.5 + grid = grid.reshape(1, 1, -1, 2).expand(batch_size, num_keypoints, -1, -1) + delta_0 = torch.gather(grid, 1, indices_0.unsqueeze(-1).expand(-1, -1, -1, 2)).squeeze(2) + delta_1 = torch.gather(grid, 1, indices_1.unsqueeze(-1).expand(-1, -1, -1, 2)).squeeze(2) + + fine_matches_0 = coarse_matched_keypoints[:, 0] + delta_0 * fine_scale + fine_matches_1 = coarse_matched_keypoints[:, 1] + delta_1 * fine_scale + + indices = torch.stack([indices_0, indices_1], dim=1) + fine_matches = torch.stack([fine_matches_0, fine_matches_1], dim=1) + + return indices, fine_matches + + def _get_second_stage_fine_matching( + self, + indices: torch.Tensor, + fine_matches: torch.Tensor, + fine_confidence: torch.Tensor, + fine_window_size: int, + fine_scale: float, + ) -> torch.Tensor: + """ + For the given position in their respective fine windows, retrieve the 3x3 fine confidences around this position. + After applying softmax to these confidences, compute the 2D spatial expected coordinates. + Shift the first stage fine matching with these expected coordinates. + + Note: + This step can be done as a postprocessing step, because does not involve any model weights/params. + However, we keep it in the modeling code for consistency with other keypoint matching models AND for + easier torch.compile/torch.export (all ops are in torch). + + Args: + indices (`torch.Tensor` of shape `(batch_size, 2, num_keypoints)`): + Indices representing the position of each keypoint in the fine window + fine_matches (`torch.Tensor` of shape `(2, num_matches, 2)`): + Coordinates of matched keypoints after the first fine stage + fine_confidence (`torch.Tensor` of shape `(num_matches, fine_window_size, fine_window_size)`): + Second stage confidence of matching fine features between the first and the second image + fine_window_size (`int`): + Size of the window used to refine matches + fine_scale (`float`): + Scale between the size of fine features and coarse features + + Returns: + fine_matches (`torch.Tensor` of shape `(2, num_matches, 2)`): + Coordinates of matched keypoints after the second fine stage + """ + batch_size, num_keypoints, _, _ = fine_confidence.shape + fine_kernel_size = torch_int(fine_window_size**0.5) + + indices_0 = indices[:, 0] + indices_1 = indices[:, 1] + indices_1_i = indices_1 // fine_kernel_size + indices_1_j = indices_1 % fine_kernel_size + + # matches_indices, indices_0, indices_1_i, indices_1_j of shape (num_matches, 3, 3) + batch_indices = torch.arange(batch_size, device=indices_0.device).reshape(batch_size, 1, 1, 1) + matches_indices = torch.arange(num_keypoints, device=indices_0.device).reshape(1, num_keypoints, 1, 1) + indices_0 = indices_0[..., None] + indices_1_i = indices_1_i[..., None] + indices_1_j = indices_1_j[..., None] + + delta = create_meshgrid(3, 3, normalized_coordinates=True, device=indices_0.device).to(torch.long) + delta = delta[None, ...] + + indices_1_i = indices_1_i + delta[..., 1] + indices_1_j = indices_1_j + delta[..., 0] + + fine_confidence = fine_confidence.reshape( + batch_size, num_keypoints, fine_window_size, fine_kernel_size + 2, fine_kernel_size + 2 + ) + # (batch_size, seq_len, fine_window_size, fine_kernel_size + 2, fine_kernel_size + 2) -> (batch_size, seq_len, 3, 3) + fine_confidence = fine_confidence[batch_indices, matches_indices, indices_0, indices_1_i, indices_1_j] + fine_confidence = fine_confidence.reshape(batch_size, num_keypoints, 9) + fine_confidence = nn.functional.softmax( + fine_confidence / self.config.fine_matching_regress_temperature, dim=-1 + ) + + heatmap = fine_confidence.reshape(batch_size, num_keypoints, 3, 3) + fine_coordinates_normalized = spatial_expectation2d(heatmap, True)[0] + + fine_matches_0 = fine_matches[:, 0] + fine_matches_1 = fine_matches[:, 1] + (fine_coordinates_normalized * (3 // 2) * fine_scale) + + fine_matches = torch.stack([fine_matches_0, fine_matches_1], dim=1) + + return fine_matches + + def _fine_matching( + self, + fine_features_0: torch.Tensor, + fine_features_1: torch.Tensor, + coarse_matched_keypoints: torch.Tensor, + fine_scale: float, + ) -> torch.Tensor: + """ + For each coarse pixel with a corresponding window of fine features, compute the matching confidence between fine + features in the first image and the second image. + + Fine features are sliced in two part : + - The first part used for the first stage are the first fine_hidden_size - config.fine_matching_slicedim (64 - 8 + = 56 by default) features. + - The second part used for the second stage are the last config.fine_matching_slicedim (8 by default) features. + + Each part is used to compute a fine confidence tensor of the following shape : + (batch_size, (coarse_height * coarse_width), fine_window_size, fine_window_size) + They correspond to the score between each fine pixel in the first image and each fine pixel in the second image. + + Args: + fine_features_0 (`torch.Tensor` of shape `(num_matches, fine_kernel_size ** 2, fine_kernel_size ** 2)`): + Fine features from the first image + fine_features_1 (`torch.Tensor` of shape `(num_matches, (fine_kernel_size + 2) ** 2, (fine_kernel_size + 2) + ** 2)`): + Fine features from the second image + coarse_matched_keypoints (`torch.Tensor` of shape `(2, num_matches, 2)`): + Keypoint coordinates found in coarse matching for the first and second image + fine_scale (`int`): + Scale between the size of fine features and coarse features + + Returns: + fine_coordinates (`torch.Tensor` of shape `(2, num_matches, 2)`): + Matched keypoint between the first and the second image. All matched keypoints are concatenated in the + second dimension. + + """ + batch_size, num_keypoints, fine_window_size, fine_embed_dim = fine_features_0.shape + fine_matching_slice_dim = self.config.fine_matching_slice_dim + + fine_kernel_size = torch_int(fine_window_size**0.5) + + # Split fine features into first and second stage features + split_fine_features_0 = torch.split(fine_features_0, fine_embed_dim - fine_matching_slice_dim, -1) + split_fine_features_1 = torch.split(fine_features_1, fine_embed_dim - fine_matching_slice_dim, -1) + + # Retrieve first stage fine features + fine_features_0 = split_fine_features_0[0] + fine_features_1 = split_fine_features_1[0] + + # Normalize first stage fine features + fine_features_0 = fine_features_0 / fine_features_0.shape[-1] ** 0.5 + fine_features_1 = fine_features_1 / fine_features_1.shape[-1] ** 0.5 + + # Compute first stage confidence + fine_confidence = fine_features_0 @ fine_features_1.transpose(-1, -2) + fine_confidence = nn.functional.softmax(fine_confidence, 1) * nn.functional.softmax(fine_confidence, 2) + fine_confidence = fine_confidence.reshape( + batch_size, num_keypoints, fine_window_size, fine_kernel_size + 2, fine_kernel_size + 2 + ) + fine_confidence = fine_confidence[..., 1:-1, 1:-1] + first_stage_fine_confidence = fine_confidence.reshape( + batch_size, num_keypoints, fine_window_size, fine_window_size + ) + + fine_indices, fine_matches = self._get_first_stage_fine_matching( + first_stage_fine_confidence, + coarse_matched_keypoints, + fine_window_size, + fine_scale, + ) + + # Retrieve second stage fine features + fine_features_0 = split_fine_features_0[1] + fine_features_1 = split_fine_features_1[1] + + # Normalize second stage fine features + fine_features_1 = fine_features_1 / fine_matching_slice_dim**0.5 + + # Compute second stage fine confidence + second_stage_fine_confidence = fine_features_0 @ fine_features_1.transpose(-1, -2) + + fine_coordinates = self._get_second_stage_fine_matching( + fine_indices, + fine_matches, + second_stage_fine_confidence, + fine_window_size, + fine_scale, + ) + + return fine_coordinates + + @auto_docstring + @can_return_tuple + def forward( + self, + pixel_values: torch.FloatTensor, + labels: Optional[torch.LongTensor] = None, + **kwargs: Unpack[TransformersKwargs], + ) -> KeypointMatchingOutput: + r""" + Examples: + + ```python + >>> from transformers import AutoImageProcessor, AutoModel + >>> import torch + >>> from PIL import Image + >>> import requests + + >>> url = "https://github.com/magicleap/SuperGluePretrainedNetwork/blob/master/assets/phototourism_sample_images/london_bridge_78916675_4568141288.jpg?raw=true" + >>> image1 = Image.open(requests.get(url, stream=True).raw) + >>> url = "https://github.com/magicleap/SuperGluePretrainedNetwork/blob/master/assets/phototourism_sample_images/london_bridge_19481797_2295892421.jpg?raw=true" + >>> image2 = Image.open(requests.get(url, stream=True).raw) + >>> images = [image1, image2] + + >>> processor = AutoImageProcessor.from_pretrained("zju-community/efficient_loftr") + >>> model = AutoModel.from_pretrained("zju-community/efficient_loftr") + + >>> with torch.no_grad(): + >>> inputs = processor(images, return_tensors="pt") + >>> outputs = model(**inputs) + ```""" + if labels is not None: + raise ValueError("SuperGlue is not trainable, no labels should be provided.") + + # 1. Extract coarse and residual features + model_outputs: BackboneOutput = self.efficientloftr(pixel_values, **kwargs) + features = model_outputs.feature_maps + + # 2. Compute coarse-level matching + coarse_features = features[0] + coarse_embed_dim, coarse_height, coarse_width = coarse_features.shape[-3:] + batch_size, _, channels, height, width = pixel_values.shape + coarse_scale = height / coarse_height + coarse_keypoints, coarse_matching_scores, coarse_matched_indices = self._coarse_matching( + coarse_features, coarse_scale + ) + + # 3. Fine-level refinement + residual_features = features[1:] + fine_features_0, fine_features_1 = self.refinement_layer(coarse_features, residual_features) + + # Filter fine features with coarse matches indices + _, _, num_keypoints = coarse_matching_scores.shape + batch_indices = torch.arange(batch_size)[..., None] + fine_features_0 = fine_features_0[batch_indices, coarse_matched_indices[:, 0]] + fine_features_1 = fine_features_1[batch_indices, coarse_matched_indices[:, 1]] + + # 4. Computer fine-level matching + fine_height = torch_int(coarse_height * coarse_scale) + fine_scale = height / fine_height + matching_keypoints = self._fine_matching(fine_features_0, fine_features_1, coarse_keypoints, fine_scale) + + matching_keypoints[:, :, :, 0] = matching_keypoints[:, :, :, 0] / width + matching_keypoints[:, :, :, 1] = matching_keypoints[:, :, :, 1] / height + + return KeypointMatchingOutput( + matches=coarse_matched_indices, + matching_scores=coarse_matching_scores, + keypoints=matching_keypoints, + hidden_states=model_outputs.hidden_states, + attentions=model_outputs.attentions, + ) + + +__all__ = ["EfficientLoFTRPreTrainedModel", "EfficientLoFTRModel", "EfficientLoFTRForKeypointMatching"] diff --git a/src/transformers/models/lightglue/image_processing_lightglue.py b/src/transformers/models/lightglue/image_processing_lightglue.py index ca9189210b..124e4b04d5 100644 --- a/src/transformers/models/lightglue/image_processing_lightglue.py +++ b/src/transformers/models/lightglue/image_processing_lightglue.py @@ -51,7 +51,7 @@ logger = logging.get_logger(__name__) def is_grayscale( - image: ImageInput, + image: np.ndarray, input_data_format: Optional[Union[str, ChannelDimension]] = None, ): if input_data_format == ChannelDimension.FIRST: diff --git a/src/transformers/models/superglue/image_processing_superglue.py b/src/transformers/models/superglue/image_processing_superglue.py index ab6108e205..7fd31a905e 100644 --- a/src/transformers/models/superglue/image_processing_superglue.py +++ b/src/transformers/models/superglue/image_processing_superglue.py @@ -53,7 +53,7 @@ logger = logging.get_logger(__name__) # Copied from transformers.models.superpoint.image_processing_superpoint.is_grayscale def is_grayscale( - image: ImageInput, + image: np.ndarray, input_data_format: Optional[Union[str, ChannelDimension]] = None, ): if input_data_format == ChannelDimension.FIRST: diff --git a/src/transformers/models/superpoint/image_processing_superpoint.py b/src/transformers/models/superpoint/image_processing_superpoint.py index dcd0c09f91..9759e751b2 100644 --- a/src/transformers/models/superpoint/image_processing_superpoint.py +++ b/src/transformers/models/superpoint/image_processing_superpoint.py @@ -45,7 +45,7 @@ logger = logging.get_logger(__name__) def is_grayscale( - image: ImageInput, + image: np.ndarray, input_data_format: Optional[Union[str, ChannelDimension]] = None, ): if input_data_format == ChannelDimension.FIRST: diff --git a/src/transformers/utils/generic.py b/src/transformers/utils/generic.py index 9692f705f2..4a007c830f 100644 --- a/src/transformers/utils/generic.py +++ b/src/transformers/utils/generic.py @@ -1075,7 +1075,7 @@ def check_model_inputs(func): if key == "hidden_states": if hasattr(outputs, "vision_hidden_states"): collected_outputs[key] += (outputs.vision_hidden_states,) - else: + elif hasattr(outputs, "last_hidden_state"): collected_outputs[key] += (outputs.last_hidden_state,) outputs[key] = collected_outputs[key] elif key == "attentions": diff --git a/tests/models/efficientloftr/__init__.py b/tests/models/efficientloftr/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/models/efficientloftr/test_image_processing_efficientloftr.py b/tests/models/efficientloftr/test_image_processing_efficientloftr.py new file mode 100644 index 0000000000..ba325aa9c2 --- /dev/null +++ b/tests/models/efficientloftr/test_image_processing_efficientloftr.py @@ -0,0 +1,90 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest + +from tests.models.superglue.test_image_processing_superglue import ( + SuperGlueImageProcessingTest, + SuperGlueImageProcessingTester, +) +from transformers.testing_utils import require_torch, require_vision +from transformers.utils import is_torch_available, is_vision_available + + +if is_torch_available(): + import numpy as np + import torch + + from transformers.models.efficientloftr.modeling_efficientloftr import KeypointMatchingOutput + +if is_vision_available(): + from transformers import EfficientLoFTRImageProcessor + + +def random_array(size): + return np.random.randint(255, size=size) + + +def random_tensor(size): + return torch.rand(size) + + +class EfficientLoFTRImageProcessingTester(SuperGlueImageProcessingTester): + """Tester for EfficientLoFTRImageProcessor""" + + def __init__( + self, + parent, + batch_size=6, + num_channels=3, + image_size=18, + min_resolution=30, + max_resolution=400, + do_resize=True, + size=None, + do_grayscale=True, + ): + super().__init__( + parent, batch_size, num_channels, image_size, min_resolution, max_resolution, do_resize, size, do_grayscale + ) + + def prepare_keypoint_matching_output(self, pixel_values): + """Prepare a fake output for the keypoint matching model with random matches between 50 keypoints per image.""" + max_number_keypoints = 50 + batch_size = len(pixel_values) + keypoints = torch.zeros((batch_size, 2, max_number_keypoints, 2)) + matches = torch.full((batch_size, 2, max_number_keypoints), -1, dtype=torch.int) + scores = torch.zeros((batch_size, 2, max_number_keypoints)) + for i in range(batch_size): + random_number_keypoints0 = np.random.randint(10, max_number_keypoints) + random_number_keypoints1 = np.random.randint(10, max_number_keypoints) + random_number_matches = np.random.randint(5, min(random_number_keypoints0, random_number_keypoints1)) + keypoints[i, 0, :random_number_keypoints0] = torch.rand((random_number_keypoints0, 2)) + keypoints[i, 1, :random_number_keypoints1] = torch.rand((random_number_keypoints1, 2)) + random_matches_indices0 = torch.randperm(random_number_keypoints1, dtype=torch.int)[:random_number_matches] + random_matches_indices1 = torch.randperm(random_number_keypoints0, dtype=torch.int)[:random_number_matches] + matches[i, 0, random_matches_indices1] = random_matches_indices0 + matches[i, 1, random_matches_indices0] = random_matches_indices1 + scores[i, 0, random_matches_indices1] = torch.rand((random_number_matches,)) + scores[i, 1, random_matches_indices0] = torch.rand((random_number_matches,)) + return KeypointMatchingOutput(keypoints=keypoints, matches=matches, matching_scores=scores) + + +@require_torch +@require_vision +class EfficientLoFTRImageProcessingTest(SuperGlueImageProcessingTest, unittest.TestCase): + image_processing_class = EfficientLoFTRImageProcessor if is_vision_available() else None + + def setUp(self) -> None: + super().setUp() + self.image_processor_tester = EfficientLoFTRImageProcessingTester(self) diff --git a/tests/models/efficientloftr/test_modeling_efficientloftr.py b/tests/models/efficientloftr/test_modeling_efficientloftr.py new file mode 100644 index 0000000000..50c24c41fa --- /dev/null +++ b/tests/models/efficientloftr/test_modeling_efficientloftr.py @@ -0,0 +1,453 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import inspect +import unittest +from functools import reduce + +from datasets import load_dataset + +from transformers.models.efficientloftr import EfficientLoFTRConfig, EfficientLoFTRModel +from transformers.testing_utils import ( + require_torch, + require_vision, + set_config_for_less_flaky_test, + set_model_for_less_flaky_test, + set_model_tester_for_less_flaky_test, + slow, + torch_device, +) +from transformers.utils import cached_property, is_torch_available, is_vision_available + +from ...test_configuration_common import ConfigTester +from ...test_modeling_common import ModelTesterMixin, floats_tensor + + +if is_torch_available(): + import torch + + from transformers import EfficientLoFTRForKeypointMatching + +if is_vision_available(): + from transformers import AutoImageProcessor + + +class EfficientLoFTRModelTester: + def __init__( + self, + parent, + batch_size=2, + image_width=80, + image_height=60, + stage_num_blocks: list[int] = [1, 1, 1], + out_features: list[int] = [32, 32, 64], + stage_stride: list[int] = [2, 1, 2], + q_aggregation_kernel_size: int = 1, + kv_aggregation_kernel_size: int = 1, + q_aggregation_stride: int = 1, + kv_aggregation_stride: int = 1, + num_attention_layers: int = 2, + num_attention_heads: int = 8, + hidden_size: int = 64, + coarse_matching_threshold: float = 0.0, + fine_kernel_size: int = 2, + coarse_matching_border_removal: int = 0, + ): + self.parent = parent + self.batch_size = batch_size + self.image_width = image_width + self.image_height = image_height + + self.stage_num_blocks = stage_num_blocks + self.out_features = out_features + self.stage_stride = stage_stride + self.q_aggregation_kernel_size = q_aggregation_kernel_size + self.kv_aggregation_kernel_size = kv_aggregation_kernel_size + self.q_aggregation_stride = q_aggregation_stride + self.kv_aggregation_stride = kv_aggregation_stride + self.num_attention_layers = num_attention_layers + self.num_attention_heads = num_attention_heads + self.hidden_size = hidden_size + self.coarse_matching_threshold = coarse_matching_threshold + self.coarse_matching_border_removal = coarse_matching_border_removal + self.fine_kernel_size = fine_kernel_size + + def prepare_config_and_inputs(self): + # EfficientLoFTR expects a grayscale image as input + pixel_values = floats_tensor([self.batch_size, 2, 3, self.image_height, self.image_width]) + config = self.get_config() + return config, pixel_values + + def get_config(self): + return EfficientLoFTRConfig( + stage_num_blocks=self.stage_num_blocks, + out_features=self.out_features, + stage_stride=self.stage_stride, + q_aggregation_kernel_size=self.q_aggregation_kernel_size, + kv_aggregation_kernel_size=self.kv_aggregation_kernel_size, + q_aggregation_stride=self.q_aggregation_stride, + kv_aggregation_stride=self.kv_aggregation_stride, + num_attention_layers=self.num_attention_layers, + num_attention_heads=self.num_attention_heads, + hidden_size=self.hidden_size, + coarse_matching_threshold=self.coarse_matching_threshold, + coarse_matching_border_removal=self.coarse_matching_border_removal, + fine_kernel_size=self.fine_kernel_size, + ) + + def create_and_check_model(self, config, pixel_values): + model = EfficientLoFTRForKeypointMatching(config=config) + model.to(torch_device) + model.eval() + result = model(pixel_values) + maximum_num_matches = result.matches.shape[-1] + self.parent.assertEqual( + result.keypoints.shape, + (self.batch_size, 2, maximum_num_matches, 2), + ) + self.parent.assertEqual( + result.matches.shape, + (self.batch_size, 2, maximum_num_matches), + ) + self.parent.assertEqual( + result.matching_scores.shape, + (self.batch_size, 2, maximum_num_matches), + ) + + def prepare_config_and_inputs_for_common(self): + config_and_inputs = self.prepare_config_and_inputs() + config, pixel_values = config_and_inputs + inputs_dict = {"pixel_values": pixel_values} + return config, inputs_dict + + +@require_torch +class EfficientLoFTRModelTest(ModelTesterMixin, unittest.TestCase): + all_model_classes = (EfficientLoFTRForKeypointMatching, EfficientLoFTRModel) if is_torch_available() else () + + test_pruning = False + test_resize_embeddings = False + test_head_masking = False + has_attentions = True + + def setUp(self): + self.model_tester = EfficientLoFTRModelTester(self) + self.config_tester = ConfigTester(self, config_class=EfficientLoFTRConfig, has_text_modality=False) + + def test_config(self): + self.config_tester.create_and_test_config_to_json_string() + self.config_tester.create_and_test_config_to_json_file() + self.config_tester.create_and_test_config_from_and_save_pretrained() + self.config_tester.create_and_test_config_with_num_labels() + self.config_tester.check_config_can_be_init_without_params() + self.config_tester.check_config_arguments_init() + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching does not use inputs_embeds") + def test_inputs_embeds(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching does not support input and output embeddings") + def test_model_get_set_embeddings(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching does not use feedforward chunking") + def test_feed_forward_chunking(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching is not trainable") + def test_training(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching is not trainable") + def test_training_gradient_checkpointing(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching is not trainable") + def test_training_gradient_checkpointing_use_reentrant(self): + pass + + @unittest.skip(reason="EfficientLoFTRForKeypointMatching is not trainable") + def test_training_gradient_checkpointing_use_reentrant_false(self): + pass + + @unittest.skip(reason="EfficientLoFTR does not output any loss term in the forward pass") + def test_retain_grad_hidden_states_attentions(self): + pass + + def test_model(self): + config_and_inputs = self.model_tester.prepare_config_and_inputs() + self.model_tester.create_and_check_model(*config_and_inputs) + + def test_forward_signature(self): + config, _ = self.model_tester.prepare_config_and_inputs() + + for model_class in self.all_model_classes: + model = model_class(config) + signature = inspect.signature(model.forward) + # signature.parameters is an OrderedDict => so arg_names order is deterministic + arg_names = [*signature.parameters.keys()] + + expected_arg_names = ["pixel_values"] + self.assertListEqual(arg_names[:1], expected_arg_names) + + def test_hidden_states_output(self): + def check_hidden_states_output(inputs_dict, config, model_class): + model = model_class(config) + model.to(torch_device) + model.eval() + + with torch.no_grad(): + outputs = model(**self._prepare_for_class(inputs_dict, model_class)) + + hidden_states = outputs.hidden_states + + expected_num_hidden_states = len(self.model_tester.stage_num_blocks) + self.assertEqual(len(hidden_states), expected_num_hidden_states) + + self.assertListEqual( + list(hidden_states[0].shape[-2:]), + [self.model_tester.image_height // 2, self.model_tester.image_width // 2], + ) + + config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() + + for model_class in self.all_model_classes: + inputs_dict["output_hidden_states"] = True + check_hidden_states_output(inputs_dict, config, model_class) + + # check that output_hidden_states also work using config + del inputs_dict["output_hidden_states"] + config.output_hidden_states = True + + check_hidden_states_output(inputs_dict, config, model_class) + + def test_attention_outputs(self): + def check_attention_output(inputs_dict, config, model_class): + config._attn_implementation = "eager" + model = model_class(config) + model.to(torch_device) + model.eval() + + with torch.no_grad(): + outputs = model(**self._prepare_for_class(inputs_dict, model_class)) + + attentions = outputs.attentions + total_stride = reduce(lambda a, b: a * b, config.stage_stride) + hidden_size = ( + self.model_tester.image_height // total_stride * self.model_tester.image_width // total_stride + ) + + expected_attention_shape = [ + self.model_tester.num_attention_heads, + hidden_size, + hidden_size, + ] + + for i, attention in enumerate(attentions): + self.assertListEqual( + list(attention.shape[-3:]), + expected_attention_shape, + ) + + config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() + + for model_class in self.all_model_classes: + inputs_dict["output_attentions"] = True + check_attention_output(inputs_dict, config, model_class) + + # check that output_hidden_states also work using config + del inputs_dict["output_attentions"] + config.output_attentions = True + + check_attention_output(inputs_dict, config, model_class) + + @slow + def test_model_from_pretrained(self): + from_pretrained_ids = ["stevenbucaille/efficientloftr"] + for model_name in from_pretrained_ids: + model = EfficientLoFTRForKeypointMatching.from_pretrained(model_name) + self.assertIsNotNone(model) + + def test_forward_labels_should_be_none(self): + config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() + for model_class in self.all_model_classes: + model = model_class(config) + model.to(torch_device) + model.eval() + + with torch.no_grad(): + model_inputs = self._prepare_for_class(inputs_dict, model_class) + # Provide an arbitrary sized Tensor as labels to model inputs + model_inputs["labels"] = torch.rand((128, 128)) + + with self.assertRaises(ValueError) as cm: + model(**model_inputs) + self.assertEqual(ValueError, cm.exception.__class__) + + def test_batching_equivalence(self, atol=1e-5, rtol=1e-5): + """ + This test is overwritten because the model outputs do not contain only regressive values but also keypoint + locations. + Similarly to the problem discussed about SuperGlue implementation + [here](https://github.com/huggingface/transformers/pull/29886#issuecomment-2482752787), the consequence of + having different scores for matching, makes the maximum indices differ. These indices are being used to compute + the keypoint coordinates. The keypoint coordinates, in the model outputs, are floating point tensors, so the + original implementation of this test cover this case. But the resulting tensors may have differences exceeding + the relative and absolute tolerance. + Therefore, similarly to SuperGlue integration test, for the key "keypoints" in the model outputs, we check the + number of differences in keypoint coordinates being less than a TODO given number + """ + + def recursive_check(batched_object, single_row_object, model_name, key): + if isinstance(batched_object, (list, tuple)): + for batched_object_value, single_row_object_value in zip(batched_object, single_row_object): + recursive_check(batched_object_value, single_row_object_value, model_name, key) + elif isinstance(batched_object, dict): + for batched_object_value, single_row_object_value in zip( + batched_object.values(), single_row_object.values() + ): + recursive_check(batched_object_value, single_row_object_value, model_name, key) + # do not compare returned loss (0-dim tensor) / codebook ids (int) / caching objects + elif batched_object is None or not isinstance(batched_object, torch.Tensor): + return + elif batched_object.dim() == 0: + return + # do not compare int or bool outputs as they are mostly computed with max/argmax/topk methods which are + # very sensitive to the inputs (e.g. tiny differences may give totally different results) + elif not torch.is_floating_point(batched_object): + return + else: + # indexing the first element does not always work + # e.g. models that output similarity scores of size (N, M) would need to index [0, 0] + slice_ids = [slice(0, index) for index in single_row_object.shape] + batched_row = batched_object[slice_ids] + if key == "keypoints": + batched_row = torch.sum(batched_row, dim=-1) + single_row_object = torch.sum(single_row_object, dim=-1) + tolerance = 0.02 * single_row_object.shape[-1] + self.assertTrue( + torch.sum(~torch.isclose(batched_row, single_row_object, rtol=rtol, atol=atol)) < tolerance + ) + else: + self.assertFalse( + torch.isnan(batched_row).any(), f"Batched output has `nan` in {model_name} for key={key}" + ) + self.assertFalse( + torch.isinf(batched_row).any(), f"Batched output has `inf` in {model_name} for key={key}" + ) + self.assertFalse( + torch.isnan(single_row_object).any(), + f"Single row output has `nan` in {model_name} for key={key}", + ) + self.assertFalse( + torch.isinf(single_row_object).any(), + f"Single row output has `inf` in {model_name} for key={key}", + ) + try: + torch.testing.assert_close(batched_row, single_row_object, atol=atol, rtol=rtol) + except AssertionError as e: + msg = f"Batched and Single row outputs are not equal in {model_name} for key={key}.\n\n" + msg += str(e) + raise AssertionError(msg) + + set_model_tester_for_less_flaky_test(self) + + config, batched_input = self.model_tester.prepare_config_and_inputs_for_common() + set_config_for_less_flaky_test(config) + + for model_class in self.all_model_classes: + config.output_hidden_states = True + + model_name = model_class.__name__ + if hasattr(self.model_tester, "prepare_config_and_inputs_for_model_class"): + config, batched_input = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) + batched_input_prepared = self._prepare_for_class(batched_input, model_class) + model = model_class(config).to(torch_device).eval() + set_model_for_less_flaky_test(model) + + batch_size = self.model_tester.batch_size + single_row_input = {} + for key, value in batched_input_prepared.items(): + if isinstance(value, torch.Tensor) and value.shape[0] % batch_size == 0: + # e.g. musicgen has inputs of size (bs*codebooks). in most cases value.shape[0] == batch_size + single_batch_shape = value.shape[0] // batch_size + single_row_input[key] = value[:single_batch_shape] + else: + single_row_input[key] = value + + with torch.no_grad(): + model_batched_output = model(**batched_input_prepared) + model_row_output = model(**single_row_input) + + if isinstance(model_batched_output, torch.Tensor): + model_batched_output = {"model_output": model_batched_output} + model_row_output = {"model_output": model_row_output} + + for key in model_batched_output: + # DETR starts from zero-init queries to decoder, leading to cos_similarity = `nan` + if hasattr(self, "zero_init_hidden_state") and "decoder_hidden_states" in key: + model_batched_output[key] = model_batched_output[key][1:] + model_row_output[key] = model_row_output[key][1:] + recursive_check(model_batched_output[key], model_row_output[key], model_name, key) + + +def prepare_imgs(): + dataset = load_dataset("hf-internal-testing/image-matching-test-dataset", split="train") + image1 = dataset[0]["image"] + image2 = dataset[1]["image"] + image3 = dataset[2]["image"] + return [[image1, image2], [image3, image2]] + + +@require_torch +@require_vision +class EfficientLoFTRModelIntegrationTest(unittest.TestCase): + @cached_property + def default_image_processor(self): + return AutoImageProcessor.from_pretrained("stevenbucaille/efficientloftr") if is_vision_available() else None + + @slow + def test_inference(self): + model = EfficientLoFTRForKeypointMatching.from_pretrained( + "stevenbucaille/efficientloftr", attn_implementation="eager" + ).to(torch_device) + preprocessor = self.default_image_processor + images = prepare_imgs() + inputs = preprocessor(images=images, return_tensors="pt").to(torch_device) + with torch.no_grad(): + outputs = model(**inputs, output_hidden_states=True, output_attentions=True) + + predicted_top10 = torch.topk(outputs.matching_scores[0, 0], k=10) + predicted_top10_matches_indices = predicted_top10.indices + predicted_top10_matching_scores = predicted_top10.values + + expected_number_of_matches = 4800 + expected_matches_shape = torch.Size((len(images), 2, expected_number_of_matches)) + expected_matching_scores_shape = torch.Size((len(images), 2, expected_number_of_matches)) + + expected_top10_matches_indices = torch.tensor( + [3145, 3065, 3143, 3066, 3144, 1397, 1705, 3151, 2342, 2422], dtype=torch.int64, device=torch_device + ) + expected_top10_matching_scores = torch.tensor( + [0.9997, 0.9996, 0.9996, 0.9995, 0.9995, 0.9995, 0.9994, 0.9994, 0.9994, 0.9994], device=torch_device + ) + + self.assertEqual(outputs.matches.shape, expected_matches_shape) + self.assertEqual(outputs.matching_scores.shape, expected_matching_scores_shape) + + torch.testing.assert_close( + predicted_top10_matches_indices, expected_top10_matches_indices, rtol=5e-3, atol=5e-3 + ) + torch.testing.assert_close( + predicted_top10_matching_scores, expected_top10_matching_scores, rtol=5e-3, atol=5e-3 + )