Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions examples/qwen3/conf/train/32b_te_cx_gems_nsys.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
system:
no_shared_fs: ${experiment.runner.no_shared_fs}
num_workers: 2
tensor_model_parallel_size: 8
pipeline_model_parallel_size: 1
expert_model_parallel_size: 1
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: true
overlap_grad_reduce: true
overlap_param_gather: true
# profiling
profile: true
profile_step_start: 5
profile_step_end: 15
profile_ranks: [0,7,8,15]
precision:
bf16: true
attention_softmax_in_fp32: true
accumulate_allreduce_grads_in_fp32: true
logging:
log_interval: 1
tensorboard_log_interval: 1
wandb_project: ${experiment.exp_name}
wandb_exp_name: ${experiment.exp_name}
log_timers_to_tensorboard: true
log_validation_ppl_to_tensorboard: true
log_throughput: true
log_params_norm: true
log_num_zeros_in_grad: true
log_memory_to_tensorboard: true
checkpoint:
save_interval: ${experiment.save_steps}
load: ${experiment.load}
ckpt_format: ${experiment.ckpt_format}

model:
transformer_impl: transformer_engine
use_transformer_engine_fl: true
flag_gems_log_path: /share/project/lixianduo/scale_gems_cx/gems_te_cx_gems_log
flag_gems_unused: ['index_put', 'index_put_']
num_layers: 4
hidden_size: 5120
ffn_hidden_size: 25600
num_attention_heads: 64
kv_channels: 128
group_query_attention: true
num_query_groups: 8
seq_length: 4096
max_position_embeddings: 40960
norm_epsilon: 1e-6
use_rotary_position_embeddings: true
rotary_base: 1000000
swiglu: true
normalization: RMSNorm
qk_layernorm: true
init_method_std: 0.02
attention_dropout: 0.0
hidden_dropout: 0.0
untie_embeddings_and_output_weights: true
no_position_embedding: true
no_rope_fusion: true
disable_bias_linear: true

# training
seed: ${experiment.seed}
finetune: false
micro_batch_size: 1
global_batch_size: 8 #2048
eval_iters: 0
train_iters: 102400

optimizer:
clip_grad: 1.0
weight_decay: 0.1
adam_beta1: 0.9
adam_beta2: 0.95
lr_scheduler:
lr: 3.0e-3
min_lr: 3.0e-4
lr_warmup_fraction: 0.1
lr_decay_style: WSD
lr_wsd_decay_style: cosine
lr_wsd_decay_iters: 10

data:
reset_position_ids: True
reset_attention_mask: True
# data_path: /share/project/lixianduo/demo_data/pile/pile_wikipedia_demo
data_path: /share/project/lizhiyu/hetero_data/HQ_wo_fim/Nemotron-CC-high-actual-actual-high_text_document
split: 1
no_mmap_bin_files: true
tokenizer:
legacy_tokenizer: true
tokenizer_type: QwenTokenizerFS
tokenizer_path: /share/project/lixianduo/qwentokenizer
vocab_size: 151851
make_vocab_size_divisible_by: 64
36 changes: 36 additions & 0 deletions examples/qwen3/conf/train_te_cx_gems_nsys.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defaults:
- _self_
# - train: 30b_a3b
- train: 32b_te_cx_gems_nsys

experiment:
# exp_name: Qwen3-30b-a3b-Train
exp_name: Qwen3-32b-Train
seed: 42
save_steps: 10000
load: None
exp_dir: /share/project/lixianduo/scale_gems_cx/experiments_te_cx_gems_nsys/${experiment.exp_name}
ckpt_format: torch
task:
type: train
backend: megatron
entrypoint: flagscale/train/train_gpt.py
runner:
per_node_task: false
no_shared_fs: false
rdzv_backend: static
hostfile: /share/project/lixianduo/scale_gems_cx/1node_hostfile
ssh_port: 7878
cmds:
before_start: ulimit -n 1048576 && source /root/miniconda3/bin/activate /share/project/lixianduo/envs/flagscale-train-gems-cx
envs:
LOGLEVEL: "INFO"
CUDA_VISIBLE_DEVICES: "0,1,2,3,4,5,6,7"
CUDA_DEVICE_MAX_CONNECTIONS: 1
USE_NSYS_PROFILE: True

action: run

hydra:
run:
dir: ${experiment.exp_dir}/hydra
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
diff --git a/megatron/core/tensor_parallel/layers.py b/megatron/core/tensor_parallel/layers.py
index 5c6f34b70..ae07dc556 100644
--- a/megatron/core/tensor_parallel/layers.py
+++ b/megatron/core/tensor_parallel/layers.py
@@ -448,6 +448,7 @@ class LinearWithGradAccumulationAndAsyncCommunication(torch.autograd.Function):
grad_output_buffer,
wgrad_deferral_limit,
tp_group,
+ use_transformer_engine_fl,
):
"""Forward."""
if gradient_accumulation_fusion and hasattr(weight, "main_grad"):
@@ -466,6 +467,7 @@ class LinearWithGradAccumulationAndAsyncCommunication(torch.autograd.Function):
ctx.wgrad_deferral_limit = wgrad_deferral_limit
ctx.grad_output_buffer = grad_output_buffer
ctx.tp_group = tp_group
+ ctx.use_transformer_engine_fl = use_transformer_engine_fl

if sequence_parallel:
dim_size = list(input.size())
@@ -556,16 +558,23 @@ class LinearWithGradAccumulationAndAsyncCommunication(torch.autograd.Function):
if hasattr(weight, "__fsdp_param__"):
weight.main_grad = weight.get_main_grad()

- if weight.main_grad.dtype == torch.float32:
- fused_weight_gradient_mlp_cuda.wgrad_gemm_accum_fp32(
- total_input, grad_output, weight.main_grad
- )
- elif weight.main_grad.dtype in (torch.float16, torch.bfloat16):
- fused_weight_gradient_mlp_cuda.wgrad_gemm_accum_fp16(
- total_input, grad_output, weight.main_grad
- )
+ if not ctx.use_transformer_engine_fl:
+ if weight.main_grad.dtype == torch.float32:
+ fused_weight_gradient_mlp_cuda.wgrad_gemm_accum_fp32(
+ total_input, grad_output, weight.main_grad
+ )
+ elif weight.main_grad.dtype in (torch.float16, torch.bfloat16):
+ fused_weight_gradient_mlp_cuda.wgrad_gemm_accum_fp16(
+ total_input, grad_output, weight.main_grad
+ )
+ else:
+ raise RuntimeError("Unsupported gradient type for gradient accumulation fusion")
else:
- raise RuntimeError("Unsupported gradient type for gradient accumulation fusion")
+ if weight.main_grad.dtype in (torch.float32, torch.float16, torch.bfloat16):
+ grad_weight = torch.matmul(grad_output.t(), total_input)
+ weight.main_grad += grad_weight.view_as(weight.main_grad)
+ else:
+ raise RuntimeError("Unsupported gradient type for gradient accumulation fusion")

if hasattr(weight, "grad_added_to_main_grad"):
# When overlap_grad_reduce is True, need to ensure that backward hooks
@@ -607,12 +616,12 @@ class LinearWithGradAccumulationAndAsyncCommunication(torch.autograd.Function):
handle.wait()
# Need to return None's as gradient has to flow for all the input arguments
# provided during forward
- return (sub_grad_input, grad_weight, grad_bias, None, None, None, None, None, None)
+ return (sub_grad_input, grad_weight, grad_bias, None, None, None, None, None, None, None, None)

if ctx.allreduce_dgrad:
handle.wait()

- return grad_input, grad_weight, grad_bias, None, None, None, None, None, None
+ return grad_input, grad_weight, grad_bias, None, None, None, None, None, None, None, None


def linear_with_grad_accumulation_and_async_allreduce(
@@ -626,6 +635,7 @@ def linear_with_grad_accumulation_and_async_allreduce(
wgrad_deferral_limit: Optional[int] = 0,
async_grad_allreduce: Optional[bool] = None,
tp_group: Optional[torch.distributed.ProcessGroup] = None,
+ use_transformer_engine_fl: Optional[bool] = False,
) -> torch.Tensor:
"""Linear layer execution with asynchronous communication and
gradient accumulation fusion in backprop.
@@ -711,6 +721,7 @@ def linear_with_grad_accumulation_and_async_allreduce(
grad_output_buffer,
wgrad_deferral_limit,
tp_group,
+ use_transformer_engine_fl,
]

if not linear_with_grad_accumulation_and_async_allreduce.warned:
@@ -807,6 +818,7 @@ class ColumnParallelLinear(torch.nn.Module):
tp_group: Optional[torch.distributed.ProcessGroup] = None,
):
super(ColumnParallelLinear, self).__init__()
+ print(f"[ColumnParallelLinear], {config.transformer_impl=}")

# Keep input parameters
self.input_size = input_size
@@ -938,6 +950,8 @@ class ColumnParallelLinear(torch.nn.Module):
if not weight.requires_grad:
return linear_with_frozen_weight(input, weight, *args, **kwargs)
else:
+ if self.config.use_transformer_engine_fl:
+ kwargs['use_transformer_engine_fl'] = True
return linear_with_grad_accumulation_and_async_allreduce(input, weight, *args, **kwargs)

def forward(
@@ -1298,3 +1312,4 @@ class RowParallelLinear(torch.nn.Module):
f"{type(self).__name__}(in_features={self.input_size}, "
f"out_features={self.output_size}, bias={use_bias}, TP={tp})"
)
+
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/megatron/core/transformer/transformer_config.py b/megatron/core/transformer/transformer_config.py
index 5ff62f74c..4bea0c328 100644
index 5ff62f74c..3c0571b2d 100644
--- a/megatron/core/transformer/transformer_config.py
+++ b/megatron/core/transformer/transformer_config.py
@@ -317,6 +317,15 @@ class TransformerConfig(ModelParallelConfig):
Expand Down Expand Up @@ -41,7 +41,7 @@ index 5ff62f74c..4bea0c328 100644
flash_decode: bool = False
""" Use the optimized flash decoding kernel during inference. """

@@ -705,6 +723,26 @@ class TransformerConfig(ModelParallelConfig):
@@ -705,6 +723,31 @@ class TransformerConfig(ModelParallelConfig):
"""Transformer implementation to use.
Options are 'transformer_engine' for Transformer Engine and 'local' for MCore."""

Expand All @@ -64,11 +64,16 @@ index 5ff62f74c..4bea0c328 100644
+ """Lora a init method"""
+ lora_out_init_method: Optional[str] = None
+ """Lora b init method"""
+
+ ####################
+ # TransformerEngine-FL
+ ####################
+ use_transformer_engine_fl: Optional[bool] = False
+
def __post_init__(self):
"""Python dataclass method that is used to modify attributes after initialization.
See https://docs.python.org/3/library/dataclasses.html#post-init-processing for more
@@ -1481,6 +1519,9 @@ class TransformerConfig(ModelParallelConfig):
@@ -1481,6 +1524,9 @@ class TransformerConfig(ModelParallelConfig):
f"the number of layers ({self.num_layers})"
)

Expand All @@ -78,3 +83,8 @@ index 5ff62f74c..4bea0c328 100644

@dataclass
class MLATransformerConfig(TransformerConfig):
@@ -1569,3 +1615,4 @@ class MLATransformerConfig(TransformerConfig):
assert (
self.apply_rope_fusion is False
), "Rope Fusion is not compatible with caching latents"
+
Loading
Loading