[Model] Cohere2 Vision (#39810)

* Add cohere2_vision to support CohereLabs/command-a-vision-07-2025

* update and add modualr file

* update processors and check with orig impl later

* delete unused files

* image processor reduce LOC and re-use GotOCR2

* update the config to use modular

* model tests pass

* processor fixes

* check model outputs decorator

* address one more comment

* Update tokens. Temp - need to read from tokenizer'

* fix for multi-gpu

* Fix image token handling

* upadte image token expansion logic

* fix a few issues with remote code loading

* not related but modular forces us to change all files now

* Add overview and code sample to cohere vision docs

* add scripts. TMP.

* Update inference script

* Create script

* set dtype in export script

* TO revert: modular export fix

* Fix scripts

* Revert "TO revert: modular export fix"

This reverts commit bdb2f305b61027a05f0032ce70d6ca698879191c.

* Use modular weights

* Upload to hub

Removed OOD weights ad script

* Updated docs

* fix import error

Update docs

Added pipeline test

* Updated docs

* Run modular script

remove modular for config

Added patch_size

Added docstrings in modular

Fix OOM

Add docs, fixup integration tests. 8-gpu passing

* tiny updates

* address comments + fixup

* add test for chat template

* check model outputs workaround

* aya vision fix check model inputs

* Revert "add test for chat template"

This reverts commit 42c756e397f588d76b449ff1f93292d8ee0202d8.

* reveert more changes

* last revert

* skip and merge

* faulty copy from

---------

Co-authored-by: Julian Mack <julian.mack@cohere.com>
Co-authored-by: kyle-cohere <kyle@cohere.com>
This commit is contained in:
Raushan Turganbay
2025-07-31 12:57:34 +02:00
committed by GitHub
parent 6c3f27ba61
commit e1688d28d3
32 changed files with 2375 additions and 48 deletions

View File

@@ -411,6 +411,8 @@
title: Cohere title: Cohere
- local: model_doc/cohere2 - local: model_doc/cohere2
title: Cohere2 title: Cohere2
- local: model_doc/cohere2_vision
title: Cohere2Vision
- local: model_doc/convbert - local: model_doc/convbert
title: ConvBERT title: ConvBERT
- local: model_doc/cpm - local: model_doc/cpm

View File

@@ -0,0 +1,92 @@
# Command A Vision
<div class="flex flex-wrap space-x-1">
<img alt="PyTorch" src="https://img.shields.io/badge/PyTorch-DE3412?style=flat&logo=pytorch&logoColor=white">
<img alt="FlashAttention" src="https://img.shields.io/badge/%E2%9A%A1%EF%B8%8E%20FlashAttention-eae0c8?style=flat">
<img alt="SDPA" src="https://img.shields.io/badge/SDPA-DE3412?style=flat&logo=pytorch&logoColor=white">
<img alt="Tensor parallelism" src="https://img.shields.io/badge/Tensor%20parallelism-06b6d4?style=flat&logoColor=white">
</div>
## Overview
Command A Vision is a state-of-the-art multimodal model designed to seamlessly integrate visual and textual information for a wide range of applications. By combining advanced computer vision techniques with natural language processing capabilities, Command A Vision enables users to analyze, understand, and generate insights from both visual and textual data.
The model excels at tasks including image captioning, visual question answering, document understanding, and chart understanding. This makes it a versatile tool for AI practitioners. Its ability to process complex visual and textual inputs makes it useful in settings where text-only representations are imprecise or unavailable, like real-world image understanding and graphics-heavy document processing.
Command A Vision is built upon a robust architecture that leverages the latest advancements in VLMs. It's highly performant and efficient, even when dealing with large-scale datasets. The model's flexibility makes it suitable for a wide range of use cases, from content moderation and image search to medical imaging analysis and robotics.
## Usage tips
The model and image processor can be loaded as follows:
```python
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText
model_id = "CohereLabs/command-a-vision-07-2025"
processor = AutoProcessor.from_pretrained(model_id)
model = AutoModelForImageTextToText.from_pretrained(
model_id, device_map="auto", torch_dtype=torch.float16
)
# Format message with the Command-A-Vision chat template
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"url": "https://images.pexels.com/photos/1108099/pexels-photo-1108099.jpeg",
},
{"type": "text", "text": "what is in this image?"},
],
},
]
inputs = processor.apply_chat_template(
messages,
padding=True,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(model.device)
gen_tokens = model.generate(
**inputs,
max_new_tokens=300,
do_sample=True,
temperature=0.3,
)
print(
processor.tokenizer.decode(
gen_tokens[0][inputs.input_ids.shape[1] :], skip_special_tokens=True
)
)
```
## Cohere2VisionConfig
[[autodoc]] Cohere2VisionConfig
## Cohere2VisionForConditionalGeneration
[[autodoc]] Cohere2VisionForConditionalGeneration
- forward
## Cohere2VisionModel
[[autodoc]] Cohere2VisionModel
- forward
## Cohere2VisionImageProcessorFast
[[autodoc]] Cohere2VisionImageProcessorFast
- preprocess
## Cohere2VisionProcessor
[[autodoc]] Cohere2VisionProcessor

View File

@@ -1164,7 +1164,7 @@ class Cache:
while len(self.layers) <= layer_idx: while len(self.layers) <= layer_idx:
kwargs = self.layer_init_kwargs.copy() kwargs = self.layer_init_kwargs.copy()
if self.layer_init_kwargs.get("layer_device_map", None) is not None: if self.layer_init_kwargs.get("layer_device_map", None) is not None:
kwargs["device"] = kwargs.pop("layer_device_map")[layer_idx] kwargs["device"] = kwargs.pop("layer_device_map")[len(self.layers)]
new_layer_class = ( new_layer_class = (
self.layer_classes[len(self.layers)] if isinstance(self.layer_classes, list) else self.layer_classes self.layer_classes[len(self.layers)] if isinstance(self.layer_classes, list) else self.layer_classes

View File

@@ -63,6 +63,7 @@ if TYPE_CHECKING:
from .codegen import * from .codegen import *
from .cohere import * from .cohere import *
from .cohere2 import * from .cohere2 import *
from .cohere2_vision import *
from .colpali import * from .colpali import *
from .colqwen2 import * from .colqwen2 import *
from .conditional_detr import * from .conditional_detr import *

View File

@@ -1106,7 +1106,7 @@ class AriaForConditionalGeneration(AriaPreTrainedModel, GenerationMixin):
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -81,6 +81,7 @@ CONFIG_MAPPING_NAMES = OrderedDict[str, str](
("codegen", "CodeGenConfig"), ("codegen", "CodeGenConfig"),
("cohere", "CohereConfig"), ("cohere", "CohereConfig"),
("cohere2", "Cohere2Config"), ("cohere2", "Cohere2Config"),
("cohere2_vision", "Cohere2VisionConfig"),
("colpali", "ColPaliConfig"), ("colpali", "ColPaliConfig"),
("colqwen2", "ColQwen2Config"), ("colqwen2", "ColQwen2Config"),
("conditional_detr", "ConditionalDetrConfig"), ("conditional_detr", "ConditionalDetrConfig"),
@@ -476,6 +477,7 @@ MODEL_NAMES_MAPPING = OrderedDict[str, str](
("codegen", "CodeGen"), ("codegen", "CodeGen"),
("cohere", "Cohere"), ("cohere", "Cohere"),
("cohere2", "Cohere2"), ("cohere2", "Cohere2"),
("cohere2_vision", "Cohere2Vision"),
("colpali", "ColPali"), ("colpali", "ColPali"),
("colqwen2", "ColQwen2"), ("colqwen2", "ColQwen2"),
("conditional_detr", "Conditional DETR"), ("conditional_detr", "Conditional DETR"),

View File

@@ -72,6 +72,7 @@ else:
("chinese_clip", ("ChineseCLIPImageProcessor", "ChineseCLIPImageProcessorFast")), ("chinese_clip", ("ChineseCLIPImageProcessor", "ChineseCLIPImageProcessorFast")),
("clip", ("CLIPImageProcessor", "CLIPImageProcessorFast")), ("clip", ("CLIPImageProcessor", "CLIPImageProcessorFast")),
("clipseg", ("ViTImageProcessor", "ViTImageProcessorFast")), ("clipseg", ("ViTImageProcessor", "ViTImageProcessorFast")),
("cohere2_vision", ("Cohere2VisionImageProcessorFast",)),
("conditional_detr", ("ConditionalDetrImageProcessor", "ConditionalDetrImageProcessorFast")), ("conditional_detr", ("ConditionalDetrImageProcessor", "ConditionalDetrImageProcessorFast")),
("convnext", ("ConvNextImageProcessor", "ConvNextImageProcessorFast")), ("convnext", ("ConvNextImageProcessor", "ConvNextImageProcessorFast")),
("convnextv2", ("ConvNextImageProcessor", "ConvNextImageProcessorFast")), ("convnextv2", ("ConvNextImageProcessor", "ConvNextImageProcessorFast")),

View File

@@ -77,6 +77,7 @@ MODEL_MAPPING_NAMES = OrderedDict(
("codegen", "CodeGenModel"), ("codegen", "CodeGenModel"),
("cohere", "CohereModel"), ("cohere", "CohereModel"),
("cohere2", "Cohere2Model"), ("cohere2", "Cohere2Model"),
("cohere2_vision", "Cohere2VisionModel"),
("conditional_detr", "ConditionalDetrModel"), ("conditional_detr", "ConditionalDetrModel"),
("convbert", "ConvBertModel"), ("convbert", "ConvBertModel"),
("convnext", "ConvNextModel"), ("convnext", "ConvNextModel"),
@@ -944,6 +945,7 @@ MODEL_FOR_IMAGE_TEXT_TO_TEXT_MAPPING_NAMES = OrderedDict(
("blip", "BlipForConditionalGeneration"), ("blip", "BlipForConditionalGeneration"),
("blip-2", "Blip2ForConditionalGeneration"), ("blip-2", "Blip2ForConditionalGeneration"),
("chameleon", "ChameleonForConditionalGeneration"), ("chameleon", "ChameleonForConditionalGeneration"),
("cohere2_vision", "Cohere2VisionForConditionalGeneration"),
("deepseek_vl", "DeepseekVLForConditionalGeneration"), ("deepseek_vl", "DeepseekVLForConditionalGeneration"),
("deepseek_vl_hybrid", "DeepseekVLHybridForConditionalGeneration"), ("deepseek_vl_hybrid", "DeepseekVLHybridForConditionalGeneration"),
("emu3", "Emu3ForConditionalGeneration"), ("emu3", "Emu3ForConditionalGeneration"),

View File

@@ -60,6 +60,7 @@ PROCESSOR_MAPPING_NAMES = OrderedDict(
("clip", "CLIPProcessor"), ("clip", "CLIPProcessor"),
("clipseg", "CLIPSegProcessor"), ("clipseg", "CLIPSegProcessor"),
("clvp", "ClvpProcessor"), ("clvp", "ClvpProcessor"),
("cohere2_vision", "Cohere2VisionProcessor"),
("colpali", "ColPaliProcessor"), ("colpali", "ColPaliProcessor"),
("colqwen2", "ColQwen2Processor"), ("colqwen2", "ColQwen2Processor"),
("deepseek_vl", "DeepseekVLProcessor"), ("deepseek_vl", "DeepseekVLProcessor"),

View File

@@ -33,9 +33,9 @@ class AyaVisionConfig(PretrainedConfig):
documentation from [`PretrainedConfig`] for more information. documentation from [`PretrainedConfig`] for more information.
Args: Args:
vision_config (`Union[AutoConfig, dict]`, *optional*, defaults to `CLIPVisionConfig`): vision_config (`Union[AutoConfig, dict]`, *optional*, defaults to `SiglipVisionConfig`):
The config object or dictionary of the vision backbone. The config object or dictionary of the vision backbone.
text_config (`Union[AutoConfig, dict]`, *optional*, defaults to `LlamaConfig`): text_config (`Union[AutoConfig, dict]`, *optional*, defaults to `Cohere2Config`):
The config object or dictionary of the text backbone. The config object or dictionary of the text backbone.
vision_feature_select_strategy (`str`, *optional*, defaults to `"full"`): vision_feature_select_strategy (`str`, *optional*, defaults to `"full"`):
The feature selection strategy used to select the vision feature from the vision backbone. The feature selection strategy used to select the vision feature from the vision backbone.
@@ -81,7 +81,9 @@ class AyaVisionConfig(PretrainedConfig):
self.vision_feature_layer = vision_feature_layer self.vision_feature_layer = vision_feature_layer
if isinstance(vision_config, dict): if isinstance(vision_config, dict):
vision_config["model_type"] = vision_config.get("model_type", "clip_vision_model") vision_config["model_type"] = (
vision_config["model_type"] if "model_type" in vision_config else "siglip_vision_model"
)
vision_config = CONFIG_MAPPING[vision_config["model_type"]](**vision_config) vision_config = CONFIG_MAPPING[vision_config["model_type"]](**vision_config)
elif vision_config is None: elif vision_config is None:
vision_config = CONFIG_MAPPING["siglip_vision_model"]( vision_config = CONFIG_MAPPING["siglip_vision_model"](
@@ -97,7 +99,7 @@ class AyaVisionConfig(PretrainedConfig):
self.vision_config = vision_config self.vision_config = vision_config
if isinstance(text_config, dict): if isinstance(text_config, dict):
text_config["model_type"] = text_config.get("model_type", "llama") text_config["model_type"] = text_config["model_type"] if "model_type" in text_config else "cohere2"
text_config = CONFIG_MAPPING[text_config["model_type"]](**text_config) text_config = CONFIG_MAPPING[text_config["model_type"]](**text_config)
elif text_config is None: elif text_config is None:
text_config = CONFIG_MAPPING["cohere2"]() text_config = CONFIG_MAPPING["cohere2"]()

View File

@@ -33,6 +33,7 @@ from ...modeling_outputs import BaseModelOutputWithPast, ModelOutput
from ...modeling_utils import PreTrainedModel from ...modeling_utils import PreTrainedModel
from ...processing_utils import Unpack from ...processing_utils import Unpack
from ...utils import TransformersKwargs, auto_docstring, can_return_tuple, is_torchdynamo_compiling from ...utils import TransformersKwargs, auto_docstring, can_return_tuple, is_torchdynamo_compiling
from ...utils.generic import check_model_inputs
from ..auto import AutoModel from ..auto import AutoModel
from .configuration_aya_vision import AyaVisionConfig from .configuration_aya_vision import AyaVisionConfig
@@ -99,6 +100,10 @@ class AyaVisionPreTrainedModel(PreTrainedModel):
_can_compile_fullgraph = False _can_compile_fullgraph = False
_supports_flex_attn = True _supports_flex_attn = True
_supports_attention_backend = True _supports_attention_backend = True
_can_record_outputs = {
"hidden_states": "DecoderLayer",
"attentions": "Attention",
}
@dataclass @dataclass
@@ -237,7 +242,7 @@ class AyaVisionModel(AyaVisionPreTrainedModel):
image_features = self.multi_modal_projector(selected_image_feature) image_features = self.multi_modal_projector(selected_image_feature)
return image_features return image_features
@can_return_tuple @check_model_inputs
@auto_docstring @auto_docstring
def forward( def forward(
self, self,
@@ -250,17 +255,9 @@ class AyaVisionModel(AyaVisionPreTrainedModel):
vision_feature_layer: Optional[Union[int, list[int]]] = None, vision_feature_layer: Optional[Union[int, list[int]]] = None,
vision_feature_select_strategy: Optional[str] = None, vision_feature_select_strategy: Optional[str] = None,
use_cache: Optional[bool] = None, use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[FlashAttentionKwargs], **kwargs: Unpack[FlashAttentionKwargs],
) -> Union[tuple, AyaVisionModelOutputWithPast]: ) -> Union[tuple, AyaVisionModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
vision_feature_layer = ( vision_feature_layer = (
vision_feature_layer if vision_feature_layer is not None else self.config.vision_feature_layer vision_feature_layer if vision_feature_layer is not None else self.config.vision_feature_layer
) )
@@ -308,9 +305,6 @@ class AyaVisionModel(AyaVisionPreTrainedModel):
past_key_values=past_key_values, past_key_values=past_key_values,
inputs_embeds=inputs_embeds, inputs_embeds=inputs_embeds,
use_cache=use_cache, use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position, cache_position=cache_position,
**kwargs, **kwargs,
) )
@@ -357,7 +351,7 @@ class AyaVisionForConditionalGeneration(AyaVisionPreTrainedModel, GenerationMixi
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -32,7 +32,8 @@ from ...activations import ACT2FN
from ...cache_utils import Cache from ...cache_utils import Cache
from ...modeling_flash_attention_utils import FlashAttentionKwargs from ...modeling_flash_attention_utils import FlashAttentionKwargs
from ...processing_utils import Unpack from ...processing_utils import Unpack
from ...utils import auto_docstring, can_return_tuple, is_torchdynamo_compiling, logging from ...utils import auto_docstring, is_torchdynamo_compiling, logging
from ...utils.generic import check_model_inputs
from .configuration_aya_vision import AyaVisionConfig from .configuration_aya_vision import AyaVisionConfig
@@ -91,6 +92,10 @@ class AyaVisionMultiModalProjector(nn.Module):
class AyaVisionPreTrainedModel(LlavaPreTrainedModel): class AyaVisionPreTrainedModel(LlavaPreTrainedModel):
_can_compile_fullgraph = False _can_compile_fullgraph = False
_can_record_outputs = {
"hidden_states": "DecoderLayer",
"attentions": "Attention",
}
class AyaVisionCausalLMOutputWithPast(LlavaCausalLMOutputWithPast): class AyaVisionCausalLMOutputWithPast(LlavaCausalLMOutputWithPast):
@@ -158,7 +163,7 @@ class AyaVisionModel(LlavaModel):
image_features = self.multi_modal_projector(selected_image_feature) image_features = self.multi_modal_projector(selected_image_feature)
return image_features return image_features
@can_return_tuple @check_model_inputs
@auto_docstring @auto_docstring
def forward( def forward(
self, self,
@@ -171,17 +176,9 @@ class AyaVisionModel(LlavaModel):
vision_feature_layer: Optional[Union[int, list[int]]] = None, vision_feature_layer: Optional[Union[int, list[int]]] = None,
vision_feature_select_strategy: Optional[str] = None, vision_feature_select_strategy: Optional[str] = None,
use_cache: Optional[bool] = None, use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[FlashAttentionKwargs], **kwargs: Unpack[FlashAttentionKwargs],
) -> Union[tuple, AyaVisionModelOutputWithPast]: ) -> Union[tuple, AyaVisionModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
vision_feature_layer = ( vision_feature_layer = (
vision_feature_layer if vision_feature_layer is not None else self.config.vision_feature_layer vision_feature_layer if vision_feature_layer is not None else self.config.vision_feature_layer
) )
@@ -229,9 +226,6 @@ class AyaVisionModel(LlavaModel):
past_key_values=past_key_values, past_key_values=past_key_values,
inputs_embeds=inputs_embeds, inputs_embeds=inputs_embeds,
use_cache=use_cache, use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position, cache_position=cache_position,
**kwargs, **kwargs,
) )

View File

@@ -0,0 +1,29 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from ...utils import _LazyModule
from ...utils.import_utils import define_import_structure
if TYPE_CHECKING:
from .configuration_cohere2_vision import *
from .image_processing_cohere2_vision_fast import *
from .modeling_cohere2_vision import *
from .processing_cohere2_vision import *
else:
import sys
_file = globals()["__file__"]
sys.modules[__name__] = _LazyModule(__name__, _file, define_import_structure(_file), module_spec=__spec__)

View File

@@ -0,0 +1,84 @@
# Copyright 2025 the Cohere Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...configuration_utils import PretrainedConfig
from ..auto import CONFIG_MAPPING, AutoConfig
class Cohere2VisionConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`Cohere2VisionForConditionalGeneration`]. It is used to instantiate an
Cohere2 Vision model according to the specified arguments, defining the model architecture.
[CohereLabs/command-a-vision-07-2025](https://huggingface.co/CohereLabs/command-a-vision-07-2025)
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vision_config (`Union[AutoConfig, dict]`, *optional*, defaults to `SiglipVisionConfig`):
The config object or dictionary of the vision backbone.
text_config (`Union[AutoConfig, dict]`, *optional*, defaults to `Cohere2Config`):
The config object or dictionary of the text backbone.
downsample_factor (`int`, *optional*, defaults to 2):
The factor by which to downsample the input image.
image_token_id (`int`, *optional*, defaults to 255036):
The token ID to use as placeholder for the image input.
alignment_intermediate_size (`int`, *optional*, defaults to 36864):
The size of the intermediate layer for alignment.
"""
model_type = "cohere2_vision"
sub_configs = {"text_config": AutoConfig, "vision_config": AutoConfig}
def __init__(
self,
vision_config=None,
text_config=None,
downsample_factor=2,
image_token_id=255036,
alignment_intermediate_size=36864,
**kwargs,
):
super().__init__(**kwargs)
self.downsample_factor = downsample_factor
self.image_token_id = image_token_id
self.alignment_intermediate_size = alignment_intermediate_size
if isinstance(vision_config, dict):
vision_config["model_type"] = (
vision_config["model_type"] if "model_type" in vision_config else "siglip_vision_model"
)
vision_config = CONFIG_MAPPING[vision_config["model_type"]](**vision_config)
elif vision_config is None:
vision_config = CONFIG_MAPPING["siglip_vision_model"](
hidden_size=1152,
intermediate_size=3072,
image_size=512,
num_hidden_layers=27,
num_attention_heads=12,
)
self.vision_config = vision_config
if isinstance(text_config, dict):
text_config["model_type"] = text_config["model_type"] if "model_type" in text_config else "cohere2"
text_config = CONFIG_MAPPING[text_config["model_type"]](**text_config)
elif text_config is None:
text_config = CONFIG_MAPPING["cohere2"](tie_word_embeddings=True)
self.text_config = text_config
__all__ = ["Cohere2VisionConfig"]

View File

@@ -0,0 +1,309 @@
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# This file was automatically generated from src/transformers/models/cohere2_vision/modular_cohere2_vision.py.
# Do NOT edit this file manually as any edits will be overwritten by the generation of
# the file from the modular. If any change should be done, please apply the change to the
# modular_cohere2_vision.py file directly. One of our CI enforces this.
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# coding=utf-8
# Copyright 2025 the Cohere Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import lru_cache
from typing import Optional, Union
import numpy as np
import torch
from ...image_processing_utils import BatchFeature
from ...image_processing_utils_fast import (
BaseImageProcessorFast,
DefaultFastImageProcessorKwargs,
group_images_by_shape,
reorder_images,
)
from ...image_utils import OPENAI_CLIP_MEAN, OPENAI_CLIP_STD, ImageInput, PILImageResampling, SizeDict
from ...processing_utils import Unpack
from ...utils import TensorType, auto_docstring, is_torchvision_v2_available
if is_torchvision_v2_available():
from torchvision.transforms.v2 import functional as F
else:
from torchvision.transforms import functional as F
class Cohere2VisionFastImageProcessorKwargs(DefaultFastImageProcessorKwargs):
"""
crop_to_patches (`bool`, *optional*, defaults to `False`):
Whether to crop the image to patches. Can be overridden by the `crop_to_patches` parameter in the
`preprocess` method.
min_patches (`int`, *optional*, defaults to 1):
The minimum number of patches to be extracted from the image. Only has an effect if `crop_to_patches` is
set to `True`. Can be overridden by the `min_patches` parameter in the `preprocess` method.
max_patches (`int`, *optional*, defaults to 12):
The maximum number of patches to be extracted from the image. Only has an effect if `crop_to_patches` is
set to `True`. Can be overridden by the `max_patches` parameter in the `preprocess` method.
"""
crop_to_patches: Optional[bool]
min_patches: Optional[int]
max_patches: Optional[int]
@lru_cache(maxsize=10)
def get_all_supported_aspect_ratios(max_image_tiles: int) -> list[tuple[int, int]]:
"""
Computes all allowed aspect ratios for a given maximum number of input tiles.
This function calculates all possible arrangements of tiles that can be formed
within the constraint of the maximum number of tiles. Each arrangement is
represented by its aspect ratio (width/height) and the corresponding tile configuration.
Args:
max_image_tiles (`int`):
The maximum number of tiles allowed.
Returns:
`list[tuple[int, int]]`: A list of tuples, each tuple representing a valid (width, height)
configuration in terms of number of tiles.
Example:
>>> get_all_supported_aspect_ratios(4)
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1), (2, 2), (3, 1), (4, 1)]
"""
aspect_ratios = []
for width in range(1, max_image_tiles + 1):
for height in range(1, max_image_tiles + 1):
if width * height <= max_image_tiles:
aspect_ratios.append((width, height))
return aspect_ratios
def get_optimal_tiled_canvas(
original_image_size: tuple[int, int],
target_tile_size: tuple[int, int],
min_image_tiles: int,
max_image_tiles: int,
) -> tuple[int, int]:
possible_resolutions = get_all_supported_aspect_ratios(max_image_tiles)
possible_resolutions = sorted(possible_resolutions, key=lambda x: x[0] * x[1])
image_height, image_width = original_image_size
patch_size_height, patch_size_width = target_tile_size # (height == width)
candidate_resolutions = np.array(possible_resolutions) * patch_size_height
original_size = np.stack([image_height, image_width])
required_scales = candidate_resolutions / original_size
required_scale = np.min(required_scales, axis=-1, keepdims=True) # [n_resolutions, 1]
if np.all(required_scale < 1):
# We are forced to downscale, so try to minimize the amount of downscaling
best_grid = possible_resolutions[np.argmax(required_scale)]
else:
# Pick the resolution that required the least upscaling so that it most closely fits the image
required_scale = np.where(required_scale < 1.0, 10e9, required_scale)
best_grid = possible_resolutions[np.argmin(required_scale)]
return best_grid
@auto_docstring
class Cohere2VisionImageProcessorFast(BaseImageProcessorFast):
resample = PILImageResampling.BICUBIC
image_mean = OPENAI_CLIP_MEAN
image_std = OPENAI_CLIP_STD
size = {"height": 512, "width": 512}
do_resize = True
do_rescale = True
do_normalize = True
do_convert_rgb = True
crop_to_patches = True
min_patches = 1
max_patches = 12
valid_kwargs = Cohere2VisionFastImageProcessorKwargs
patch_size = 16
def __init__(self, **kwargs: Unpack[Cohere2VisionFastImageProcessorKwargs]):
super().__init__(**kwargs)
@auto_docstring
def preprocess(self, images: ImageInput, **kwargs: Unpack[Cohere2VisionFastImageProcessorKwargs]) -> BatchFeature:
return super().preprocess(images, **kwargs)
def crop_image_to_patches(
self,
images: "torch.Tensor",
min_patches: int,
max_patches: int,
use_thumbnail: bool = True,
patch_size: Optional[Union[tuple, int, dict]] = None,
interpolation: Optional["F.InterpolationMode"] = None,
):
"""
Crop the images to patches and return a list of cropped images.
The number of patches and their grid arrangement are determined by the original image size,
the target patch size and the minimum and maximum number of patches.
The aspect ratio of the patches grid is chosen to be the closest to the original image aspect ratio.
Args:
images (`torch.Tensor`):
The images to be cropped.
min_patches (`int`):
The minimum number of patches to be extracted from the image.
max_patches (`int`):
The maximum number of patches to be extracted from the image.
use_thumbnail (`bool`, *optional*, defaults to `True`):
Whether to add a thumbnail image to the list of cropped patches.
patch_size (`int`, `tuple[int, int]`, `dict`, *optional*):
The size of the output patches.
The format of the image data. If `None`, the format is inferred from the input image.
Returns:
list[`PIL.Image.Image`] or list[np.ndarray]: The list of cropped images.
"""
patch_size_height, patch_size_width = patch_size.height, patch_size.width
original_height, original_width = images.shape[-2:]
# find the closest aspect ratio to the target
num_columns, num_rows = get_optimal_tiled_canvas(
(original_height, original_width), (patch_size_height, patch_size_width), min_patches, max_patches
)
# calculate the target width and height
target_width = patch_size_width * num_columns
target_height = patch_size_height * num_rows
num_blocks = num_columns * num_rows
# resize the image so that each patch is of patch_size
resized_image = self.resize(
images, SizeDict(height=target_height, width=target_width), interpolation=interpolation
)
# split the image into patches
processed_images = []
for i in range(num_blocks):
column = i % num_columns
row = i // num_columns
box = (
column * patch_size_width,
row * patch_size_height,
(column + 1) * patch_size_width,
(row + 1) * patch_size_height,
)
# split the image
patch_image = resized_image[..., box[1] : box[3], box[0] : box[2]]
processed_images.append(patch_image)
if use_thumbnail and len(processed_images) != 1:
thumbnail_img = self.resize(images, patch_size, interpolation=interpolation)
processed_images.append(thumbnail_img)
processed_images = torch.stack(processed_images, dim=0).transpose(0, 1).contiguous()
return processed_images
def _preprocess(
self,
images: list["torch.Tensor"],
do_resize: bool,
size: SizeDict,
crop_to_patches: bool,
min_patches: int,
max_patches: int,
interpolation: Optional["F.InterpolationMode"],
do_center_crop: bool,
crop_size: SizeDict,
do_rescale: bool,
rescale_factor: float,
do_normalize: bool,
image_mean: Optional[Union[float, list[float]]],
image_std: Optional[Union[float, list[float]]],
disable_grouping: Optional[bool],
return_tensors: Optional[Union[str, TensorType]],
) -> BatchFeature:
if crop_to_patches:
grouped_images, grouped_images_index = group_images_by_shape(images, disable_grouping=disable_grouping)
processed_images_grouped = {}
num_patches = {}
for shape, stacked_images in grouped_images.items():
stacked_images = self.crop_image_to_patches(
stacked_images,
min_patches,
max_patches,
patch_size=size,
interpolation=interpolation,
)
processed_images_grouped[shape] = stacked_images
num_patches[shape] = [stacked_images.shape[1]] * stacked_images.shape[0]
images = reorder_images(processed_images_grouped, grouped_images_index)
images = [image for images_list in images for image in images_list]
num_patches = reorder_images(num_patches, grouped_images_index)
else:
num_patches = [1] * len(images)
# Group images by size for batched resizing
grouped_images, grouped_images_index = group_images_by_shape(images, disable_grouping=disable_grouping)
resized_images_grouped = {}
for shape, stacked_images in grouped_images.items():
if do_resize:
stacked_images = self.resize(image=stacked_images, size=size, interpolation=interpolation)
resized_images_grouped[shape] = stacked_images
resized_images = reorder_images(resized_images_grouped, grouped_images_index)
# Group images by size for further processing
# Needed in case do_resize is False, or resize returns images with different sizes
grouped_images, grouped_images_index = group_images_by_shape(resized_images, disable_grouping=disable_grouping)
processed_images_grouped = {}
for shape, stacked_images in grouped_images.items():
if do_center_crop:
stacked_images = self.center_crop(stacked_images, crop_size)
# Fused rescale and normalize
stacked_images = self.rescale_and_normalize(
stacked_images, do_rescale, rescale_factor, do_normalize, image_mean, image_std
)
processed_images_grouped[shape] = stacked_images
processed_images = reorder_images(processed_images_grouped, grouped_images_index)
processed_images = torch.stack(processed_images, dim=0) if return_tensors else processed_images
return BatchFeature(
data={"pixel_values": processed_images, "num_patches": num_patches}, tensor_type=return_tensors
)
def get_number_of_image_patches(self, height: int, width: int, images_kwargs=None):
"""
A utility that returns number patches for a given image size.
Args:
height (`int`):
Height of the input image.
width (`int`):
Width of the input image.
images_kwargs (`dict`, *optional*)
Any kwargs to override defaults of the image processor.
Returns:
`int`: Number of patches per image.
"""
min_patches = images_kwargs.get("min_patches", self.min_patches)
max_patches = images_kwargs.get("max_patches", self.max_patches)
patch_size = images_kwargs.get("patch_size", self.size)
crop_to_patches = images_kwargs.get("crop_to_patches", self.crop_to_patches)
num_patches = 1
if crop_to_patches and max_patches > 1:
num_columns, num_rows = get_optimal_tiled_canvas(
(height, width), (patch_size["height"], patch_size["width"]), min_patches, max_patches
)
num_patches += num_columns * num_rows
return num_patches
__all__ = ["Cohere2VisionImageProcessorFast"]

View File

@@ -0,0 +1,432 @@
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# This file was automatically generated from src/transformers/models/cohere2_vision/modular_cohere2_vision.py.
# Do NOT edit this file manually as any edits will be overwritten by the generation of
# the file from the modular. If any change should be done, please apply the change to the
# modular_cohere2_vision.py file directly. One of our CI enforces this.
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# coding=utf-8
# Copyright 2025 the Cohere Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import Optional, Union
import torch
from torch import nn
from ...cache_utils import Cache
from ...generation import GenerationMixin
from ...modeling_flash_attention_utils import FlashAttentionKwargs
from ...modeling_outputs import BaseModelOutputWithPast, ModelOutput
from ...modeling_utils import PreTrainedModel
from ...processing_utils import Unpack
from ...utils import TransformersKwargs, auto_docstring
from ...utils.generic import check_model_inputs
from ..auto import AutoModel
from .configuration_cohere2_vision import Cohere2VisionConfig
class Cohere2VisionMultiModalProjector(nn.Module):
def __init__(self, config: Cohere2VisionConfig):
super().__init__()
self.config = config
self.downsample_factor = config.downsample_factor
self.intermediate_size = config.alignment_intermediate_size
self.linear_1 = nn.Linear(
config.vision_config.hidden_size * (config.downsample_factor**2), self.intermediate_size, bias=True
)
self.act = nn.SiLU()
self.linear_2 = nn.Linear(self.intermediate_size // 2, config.text_config.hidden_size, bias=True)
def pixel_shuffle(self, image_features): # B, S, D
batch_size, seq_length, feature_dim = image_features.shape
height = width = int(seq_length**0.5)
image_features = image_features.reshape(image_features.shape[0], width, height, -1)
channels = image_features.shape[-1]
image_features = image_features.reshape(
batch_size, width, int(height / self.downsample_factor), int(channels * self.downsample_factor)
)
image_features = image_features.permute(0, 2, 1, 3)
image_features = image_features.reshape(
batch_size, int(height / self.downsample_factor), int(width / self.downsample_factor), -1
)
image_features = image_features.permute(0, 2, 1, 3)
return image_features
def forward(self, image_features):
image_features = self.pixel_shuffle(image_features)
hidden_states = self.linear_1(image_features)
# Split along last dimension and apply SwiGLU
x, gate = hidden_states.chunk(2, dim=-1)
hidden_states = self.act(gate) * x
hidden_states = self.linear_2(hidden_states)
return hidden_states
@dataclass
@auto_docstring(
custom_intro="""
Base class for Cohere2Vision outputs, with hidden states and attentions.
"""
)
class Cohere2VisionModelOutputWithPast(BaseModelOutputWithPast):
r"""
past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`)
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
image_hidden_states (`torch.FloatTensor`, *optional*):
A `torch.FloatTensor` of size `(batch_size, num_images, sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder and after projecting the last hidden state.
"""
image_hidden_states: Optional[torch.FloatTensor] = None
@dataclass
@auto_docstring(
custom_intro="""
Base class for Cohere2Vision causal language model (or autoregressive) outputs.
"""
)
class Cohere2VisionCausalLMOutputWithPast(ModelOutput):
r"""
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
Language modeling loss (for next-token prediction).
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`)
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
image_hidden_states (`torch.FloatTensor`, *optional*):
A `torch.FloatTensor` of size `(batch_size, num_images, sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder and after projecting the last hidden state.
"""
loss: Optional[torch.FloatTensor] = None
logits: Optional[torch.FloatTensor] = None
past_key_values: Optional[list[torch.FloatTensor]] = None
hidden_states: Optional[tuple[torch.FloatTensor]] = None
attentions: Optional[tuple[torch.FloatTensor]] = None
image_hidden_states: Optional[torch.FloatTensor] = None
@auto_docstring
class Cohere2VisionPreTrainedModel(PreTrainedModel):
config: Cohere2VisionConfig
base_model_prefix = ""
supports_gradient_checkpointing = True
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn = True
_supports_sdpa = True
_can_compile_fullgraph = False
_supports_flex_attn = True
_supports_attention_backend = True
_can_record_outputs = {
"hidden_states": "DecoderLayer",
"attentions": "Attention",
}
@auto_docstring(
custom_intro="""
The Cohere2Vision model which consists of a vision backbone and a language model, without a language modeling head.
"""
)
class Cohere2VisionModel(Cohere2VisionPreTrainedModel):
_checkpoint_conversion_mapping = {}
def __init__(self, config: Cohere2VisionConfig):
super().__init__(config)
self.vision_tower = AutoModel.from_config(config.vision_config)
self.multi_modal_projector = Cohere2VisionMultiModalProjector(config)
self.language_model = AutoModel.from_config(config.text_config)
self.post_init()
def get_input_embeddings(self):
return self.language_model.get_input_embeddings()
def set_input_embeddings(self, value):
self.language_model.set_input_embeddings(value)
def set_decoder(self, decoder):
self.language_model = decoder
def get_decoder(self):
return self.language_model
def get_image_features(
self,
pixel_values: torch.FloatTensor,
image_num_patches: torch.Tensor,
):
"""
Obtains image last hidden states from the vision tower and apply multimodal projection.
Args:
pixel_values (`torch.FloatTensor]` of shape `(batch_size, num_patches, channels, height, width)`)
The tensors corresponding to the input images.
image_num_patches (`torch.Tensor` of shape `(num_images)`)
Number of patches for each image.
Returns:
image_features (List[`torch.Tensor`]): List of image feature tensor, each contains all the visual feature of all patches
and are of shape `(num_patches, image_length, embed_dim)`).
"""
image_features = self.vision_tower(pixel_values, output_hidden_states=True)
selected_image_feature = image_features.last_hidden_state
image_features = self.multi_modal_projector(selected_image_feature)
return image_features
@check_model_inputs
@auto_docstring
def forward(
self,
input_ids: torch.LongTensor = None,
pixel_values: torch.FloatTensor = None,
image_num_patches: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[FlashAttentionKwargs],
) -> Union[tuple, Cohere2VisionModelOutputWithPast]:
r"""
image_num_patches (`torch.Tensor` of shape `(num_images,)`):
Number of patches per input image.
"""
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
if inputs_embeds is None:
inputs_embeds = self.get_input_embeddings()(input_ids)
if pixel_values is not None:
image_features = self.get_image_features(pixel_values, image_num_patches=image_num_patches)
image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype)
if input_ids is None:
special_image_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_image_mask = special_image_mask.all(-1)
else:
special_image_mask = input_ids == self.config.image_token_id
special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device)
inputs_embeds = inputs_embeds.masked_scatter(special_image_mask, image_features)
outputs = self.language_model(
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
return Cohere2VisionModelOutputWithPast(
last_hidden_state=outputs.last_hidden_state,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=image_features if pixel_values is not None else None,
)
@auto_docstring(
custom_intro="""
The COHERE2_VISION model which consists of a vision backbone and a language model.
"""
)
class Cohere2VisionForConditionalGeneration(Cohere2VisionPreTrainedModel, GenerationMixin):
_checkpoint_conversion_mapping = {}
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config: Cohere2VisionConfig):
super().__init__(config)
self.model = Cohere2VisionModel(config)
self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False)
self.post_init()
def get_input_embeddings(self):
return self.model.get_input_embeddings()
def set_input_embeddings(self, value):
self.model.set_input_embeddings(value)
def get_output_embeddings(self) -> nn.Module:
return self.lm_head
def set_decoder(self, decoder):
self.model.set_decoder(decoder)
def get_decoder(self):
return self.model.get_decoder()
def get_image_features(
self,
pixel_values: torch.FloatTensor,
image_num_patches: torch.Tensor,
):
return self.model.get_image_features(
pixel_values=pixel_values,
image_num_patches=image_num_patches,
)
# Make modules available throught conditional class for BC
@property
def language_model(self):
return self.model.language_model
@property
def vision_tower(self):
return self.model.vision_tower
@property
def multi_modal_projector(self):
return self.model.multi_modal_projector
@check_model_inputs
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
image_num_patches: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0,
image_sizes: Optional[torch.Tensor] = None,
**kwargs: Unpack[TransformersKwargs],
) -> Union[tuple, Cohere2VisionCausalLMOutputWithPast]:
r"""
image_num_patches (`torch.Tensor` of shape `(num_images,)`):
Number of patches per input image.
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Example:
```python
>>> from transformers import AutoProcessor, Cohere2VisionForConditionalGeneration
>>> import torch
>>> processor = AutoProcessor.from_pretrained("CohereLabs/command-a-vision-07-2025", use_fast=True)
>>> model = Cohere2VisionForConditionalGeneration.from_pretrained("CohereLabs/command-a-vision-07-2025", device_map="auto")
>>> messages = [
... {
... "role": "user",
... "content": [
... {
... "type": "image",
... "url": "https://images.pexels.com/photos/1108099/pexels-photo-1108099.jpeg",
... },
... {"type": "text", "text": "what is in this image?"},
... ],
... },
... ]
>>> inputs = processor.apply_chat_template(
... messages, padding=True, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt",
... ).to(model.device)
>>> gen_tokens = model.generate(**inputs, max_new_tokens=300, do_sample=True, temperature=0.3)
>>> processor.tokenizer.decode(gen_tokens[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
```"""
outputs = self.model(
input_ids=input_ids,
pixel_values=pixel_values,
image_num_patches=image_num_patches,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
image_sizes=image_sizes,
**kwargs,
)
hidden_states = outputs[0]
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :])
loss = None
if labels is not None:
loss = self.loss_function(
logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size, **kwargs
)
return Cohere2VisionCausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=outputs.image_hidden_states,
)
def prepare_inputs_for_generation(
self,
input_ids,
past_key_values=None,
inputs_embeds=None,
pixel_values=None,
attention_mask=None,
cache_position=None,
logits_to_keep=None,
**kwargs,
):
# Overwritten -- in specific circumstances we don't want to forward image inputs to the model
model_inputs = super().prepare_inputs_for_generation(
input_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
attention_mask=attention_mask,
cache_position=cache_position,
logits_to_keep=logits_to_keep,
**kwargs,
)
if cache_position[0] == 0:
# If we're in cached decoding stage, pixel values should be None because input ids do not contain special image token anymore
# Otherwise we need pixel values to be passed to model
model_inputs["pixel_values"] = pixel_values
return model_inputs
__all__ = ["Cohere2VisionForConditionalGeneration", "Cohere2VisionPreTrainedModel", "Cohere2VisionModel"]

View File

@@ -0,0 +1,351 @@
# coding=utf-8
# Copyright 2025 the Cohere Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""PyTorch AyaVision model."""
from functools import lru_cache
from typing import Optional, Union
import numpy as np
import torch
from torch import nn
from transformers.models.aya_vision.modeling_aya_vision import (
AyaVisionCausalLMOutputWithPast,
AyaVisionForConditionalGeneration,
AyaVisionModel,
AyaVisionModelOutputWithPast,
)
from transformers.models.got_ocr2.image_processing_got_ocr2_fast import GotOcr2ImageProcessorFast
from ...cache_utils import Cache
from ...modeling_flash_attention_utils import FlashAttentionKwargs
from ...processing_utils import Unpack
from ...utils import (
TransformersKwargs,
auto_docstring,
logging,
)
from ...utils.generic import check_model_inputs
from .configuration_cohere2_vision import Cohere2VisionConfig
logger = logging.get_logger(__name__)
class Cohere2VisionMultiModalProjector(nn.Module):
def __init__(self, config: Cohere2VisionConfig):
super().__init__()
self.config = config
self.downsample_factor = config.downsample_factor
self.intermediate_size = config.alignment_intermediate_size
self.linear_1 = nn.Linear(
config.vision_config.hidden_size * (config.downsample_factor**2), self.intermediate_size, bias=True
)
self.act = nn.SiLU()
self.linear_2 = nn.Linear(self.intermediate_size // 2, config.text_config.hidden_size, bias=True)
def pixel_shuffle(self, image_features): # B, S, D
batch_size, seq_length, feature_dim = image_features.shape
height = width = int(seq_length**0.5)
image_features = image_features.reshape(image_features.shape[0], width, height, -1)
channels = image_features.shape[-1]
image_features = image_features.reshape(
batch_size, width, int(height / self.downsample_factor), int(channels * self.downsample_factor)
)
image_features = image_features.permute(0, 2, 1, 3)
image_features = image_features.reshape(
batch_size, int(height / self.downsample_factor), int(width / self.downsample_factor), -1
)
image_features = image_features.permute(0, 2, 1, 3)
return image_features
def forward(self, image_features):
image_features = self.pixel_shuffle(image_features)
hidden_states = self.linear_1(image_features)
# Split along last dimension and apply SwiGLU
x, gate = hidden_states.chunk(2, dim=-1)
hidden_states = self.act(gate) * x
hidden_states = self.linear_2(hidden_states)
return hidden_states
class Cohere2VisionModelOutputWithPast(AyaVisionModelOutputWithPast):
pass
class Cohere2VisionCausalLMOutputWithPast(AyaVisionCausalLMOutputWithPast):
pass
class Cohere2VisionModel(AyaVisionModel):
_checkpoint_conversion_mapping = {}
def get_image_features(
self,
pixel_values: torch.FloatTensor,
image_num_patches: torch.Tensor,
):
"""
Obtains image last hidden states from the vision tower and apply multimodal projection.
Args:
pixel_values (`torch.FloatTensor]` of shape `(batch_size, num_patches, channels, height, width)`)
The tensors corresponding to the input images.
image_num_patches (`torch.Tensor` of shape `(num_images)`)
Number of patches for each image.
Returns:
image_features (List[`torch.Tensor`]): List of image feature tensor, each contains all the visual feature of all patches
and are of shape `(num_patches, image_length, embed_dim)`).
"""
image_features = self.vision_tower(pixel_values, output_hidden_states=True)
selected_image_feature = image_features.last_hidden_state
image_features = self.multi_modal_projector(selected_image_feature)
return image_features
@check_model_inputs
@auto_docstring
def forward(
self,
input_ids: torch.LongTensor = None,
pixel_values: torch.FloatTensor = None,
image_num_patches: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[FlashAttentionKwargs],
) -> Union[tuple, Cohere2VisionModelOutputWithPast]:
r"""
image_num_patches (`torch.Tensor` of shape `(num_images,)`):
Number of patches per input image.
"""
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
if inputs_embeds is None:
inputs_embeds = self.get_input_embeddings()(input_ids)
if pixel_values is not None:
image_features = self.get_image_features(pixel_values, image_num_patches=image_num_patches)
image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype)
if input_ids is None:
special_image_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_image_mask = special_image_mask.all(-1)
else:
special_image_mask = input_ids == self.config.image_token_id
special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device)
inputs_embeds = inputs_embeds.masked_scatter(special_image_mask, image_features)
outputs = self.language_model(
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
return Cohere2VisionModelOutputWithPast(
last_hidden_state=outputs.last_hidden_state,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=image_features if pixel_values is not None else None,
)
class Cohere2VisionForConditionalGeneration(AyaVisionForConditionalGeneration):
_checkpoint_conversion_mapping = {}
def get_image_features(
self,
pixel_values: torch.FloatTensor,
image_num_patches: torch.Tensor,
):
return self.model.get_image_features(
pixel_values=pixel_values,
image_num_patches=image_num_patches,
)
@check_model_inputs
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
image_num_patches: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0,
image_sizes: Optional[torch.Tensor] = None,
**kwargs: Unpack[TransformersKwargs],
) -> Union[tuple, Cohere2VisionCausalLMOutputWithPast]:
r"""
image_num_patches (`torch.Tensor` of shape `(num_images,)`):
Number of patches per input image.
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Example:
```python
>>> from transformers import AutoProcessor, Cohere2VisionForConditionalGeneration
>>> import torch
>>> processor = AutoProcessor.from_pretrained("CohereLabs/command-a-vision-07-2025", use_fast=True)
>>> model = Cohere2VisionForConditionalGeneration.from_pretrained("CohereLabs/command-a-vision-07-2025", device_map="auto")
>>> messages = [
... {
... "role": "user",
... "content": [
... {
... "type": "image",
... "url": "https://images.pexels.com/photos/1108099/pexels-photo-1108099.jpeg",
... },
... {"type": "text", "text": "what is in this image?"},
... ],
... },
... ]
>>> inputs = processor.apply_chat_template(
... messages, padding=True, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt",
... ).to(model.device)
>>> gen_tokens = model.generate(**inputs, max_new_tokens=300, do_sample=True, temperature=0.3)
>>> processor.tokenizer.decode(gen_tokens[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
```"""
outputs = self.model(
input_ids=input_ids,
pixel_values=pixel_values,
image_num_patches=image_num_patches,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
image_sizes=image_sizes,
**kwargs,
)
hidden_states = outputs[0]
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :])
loss = None
if labels is not None:
loss = self.loss_function(
logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size, **kwargs
)
return Cohere2VisionCausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=outputs.image_hidden_states,
)
@lru_cache(maxsize=10)
def get_all_supported_aspect_ratios(max_image_tiles: int) -> list[tuple[int, int]]:
"""
Computes all allowed aspect ratios for a given maximum number of input tiles.
This function calculates all possible arrangements of tiles that can be formed
within the constraint of the maximum number of tiles. Each arrangement is
represented by its aspect ratio (width/height) and the corresponding tile configuration.
Args:
max_image_tiles (`int`):
The maximum number of tiles allowed.
Returns:
`list[tuple[int, int]]`: A list of tuples, each tuple representing a valid (width, height)
configuration in terms of number of tiles.
Example:
>>> get_all_supported_aspect_ratios(4)
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1), (2, 2), (3, 1), (4, 1)]
"""
aspect_ratios = []
for width in range(1, max_image_tiles + 1):
for height in range(1, max_image_tiles + 1):
if width * height <= max_image_tiles:
aspect_ratios.append((width, height))
return aspect_ratios
def get_optimal_tiled_canvas(
original_image_size: tuple[int, int],
target_tile_size: tuple[int, int],
min_image_tiles: int,
max_image_tiles: int,
) -> tuple[int, int]:
possible_resolutions = get_all_supported_aspect_ratios(max_image_tiles)
possible_resolutions = sorted(possible_resolutions, key=lambda x: x[0] * x[1])
image_height, image_width = original_image_size
patch_size_height, patch_size_width = target_tile_size # (height == width)
candidate_resolutions = np.array(possible_resolutions) * patch_size_height
original_size = np.stack([image_height, image_width])
required_scales = candidate_resolutions / original_size
required_scale = np.min(required_scales, axis=-1, keepdims=True) # [n_resolutions, 1]
if np.all(required_scale < 1):
# We are forced to downscale, so try to minimize the amount of downscaling
best_grid = possible_resolutions[np.argmax(required_scale)]
else:
# Pick the resolution that required the least upscaling so that it most closely fits the image
required_scale = np.where(required_scale < 1.0, 10e9, required_scale)
best_grid = possible_resolutions[np.argmin(required_scale)]
return best_grid
@auto_docstring
class Cohere2VisionImageProcessorFast(GotOcr2ImageProcessorFast):
size = {"height": 512, "width": 512}
min_patches = 1
max_patches = 12
crop_to_patches = True
patch_size = 16
__all__ = [
"Cohere2VisionForConditionalGeneration",
"Cohere2VisionPreTrainedModel", # noqa: F822
"Cohere2VisionModel",
"Cohere2VisionImageProcessorFast",
]

View File

@@ -0,0 +1,216 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
import numpy as np
from ...image_processing_utils import BatchFeature
from ...image_utils import ImageInput
from ...processing_utils import ImagesKwargs, MultiModalData, ProcessingKwargs, ProcessorMixin, Unpack
from ...tokenization_utils_base import PreTokenizedInput, TextInput
class Cohere2VisionImagesKwargs(ImagesKwargs, total=False):
max_patches: Optional[int]
class Cohere2VisionProcessorKwargs(ProcessingKwargs, total=False):
images_kwargs: Cohere2VisionImagesKwargs
_defaults = {
"text_kwargs": {
"padding_side": "left",
"padding": True,
"return_mm_token_type_ids": False,
},
}
class Cohere2VisionProcessor(ProcessorMixin):
r"""
Constructs a Cohere2Vision processor which wraps a [`AutoImageProcessor`] and
[`PretrainedTokenizerFast`] tokenizer into a single processor that inherits both the image processor and
tokenizer functionalities. See the [`~Cohere2VisionProcessor.__call__`] and [`~Cohere2VisionProcessor.decode`] for more information.
Args:
image_processor ([`AutoImageProcessor`], *optional*):
The image processor is a required input.
tokenizer ([`PreTrainedTokenizer`, `PreTrainedTokenizerFast`], *optional*):
The tokenizer is a required input.
chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages
in a chat into a tokenizable string.
"""
attributes = ["image_processor", "tokenizer"]
image_processor_class = "AutoImageProcessor"
tokenizer_class = "AutoTokenizer"
def __init__(
self,
image_processor=None,
tokenizer=None,
chat_template=None,
**kwargs,
):
super().__init__(image_processor, tokenizer, chat_template=chat_template)
self.patch_size = self.image_processor.patch_size
self.boi_token = tokenizer.boi_token
self.eoi_token = tokenizer.eoi_token
self.image_token = tokenizer.image_token
self.img_line_break_token = tokenizer.img_line_break_token
self.image_token_id = tokenizer.image_token_id
self.image_ids = tokenizer.convert_tokens_to_ids(
[
self.image_token,
self.boi_token,
self.eoi_token,
self.img_line_break_token,
]
)
def __call__(
self,
images: Optional[ImageInput] = None,
text: Optional[Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]]] = None,
**kwargs: Unpack[Cohere2VisionProcessorKwargs],
) -> BatchFeature:
"""
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
and `kwargs` arguments to PreTrainedTokenizerFast's [`~PreTrainedTokenizerFast.__call__`] to encode the text.
To prepare the vision inputs, this method forwards the `images` and `kwargs` arguments to
GotOcr2ImageProcessor's [`~GotOcr2ImageProcessor.__call__`] if `images` is not `None`.
Args:
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`):
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
tensor. Both channels-first and channels-last formats are supported.
text (`str`, `list[str]`, `list[list[str]]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'tf'`: Return TensorFlow `tf.constant` objects.
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
- `'jax'`: Return JAX `jnp.ndarray` objects.
Returns:
[`BatchFeature`]: A [`BatchFeature`] with the following fields:
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
`None`).
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
"""
if text is None:
raise ValueError("You have to specify text.")
elif not isinstance(text, (list, tuple)):
text = [text]
output_kwargs = self._merge_kwargs(
Cohere2VisionProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
# Process images
image_inputs = {}
if images is not None:
image_inputs = self.image_processor(images=images, **output_kwargs["images_kwargs"])
batch_num_patches = iter(image_inputs.pop("num_patches"))
processed_text = []
for sample in text:
while self.image_token in sample:
num_patches = next(batch_num_patches)
img_patches_per_tile = int(self.patch_size**2)
img_string = f"{self.boi_token}"
for idx in range(1, num_patches):
img_string += "<placeholder>" * img_patches_per_tile + self.img_line_break_token
img_string += "<placeholder>" * img_patches_per_tile + self.img_line_break_token
img_string += f"{self.eoi_token}"
sample = sample.replace(self.image_token, img_string, 1)
processed_text.append(sample)
text = [sample.replace("<placeholder>", self.image_token) for sample in processed_text]
return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)
return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False)
text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"], return_tensors=None)
if return_mm_token_type_ids:
array_ids = np.array(text_inputs["input_ids"])
mm_token_type_ids = np.zeros_like(text_inputs["input_ids"])
mm_token_type_ids[np.isin(array_ids, self.image_ids)] = 1
text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist()
return BatchFeature(data={**text_inputs, **image_inputs}, tensor_type=return_tensors)
def _get_num_multimodal_tokens(self, image_sizes=None, **kwargs):
"""
Computes the number of placeholder tokens needed for multimodal inputs with the given sizes.
Args:
image_sizes (`list[list[int]]`, *optional*):
The input sizes formatted as (height, width) per each image.
Returns:
`MultiModalData`: A `MultiModalData` object holding number of tokens per each of the provided
input modalities, along with other useful data.
"""
vision_data = {}
if image_sizes is not None:
images_kwargs = Cohere2VisionProcessorKwargs._defaults.get("images_kwargs", {})
images_kwargs.update(kwargs)
num_image_patches = [
self.image_processor.get_number_of_image_patches(*image_size, images_kwargs)
for image_size in image_sizes
]
token_per_patch = int(self.patch_size**2)
num_image_tokens = [
2 + sum(token_per_patch + 1 for _ in range(num_patches)) for num_patches in num_image_patches
] # Add +2 and +1 for BOI/EOI and image break tokens
vision_data.update({"num_image_tokens": num_image_tokens, "num_image_patches": num_image_patches})
return MultiModalData(**vision_data)
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to PreTrainedTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to PreTrainedTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)
@property
def model_input_names(self):
tokenizer_input_names = self.tokenizer.model_input_names
image_processor_input_names = self.image_processor.model_input_names
return list(tokenizer_input_names) + list(image_processor_input_names)
__all__ = ["Cohere2VisionProcessor"]

View File

@@ -45,7 +45,7 @@ if is_torchvision_available():
from torchvision.transforms import functional as F from torchvision.transforms import functional as F
class GotOcr2ImageProcessorKwargs(DefaultFastImageProcessorKwargs): class GotOcr2FastImageProcessorKwargs(DefaultFastImageProcessorKwargs):
""" """
crop_to_patches (`bool`, *optional*, defaults to `False`): crop_to_patches (`bool`, *optional*, defaults to `False`):
Whether to crop the image to patches. Can be overridden by the `crop_to_patches` parameter in the Whether to crop the image to patches. Can be overridden by the `crop_to_patches` parameter in the
@@ -76,13 +76,13 @@ class GotOcr2ImageProcessorFast(BaseImageProcessorFast):
crop_to_patches = False crop_to_patches = False
min_patches = 1 min_patches = 1
max_patches = 12 max_patches = 12
valid_kwargs = GotOcr2ImageProcessorKwargs valid_kwargs = GotOcr2FastImageProcessorKwargs
def __init__(self, **kwargs: Unpack[GotOcr2ImageProcessorKwargs]): def __init__(self, **kwargs: Unpack[GotOcr2FastImageProcessorKwargs]):
super().__init__(**kwargs) super().__init__(**kwargs)
@auto_docstring @auto_docstring
def preprocess(self, images: ImageInput, **kwargs: Unpack[GotOcr2ImageProcessorKwargs]) -> BatchFeature: def preprocess(self, images: ImageInput, **kwargs: Unpack[GotOcr2FastImageProcessorKwargs]) -> BatchFeature:
return super().preprocess(images, **kwargs) return super().preprocess(images, **kwargs)
def crop_image_to_patches( def crop_image_to_patches(

View File

@@ -678,7 +678,7 @@ class GotOcr2ForConditionalGeneration(GotOcr2PreTrainedModel, GenerationMixin):
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -824,7 +824,7 @@ class InternVLForConditionalGeneration(InternVLPreTrainedModel, GenerationMixin)
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -341,7 +341,7 @@ class LlavaForConditionalGeneration(LlavaPreTrainedModel, GenerationMixin):
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -378,7 +378,7 @@ class Mistral3ForConditionalGeneration(Mistral3PreTrainedModel, GenerationMixin)
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, self,

View File

@@ -316,7 +316,7 @@ class PerceptionLMForConditionalGeneration(PerceptionLMPreTrainedModel, Generati
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
@can_return_tuple @can_return_tuple
@auto_docstring @auto_docstring

View File

@@ -303,7 +303,7 @@ class VipLlavaForConditionalGeneration(VipLlavaPreTrainedModel, GenerationMixin)
self.model.set_decoder(decoder) self.model.set_decoder(decoder)
def get_decoder(self): def get_decoder(self):
return self.model.get_decoder return self.model.get_decoder()
def get_image_features( def get_image_features(
self, pixel_values: torch.FloatTensor, vision_feature_layers: Optional[Union[int, list[int]]] = None self, pixel_values: torch.FloatTensor, vision_feature_layers: Optional[Union[int, list[int]]] = None

View File

@@ -974,11 +974,13 @@ class OutputRecorder:
target_class (Type): The class (e.g., nn.Module) to which the hook will be attached. target_class (Type): The class (e.g., nn.Module) to which the hook will be attached.
index (Optional[int]): If the output is a tuple/list, optionally record only at a specific index. index (Optional[int]): If the output is a tuple/list, optionally record only at a specific index.
layer_name (Optional[str]): Name of the submodule to target (if needed), e.g., "transformer.layer.3.attn". layer_name (Optional[str]): Name of the submodule to target (if needed), e.g., "transformer.layer.3.attn".
class_name (Optional[str]): Name of the class to which the hook will be attached. Could be the suffix of class name in some cases.
""" """
target_class: "type[torch.nn.Module]" target_class: "type[torch.nn.Module]"
index: Optional[int] = 0 index: Optional[int] = 0
layer_name: Optional[str] = None layer_name: Optional[str] = None
class_name: Optional[str] = None
def check_model_inputs(func): def check_model_inputs(func):
@@ -1049,12 +1051,17 @@ def check_model_inputs(func):
for specs in layer_specs: for specs in layer_specs:
if not isinstance(specs, OutputRecorder): if not isinstance(specs, OutputRecorder):
index = 0 if "hidden_states" in key else 1 index = 0 if "hidden_states" in key else 1
specs = OutputRecorder(target_class=specs, index=index) class_name = None if not isinstance(specs, str) else specs
target_class = specs if not isinstance(specs, str) else None
specs = OutputRecorder(target_class=target_class, index=index, class_name=class_name)
capture_tasks.append((key, specs)) capture_tasks.append((key, specs))
for name, module in self.named_modules(): for name, module in self.named_modules():
for key, specs in capture_tasks: for key, specs in capture_tasks:
if isinstance(module, specs.target_class): # The second check is for multimodals where only backbone layer suffix is available
if (specs.target_class is not None and isinstance(module, specs.target_class)) or (
specs.class_name is not None and name.endswith(specs.class_name)
):
if specs.layer_name is not None and specs.layer_name not in name: if specs.layer_name is not None and specs.layer_name not in name:
continue continue
# Monkey patch forward # Monkey patch forward

View File

@@ -167,7 +167,10 @@ def list_repo_templates(
return [ return [
entry.path.removeprefix(f"{CHAT_TEMPLATE_DIR}/") entry.path.removeprefix(f"{CHAT_TEMPLATE_DIR}/")
for entry in list_repo_tree( for entry in list_repo_tree(
repo_id=repo_id, revision=revision, path_in_repo=CHAT_TEMPLATE_DIR, recursive=False repo_id=repo_id,
revision=revision,
path_in_repo=CHAT_TEMPLATE_DIR,
recursive=False,
) )
if entry.path.endswith(".jinja") if entry.path.endswith(".jinja")
] ]

View File

View File

@@ -0,0 +1,192 @@
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from transformers.testing_utils import require_torch, require_vision
from transformers.utils import is_torch_available, is_torchvision_available, is_vision_available
from ...test_image_processing_common import ImageProcessingTestMixin, prepare_image_inputs
if is_torch_available():
import torch
if is_vision_available():
from PIL import Image
if is_torchvision_available():
from transformers import Cohere2VisionImageProcessorFast
class Cohere2VisionImageProcessingTester(unittest.TestCase):
def __init__(
self,
parent,
batch_size=7,
num_channels=3,
image_size=18,
min_resolution=30,
max_resolution=400,
do_resize=True,
size=None,
do_normalize=True,
image_mean=[0.48145466, 0.4578275, 0.40821073],
image_std=[0.26862954, 0.26130258, 0.27577711],
do_convert_rgb=True,
):
super().__init__()
size = size if size is not None else {"height": 30, "width": 30}
self.parent = parent
self.batch_size = batch_size
self.num_channels = num_channels
self.image_size = image_size
self.min_resolution = min_resolution
self.max_resolution = max_resolution
self.do_resize = do_resize
self.size = size
self.do_normalize = do_normalize
self.image_mean = image_mean
self.image_std = image_std
self.do_convert_rgb = do_convert_rgb
def prepare_image_processor_dict(self):
return {
"do_resize": self.do_resize,
"size": self.size,
"do_normalize": self.do_normalize,
"image_mean": self.image_mean,
"image_std": self.image_std,
"do_convert_rgb": self.do_convert_rgb,
}
def prepare_image_inputs(self, equal_resolution=False, numpify=False, torchify=False):
return prepare_image_inputs(
batch_size=self.batch_size,
num_channels=self.num_channels,
min_resolution=self.min_resolution,
max_resolution=self.max_resolution,
equal_resolution=equal_resolution,
numpify=numpify,
torchify=torchify,
)
@require_torch
@require_vision
class Cohere2VisionProcessingTest(ImageProcessingTestMixin, unittest.TestCase):
fast_image_processing_class = Cohere2VisionImageProcessorFast if is_torchvision_available() else None
test_slow_image_processor = False
def setUp(self):
super().setUp()
self.image_processor_tester = Cohere2VisionImageProcessingTester(self)
@property
def image_processor_dict(self):
return self.image_processor_tester.prepare_image_processor_dict()
def test_image_processor_properties(self):
for image_processing_class in self.image_processor_list:
image_processor = image_processing_class(**self.image_processor_dict)
self.assertTrue(hasattr(image_processor, "do_resize"))
self.assertTrue(hasattr(image_processor, "size"))
self.assertTrue(hasattr(image_processor, "do_normalize"))
self.assertTrue(hasattr(image_processor, "image_mean"))
self.assertTrue(hasattr(image_processor, "image_std"))
self.assertTrue(hasattr(image_processor, "do_convert_rgb"))
def test_call_pil(self):
for image_processing_class in self.image_processor_list:
# Initialize image_processing
image_processing = image_processing_class(**self.image_processor_dict)
# create random PIL images
image_inputs = self.image_processor_tester.prepare_image_inputs(equal_resolution=True)
for image in image_inputs:
self.assertIsInstance(image, Image.Image)
# Test not batched input
encoded_images = image_processing(image_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (10, 3, 30, 30))
# Test batched
encoded_images = image_processing(image_inputs, return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (70, 3, 30, 30))
def test_call_numpy(self):
for image_processing_class in self.image_processor_list:
# Initialize image_processing
image_processing = image_processing_class(**self.image_processor_dict)
# create random numpy tensors
image_inputs = self.image_processor_tester.prepare_image_inputs(equal_resolution=True, numpify=True)
for image in image_inputs:
self.assertIsInstance(image, np.ndarray)
# Test not batched input
encoded_images = image_processing(image_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (10, 3, 30, 30))
# Test batched
encoded_images = image_processing(image_inputs, return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (70, 3, 30, 30))
def test_call_pytorch(self):
for image_processing_class in self.image_processor_list:
# Initialize image_processing
image_processing = image_processing_class(**self.image_processor_dict)
# create random PyTorch tensors
image_inputs = self.image_processor_tester.prepare_image_inputs(equal_resolution=True, torchify=True)
for image in image_inputs:
self.assertIsInstance(image, torch.Tensor)
# Test not batched input
encoded_images = image_processing(image_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (10, 3, 30, 30))
# Test batched
encoded_images = image_processing(image_inputs, return_tensors="pt").pixel_values
self.assertEqual(tuple(encoded_images.shape), (70, 3, 30, 30))
def test_call_numpy_4_channels(self):
for image_processing_class in self.image_processor_list:
# Test that can process images which have an arbitrary number of channels
# Initialize image_processing
image_processor = image_processing_class(**self.image_processor_dict)
# create random numpy tensors
self.image_processor_tester.num_channels = 4
image_inputs = self.image_processor_tester.prepare_image_inputs(equal_resolution=True, numpify=True)
# Test not batched input
encoded_images = image_processor(
image_inputs[0],
return_tensors="pt",
input_data_format="channels_last",
image_mean=0,
image_std=1,
).pixel_values
self.assertEqual(tuple(encoded_images.shape), (10, 4, 30, 30))
# Test batched
encoded_images = image_processor(
image_inputs,
return_tensors="pt",
input_data_format="channels_last",
image_mean=0,
image_std=1,
).pixel_values
self.assertEqual(tuple(encoded_images.shape), (70, 4, 30, 30))

View File

@@ -0,0 +1,470 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch GotOcr2 model."""
import unittest
from transformers import (
AutoProcessor,
Cohere2VisionConfig,
is_torch_available,
)
from transformers.testing_utils import (
Expectations,
cleanup,
get_device_properties,
require_deterministic_for_xpu,
require_read_token,
require_torch,
require_torch_accelerator,
slow,
torch_device,
)
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import (
Cohere2VisionForConditionalGeneration,
Cohere2VisionModel,
)
class Cohere2VisionText2TextModelTester:
def __init__(
self,
parent,
batch_size=3,
seq_length=7,
downsample_factor=2,
alignment_intermediate_size=32,
ignore_index=-100,
image_token_id=2,
num_channels=3,
image_size=64,
is_training=True,
text_config={
"model_type": "cohere2",
"vocab_size": 99,
"hidden_size": 128,
"intermediate_size": 37,
"num_hidden_layers": 4,
"num_attention_heads": 4,
"output_channels": 64,
"hidden_act": "silu",
"max_position_embeddings": 512,
"tie_word_embeddings": True,
"bos_token_id": 0,
"eos_token_id": 0,
"pad_token_id": 0,
},
vision_config={
"model_type": "siglip_vision_model",
"hidden_size": 32,
"num_hidden_layers": 2,
"num_attention_heads": 4,
"intermediate_size": 128,
"image_size": 64,
"patch_size": 8,
"vision_use_head": False,
},
):
self.parent = parent
self.ignore_index = ignore_index
self.bos_token_id = text_config["bos_token_id"]
self.eos_token_id = text_config["eos_token_id"]
self.pad_token_id = text_config["pad_token_id"]
self.image_token_id = image_token_id
self.text_config = text_config
self.vision_config = vision_config
self.batch_size = batch_size
self.downsample_factor = downsample_factor
self.alignment_intermediate_size = alignment_intermediate_size
self.is_training = is_training
self.num_channels = num_channels
self.image_size = image_size
self.image_seq_length = 16
self.seq_length = seq_length + self.image_seq_length
self.num_hidden_layers = text_config["num_hidden_layers"]
self.vocab_size = text_config["vocab_size"]
self.hidden_size = text_config["hidden_size"]
self.num_attention_heads = text_config["num_attention_heads"]
def get_config(self):
return Cohere2VisionConfig(
text_config=self.text_config,
vision_config=self.vision_config,
image_token_id=self.image_token_id,
downsample_factor=self.downsample_factor,
alignment_intermediate_size=self.alignment_intermediate_size,
)
def prepare_config_and_inputs(self):
config = self.get_config()
pixel_values = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size])
image_num_patches = torch.tensor([1] * self.batch_size).to(torch_device)
return config, pixel_values, image_num_patches
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, pixel_values, image_num_patches = config_and_inputs
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
attention_mask = torch.ones(input_ids.shape, dtype=torch.long, device=torch_device)
input_ids[input_ids == self.image_token_id] = self.pad_token_id
input_ids[:, : self.image_seq_length] = self.image_token_id
inputs_dict = {
"pixel_values": pixel_values,
"input_ids": input_ids,
"attention_mask": attention_mask,
"image_num_patches": image_num_patches,
}
return config, inputs_dict
@require_torch
class Cohere2ModelTest(ModelTesterMixin, GenerationTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (
(
Cohere2VisionModel,
Cohere2VisionForConditionalGeneration,
)
if is_torch_available()
else ()
)
all_generative_model_classes = (Cohere2VisionForConditionalGeneration,) if is_torch_available() else ()
pipeline_model_mapping = (
{
"image-text-to-text": Cohere2VisionForConditionalGeneration,
}
if is_torch_available()
else {}
)
fx_compatible = False
test_pruning = False
test_torchscript = False
test_head_masking = False
_is_composite = True
def setUp(self):
self.model_tester = Cohere2VisionText2TextModelTester(self)
self.config_tester = ConfigTester(self, config_class=Cohere2VisionConfig, has_text_modality=False)
def test_config(self):
self.config_tester.run_common_tests()
@unittest.skip(reason="Siglip backbone uses the same initialization scheme as the Flax original implementation")
def test_initialization(self):
pass
@require_read_token
@require_torch
class Cohere2IntegrationTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.model_checkpoint = "CohereLabs/command-a-vision-07-2025"
cls.model = None
@classmethod
def tearDownClass(cls):
del cls.model
cleanup(torch_device, gc_collect=True)
def tearDown(self):
cleanup(torch_device, gc_collect=True)
@classmethod
def get_model(cls):
# Use 4-bit on T4
device_type, major, _ = get_device_properties()
load_in_4bit = (device_type == "cuda") and (major < 8)
torch_dtype = None if load_in_4bit else torch.float16
if cls.model is None:
cls.model = Cohere2VisionForConditionalGeneration.from_pretrained(
cls.model_checkpoint,
device_map="auto",
torch_dtype=torch_dtype,
load_in_4bit=load_in_4bit,
)
return cls.model
@slow
@require_torch_accelerator
def test_model_integration_forward(self):
processor = AutoProcessor.from_pretrained(self.model_checkpoint)
model = self.get_model()
messages = [
{
"role": "user",
"content": [
{"type": "image", "url": "http://images.cocodataset.org/val2017/000000039769.jpg"},
{"type": "text", "text": "Please describe the image explicitly."},
],
}
]
inputs = processor.apply_chat_template(
messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt"
).to(torch_device, dtype=torch.float16)
# Forward
with torch.inference_mode():
output = model(**inputs)
actual_logits = output.logits[0, -1, :5].cpu()
EXPECTED_LOGITS = Expectations(
{
("xpu", 3): [0.4109, 0.1532, 0.8018, 2.1328, 0.5483],
# 4-bit
("cuda", 7): [0.1097, 0.3481, 3.8340, 9.7969, 2.0488],
("cuda", 8): [2.4277, 1.6875, 1.8789, 2.1875, 1.9375],
}
) # fmt: skip
expected_logits = torch.tensor(EXPECTED_LOGITS.get_expectation(), dtype=torch.float16)
self.assertTrue(
torch.allclose(actual_logits, expected_logits, atol=0.1),
f"Actual logits: {actual_logits}"
f"\nExpected logits: {expected_logits}"
f"\nDifference: {torch.abs(actual_logits - expected_logits)}",
)
@slow
@require_torch_accelerator
@require_deterministic_for_xpu
def test_model_integration_generate_text_only(self):
processor = AutoProcessor.from_pretrained(self.model_checkpoint)
model = self.get_model()
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Write a haiku"},
],
}
]
inputs = processor.apply_chat_template(
messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt"
).to(torch_device, dtype=torch.float16)
with torch.no_grad():
generate_ids = model.generate(**inputs, max_new_tokens=25, do_sample=False)
decoded_output = processor.decode(
generate_ids[0, inputs["input_ids"].shape[1] :], skip_special_tokens=True
)
expected_outputs = Expectations(
{
("xpu", 3): "Whispers on the breeze,\nLeaves dance under moonlit skies,\nNature's quiet song.",
# 4-bit
("cuda", 7): "Sure, here's a haiku for you:\n\nMorning dew sparkles,\nPetals unfold in sunlight,\n",
("cuda", 8): "**Haiku**\n\n*Softly falls the snow*\n*Blanketing the earth in white*\n*",
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(decoded_output, expected_output)
@slow
@require_torch_accelerator
@require_deterministic_for_xpu
def test_model_integration_generate_chat_template(self):
processor = AutoProcessor.from_pretrained(self.model_checkpoint)
model = self.get_model()
messages = [
{
"role": "user",
"content": [
{"type": "image", "url": "http://images.cocodataset.org/val2017/000000039769.jpg"},
{"type": "text", "text": "Please describe the image explicitly."},
],
}
]
inputs = processor.apply_chat_template(
messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt"
).to(torch_device, dtype=torch.float16)
with torch.no_grad():
generate_ids = model.generate(**inputs, max_new_tokens=20, do_sample=False)
decoded_output = processor.decode(
generate_ids[0, inputs["input_ids"].shape[1] :], skip_special_tokens=True
)
expected_outputs = Expectations(
{
("xpu", 3): 'The image depicts a cozy scene of two cats resting on a bright pink blanket. The cats,',
# 4-bit
("cuda", 7): 'The image depicts two cats comfortably resting on a pink blanket spread across a sofa. The cats,',
("cuda", 8): 'The image depicts two cats lying on a bright pink blanket that covers a red couch. The cat',
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(decoded_output, expected_output)
@slow
@require_torch_accelerator
def test_model_integration_batched_generate(self):
processor = AutoProcessor.from_pretrained(self.model_checkpoint)
model = self.get_model()
# Prepare inputs
messages = [
[
{
"role": "user",
"content": [
{"type": "image", "url": "https://llava-vl.github.io/static/images/view.jpg"},
{"type": "text", "text": "Write a haiku for this image"},
],
},
],
[
{
"role": "user",
"content": [
{"type": "image", "url": "https://www.ilankelman.org/stopsigns/australia.jpg"},
{"type": "text", "text": "Describe this image"},
],
},
],
]
inputs = processor.apply_chat_template(
messages, padding=True, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt"
).to(model.device, dtype=torch.float16)
output = model.generate(**inputs, do_sample=False, max_new_tokens=25)
# Check first output
decoded_output = processor.decode(output[0, inputs["input_ids"].shape[1] :], skip_special_tokens=True)
expected_outputs = Expectations(
{
("xpu", 3): "Wooden path to water,\nMountains echo in stillness,\nPeaceful forest lake.",
# 4-bit
("cuda", 7): "Wooden bridge stretches\nMirrored lake below, mountains rise\nPeaceful, serene",
("cuda", 8): 'Dock stretches to calm, \nMountains whisper through the trees, \nLake mirrors the sky.',
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(
decoded_output,
expected_output,
f"Decoded output: {decoded_output}\nExpected output: {expected_output}",
)
# Check second output
decoded_output = processor.decode(output[1, inputs["input_ids"].shape[1] :], skip_special_tokens=True)
expected_outputs = Expectations(
{
("xpu", 3): 'This image captures a vibrant street scene in a bustling urban area, likely in an Asian city. The focal point is a',
# 4-bit
("cuda", 7): 'This vibrant image captures a bustling street scene in a multicultural urban area, featuring a traditional Chinese gate adorned with intricate red and',
("cuda", 8): 'The image depicts a vibrant street scene in what appears to be a Chinatown district, likely in an urban area. The focal',
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(
decoded_output,
expected_output,
f"Decoded output: {decoded_output}\nExpected output: {expected_output}",
)
@slow
@require_torch_accelerator
@require_deterministic_for_xpu
def test_model_integration_batched_generate_multi_image(self):
processor = AutoProcessor.from_pretrained(self.model_checkpoint)
model = self.get_model()
# Prepare inputs
messages = [
[
{
"role": "user",
"content": [
{"type": "image", "url": "https://llava-vl.github.io/static/images/view.jpg"},
{"type": "text", "text": "Write a haiku for this image"},
],
},
],
[
{
"role": "user",
"content": [
{
"type": "image",
"url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg",
},
{
"type": "image",
"url": "https://thumbs.dreamstime.com/b/golden-gate-bridge-san-francisco-purple-flowers-california-echium-candicans-36805947.jpg",
},
{
"type": "text",
"text": "These images depict two different landmarks. Can you identify them?",
},
],
},
],
]
inputs = processor.apply_chat_template(
messages, padding=True, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt"
).to(model.device, dtype=torch.float16)
output = model.generate(**inputs, do_sample=False, max_new_tokens=25)
# Check first output
decoded_output = processor.decode(output[0, inputs["input_ids"].shape[1] :], skip_special_tokens=True)
# Batching seems to alter the output slightly, but it is also the case in the original implementation. This seems to be expected: https://github.com/huggingface/transformers/issues/23017#issuecomment-1649630232
expected_outputs = Expectations(
{
("xpu", 3): "Wooden path to water,\nMountains echo in stillness,\nPeaceful forest lake.",
("cuda", 7): 'Wooden bridge stretches\nMirrored lake below, mountains rise\nPeaceful, serene',
("cuda", 8): 'Dock stretches to calm, \nMountains whisper through the trees, \nLake mirrors the sky.',
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(
decoded_output,
expected_output,
f"Decoded output: {decoded_output}\nExpected output: {expected_output}",
)
# Check second output
decoded_output = processor.decode(output[1, inputs["input_ids"].shape[1] :], skip_special_tokens=True)
expected_outputs = Expectations(
{
("xpu", 3): "The first image showcases the Statue of Liberty, a colossal neoclassical sculpture on Liberty Island in New York Harbor. Standing at ",
("cuda", 7): 'The first image showcases the Statue of Liberty, a monumental sculpture located on Liberty Island in New York Harbor. Standing atop a',
("cuda", 8): 'The two landmarks depicted in the images are the Statue of Liberty and the Golden Gate Bridge. \n\n1. **Statue',
}
) # fmt: skip
expected_output = expected_outputs.get_expectation()
self.assertEqual(
decoded_output,
expected_output,
f"Decoded output: {decoded_output}\nExpected output: {expected_output}",
)

View File

@@ -0,0 +1,139 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import shutil
import tempfile
import unittest
from transformers import AutoProcessor, AutoTokenizer, Cohere2VisionProcessor
from transformers.testing_utils import require_read_token, require_torch, require_vision
from transformers.utils import is_torch_available, is_torchvision_available
from ...test_processing_common import ProcessorTesterMixin
if is_torch_available():
import torch
if is_torchvision_available():
from transformers import Cohere2VisionImageProcessorFast
@require_read_token
@require_vision
@unittest.skip("Model not released yet!")
class Cohere2VisionProcessorTest(ProcessorTesterMixin, unittest.TestCase):
processor_class = Cohere2VisionProcessor
@classmethod
def setUpClass(cls):
cls.tmpdirname = tempfile.mkdtemp()
image_processor = Cohere2VisionImageProcessorFast(
size={"height": 20, "width": 20},
max_patches=3,
)
tokenizer = AutoTokenizer.from_pretrained("CohereLabs/command-a-vision-07-2025")
processor_kwargs = cls.prepare_processor_dict()
processor = Cohere2VisionProcessor(
image_processor=image_processor,
tokenizer=tokenizer,
**processor_kwargs,
)
processor.save_pretrained(cls.tmpdirname)
cls.image_token = processor.image_token
@staticmethod
def prepare_processor_dict():
return {}
def get_tokenizer(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).tokenizer
def get_image_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).image_processor
def get_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdirname, ignore_errors=True)
@require_torch
def test_process_interleaved_images_videos(self):
processor = self.get_processor()
messages = [
[
{
"role": "user",
"content": [
{
"type": "image",
"url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg",
},
{
"type": "image",
"url": "https://thumbs.dreamstime.com/b/golden-gate-bridge-san-francisco-purple-flowers-california-echium-candicans-36805947.jpg",
},
{"type": "text", "text": "What are the differences between these two images?"},
],
},
],
[
{
"role": "user",
"content": [
{
"type": "image",
"url": "https://llava-vl.github.io/static/images/view.jpg",
},
{"type": "text", "text": "Write a haiku for this image"},
],
}
],
]
inputs_batched = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
padding=True,
)
# Process non batched inputs to check if the pixel_values and input_ids are reconstructed in the correct order when batched together
images_patches_index = 0
for i, message in enumerate(messages):
inputs = processor.apply_chat_template(
message,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
padding=True,
)
# We slice with [-inputs["input_ids"].shape[1] :] as the input_ids are left padded
torch.testing.assert_close(
inputs["input_ids"][0], inputs_batched["input_ids"][i][-inputs["input_ids"].shape[1] :]
)
torch.testing.assert_close(
inputs["pixel_values"],
inputs_batched["pixel_values"][
images_patches_index : images_patches_index + inputs["pixel_values"].shape[0]
],
)
images_patches_index += inputs["pixel_values"].shape[0]

View File

@@ -4677,9 +4677,13 @@ class ModelTesterMixin:
sub_config = getattr(config, key) sub_config = getattr(config, key)
update_config_for_flex(sub_config) update_config_for_flex(sub_config)
model = model_class(config).to(device=torch_device) if model_class._can_set_attn_implementation():
model.set_attn_implementation("flex_attention") model = model_class(config).to(device=torch_device)
self.assertTrue(model.config._attn_implementation == "flex_attention") model.set_attn_implementation("flex_attention")
self.assertTrue(model.config._attn_implementation == "flex_attention")
else:
config._attn_implementation = "flex_attention"
model = model_class(config).to(device=torch_device)
# Elaborate workaround for encoder-decoder models as some do not specify their main input # Elaborate workaround for encoder-decoder models as some do not specify their main input
dummy_inputs = {model.main_input_name: inputs_dict[model.main_input_name].to(torch_device)} dummy_inputs = {model.main_input_name: inputs_dict[model.main_input_name].to(torch_device)}