Add Doge model (#35891)

* Add Doge Model

* Fix code quality

* Rollback an error commit

* Fix config for open-source weights

* Revert "Fix config for open-source weights"

This reverts commit 229cdcac10a6a4274d1dd13b729bc14c98eb0c76.

* Add modular_doge

* Update Doge inherits from Llama

* Fix import bug

* [docs] Add usage of doge model

* Fix Doge import pretrainedconfig from modeling_utils to configuration_utils

* [docs] remove trust remote code from doge

* Fix dynamo bug in doge model

* Update docstrings

* Import apply_rotary_pos_emb and repeat_kv from Llama

* Fix all nits

* Fix code quality

* Fix some bugs

* Fix code quality

* Remove inherited `_update_causal_mask` from Llama
This leads to incorrect weight initialization.

* Fix the wrong tensor orderings in DogeCDMoE

* Fix attention mask bug
We have to provide attention_mask for dynamic mask computation

* Modify most implementations to inherit from Llama
But there are two problems:
1. `flex_attention_forward` is not updated properly
2. `Example` error in the forward method of DogeForCausalLM

* Modify CDMoE for batch efficient implementation

* Uniform MoE configuration names, just like QwenMoE

* Fix code quality

* Fix code quality

* Fix code quality

* Add tp plan of CDMoE Module

* Hybird DMA with sliding window

* Update valid tokens greater than window size

* Fix code quality

* Add `convert_doge_weights_to_hf`

* Fix STATE_DICT_MAPPING in convert_doge_weights_to_hf.py

* Fix nits in modular_doge

* Fix code quality

* Fix all nits

* Fix all nits

* Make sure the attention function is updated inside the class

* Fix code quality issues in the Doge model and add a test for it

* Fix `test_generate`

* Fix code quality

* Fix nits fllowing suggestions

* Fix code quality

* Fix code quality issues

* Fix nits

* Fix code quality nits

* Fix the missing parameters in the configuration.

* Fix the missing parameters in the configuration.

* Fix nits

* Add initialization of attention

* Fix last nits

* Simplify dynamic mask generation logic

* Rename router_logits to gate_logits for matching latest changes of MixtralModel

* Rename typings for matching latest changes of MixtralModel

* Fixes typo in comment

* Update src/transformers/models/doge/modular_doge.py

Co-authored-by: Arthur <48595927+ArthurZucker@users.noreply.github.com>

* Fix code quality issues to match other modular

* Fix code quality issues to match other modular

* Fix the static compilation errors

* Update model weights link

* Fix code quality issues to match other modular

* reapply modular and support for new outputs

* style

* simplify a lot

* fix import location

* reapply modular

* fix

* fix integration test

---------

Co-authored-by: Arthur <48595927+ArthurZucker@users.noreply.github.com>
Co-authored-by: Cyril Vallez <cyril.vallez@huggingface.co>
Co-authored-by: Cyril Vallez <cyril.vallez@gmail.com>
This commit is contained in:
Jingze Shi
2025-07-08 17:44:29 +08:00
committed by GitHub
parent d370bc64c6
commit d8590b4b0c
11 changed files with 2624 additions and 0 deletions

View File

@@ -429,6 +429,8 @@
title: DiffLlama
- local: model_doc/distilbert
title: DistilBERT
- local: model_doc/doge
title: Doge
- local: model_doc/dots1
title: dots1
- local: model_doc/dpr

View File

@@ -0,0 +1,103 @@
<!--Copyright 2025 The HuggingFace Team. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.
-->
# Doge
## Overview
Doge is a series of small language models based on the [Doge](https://github.com/SmallDoges/small-doge) architecture, aiming to combine the advantages of state-space and self-attention algorithms, calculate dynamic masks from cached value states using the zero-order hold method, and solve the problem of existing mainstream language models getting lost in context. It uses the `wsd_scheduler` scheduler to pre-train on the `smollm-corpus`, and can continue training on new datasets or add sparse activation feedforward networks from stable stage checkpoints.
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/refs%2Fpr%2F426/transformers/model_doc/doge_architecture.png" alt="drawing" width="600"/>
As shown in the figure below, the sequence transformation part of the Doge architecture uses `Dynamic Mask Attention`, which can be understood as using self-attention related to value states during training, and using state-space without past state decay during inference, to solve the problem of existing Transformers or SSMs getting lost in long text. The state transformation part of Doge uses `Cross Domain Mixture of Experts`, which consists of dense linear layers and sparse embedding layers, and can additionally increase sparse parameters to continue training from dense weight checkpoints without retraining the entire model, thereby reducing the cost of continuous iteration of the model. In addition, Doge also uses `RMSNorm` and `Residual` with learnable parameters to adapt the gradient range of deep models.
Checkout all Doge model checkpoints [here](https://huggingface.co/collections/SmallDoge/doge-slm-679cc991f027c4a3abbded4a).
## Usage
<details>
<summary>Using Doge-Base for text generation</summary>
```python
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("SmallDoge/Doge-20M")
model = AutoModelForCausalLM.from_pretrained("SmallDoge/Doge-20M")
inputs = tokenizer("Hey how are you doing?", return_tensors="pt")
outputs = model.generate(**inputs, max_new_tokens=100)
print(tokenizer.batch_decode(outputs))
```
</details>
<details>
<summary>Using Doge-Instruct for question answering</summary>
```python
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextStreamer
tokenizer = AutoTokenizer.from_pretrained("SmallDoge/Doge-20M-Instruct")
model = AutoModelForCausalLM.from_pretrained("SmallDoge/Doge-20M-Instruct")
generation_config = GenerationConfig(
max_new_tokens=100,
use_cache=True,
do_sample=True,
temperature=0.8,
top_p=0.9,
repetition_penalty=1.0
)
steamer = TextStreamer(tokenizer=tokenizer, skip_prompt=True)
prompt = "Hi, how are you doing today?"
conversation = [
{"role": "user", "content": prompt}
]
inputs = tokenizer.apply_chat_template(
conversation=conversation,
tokenize=True,
return_tensors="pt",
)
outputs = model.generate(
inputs,
tokenizer=tokenizer,
generation_config=generation_config,
streamer=steamer
)
```
</details>
## DogeConfig
[[autodoc]] DogeConfig
## DogeModel
[[autodoc]] DogeModel
- forward
## DogeForCausalLM
[[autodoc]] DogeForCausalLM
- forward
## DogeForSequenceClassification
[[autodoc]] DogeForSequenceClassification
- forward

View File

@@ -112,6 +112,7 @@ CONFIG_MAPPING_NAMES = OrderedDict[str, str](
("dinov2", "Dinov2Config"),
("dinov2_with_registers", "Dinov2WithRegistersConfig"),
("distilbert", "DistilBertConfig"),
("doge", "DogeConfig"),
("donut-swin", "DonutSwinConfig"),
("dots1", "Dots1Config"),
("dpr", "DPRConfig"),
@@ -493,6 +494,7 @@ MODEL_NAMES_MAPPING = OrderedDict[str, str](
("dinov2_with_registers", "DINOv2 with Registers"),
("distilbert", "DistilBERT"),
("dit", "DiT"),
("doge", "Doge"),
("donut-swin", "DonutSwin"),
("dots1", "dots1"),
("dpr", "DPR"),

View File

@@ -105,6 +105,7 @@ MODEL_MAPPING_NAMES = OrderedDict(
("dinov2", "Dinov2Model"),
("dinov2_with_registers", "Dinov2WithRegistersModel"),
("distilbert", "DistilBertModel"),
("doge", "DogeModel"),
("donut-swin", "DonutSwinModel"),
("dots1", "Dots1Model"),
("dpr", "DPRQuestionEncoder"),
@@ -576,6 +577,7 @@ MODEL_FOR_CAUSAL_LM_MAPPING_NAMES = OrderedDict(
("dbrx", "DbrxForCausalLM"),
("deepseek_v3", "DeepseekV3ForCausalLM"),
("diffllama", "DiffLlamaForCausalLM"),
("doge", "DogeForCausalLM"),
("dots1", "Dots1ForCausalLM"),
("electra", "ElectraForCausalLM"),
("emu3", "Emu3ForCausalLM"),
@@ -1105,6 +1107,7 @@ MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING_NAMES = OrderedDict(
("deberta-v2", "DebertaV2ForSequenceClassification"),
("diffllama", "DiffLlamaForSequenceClassification"),
("distilbert", "DistilBertForSequenceClassification"),
("doge", "DogeForSequenceClassification"),
("electra", "ElectraForSequenceClassification"),
("ernie", "ErnieForSequenceClassification"),
("ernie_m", "ErnieMForSequenceClassification"),

View File

@@ -0,0 +1,28 @@
# coding=utf-8
# Copyright 2025 Jingze Shi and the HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from ...utils import _LazyModule
from ...utils.import_utils import define_import_structure
if TYPE_CHECKING:
from .configuration_doge import *
from .modeling_doge import *
else:
import sys
_file = globals()["__file__"]
sys.modules[__name__] = _LazyModule(__name__, _file, define_import_structure(_file), module_spec=__spec__)

View File

@@ -0,0 +1,241 @@
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# This file was automatically generated from src/transformers/models/doge/modular_doge.py.
# Do NOT edit this file manually as any edits will be overwritten by the generation of
# the file from the modular. If any change should be done, please apply the change to the
# modular_doge.py file directly. One of our CI enforces this.
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# coding=utf-8
# Copyright 2025 Jingze Shi and the HuggingFace Inc. team. All rights reserved.
#
# The Doge family of small language models is trained by SmallDoge Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...configuration_utils import PretrainedConfig
from ...modeling_rope_utils import rope_config_validation
class DogeConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`DogeModel`]. It is used to instantiate an Doge
model according to the specified arguments, defining the model architecture like [SmallDoge/Doge-320M](https://huggingface.co/SmallDoge/Doge-320M).
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vocab_size (`int`, *optional*, defaults to 32768):
Vocabulary size of the Doge2 model. Defines the number of different tokens that can be represented by the `inputs_ids` passed when calling [`DogeModel`]
hidden_size (`int`, *optional*, defaults to 1024):
Dimension of the hidden representations.
intermediate_size (`int`, *optional*, defaults to 2048):
Dimension of the MLP representations.
num_hidden_layers (`int`, *optional*, defaults to 32):
Number of hidden layers in the Transformer decoder.
hidden_dropout (`float`, *optional*, defaults to 0.0):
Dropout probability for each sequence transformation and state transformation module.
hidden_act (`str` or `function`, *optional*, defaults to `"silu"`):
The non-linear activation function (function or string) in the decoder.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
rms_norm_eps (`float`, *optional*, defaults to 1e-06):
The epsilon used by the rms normalization layers.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models). Only
relevant if `config.is_decoder=True`.
tie_word_embeddings (`bool`, *optional*, defaults to `False`):
Whether the model's input and output word embeddings should be tied.
max_position_embeddings (`int`, *optional*, defaults to 2048):
The maximum sequence length that this model might ever be used with.
rope_theta (`float`, *optional*, defaults to 10000.0):
The base period of the RoPE embeddings.
rope_scaling (`Dict`, *optional*):
Dictionary containing the scaling configuration for the RoPE embeddings.
NOTE: if you apply new rope type and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value accordingly.
Doge family of small models use `{ 'rope_type': 'dynamic', 'factor': 4.0, 'original_max_position_embeddings': 2048 }` as the default value.
Expected contents:
`rope_type` (`str`):
The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope', 'llama3'], with 'default' being the original RoPE implementation.
`factor` (`float`, *optional*):
Used with all rope types except 'default'. The scaling factor to apply to the RoPE embeddings.
In most scaling types, a `factor` of x will enable the model to handle sequences of length x * original maximum pre-trained length.
`original_max_position_embeddings` (`int`, *optional*):
Used with 'dynamic', 'longrope' and 'llama3'.
The original max position embeddings used during pretraining.
`attention_factor` (`float`, *optional*):
Used with 'yarn' and 'longrope'. The scaling factor to be applied on the attention
computation.
If unspecified, it defaults to value recommended by the implementation, using the `factor` field to infer the suggested value.
`beta_fast` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for extrapolation (only) in the linear
ramp function. If unspecified, it defaults to 32.
`beta_slow` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for interpolation (only) in the linear
ramp function. If unspecified, it defaults to 1.
`short_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to short contexts (<`original_max_position_embeddings`).
Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2
`long_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to long contexts (<`original_max_position_embeddings`).
Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2
`low_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to low frequency components of the RoPE
`high_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to high frequency components of the RoPE
num_attention_heads (`int`, *optional*, defaults to 8):
Number of attention heads for each attention layer in the Transformer decoder.
num_key_value_heads (`int`, *optional*):
This is the number of key_value heads that should be used to implement Grouped Query Attention.
If `num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if
`num_key_value_heads=1` the model will use Multi Query Attention (MQA) otherwise GQA is used.
When converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed by meanpooling all the original heads within that group.
For more details checkout [this paper](https://arxiv.org/pdf/2305.13245.pdf).
If it is not specified, will default to `num_attention_heads`.
attention_bias (`bool`, defaults to `False`, *optional*, defaults to `False`):
Whether to use a bias in the query, key, value and output projection layers during self-attention.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
mlp_bias (`bool`, *optional*, defaults to `False`):
Whether to use a bias in up_proj, down_proj and gate_proj layers in the MLP layers.
sliding_window (`int`, *optional*):
Sliding window attention window size. If not specified, will default to `None`.
keep_window_size (`int`, *optional*, defaults to 2048):
The window size of tokens that are not dynamically masked, and dynamic masking is only performed when the sequence length exceeds this value.
is_moe (`bool`, *optional*, defaults to `False`):
Whether to use the Cross Domain Mixture of Experts, if `True`, the MoE will inherit the MLP to initialize.
num_experts (`int`, *optional*, defaults to 16384):
Number of routed experts in the model. This is only used when `is_moe=True`.
num_experts_per_tok (`int`, *optional*, defaults to 64):
Number of selected experts to route per-token.
norm_topk_prob (`bool`, *optional*, defaults to `False`):
Whether to normalize the topk probabilities.
output_router_logits (`bool`, *optional*, defaults to `False`):
Whether or not the router logits should be returned by the model. Enabling this will also
allow the model to output the auxiliary loss, including load balancing loss and router z-loss.
router_aux_loss_coef (`float`, *optional*, defaults to 0.001):
The aux loss factor for the total loss.
```python
>>> from transformers import DogeConfig, DogeModel
>>> # Initializing a Doge-320M style configuration
>>> configuration = DogeConfig()
>>> # Initializing a model from the Doge-320M style configuration
>>> model = DogeModel(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "doge"
keys_to_ignore_at_inference = ["past_key_values"]
# Default tensor parallel plan for base model `DogeModel`
base_model_tp_plan = {
"layers.*.self_attn.q_proj": "colwise",
"layers.*.self_attn.k_proj": "colwise",
"layers.*.self_attn.v_proj": "colwise",
"layers.*.self_attn.dt_proj": "rowwise",
"layers.*.self_attn.o_proj": "rowwise",
"layers.*.input_layernorm.weight": "sequence_parallel",
"layers.*.input_residual.weight": "sequence_parallel",
"layers.*.post_attention_layernorm.weight": "sequence_parallel",
"layers.*.post_attention_residual.weight": "sequence_parallel",
"norm.weight": "sequence_parallel",
"layers.*.mlp.gate_proj": "colwise",
"layers.*.mlp.up_proj": "colwise",
"layers.*.mlp.down_proj": "rowwise",
"layers.*.mlp.router_gate": "colwise_rep",
"layers.*.mlp.down_embed": "rowwise_rep",
"layers.*.mlp.up_embed": "rowwise_rep",
}
base_model_pp_plan = {
"embed_tokens": (["input_ids"], ["inputs_embeds"]),
"layers": (["hidden_states", "attention_mask"], ["hidden_states"]),
"norm": (["hidden_states"], ["hidden_states"]),
}
def __init__(
self,
vocab_size=32768,
hidden_size=1024,
intermediate_size=2048,
num_hidden_layers=32,
hidden_dropout=0.0,
hidden_act="silu",
initializer_range=0.02,
rms_norm_eps=1e-06,
use_cache=True,
tie_word_embeddings=False,
max_position_embeddings=2048,
rope_theta=10000.0,
rope_scaling=None,
num_attention_heads=8,
num_key_value_heads=None,
attention_bias=False,
attention_dropout=0.0,
mlp_bias=False,
sliding_window=None,
keep_window_size=2048,
is_moe=False,
num_experts=16384,
num_experts_per_tok=64,
norm_topk_prob=False,
output_router_logits=False,
router_aux_loss_coef=0.001,
**kwargs,
):
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.hidden_dropout = hidden_dropout
self.hidden_act = hidden_act
self.initializer_range = initializer_range
self.rms_norm_eps = rms_norm_eps
self.use_cache = use_cache
self.max_position_embeddings = max_position_embeddings
self.rope_theta = rope_theta
self.rope_scaling = rope_scaling
self.num_attention_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.attention_bias = attention_bias
self.attention_dropout = attention_dropout
self.mlp_bias = mlp_bias
self.sliding_window = sliding_window
self.keep_window_size = keep_window_size
self.is_moe = is_moe
self.num_experts = num_experts
self.num_experts_per_tok = num_experts_per_tok
self.norm_topk_prob = norm_topk_prob
self.output_router_logits = output_router_logits
self.router_aux_loss_coef = router_aux_loss_coef
# Validate the correctness of rotary position embeddings parameters
# BC: if there is a 'type' field, copy it it to 'rope_type'.
if self.rope_scaling is not None and "type" in self.rope_scaling:
self.rope_scaling["rope_type"] = self.rope_scaling["type"]
rope_config_validation(self)
# for backward compatibility
if num_key_value_heads is None:
self.num_key_value_heads = num_attention_heads
super().__init__(
tie_word_embeddings=tie_word_embeddings,
**kwargs,
)
__all__ = ["DogeConfig"]

View File

@@ -0,0 +1,126 @@
import argparse
import json
import os
import re
import torch
from safetensors.torch import load_file
from transformers import DogeConfig, DogeForCausalLM
# fmt: off
# `None` means we drop the key
STATE_DICT_MAPPING = {
# CausalLM keys
r"^lm_head.weight": r"lm_head.weight",
# Model keys
r"^model.word_embed.weight": r"model.embed_tokens.weight",
r"^model.rotary_emb.rotary_emb": r"model.rotary_emb.rotary_emb",
r"^model.final_layernorm.weight": r"model.norm.weight",
# Layers keys
r"^model.layers.(\d+).pre_layernorm.weight": r"model.layers.\1.input_layernorm.weight",
r"^model.layers.(\d+).pre_residual.weight": r"model.layers.\1.input_residual",
r"^model.layers.(\d+).post_layernorm.weight": r"model.layers.\1.post_attention_layernorm.weight",
r"^model.layers.(\d+).post_residual.weight": r"model.layers.\1.post_attention_residual",
# Attention keys
r"^model.layers.(\d+).self_attn.q_proj.weight": r"model.layers.\1.self_attn.q_proj.weight",
r"^model.layers.(\d+).self_attn.k_proj.weight": r"model.layers.\1.self_attn.k_proj.weight",
r"^model.layers.(\d+).self_attn.v_proj.weight": r"model.layers.\1.self_attn.v_proj.weight",
r"^model.layers.(\d+).self_attn.A": r"model.layers.\1.self_attn.A",
r"^model.layers.(\d+).self_attn.dt_proj.weight": r"model.layers.\1.self_attn.dt_proj.weight",
r"^model.layers.(\d+).self_attn.o_proj.weight": r"model.layers.\1.self_attn.o_proj.weight",
# Feedforward keys
r"^model.layers.(\d+).feed_forward.gate_proj.weight": r"model.layers.\1.mlp.gate_proj.weight",
r"^model.layers.(\d+).feed_forward.up_proj.weight": r"model.layers.\1.mlp.up_proj.weight",
r"^model.layers.(\d+).feed_forward.down_proj.weight": r"model.layers.\1.mlp.down_proj.weight",
r"^model.layers.(\d+).feed_forward.router_gate.weight": r"model.layers.\1.mlp.router_gate.weight",
r"^model.layers.(\d+).feed_forward.router_gate.bias": None,
r"^model.layers.(\d+).feed_forward.down_embed.weight": r"model.layers.\1.mlp.down_embed.weight",
r"^model.layers.(\d+).feed_forward.up_embed.weight": r"model.layers.\1.mlp.up_embed.weight",
}
# fmt: on
def load_weights(input_dir: str):
safetensor_files = [os.path.join(input_dir, x) for x in os.listdir(input_dir) if x.endswith(".safetensors")]
all_weights = {}
if safetensor_files:
if len(safetensor_files) == 1:
tensors = load_file(safetensor_files[0])
all_weights.update(tensors)
return all_weights
safetensor_files = sorted(safetensor_files, key=lambda x: int(x.rsplit("-", 3)[1]))
for file in safetensor_files:
tensors = load_file(file)
all_weights.update(tensors)
return all_weights
else:
raise ValueError("No .safetensors or .bin files found in the specified directory.")
def map_old_key_to_new(old_key):
for pattern, replacement in STATE_DICT_MAPPING.items():
if replacement is None:
if re.fullmatch(pattern, old_key):
return None
else:
new_key, n_replace = re.subn(pattern, replacement, old_key)
# Early exit of the loop
if n_replace > 0:
return new_key
raise ValueError(f"Key: {old_key} could not be mapped (check the mapping).")
def convert_state_dict(original_state_dict: dict, config: DogeConfig):
new_dict = {}
for old_key, value in original_state_dict.items():
new_key = map_old_key_to_new(old_key)
if new_key is None:
continue
new_dict[new_key] = value
return new_dict
def convert_doge_model(input_dir, output_dir):
# Load and convert config
with open(os.path.join(input_dir, "config.json")) as f:
config = json.load(f)
config = DogeConfig(**config)
config.save_pretrained(output_dir)
# Load and convert weights
original_state_dict = load_weights(input_dir)
new_dict = convert_state_dict(original_state_dict, config)
with torch.device("meta"):
model = DogeForCausalLM(config)
if config.tie_word_embeddings:
new_dict["lm_head.weight"] = new_dict["model.embed_tokens.weight"]
model.load_state_dict(new_dict, strict=True, assign=True)
model.save_pretrained(output_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"input_dir",
type=str,
help="Location of the local folder copied from the Hub.",
)
parser.add_argument(
"output_dir",
type=str,
help="Location to write HF model.",
)
args = parser.parse_args()
convert_doge_model(args.input_dir, args.output_dir)

View File

@@ -0,0 +1,947 @@
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# This file was automatically generated from src/transformers/models/doge/modular_doge.py.
# Do NOT edit this file manually as any edits will be overwritten by the generation of
# the file from the modular. If any change should be done, please apply the change to the
# modular_doge.py file directly. One of our CI enforces this.
# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# coding=utf-8
# Copyright 2025 Jingze Shi and the HuggingFace Inc. team. All rights reserved.
#
# The Doge family of small language models is trained by SmallDoge Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from typing import Callable, Optional, Union
import torch
import torch.nn.functional as F
from torch import nn
from ...activations import ACT2FN
from ...cache_utils import Cache, DynamicCache
from ...generation import GenerationMixin
from ...integrations import use_kernel_forward_from_hub
from ...integrations.flex_attention import compile_friendly_flex_attention
from ...masking_utils import create_causal_mask, create_sliding_window_causal_mask
from ...modeling_layers import GradientCheckpointingLayer
from ...modeling_outputs import (
BaseModelOutputWithPast,
MoeCausalLMOutputWithPast,
MoeModelOutputWithPast,
SequenceClassifierOutputWithPast,
)
from ...modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update
from ...modeling_utils import AttentionInterface, PreTrainedModel
from ...processing_utils import Unpack
from ...utils import TransformersKwargs, auto_docstring, can_return_tuple, is_torch_flex_attn_available, logging
from ...utils.generic import OutputRecorder, check_model_inputs
from .configuration_doge import DogeConfig
if is_torch_flex_attn_available():
from torch.nn.attention.flex_attention import BlockMask
logger = logging.get_logger(__name__)
@use_kernel_forward_from_hub("RMSNorm")
class DogeRMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6):
"""
DogeRMSNorm is equivalent to T5LayerNorm
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(torch.float32)
variance = hidden_states.pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
return self.weight * hidden_states.to(input_dtype)
def extra_repr(self):
return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}"
class DogeRotaryEmbedding(nn.Module):
def __init__(self, config: DogeConfig, device=None):
super().__init__()
# BC: "rope_type" was originally "type"
if hasattr(config, "rope_scaling") and isinstance(config.rope_scaling, dict):
self.rope_type = config.rope_scaling.get("rope_type", config.rope_scaling.get("type"))
else:
self.rope_type = "default"
self.max_seq_len_cached = config.max_position_embeddings
self.original_max_seq_len = config.max_position_embeddings
self.config = config
self.rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type]
inv_freq, self.attention_scaling = self.rope_init_fn(self.config, device)
self.register_buffer("inv_freq", inv_freq, persistent=False)
self.original_inv_freq = self.inv_freq
@torch.no_grad()
@dynamic_rope_update # power user: used with advanced RoPE types (e.g. dynamic rope)
def forward(self, x, position_ids):
inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1).to(x.device)
position_ids_expanded = position_ids[:, None, :].float()
device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu"
with torch.autocast(device_type=device_type, enabled=False): # Force float32
freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2)
emb = torch.cat((freqs, freqs), dim=-1)
cos = emb.cos() * self.attention_scaling
sin = emb.sin() * self.attention_scaling
return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype)
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1):
"""Applies Rotary Position Embedding to the query and key tensors.
Args:
q (`torch.Tensor`): The query tensor.
k (`torch.Tensor`): The key tensor.
cos (`torch.Tensor`): The cosine part of the rotary embedding.
sin (`torch.Tensor`): The sine part of the rotary embedding.
position_ids (`torch.Tensor`, *optional*):
Deprecated and unused.
unsqueeze_dim (`int`, *optional*, defaults to 1):
The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and
sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note
that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and
k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes
cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have
the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2.
Returns:
`tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding.
"""
cos = cos.unsqueeze(unsqueeze_dim)
sin = sin.unsqueeze(unsqueeze_dim)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor:
"""
This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch,
num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim)
"""
batch, num_key_value_heads, slen, head_dim = hidden_states.shape
if n_rep == 1:
return hidden_states
hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)
def eager_attention_forward(
module: nn.Module,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Optional[torch.Tensor],
scaling: float,
dropout: float = 0.0,
**kwargs: Unpack[TransformersKwargs],
):
key_states = repeat_kv(key, module.num_key_value_groups)
value_states = repeat_kv(value, module.num_key_value_groups)
attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling
if attention_mask is not None:
causal_mask = attention_mask[:, :, :, : key_states.shape[-2]]
attn_weights = attn_weights + causal_mask
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype)
attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
attn_output = torch.matmul(attn_weights, value_states)
attn_output = attn_output.transpose(1, 2).contiguous()
return attn_output, attn_weights
def flex_attention_forward(
module: nn.Module,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Union[torch.Tensor, "BlockMask"],
scaling: Optional[float] = None,
softcap: Optional[float] = None,
head_mask: Optional[torch.Tensor] = None,
**kwargs,
) -> tuple[torch.Tensor, torch.Tensor]:
block_mask = None
causal_mask = None
if isinstance(attention_mask, BlockMask):
block_mask = attention_mask
else:
causal_mask = attention_mask
if causal_mask is not None:
causal_mask = causal_mask[:, :, :, : key.shape[-2]]
def score_mod(score, batch_idx, head_idx, q_idx, kv_idx):
if softcap is not None:
score = softcap * torch.tanh(score / softcap)
if causal_mask is not None:
score = score + causal_mask[batch_idx][head_idx][q_idx][kv_idx]
if head_mask is not None:
score = score + head_mask[batch_idx][head_idx][0][0]
return score
attn_output, attention_weights = compile_friendly_flex_attention(
query,
key,
value,
score_mod=score_mod,
block_mask=block_mask,
enable_gqa=True,
scale=scaling,
# Last time checked on PyTorch == 2.5.1: Flex Attention always computes the lse regardless.
# For simplification, we thus always return it as no additional computations are introduced.
return_lse=True,
)
# lse is returned in float32
attention_weights = attention_weights.to(value.dtype)
attn_output = attn_output.transpose(1, 2).contiguous()
return attn_output, attention_weights
ALL_ATTENTION_FUNCTIONS = AttentionInterface()
ALL_ATTENTION_FUNCTIONS["doge_flex_attention"] = flex_attention_forward
class DogeAttention(nn.Module):
def __init__(self, config: DogeConfig, layer_idx: Optional[int] = None):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads)
self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads
self.scaling = self.head_dim**-0.5
self.attention_dropout = config.attention_dropout
self.keep_window_size = config.keep_window_size
self.q_proj = nn.Linear(
config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias
)
self.k_proj = nn.Linear(
config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
)
self.v_proj = nn.Linear(
config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
)
# dynamic mask for the QK^T attention weights matrix
self.A = nn.Parameter(torch.zeros(config.num_key_value_heads))
self.dt_proj = nn.Linear(
config.num_key_value_heads * self.head_dim, config.num_key_value_heads, bias=config.attention_bias
)
self.o_proj = nn.Linear(
config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias
)
self.q_norm = DogeRMSNorm(self.head_dim, eps=config.rms_norm_eps)
self.k_norm = DogeRMSNorm(self.head_dim, eps=config.rms_norm_eps)
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
**kwargs,
) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]:
input_shape = hidden_states.shape[:-1]
hidden_shape = (*input_shape, -1, self.head_dim)
query_states = self.q_norm(self.q_proj(hidden_states).view(hidden_shape)).transpose(1, 2)
key_states = self.k_norm(self.k_proj(hidden_states).view(hidden_shape)).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2)
cos, sin = position_embeddings
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)
if past_key_value is not None:
# sin and cos are specific to RoPE models; cache_position needed for the static cache
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
# calculate dynamic mask from value_states
dt_states = self.dt_proj(
value_states.transpose(1, 2).reshape(value_states.shape[0], value_states.shape[-2], -1)
)
dt_states = torch.exp(self.A * F.softplus(dt_states)).transpose(-1, -2)
attn_mask = self.prepare_dynamic_mask(
hidden_states=hidden_states,
dt_states=dt_states,
keep_window_size=self.keep_window_size,
attention_mask=attention_mask,
)
attn_mask = repeat_kv(attn_mask, self.num_key_value_groups)
attention_interface: Callable = eager_attention_forward
if self.config._attn_implementation != "eager":
attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation]
attn_output, attn_weights = attention_interface(
self,
query_states,
key_states,
value_states,
attention_mask=attn_mask,
dropout=0.0 if not self.training else self.attention_dropout,
scaling=self.scaling,
**kwargs,
)
attn_output = attn_output.reshape(*input_shape, -1).contiguous()
attn_output = self.o_proj(attn_output)
return attn_output, attn_weights
def prepare_dynamic_mask(
self,
hidden_states: torch.Tensor,
dt_states: torch.Tensor,
keep_window_size: int = 2048,
attention_mask: Optional[torch.Tensor] = None,
):
"""
The core idea of DMA is to calculate the dynamic attention mask to mask the tokens that should be masked, so as to form sparse attention.
Combine `dt_states` with `attention_mask` to generate the final `attn_mask`.
Args:
hidden_states (`torch.Tensor`): The input hidden_states, used to determine the minimum value of the current input precision.
dt_states (`torch.Tensor`): dt_states of shape `(batch_size, num_heads, key_sequence_length)`.
keep_window_size (`int`): The window size of tokens that are not dynamically masked, and dynamic masking is only performed when the sequence length exceeds this value.
attention_mask (`torch.Tensor`, *optional*): attention mask of shape `(batch_size, 1, query_sequence_length, key_sequence_length)`.
"""
min_dtype = torch.finfo(hidden_states.dtype).min
dtype = hidden_states.dtype
attn_mask = dt_states[:, :, None, :].expand(
-1, -1, hidden_states.shape[1], -1
) # [batch_size, num_heads, query_len, key_len]
if attention_mask is not None and not isinstance(attention_mask, BlockMask):
if attention_mask.dtype == torch.bool:
dtype = hidden_states.dtype
attention_mask = torch.where(
attention_mask, torch.tensor(0.0, device=attention_mask.device, dtype=dtype), min_dtype
)
attn_mask = attn_mask.masked_fill(attention_mask[:, :, :, : attn_mask.shape[-1]] != 0, min_dtype)
if attn_mask.shape[-1] > keep_window_size:
active_mask = torch.zeros_like(attn_mask, dtype=dtype, device=attn_mask.device)
topk_indices = torch.topk(attn_mask, keep_window_size, dim=-1, largest=True, sorted=False).indices
active_mask = active_mask.scatter(-1, topk_indices, 1.0)
attn_mask = attn_mask.masked_fill(active_mask == 0.0, min_dtype)
return attn_mask
class DogeMLP(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=config.mlp_bias)
self.act_fn = ACT2FN[config.hidden_act]
def forward(self, x):
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
return down_proj
class DogeCDMoE(nn.Module):
def __init__(self, config: DogeConfig):
super().__init__()
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.act_fn = ACT2FN[config.hidden_act]
self.num_experts = config.num_experts
self.num_keys = math.floor(math.sqrt(self.num_experts))
self.top_k = config.num_experts_per_tok
self.norm_topk_prob = config.norm_topk_prob
# shared expert
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=config.mlp_bias)
# router gate for retrieval experts
self.router_gate = nn.Linear(self.hidden_size, self.num_keys * 2, bias=False)
# routed experts
self.down_embed = nn.Embedding(self.num_experts, self.hidden_size)
self.up_embed = nn.Embedding(self.num_experts, self.hidden_size)
def forward(
self,
hidden_states: torch.Tensor,
**kwargs,
) -> torch.Tensor:
bsz, seq_len, _ = hidden_states.shape
# get routing logits with router gate
router_logits = self.router_gate(hidden_states).view(2, bsz * seq_len, -1)
# get experts with the highest routing logits
(scores_x, scores_y), (indices_x, indices_y) = router_logits.topk(self.num_keys, dim=-1)
all_scores = scores_x.unsqueeze(-1) + scores_y.unsqueeze(-2)
all_indices = indices_x.unsqueeze(-1) * self.num_keys + indices_y.unsqueeze(-2)
all_scores = all_scores.view(*all_scores.shape[:-2], -1)
all_indices = all_indices.view(*all_indices.shape[:-2], -1)
scores, position_indices = all_scores.topk(self.top_k, dim=-1)
indices = all_indices.gather(-1, position_indices)
routing_weights = F.softmax(scores, dim=-1)
if self.norm_topk_prob:
routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
# mix routed experts states with shared expert states
down_embed = self.down_embed(indices)
up_embed = self.up_embed(indices)
experts_weights = torch.matmul(down_embed, hidden_states.view(bsz * seq_len, -1, 1)).view(bsz * seq_len, -1)
experts_weights = self.act_fn(experts_weights) * routing_weights
experts_states = torch.matmul(experts_weights.view(bsz * seq_len, 1, -1), up_embed).view(bsz, seq_len, -1)
hidden_states = self.down_proj(self.act_fn(self.gate_proj(hidden_states)) * self.up_proj(hidden_states))
hidden_states = hidden_states + experts_states
return hidden_states, router_logits
class DogeDecoderLayer(GradientCheckpointingLayer):
def __init__(self, config: DogeConfig, layer_idx: Optional[int] = None):
super().__init__()
self.hidden_dropout = config.hidden_dropout
self.input_layernorm = DogeRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.self_attn = DogeAttention(config=config, layer_idx=layer_idx)
self.input_residual = nn.Parameter(torch.ones(config.hidden_size))
self.post_attention_layernorm = DogeRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.mlp = DogeMLP(config) if not config.is_moe else DogeCDMoE(config)
self.post_attention_residual = nn.Parameter(torch.ones(config.hidden_size))
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[tuple[torch.Tensor]] = None,
use_cache: Optional[bool] = False,
cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[TransformersKwargs],
) -> tuple[torch.FloatTensor, Optional[tuple[torch.FloatTensor, torch.FloatTensor]]]:
# sequence transformation
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
hidden_states, self_attn_weights = self.self_attn(
hidden_states=hidden_states,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
hidden_states = F.dropout(hidden_states, p=self.hidden_dropout, training=self.training)
hidden_states = self.input_residual * residual + hidden_states
# state transformation
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = F.dropout(hidden_states, p=self.hidden_dropout, training=self.training)
hidden_states = self.post_attention_residual * residual + hidden_states
return hidden_states
@auto_docstring
class DogePreTrainedModel(PreTrainedModel):
config_class = DogeConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = ["DogeDecoderLayer"]
_skip_keys_device_placement = ["past_key_values"]
_supports_flash_attn_2 = False
_supports_sdpa = True
_supports_flex_attn = True
_supports_cache_class = True
_supports_quantized_cache = True
_supports_static_cache = False
_supports_attention_backend = True
_can_record_outputs = {
"router_logits": OutputRecorder(DogeCDMoE, index=1),
"hidden_states": DogeDecoderLayer,
"attentions": DogeAttention,
}
_supports_flash_attn_3 = False
def _init_weights(self, module):
"""Initialize the weights"""
std = self.config.initializer_range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
elif isinstance(module, DogeRMSNorm):
module.weight.data.fill_(1.0)
if isinstance(module, DogeAttention):
if hasattr(module, "A"):
module.A.data.zero_()
elif isinstance(module, DogeDecoderLayer):
if hasattr(module, "input_residual"):
module.input_residual.data.fill_(1.0)
if hasattr(module, "post_attention_residual"):
module.post_attention_residual.data.fill_(1.0)
@auto_docstring
class DogeModel(DogePreTrainedModel):
def __init__(self, config: DogeConfig):
super().__init__(config)
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
self.layers = nn.ModuleList(
[DogeDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
)
self.norm = DogeRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.rotary_emb = DogeRotaryEmbedding(config=config)
self.gradient_checkpointing = False
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.embed_tokens
def set_input_embeddings(self, value):
self.embed_tokens = value
@check_model_inputs
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[TransformersKwargs],
) -> MoeModelOutputWithPast:
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
if use_cache and past_key_values is None:
past_key_values = DynamicCache()
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
if cache_position is None:
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
cache_position = torch.arange(
past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device
)
if position_ids is None:
position_ids = cache_position.unsqueeze(0)
mask_function = create_causal_mask if self.config.sliding_window is None else create_sliding_window_causal_mask
causal_mask = mask_function(
config=self.config,
input_embeds=inputs_embeds,
attention_mask=attention_mask,
cache_position=cache_position,
past_key_values=past_key_values,
position_ids=position_ids,
)
hidden_states = inputs_embeds
# create position embeddings to be shared across the decoder layers
position_embeddings = self.rotary_emb(hidden_states, position_ids)
for decoder_layer in self.layers[: self.config.num_hidden_layers]:
hidden_states = decoder_layer(
hidden_states,
position_embeddings=position_embeddings,
attention_mask=causal_mask,
position_ids=position_ids,
past_key_value=past_key_values,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
hidden_states = self.norm(hidden_states)
return MoeModelOutputWithPast( # only diff with Mistral is the output type, we need MoE
last_hidden_state=hidden_states,
past_key_values=past_key_values,
)
def load_balancing_loss_func(
gate_logits: Union[torch.Tensor, tuple[torch.Tensor], None],
num_experts: Optional[int] = None,
num_keys: Optional[int] = None,
top_k: int = 2,
attention_mask: Optional[torch.Tensor] = None,
) -> Union[torch.Tensor, int]:
r"""
Computes auxiliary load balancing loss as in Switch Transformer - implemented in Pytorch.
See Switch Transformer (https://arxiv.org/abs/2101.03961) for more details. This function implements the loss
function presented in equations (4) - (6) of the paper. It aims at penalizing cases where the routing between
experts is too unbalanced.
Args:
gate_logits:
Logits from the `router_gate`, should be a tuple of model.config.num_hidden_layers tensors of
shape [2, batch_size * sequence_length, num_keys].
num_experts:
Number of experts
num_keys:
Number of keys
top_k:
The number of experts to route per-token, can be also interpreted as the `top-k` routing
parameter.
attention_mask (`torch.Tensor`, *optional*):
The attention_mask used in forward function
shape [batch_size X sequence_length] if not None.
Returns:
The auxiliary loss.
"""
if gate_logits is None or not isinstance(gate_logits, tuple):
return 0
compute_dtype = gate_logits[0].dtype
compute_device = gate_logits[0].device
all_expert_indices = []
all_routing_weights = []
for layer_gate_logits in gate_logits:
layer_gate_logits = layer_gate_logits.to(compute_device)
(scores_x, scores_y), (indices_x, indices_y) = layer_gate_logits.topk(num_keys, dim=-1)
all_scores = scores_x.unsqueeze(-1) + scores_y.unsqueeze(-2)
all_indices = indices_x.unsqueeze(-1) * num_keys + indices_y.unsqueeze(-2)
all_scores = all_scores.view(*all_scores.shape[:-2], -1)
all_indices = all_indices.view(*all_indices.shape[:-2], -1)
_, position_indices = all_scores.topk(top_k, dim=-1)
expert_indices = all_indices.gather(-1, position_indices)
routing_weights = F.softmax(all_scores, dim=-1)
all_expert_indices.append(expert_indices)
all_routing_weights.append(routing_weights)
all_expert_indices = torch.cat(all_expert_indices, dim=0)
all_routing_weights = torch.cat(all_routing_weights, dim=0)
if attention_mask is None:
# Compute the percentage of tokens routed to each experts
all_expert_indices = all_expert_indices.view(-1)
tokens_per_expert = torch.zeros(num_experts, dtype=compute_dtype, device=compute_device)
pad = torch.ones_like(all_expert_indices, dtype=compute_dtype, device=compute_device)
tokens_per_expert = tokens_per_expert.scatter_add_(0, all_expert_indices, pad) / all_expert_indices.shape[0]
# Compute the average probability of routing to these experts
router_prob_per_expert = torch.mean(all_routing_weights, dim=0)
else:
batch_size, sequence_length = attention_mask.shape
num_hidden_layers = len(gate_logits)
# Compute the mask that masks all padding tokens as 0 with the same shape of expert_mask
expert_attention_mask = (
attention_mask[None, :, :, None]
.expand((num_hidden_layers, batch_size, sequence_length, top_k))
.reshape(-1)
.to(compute_device)
)
all_expert_indices = all_expert_indices.view(-1)[expert_attention_mask.bool()]
# Compute the percentage of tokens routed to each experts
tokens_per_expert = torch.zeros(num_experts, dtype=compute_dtype, device=compute_device)
pad = torch.ones_like(all_expert_indices, dtype=compute_dtype, device=compute_device)
tokens_per_expert = tokens_per_expert.scatter_add_(0, all_expert_indices, pad) / torch.sum(
expert_attention_mask
)
# Compute the mask that masks all padding tokens as 0 with the same shape of tokens_per_expert
router_per_expert_attention_mask = (
attention_mask[None, :, :, None]
.expand((num_hidden_layers, batch_size, sequence_length, num_experts))
.reshape(-1, num_experts)
.to(compute_device)
)
# Compute the average probability of routing to these experts
router_prob_per_expert = torch.sum(all_routing_weights * router_per_expert_attention_mask, dim=0) / torch.sum(
router_per_expert_attention_mask, dim=0
)
overall_loss = torch.sum(tokens_per_expert * router_prob_per_expert)
return overall_loss * num_experts
@auto_docstring
class DogeForCausalLM(DogePreTrainedModel, GenerationMixin):
_tied_weights_keys = ["lm_head.weight"]
_tp_plan = {"lm_head": "colwise_rep"}
_pp_plan = {"lm_head": (["hidden_states"], ["logits"])}
def __init__(self, config):
super().__init__(config)
self.model = DogeModel(config)
self.vocab_size = config.vocab_size
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
self.router_aux_loss_coef = config.router_aux_loss_coef
self.num_experts = config.num_experts
self.num_experts_per_tok = config.num_experts_per_tok
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.model.embed_tokens
def set_input_embeddings(self, value):
self.model.embed_tokens = value
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def set_decoder(self, decoder):
self.model = decoder
def get_decoder(self):
return self.model
@can_return_tuple
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[list[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0,
output_router_logits: Optional[bool] = None,
**kwargs: Unpack[TransformersKwargs],
) -> MoeCausalLMOutputWithPast:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Example:
```python
>>> from transformers import AutoTokenizer, DogeForCausalLM
>>> model = DogeForCausalLM.from_pretrained("SmallDoge/Doge-320M")
>>> tokenizer = AutoTokenizer.from_pretrained("SmallDoge/Doge-320M")
>>> prompt = "Hey, are you conscious? Can you talk to me?"
>>> inputs = tokenizer(prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(inputs.input_ids, max_length=30)
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you."
```"""
output_router_logits = (
output_router_logits if output_router_logits is not None else self.config.output_router_logits
)
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
outputs: MoeModelOutputWithPast = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
hidden_states = outputs.last_hidden_state
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :])
loss = None
if labels is not None:
loss = self.loss_function(logits, labels, self.vocab_size, **kwargs)
aux_loss = None
if output_router_logits:
aux_loss = load_balancing_loss_func(
outputs.router_logits,
self.num_experts,
math.floor(math.sqrt(self.num_experts)),
self.num_experts_per_tok,
attention_mask,
)
if labels is not None:
loss += self.router_aux_loss_coef * aux_loss.to(loss.device) # make sure to reside in the same device
return MoeCausalLMOutputWithPast(
loss=loss,
aux_loss=aux_loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
router_logits=outputs.router_logits,
)
@auto_docstring(
custom_intro="""
The Doge Model transformer with a sequence classification head on top (linear layer).
[`DogeForSequenceClassification`] uses the last token in order to do the classification, as other causal models
(e.g. GPT-2) do.
Since it does classification on the last token, it requires to know the position of the last token. If a
`pad_token_id` is defined in the configuration, it finds the last token that is not a padding token in each row. If
no `pad_token_id` is defined, it simply takes the last value in each row of the batch. Since it cannot guess the
padding tokens when `inputs_embeds` are passed instead of `input_ids`, it does the same (take the last value in
each row of the batch).
"""
)
class DogeForSequenceClassification(DogePreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.model = DogeModel(config)
self.score = nn.Linear(config.hidden_size, self.num_labels, bias=False)
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.model.embed_tokens
def set_input_embeddings(self, value):
self.model.embed_tokens = value
@can_return_tuple
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
**kwargs: Unpack[TransformersKwargs],
) -> SequenceClassifierOutputWithPast:
r"""
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
`config.num_labels > 1` a classification loss is computed (Cross-Entropy).
"""
transformer_outputs: BaseModelOutputWithPast = self.model(
input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
**kwargs,
)
hidden_states = transformer_outputs.last_hidden_state
logits = self.score(hidden_states)
if input_ids is not None:
batch_size = input_ids.shape[0]
else:
batch_size = inputs_embeds.shape[0]
if self.config.pad_token_id is None and batch_size != 1:
raise ValueError("Cannot handle batch sizes > 1 if no padding token is defined.")
if self.config.pad_token_id is None:
last_non_pad_token = -1
elif input_ids is not None:
# To handle both left- and right- padding, we take the rightmost token that is not equal to pad_token_id
non_pad_mask = (input_ids != self.config.pad_token_id).to(logits.device, torch.int32)
token_indices = torch.arange(input_ids.shape[-1], device=logits.device, dtype=torch.int32)
last_non_pad_token = (token_indices * non_pad_mask).argmax(-1)
else:
last_non_pad_token = -1
logger.warning_once(
f"{self.__class__.__name__} will not detect padding tokens in `inputs_embeds`. Results may be "
"unexpected if using padding tokens in conjunction with `inputs_embeds.`"
)
pooled_logits = logits[torch.arange(batch_size, device=logits.device), last_non_pad_token]
loss = None
if labels is not None:
loss = self.loss_function(logits=logits, labels=labels, pooled_logits=pooled_logits, config=self.config)
return SequenceClassifierOutputWithPast(
loss=loss,
logits=pooled_logits,
past_key_values=transformer_outputs.past_key_values,
hidden_states=transformer_outputs.hidden_states,
attentions=transformer_outputs.attentions,
)
__all__ = ["DogeForCausalLM", "DogeModel", "DogePreTrainedModel", "DogeForSequenceClassification"]

View File

@@ -0,0 +1,799 @@
# coding=utf-8
# Copyright 2025 Jingze Shi and the HuggingFace Inc. team. All rights reserved.
#
# The Doge family of small language models is trained by SmallDoge Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""PyTorch Doge model."""
import math
from typing import Callable, Optional, Union
import torch
import torch.nn.functional as F
from torch import nn
from ...activations import ACT2FN
from ...cache_utils import Cache
from ...configuration_utils import PretrainedConfig
from ...integrations.flex_attention import compile_friendly_flex_attention
from ...modeling_layers import GradientCheckpointingLayer
from ...modeling_outputs import MoeCausalLMOutputWithPast, MoeModelOutputWithPast
from ...modeling_rope_utils import rope_config_validation
from ...modeling_utils import AttentionInterface
from ...processing_utils import Unpack
from ...utils import TransformersKwargs, is_torch_flex_attn_available
from ...utils.generic import OutputRecorder
from ..llama.modeling_llama import (
LlamaForSequenceClassification,
LlamaMLP,
LlamaPreTrainedModel,
LlamaRMSNorm,
LlamaRotaryEmbedding,
apply_rotary_pos_emb,
eager_attention_forward,
repeat_kv,
)
from ..mixtral.modeling_mixtral import MixtralForCausalLM, MixtralModel
if is_torch_flex_attn_available():
from torch.nn.attention.flex_attention import BlockMask
class DogeConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`DogeModel`]. It is used to instantiate an Doge
model according to the specified arguments, defining the model architecture like [SmallDoge/Doge-320M](https://huggingface.co/SmallDoge/Doge-320M).
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vocab_size (`int`, *optional*, defaults to 32768):
Vocabulary size of the Doge2 model. Defines the number of different tokens that can be represented by the `inputs_ids` passed when calling [`DogeModel`]
hidden_size (`int`, *optional*, defaults to 1024):
Dimension of the hidden representations.
intermediate_size (`int`, *optional*, defaults to 2048):
Dimension of the MLP representations.
num_hidden_layers (`int`, *optional*, defaults to 32):
Number of hidden layers in the Transformer decoder.
hidden_dropout (`float`, *optional*, defaults to 0.0):
Dropout probability for each sequence transformation and state transformation module.
hidden_act (`str` or `function`, *optional*, defaults to `"silu"`):
The non-linear activation function (function or string) in the decoder.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
rms_norm_eps (`float`, *optional*, defaults to 1e-06):
The epsilon used by the rms normalization layers.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models). Only
relevant if `config.is_decoder=True`.
tie_word_embeddings (`bool`, *optional*, defaults to `False`):
Whether the model's input and output word embeddings should be tied.
max_position_embeddings (`int`, *optional*, defaults to 2048):
The maximum sequence length that this model might ever be used with.
rope_theta (`float`, *optional*, defaults to 10000.0):
The base period of the RoPE embeddings.
rope_scaling (`Dict`, *optional*):
Dictionary containing the scaling configuration for the RoPE embeddings.
NOTE: if you apply new rope type and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value accordingly.
Doge family of small models use `{ 'rope_type': 'dynamic', 'factor': 4.0, 'original_max_position_embeddings': 2048 }` as the default value.
Expected contents:
`rope_type` (`str`):
The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope', 'llama3'], with 'default' being the original RoPE implementation.
`factor` (`float`, *optional*):
Used with all rope types except 'default'. The scaling factor to apply to the RoPE embeddings.
In most scaling types, a `factor` of x will enable the model to handle sequences of length x * original maximum pre-trained length.
`original_max_position_embeddings` (`int`, *optional*):
Used with 'dynamic', 'longrope' and 'llama3'.
The original max position embeddings used during pretraining.
`attention_factor` (`float`, *optional*):
Used with 'yarn' and 'longrope'. The scaling factor to be applied on the attention
computation.
If unspecified, it defaults to value recommended by the implementation, using the `factor` field to infer the suggested value.
`beta_fast` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for extrapolation (only) in the linear
ramp function. If unspecified, it defaults to 32.
`beta_slow` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for interpolation (only) in the linear
ramp function. If unspecified, it defaults to 1.
`short_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to short contexts (<`original_max_position_embeddings`).
Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2
`long_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to long contexts (<`original_max_position_embeddings`).
Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2
`low_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to low frequency components of the RoPE
`high_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to high frequency components of the RoPE
num_attention_heads (`int`, *optional*, defaults to 8):
Number of attention heads for each attention layer in the Transformer decoder.
num_key_value_heads (`int`, *optional*):
This is the number of key_value heads that should be used to implement Grouped Query Attention.
If `num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if
`num_key_value_heads=1` the model will use Multi Query Attention (MQA) otherwise GQA is used.
When converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed by meanpooling all the original heads within that group.
For more details checkout [this paper](https://arxiv.org/pdf/2305.13245.pdf).
If it is not specified, will default to `num_attention_heads`.
attention_bias (`bool`, defaults to `False`, *optional*, defaults to `False`):
Whether to use a bias in the query, key, value and output projection layers during self-attention.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
mlp_bias (`bool`, *optional*, defaults to `False`):
Whether to use a bias in up_proj, down_proj and gate_proj layers in the MLP layers.
sliding_window (`int`, *optional*):
Sliding window attention window size. If not specified, will default to `None`.
keep_window_size (`int`, *optional*, defaults to 2048):
The window size of tokens that are not dynamically masked, and dynamic masking is only performed when the sequence length exceeds this value.
is_moe (`bool`, *optional*, defaults to `False`):
Whether to use the Cross Domain Mixture of Experts, if `True`, the MoE will inherit the MLP to initialize.
num_experts (`int`, *optional*, defaults to 16384):
Number of routed experts in the model. This is only used when `is_moe=True`.
num_experts_per_tok (`int`, *optional*, defaults to 64):
Number of selected experts to route per-token.
norm_topk_prob (`bool`, *optional*, defaults to `False`):
Whether to normalize the topk probabilities.
output_router_logits (`bool`, *optional*, defaults to `False`):
Whether or not the router logits should be returned by the model. Enabling this will also
allow the model to output the auxiliary loss, including load balancing loss and router z-loss.
router_aux_loss_coef (`float`, *optional*, defaults to 0.001):
The aux loss factor for the total loss.
```python
>>> from transformers import DogeConfig, DogeModel
>>> # Initializing a Doge-320M style configuration
>>> configuration = DogeConfig()
>>> # Initializing a model from the Doge-320M style configuration
>>> model = DogeModel(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "doge"
keys_to_ignore_at_inference = ["past_key_values"]
# Default tensor parallel plan for base model `DogeModel`
base_model_tp_plan = {
"layers.*.self_attn.q_proj": "colwise",
"layers.*.self_attn.k_proj": "colwise",
"layers.*.self_attn.v_proj": "colwise",
"layers.*.self_attn.dt_proj": "rowwise",
"layers.*.self_attn.o_proj": "rowwise",
"layers.*.input_layernorm.weight": "sequence_parallel",
"layers.*.input_residual.weight": "sequence_parallel",
"layers.*.post_attention_layernorm.weight": "sequence_parallel",
"layers.*.post_attention_residual.weight": "sequence_parallel",
"norm.weight": "sequence_parallel",
"layers.*.mlp.gate_proj": "colwise",
"layers.*.mlp.up_proj": "colwise",
"layers.*.mlp.down_proj": "rowwise",
"layers.*.mlp.router_gate": "colwise_rep",
"layers.*.mlp.down_embed": "rowwise_rep",
"layers.*.mlp.up_embed": "rowwise_rep",
}
base_model_pp_plan = {
"embed_tokens": (["input_ids"], ["inputs_embeds"]),
"layers": (["hidden_states", "attention_mask"], ["hidden_states"]),
"norm": (["hidden_states"], ["hidden_states"]),
}
def __init__(
self,
vocab_size=32768,
hidden_size=1024,
intermediate_size=2048,
num_hidden_layers=32,
hidden_dropout=0.0,
hidden_act="silu",
initializer_range=0.02,
rms_norm_eps=1e-06,
use_cache=True,
tie_word_embeddings=False,
max_position_embeddings=2048,
rope_theta=10000.0,
rope_scaling=None,
num_attention_heads=8,
num_key_value_heads=None,
attention_bias=False,
attention_dropout=0.0,
mlp_bias=False,
sliding_window=None,
keep_window_size=2048,
is_moe=False,
num_experts=16384,
num_experts_per_tok=64,
norm_topk_prob=False,
output_router_logits=False,
router_aux_loss_coef=0.001,
**kwargs,
):
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.hidden_dropout = hidden_dropout
self.hidden_act = hidden_act
self.initializer_range = initializer_range
self.rms_norm_eps = rms_norm_eps
self.use_cache = use_cache
self.max_position_embeddings = max_position_embeddings
self.rope_theta = rope_theta
self.rope_scaling = rope_scaling
self.num_attention_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.attention_bias = attention_bias
self.attention_dropout = attention_dropout
self.mlp_bias = mlp_bias
self.sliding_window = sliding_window
self.keep_window_size = keep_window_size
self.is_moe = is_moe
self.num_experts = num_experts
self.num_experts_per_tok = num_experts_per_tok
self.norm_topk_prob = norm_topk_prob
self.output_router_logits = output_router_logits
self.router_aux_loss_coef = router_aux_loss_coef
# Validate the correctness of rotary position embeddings parameters
# BC: if there is a 'type' field, copy it it to 'rope_type'.
if self.rope_scaling is not None and "type" in self.rope_scaling:
self.rope_scaling["rope_type"] = self.rope_scaling["type"]
rope_config_validation(self)
# for backward compatibility
if num_key_value_heads is None:
self.num_key_value_heads = num_attention_heads
super().__init__(
tie_word_embeddings=tie_word_embeddings,
**kwargs,
)
class DogeRMSNorm(LlamaRMSNorm):
pass
class DogeRotaryEmbedding(LlamaRotaryEmbedding):
pass
def flex_attention_forward(
module: nn.Module,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Union[torch.Tensor, "BlockMask"],
scaling: Optional[float] = None,
softcap: Optional[float] = None,
head_mask: Optional[torch.Tensor] = None,
**kwargs,
) -> tuple[torch.Tensor, torch.Tensor]:
block_mask = None
causal_mask = None
if isinstance(attention_mask, BlockMask):
block_mask = attention_mask
else:
causal_mask = attention_mask
if causal_mask is not None:
causal_mask = causal_mask[:, :, :, : key.shape[-2]]
def score_mod(score, batch_idx, head_idx, q_idx, kv_idx):
if softcap is not None:
score = softcap * torch.tanh(score / softcap)
if causal_mask is not None:
score = score + causal_mask[batch_idx][head_idx][q_idx][kv_idx]
if head_mask is not None:
score = score + head_mask[batch_idx][head_idx][0][0]
return score
attn_output, attention_weights = compile_friendly_flex_attention(
query,
key,
value,
score_mod=score_mod,
block_mask=block_mask,
enable_gqa=True,
scale=scaling,
# Last time checked on PyTorch == 2.5.1: Flex Attention always computes the lse regardless.
# For simplification, we thus always return it as no additional computations are introduced.
return_lse=True,
)
# lse is returned in float32
attention_weights = attention_weights.to(value.dtype)
attn_output = attn_output.transpose(1, 2).contiguous()
return attn_output, attention_weights
ALL_ATTENTION_FUNCTIONS = AttentionInterface()
ALL_ATTENTION_FUNCTIONS["doge_flex_attention"] = flex_attention_forward
class DogeAttention(nn.Module):
def __init__(self, config: DogeConfig, layer_idx: Optional[int] = None):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads)
self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads
self.scaling = self.head_dim**-0.5
self.attention_dropout = config.attention_dropout
self.keep_window_size = config.keep_window_size
self.q_proj = nn.Linear(
config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias
)
self.k_proj = nn.Linear(
config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
)
self.v_proj = nn.Linear(
config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
)
# dynamic mask for the QK^T attention weights matrix
self.A = nn.Parameter(torch.zeros(config.num_key_value_heads))
self.dt_proj = nn.Linear(
config.num_key_value_heads * self.head_dim, config.num_key_value_heads, bias=config.attention_bias
)
self.o_proj = nn.Linear(
config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias
)
self.q_norm = DogeRMSNorm(self.head_dim, eps=config.rms_norm_eps)
self.k_norm = DogeRMSNorm(self.head_dim, eps=config.rms_norm_eps)
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
**kwargs,
) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]:
input_shape = hidden_states.shape[:-1]
hidden_shape = (*input_shape, -1, self.head_dim)
query_states = self.q_norm(self.q_proj(hidden_states).view(hidden_shape)).transpose(1, 2)
key_states = self.k_norm(self.k_proj(hidden_states).view(hidden_shape)).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2)
cos, sin = position_embeddings
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)
if past_key_value is not None:
# sin and cos are specific to RoPE models; cache_position needed for the static cache
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
# calculate dynamic mask from value_states
dt_states = self.dt_proj(
value_states.transpose(1, 2).reshape(value_states.shape[0], value_states.shape[-2], -1)
)
dt_states = torch.exp(self.A * F.softplus(dt_states)).transpose(-1, -2)
attn_mask = self.prepare_dynamic_mask(
hidden_states=hidden_states,
dt_states=dt_states,
keep_window_size=self.keep_window_size,
attention_mask=attention_mask,
)
attn_mask = repeat_kv(attn_mask, self.num_key_value_groups)
attention_interface: Callable = eager_attention_forward
if self.config._attn_implementation != "eager":
attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation]
attn_output, attn_weights = attention_interface(
self,
query_states,
key_states,
value_states,
attention_mask=attn_mask,
dropout=0.0 if not self.training else self.attention_dropout,
scaling=self.scaling,
**kwargs,
)
attn_output = attn_output.reshape(*input_shape, -1).contiguous()
attn_output = self.o_proj(attn_output)
return attn_output, attn_weights
def prepare_dynamic_mask(
self,
hidden_states: torch.Tensor,
dt_states: torch.Tensor,
keep_window_size: int = 2048,
attention_mask: Optional[torch.Tensor] = None,
):
"""
The core idea of DMA is to calculate the dynamic attention mask to mask the tokens that should be masked, so as to form sparse attention.
Combine `dt_states` with `attention_mask` to generate the final `attn_mask`.
Args:
hidden_states (`torch.Tensor`): The input hidden_states, used to determine the minimum value of the current input precision.
dt_states (`torch.Tensor`): dt_states of shape `(batch_size, num_heads, key_sequence_length)`.
keep_window_size (`int`): The window size of tokens that are not dynamically masked, and dynamic masking is only performed when the sequence length exceeds this value.
attention_mask (`torch.Tensor`, *optional*): attention mask of shape `(batch_size, 1, query_sequence_length, key_sequence_length)`.
"""
min_dtype = torch.finfo(hidden_states.dtype).min
dtype = hidden_states.dtype
attn_mask = dt_states[:, :, None, :].expand(
-1, -1, hidden_states.shape[1], -1
) # [batch_size, num_heads, query_len, key_len]
if attention_mask is not None and not isinstance(attention_mask, BlockMask):
if attention_mask.dtype == torch.bool:
dtype = hidden_states.dtype
attention_mask = torch.where(
attention_mask, torch.tensor(0.0, device=attention_mask.device, dtype=dtype), min_dtype
)
attn_mask = attn_mask.masked_fill(attention_mask[:, :, :, : attn_mask.shape[-1]] != 0, min_dtype)
if attn_mask.shape[-1] > keep_window_size:
active_mask = torch.zeros_like(attn_mask, dtype=dtype, device=attn_mask.device)
topk_indices = torch.topk(attn_mask, keep_window_size, dim=-1, largest=True, sorted=False).indices
active_mask = active_mask.scatter(-1, topk_indices, 1.0)
attn_mask = attn_mask.masked_fill(active_mask == 0.0, min_dtype)
return attn_mask
class DogeMLP(LlamaMLP):
pass
class DogeCDMoE(nn.Module):
def __init__(self, config: DogeConfig):
super().__init__()
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.act_fn = ACT2FN[config.hidden_act]
self.num_experts = config.num_experts
self.num_keys = math.floor(math.sqrt(self.num_experts))
self.top_k = config.num_experts_per_tok
self.norm_topk_prob = config.norm_topk_prob
# shared expert
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=config.mlp_bias)
# router gate for retrieval experts
self.router_gate = nn.Linear(self.hidden_size, self.num_keys * 2, bias=False)
# routed experts
self.down_embed = nn.Embedding(self.num_experts, self.hidden_size)
self.up_embed = nn.Embedding(self.num_experts, self.hidden_size)
def forward(
self,
hidden_states: torch.Tensor,
**kwargs,
) -> torch.Tensor:
bsz, seq_len, _ = hidden_states.shape
# get routing logits with router gate
router_logits = self.router_gate(hidden_states).view(2, bsz * seq_len, -1)
# get experts with the highest routing logits
(scores_x, scores_y), (indices_x, indices_y) = router_logits.topk(self.num_keys, dim=-1)
all_scores = scores_x.unsqueeze(-1) + scores_y.unsqueeze(-2)
all_indices = indices_x.unsqueeze(-1) * self.num_keys + indices_y.unsqueeze(-2)
all_scores = all_scores.view(*all_scores.shape[:-2], -1)
all_indices = all_indices.view(*all_indices.shape[:-2], -1)
scores, position_indices = all_scores.topk(self.top_k, dim=-1)
indices = all_indices.gather(-1, position_indices)
routing_weights = F.softmax(scores, dim=-1)
if self.norm_topk_prob:
routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
# mix routed experts states with shared expert states
down_embed = self.down_embed(indices)
up_embed = self.up_embed(indices)
experts_weights = torch.matmul(down_embed, hidden_states.view(bsz * seq_len, -1, 1)).view(bsz * seq_len, -1)
experts_weights = self.act_fn(experts_weights) * routing_weights
experts_states = torch.matmul(experts_weights.view(bsz * seq_len, 1, -1), up_embed).view(bsz, seq_len, -1)
hidden_states = self.down_proj(self.act_fn(self.gate_proj(hidden_states)) * self.up_proj(hidden_states))
hidden_states = hidden_states + experts_states
return hidden_states, router_logits
class DogeDecoderLayer(GradientCheckpointingLayer):
def __init__(self, config: DogeConfig, layer_idx: Optional[int] = None):
super().__init__()
self.hidden_dropout = config.hidden_dropout
self.input_layernorm = DogeRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.self_attn = DogeAttention(config=config, layer_idx=layer_idx)
self.input_residual = nn.Parameter(torch.ones(config.hidden_size))
self.post_attention_layernorm = DogeRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.mlp = DogeMLP(config) if not config.is_moe else DogeCDMoE(config)
self.post_attention_residual = nn.Parameter(torch.ones(config.hidden_size))
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[tuple[torch.Tensor]] = None,
use_cache: Optional[bool] = False,
cache_position: Optional[torch.LongTensor] = None,
**kwargs: Unpack[TransformersKwargs],
) -> tuple[torch.FloatTensor, Optional[tuple[torch.FloatTensor, torch.FloatTensor]]]:
# sequence transformation
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
hidden_states, self_attn_weights = self.self_attn(
hidden_states=hidden_states,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
hidden_states = F.dropout(hidden_states, p=self.hidden_dropout, training=self.training)
hidden_states = self.input_residual * residual + hidden_states
# state transformation
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = F.dropout(hidden_states, p=self.hidden_dropout, training=self.training)
hidden_states = self.post_attention_residual * residual + hidden_states
return hidden_states
class DogePreTrainedModel(LlamaPreTrainedModel):
_supports_flash_attn_3 = False
_supports_flash_attn_2 = False
_supports_static_cache = False
_can_record_outputs = {
"router_logits": OutputRecorder(DogeCDMoE, index=1),
"hidden_states": DogeDecoderLayer,
"attentions": DogeAttention,
}
def _init_weights(self, module):
"""Initialize the weights"""
super()._init_weights(module)
if isinstance(module, DogeAttention):
if hasattr(module, "A"):
module.A.data.zero_()
elif isinstance(module, DogeDecoderLayer):
if hasattr(module, "input_residual"):
module.input_residual.data.fill_(1.0)
if hasattr(module, "post_attention_residual"):
module.post_attention_residual.data.fill_(1.0)
class DogeModel(MixtralModel):
pass
def load_balancing_loss_func(
gate_logits: Union[torch.Tensor, tuple[torch.Tensor], None],
num_experts: Optional[int] = None,
num_keys: Optional[int] = None,
top_k: int = 2,
attention_mask: Optional[torch.Tensor] = None,
) -> Union[torch.Tensor, int]:
r"""
Computes auxiliary load balancing loss as in Switch Transformer - implemented in Pytorch.
See Switch Transformer (https://arxiv.org/abs/2101.03961) for more details. This function implements the loss
function presented in equations (4) - (6) of the paper. It aims at penalizing cases where the routing between
experts is too unbalanced.
Args:
gate_logits:
Logits from the `router_gate`, should be a tuple of model.config.num_hidden_layers tensors of
shape [2, batch_size * sequence_length, num_keys].
num_experts:
Number of experts
num_keys:
Number of keys
top_k:
The number of experts to route per-token, can be also interpreted as the `top-k` routing
parameter.
attention_mask (`torch.Tensor`, *optional*):
The attention_mask used in forward function
shape [batch_size X sequence_length] if not None.
Returns:
The auxiliary loss.
"""
if gate_logits is None or not isinstance(gate_logits, tuple):
return 0
compute_dtype = gate_logits[0].dtype
compute_device = gate_logits[0].device
all_expert_indices = []
all_routing_weights = []
for layer_gate_logits in gate_logits:
layer_gate_logits = layer_gate_logits.to(compute_device)
(scores_x, scores_y), (indices_x, indices_y) = layer_gate_logits.topk(num_keys, dim=-1)
all_scores = scores_x.unsqueeze(-1) + scores_y.unsqueeze(-2)
all_indices = indices_x.unsqueeze(-1) * num_keys + indices_y.unsqueeze(-2)
all_scores = all_scores.view(*all_scores.shape[:-2], -1)
all_indices = all_indices.view(*all_indices.shape[:-2], -1)
_, position_indices = all_scores.topk(top_k, dim=-1)
expert_indices = all_indices.gather(-1, position_indices)
routing_weights = F.softmax(all_scores, dim=-1)
all_expert_indices.append(expert_indices)
all_routing_weights.append(routing_weights)
all_expert_indices = torch.cat(all_expert_indices, dim=0)
all_routing_weights = torch.cat(all_routing_weights, dim=0)
if attention_mask is None:
# Compute the percentage of tokens routed to each experts
all_expert_indices = all_expert_indices.view(-1)
tokens_per_expert = torch.zeros(num_experts, dtype=compute_dtype, device=compute_device)
pad = torch.ones_like(all_expert_indices, dtype=compute_dtype, device=compute_device)
tokens_per_expert = tokens_per_expert.scatter_add_(0, all_expert_indices, pad) / all_expert_indices.shape[0]
# Compute the average probability of routing to these experts
router_prob_per_expert = torch.mean(all_routing_weights, dim=0)
else:
batch_size, sequence_length = attention_mask.shape
num_hidden_layers = len(gate_logits)
# Compute the mask that masks all padding tokens as 0 with the same shape of expert_mask
expert_attention_mask = (
attention_mask[None, :, :, None]
.expand((num_hidden_layers, batch_size, sequence_length, top_k))
.reshape(-1)
.to(compute_device)
)
all_expert_indices = all_expert_indices.view(-1)[expert_attention_mask.bool()]
# Compute the percentage of tokens routed to each experts
tokens_per_expert = torch.zeros(num_experts, dtype=compute_dtype, device=compute_device)
pad = torch.ones_like(all_expert_indices, dtype=compute_dtype, device=compute_device)
tokens_per_expert = tokens_per_expert.scatter_add_(0, all_expert_indices, pad) / torch.sum(
expert_attention_mask
)
# Compute the mask that masks all padding tokens as 0 with the same shape of tokens_per_expert
router_per_expert_attention_mask = (
attention_mask[None, :, :, None]
.expand((num_hidden_layers, batch_size, sequence_length, num_experts))
.reshape(-1, num_experts)
.to(compute_device)
)
# Compute the average probability of routing to these experts
router_prob_per_expert = torch.sum(all_routing_weights * router_per_expert_attention_mask, dim=0) / torch.sum(
router_per_expert_attention_mask, dim=0
)
overall_loss = torch.sum(tokens_per_expert * router_prob_per_expert)
return overall_loss * num_experts
class DogeForCausalLM(MixtralForCausalLM):
def __init__(self, config):
super().__init__(config)
self.model = DogeModel(config)
self.num_experts = config.num_experts
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[list[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0,
output_router_logits: Optional[bool] = None,
**kwargs: Unpack[TransformersKwargs],
) -> MoeCausalLMOutputWithPast:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Example:
```python
>>> from transformers import AutoTokenizer, DogeForCausalLM
>>> model = DogeForCausalLM.from_pretrained("SmallDoge/Doge-320M")
>>> tokenizer = AutoTokenizer.from_pretrained("SmallDoge/Doge-320M")
>>> prompt = "Hey, are you conscious? Can you talk to me?"
>>> inputs = tokenizer(prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(inputs.input_ids, max_length=30)
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you."
```"""
output_router_logits = (
output_router_logits if output_router_logits is not None else self.config.output_router_logits
)
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
outputs: MoeModelOutputWithPast = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
cache_position=cache_position,
**kwargs,
)
hidden_states = outputs.last_hidden_state
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :])
loss = None
if labels is not None:
loss = self.loss_function(logits, labels, self.vocab_size, **kwargs)
aux_loss = None
if output_router_logits:
aux_loss = load_balancing_loss_func(
outputs.router_logits,
self.num_experts,
math.floor(math.sqrt(self.num_experts)),
self.num_experts_per_tok,
attention_mask,
)
if labels is not None:
loss += self.router_aux_loss_coef * aux_loss.to(loss.device) # make sure to reside in the same device
return MoeCausalLMOutputWithPast(
loss=loss,
aux_loss=aux_loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
router_logits=outputs.router_logits,
)
class DogeForSequenceClassification(LlamaForSequenceClassification):
pass
__all__ = [
"DogeConfig",
"DogeForCausalLM",
"DogeModel",
"DogePreTrainedModel",
"DogeForSequenceClassification",
]

View File

View File

@@ -0,0 +1,373 @@
# coding=utf-8
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch Doge model."""
import unittest
from transformers import AutoTokenizer, DogeConfig, is_torch_available, set_seed
from transformers.testing_utils import (
require_read_token,
require_torch,
require_torch_accelerator,
slow,
torch_device,
)
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, ids_tensor
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import (
DogeForCausalLM,
DogeForSequenceClassification,
DogeModel,
)
class DogeModelTester:
def __init__(
self,
parent,
batch_size=8,
seq_length=16,
is_training=True,
use_input_mask=True,
use_token_type_ids=False,
use_labels=True,
vocab_size=128,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=64,
hidden_act="silu",
max_position_embeddings=512,
type_vocab_size=16,
type_sequence_label_size=2,
initializer_range=0.02,
num_labels=3,
pad_token_id=0,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_token_type_ids = use_token_type_ids
self.use_labels = use_labels
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.num_labels = num_labels
self.pad_token_id = pad_token_id
self.scope = scope
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = torch.tril(torch.ones_like(input_ids).to(torch_device))
token_type_ids = None
if self.use_token_type_ids:
token_type_ids = ids_tensor([self.batch_size, self.seq_length], self.type_vocab_size)
sequence_labels = None
token_labels = None
if self.use_labels:
sequence_labels = ids_tensor([self.batch_size], self.type_sequence_label_size)
token_labels = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
config = self.get_config()
return config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels
def get_config(self):
return DogeConfig(
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
max_position_embeddings=self.max_position_embeddings,
type_vocab_size=self.type_vocab_size,
is_decoder=False,
initializer_range=self.initializer_range,
pad_token_id=self.pad_token_id,
)
def create_and_check_model(self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels):
model = DogeModel(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask)
result = model(input_ids)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def create_and_check_model_as_decoder(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
encoder_hidden_states,
encoder_attention_mask,
):
config.add_cross_attention = True
model = DogeModel(config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
)
result = model(
input_ids,
attention_mask=input_mask,
encoder_hidden_states=encoder_hidden_states,
)
result = model(input_ids, attention_mask=input_mask)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def create_and_check_for_causal_lm(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
encoder_hidden_states,
encoder_attention_mask,
):
model = DogeForCausalLM(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, labels=token_labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
def create_and_check_decoder_model_past_large_inputs(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
encoder_hidden_states,
encoder_attention_mask,
):
config.is_decoder = True
config.add_cross_attention = True
model = DogeForCausalLM(config=config)
model.to(torch_device)
model.eval()
# first forward pass
outputs = model(
input_ids,
attention_mask=input_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
use_cache=True,
)
past_key_values = outputs.past_key_values
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
next_mask = ids_tensor((self.batch_size, 3), vocab_size=2)
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
next_attention_mask = torch.cat([input_mask, next_mask], dim=-1)
output_from_no_past = model(
next_input_ids,
attention_mask=next_attention_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
output_hidden_states=True,
)["hidden_states"][0]
output_from_past = model(
next_tokens,
attention_mask=next_attention_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
past_key_values=past_key_values,
output_hidden_states=True,
)["hidden_states"][0]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
) = config_and_inputs
inputs_dict = {"input_ids": input_ids, "attention_mask": input_mask}
return config, inputs_dict
@require_torch
class DogeModelTest(ModelTesterMixin, GenerationTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (
(
DogeModel,
DogeForCausalLM,
DogeForSequenceClassification,
)
if is_torch_available()
else ()
)
all_generative_model_classes = (DogeForCausalLM,) if is_torch_available() else ()
pipeline_model_mapping = (
{
"feature-extraction": DogeModel,
"text-classification": DogeForSequenceClassification,
"text-generation": DogeForCausalLM,
"zero-shot": DogeForSequenceClassification,
}
if is_torch_available()
else {}
)
has_attentions = False
test_headmasking = False
test_pruning = False
test_torchscript = False
fx_compatible = False
# Need to use `0.8` instead of `0.9` for `test_cpu_offload`
# This is because we are hitting edge cases with the causal_mask buffer
model_split_percents = [0.5, 0.7, 0.8]
# used in `test_torch_compile_for_training`
_torch_compile_train_cls = DogeForCausalLM if is_torch_available() else None
def setUp(self):
self.model_tester = DogeModelTester(self)
self.config_tester = ConfigTester(self, config_class=DogeConfig, hidden_size=32)
def test_config(self):
self.config_tester.run_common_tests()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_doge_sequence_classification_model(self):
config, input_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.num_labels = 3
input_ids = input_dict["input_ids"]
attention_mask = input_ids.ne(1).to(torch_device)
sequence_labels = ids_tensor([self.model_tester.batch_size], self.model_tester.type_sequence_label_size)
model = DogeForSequenceClassification(config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=attention_mask, labels=sequence_labels)
self.assertEqual(result.logits.shape, (self.model_tester.batch_size, self.model_tester.num_labels))
def test_doge_sequence_classification_model_for_single_label(self):
config, input_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.num_labels = 3
config.problem_type = "single_label_classification"
input_ids = input_dict["input_ids"]
attention_mask = input_ids.ne(1).to(torch_device)
sequence_labels = ids_tensor([self.model_tester.batch_size], self.model_tester.type_sequence_label_size)
model = DogeForSequenceClassification(config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=attention_mask, labels=sequence_labels)
self.assertEqual(result.logits.shape, (self.model_tester.batch_size, self.model_tester.num_labels))
def test_doge_sequence_classification_model_for_multi_label(self):
config, input_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.num_labels = 3
config.problem_type = "multi_label_classification"
input_ids = input_dict["input_ids"]
attention_mask = input_ids.ne(1).to(torch_device)
sequence_labels = ids_tensor(
[self.model_tester.batch_size, config.num_labels], self.model_tester.type_sequence_label_size
).to(torch.float)
model = DogeForSequenceClassification(config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=attention_mask, labels=sequence_labels)
self.assertEqual(result.logits.shape, (self.model_tester.batch_size, self.model_tester.num_labels))
@unittest.skip(reason="Doge buffers include complex numbers, which breaks this test")
def test_save_load_fast_init_from_base(self):
pass
@require_torch_accelerator
class DogeIntegrationTest(unittest.TestCase):
# This variable is used to determine which CUDA device are we using for our runners (A10 or T4)
# Depending on the hardware we get different logits / generations
cuda_compute_capability_major_version = None
@classmethod
def setUpClass(cls):
if is_torch_available() and torch.cuda.is_available():
# 8 is for A100 / A10 and 7 for T4
cls.cuda_compute_capability_major_version = torch.cuda.get_device_capability()[0]
@slow
@require_read_token
def test_Doge_20M_hard(self):
"""
An integration test for Doge-20M. It tests against a long output to ensure the subtle numerical differences
"""
EXPECTED_TEXT = "Here's everything I know about dogs. Dogs is the best animal in the world. It is a very popular and popular dog in the United States. It is a very popular"
tokenizer = AutoTokenizer.from_pretrained("SmallDoge/Doge-20M")
model = DogeForCausalLM.from_pretrained("SmallDoge/Doge-20M", device_map="auto", torch_dtype=torch.bfloat16)
input_text = ["Here's everything I know about dogs. Dogs is the best animal in the"]
set_seed(0)
model_inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
generated_ids = model.generate(**model_inputs, max_new_tokens=20, do_sample=False)
generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
self.assertEqual(generated_text, EXPECTED_TEXT)