

LXMERT — transformers 3.2.0 documentation[huggingface 库已经收纳LXMERT模型!!!]
作者在readme 中写道:The logs and model snapshots will be saved under folder snap/vqa/vqa_lxr955. The validation result after training will be around 69.7% to 70.2%. 结果是可以复现的。



bash run/vqa_finetune.bash 0 vqa_lxr955

在vqa_data 中获取train.json 与图像特征组成dataset,并通过get_item提取每个batch

"""A VQA data example in json file:{"answer_type": "other","img_id": "COCO_train2014_000000458752","label": {"net": 1},"question_id": 458752000,"question_type": "what is this","sent": "What is this photo taken looking through?"}An example in obj36 tsv:
FIELDNAMES = ["img_id", "img_h", "img_w", "objects_id", "objects_conf","attrs_id", "attrs_conf", "num_boxes", "boxes", "features"]
FIELDNAMES would be keys in the dict returned by load_obj_tsv.
class VQATorchDataset(Dataset):def __getitem__(self, item: int):datum = self.data[item]img_id = datum['img_id']ques_id = datum['question_id']ques = datum['sent']# Get image infoimg_info = self.imgid2img[img_id]obj_num = img_info['num_boxes']feats = img_info['features'].copy()boxes = img_info['boxes'].copy()assert obj_num == len(boxes) == len(feats)# Normalize the boxes (to 0 ~ 1)img_h, img_w = img_info['img_h'], img_info['img_w']boxes = boxes.copy()boxes[:, (0, 2)] /= img_wboxes[:, (1, 3)] /= img_hnp.testing.assert_array_less(boxes, 1+1e-5)np.testing.assert_array_less(-boxes, 0+1e-5)# Provide label (target)if 'label' in datum:label = datum['label']target = torch.zeros(self.raw_dataset.num_answers)for ans, score in label.items():target[self.raw_dataset.ans2label[ans]] = scorereturn ques_id, feats, boxes, ques, targetelse:return ques_id, feats, boxes, ques

一个训练样例对应一个问题id, 一个图片的目标框的特征向量,目标框的坐标,问题语句,标签

        # Modelself.model = VQAModel(self.train_tuple.dataset.num_answers)

说明整个模型是VQAModel 搭起来。


下面可以看到使用self.model(feats, boxes, sent)方法获得整个LXMERT 模型的输出值,并与真值进行loss 计算,并进行反向传播。

    def train(self, train_tuple, eval_tuple):dset, loader, evaluator = train_tupleiter_wrapper = (lambda x: tqdm(x, total=len(loader))) if args.tqdm else (lambda x: x)best_valid = 0.for epoch in range(args.epochs):quesid2ans = {}for i, (ques_id, feats, boxes, sent, target) in iter_wrapper(enumerate(loader)):self.model.train()self.optim.zero_grad()feats, boxes, target = feats.cuda(), boxes.cuda(), target.cuda()logit = self.model(feats, boxes, sent)assert logit.dim() == target.dim() == 2loss = self.bce_loss(logit, target)loss = loss * logit.size(1)loss.backward()nn.utils.clip_grad_norm_(self.model.parameters(), 5.)self.optim.step()score, label = logit.max(1)for qid, l in zip(ques_id, label.cpu().numpy()):ans = dset.label2ans[l]quesid2ans[qid.item()] = ans

VQAModel 类如下,从代码中可以看出,模型总共有两个组件一个是LXRTEncoder, 一个是logit_fc。这里我们细看一下LXRTEncoder。 这里面的lxrt_encoder 在forward 函数中的返回值为单个,其实是这里默认使用了模型中的交互向量,红框指示的位置,但是作者在编写代码时也有其他特征返回的语句,并用元组进行表示(,).

# Max length including <bos> and <eos>
MAX_VQA_LENGTH = 20class VQAModel(nn.Module):def __init__(self, num_answers):super().__init__()# Build LXRT encoderself.lxrt_encoder = LXRTEncoder(args,max_seq_length=MAX_VQA_LENGTH)hid_dim = self.lxrt_encoder.dim# VQA Answer headsself.logit_fc = nn.Sequential(nn.Linear(hid_dim, hid_dim * 2),GeLU(),BertLayerNorm(hid_dim * 2, eps=1e-12),nn.Linear(hid_dim * 2, num_answers))self.logit_fc.apply(self.lxrt_encoder.model.init_bert_weights)def forward(self, feat, pos, sent):"""b -- batch_size, o -- object_number, f -- visual_feature_size:param feat: (b, o, f):param pos:  (b, o, 4):param sent: (b,) Type -- list of string:param leng: (b,) Type -- int numpy array:return: (b, num_answer) The logit of each answers."""x = self.lxrt_encoder(sent, (feat, pos))logit = self.logit_fc(x)return logit



import osimport torch
import torch.nn as nnfrom lxrt.tokenization import BertTokenizer
from lxrt.modeling import LXRTFeatureExtraction as VisualBertForLXRFeature, VISUAL_CONFIG
class LXRTEncoder(nn.Module):def __init__(self, args, max_seq_length, mode='x'):super().__init__()self.max_seq_length = max_seq_lengthset_visual_config(args)# Using the bert tokenizerself.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased",do_lower_case=True)# Build LXRT Modelself.model = VisualBertForLXRFeature.from_pretrained("bert-base-uncased",mode=mode)if args.from_scratch:print("initializing all the weights")self.model.apply(self.model.init_bert_weights)def multi_gpu(self):self.model = nn.DataParallel(self.model)@propertydef dim(self):return 768def forward(self, sents, feats, visual_attention_mask=None):train_features = convert_sents_to_features(sents, self.max_seq_length, self.tokenizer)input_ids = torch.tensor([f.input_ids for f in train_features], dtype=torch.long).cuda()input_mask = torch.tensor([f.input_mask for f in train_features], dtype=torch.long).cuda()segment_ids = torch.tensor([f.segment_ids for f in train_features], dtype=torch.long).cuda()output = self.model(input_ids, segment_ids, input_mask,visual_feats=feats,visual_attention_mask=visual_attention_mask)return output

convert_sents_to_features函数:将问题语句 进行分词处理,得到token_a, 并根据最大序列长度18 对问题进行截断处理。并将每个分词对应的id 号 添加到input_id列表中,不足18 利用0 填充。

def convert_sents_to_features(sents, max_seq_length, tokenizer):"""Loads a data file into a list of `InputBatch`s."""features = []for (i, sent) in enumerate(sents):tokens_a = tokenizer.tokenize(sent.strip())# Account for [CLS] and [SEP] with "- 2"if len(tokens_a) > max_seq_length - 2:tokens_a = tokens_a[:(max_seq_length - 2)]# Keep segment id which allows loading BERT-weights.tokens = ["[CLS]"] + tokens_a + ["[SEP]"]segment_ids = [0] * len(tokens)input_ids = tokenizer.convert_tokens_to_ids(tokens)# The mask has 1 for real tokens and 0 for padding tokens. Only real# tokens are attended to.input_mask = [1] * len(input_ids)# Zero-pad up to the sequence length.padding = [0] * (max_seq_length - len(input_ids))input_ids += paddinginput_mask += paddingsegment_ids += paddingassert len(input_ids) == max_seq_lengthassert len(input_mask) == max_seq_lengthassert len(segment_ids) == max_seq_lengthfeatures.append(InputFeatures(input_ids=input_ids,input_mask=input_mask,segment_ids=segment_ids))return features

我们还是从model 中进行着手: 即VisualBertForLXRFeature , 原名:LXRTFeatureExtraction。在前面可以看出mode 传入的时候是“x”,所以默认返回的是pooled_output

self.model = VisualBertForLXRFeature.from_pretrained(

class LXRTFeatureExtraction(BertPreTrainedModel):"""BERT model for classification."""def __init__(self, config, mode='lxr'):""":param config::param mode:  Number of visual layers"""super().__init__(config)self.bert = LXRTModel(config)self.mode = modeself.apply(self.init_bert_weights)def forward(self, input_ids, token_type_ids=None, attention_mask=None, visual_feats=None,visual_attention_mask=None):feat_seq, pooled_output = self.bert(input_ids, token_type_ids, attention_mask,visual_feats=visual_feats,visual_attention_mask=visual_attention_mask)if 'x' == self.mode:return pooled_outputelif 'x' in self.mode and ('l' in self.mode or 'r' in self.mode):return feat_seq, pooled_outputelif 'l' in self.mode or 'r' in self.mode:return feat_seq


下钻到核心模型 self.bert = LXRTModel(config),从forward 可以看出将lang_feats进行处理可以获得cross_feats 红框标注的地方。

class LXRTModel(BertPreTrainedModel):"""LXRT Model."""def __init__(self, config):super().__init__(config)self.embeddings = BertEmbeddings(config)self.encoder = LXRTEncoder(config)self.pooler = BertPooler(config)self.apply(self.init_bert_weights)def forward(self, input_ids, token_type_ids=None, attention_mask=None,visual_feats=None, visual_attention_mask=None):if attention_mask is None:attention_mask = torch.ones_like(input_ids)if token_type_ids is None:token_type_ids = torch.zeros_like(input_ids)# We create a 3D attention mask from a 2D tensor mask.# Sizes are [batch_size, 1, 1, to_seq_length]# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]# this attention mask is more simple than the triangular masking of causal attention# used in OpenAI GPT, we just need to prepare the broadcast dimension here.extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)# Since attention_mask is 1.0 for positions we want to attend and 0.0 for# masked positions, this operation will create a tensor which is 0.0 for# positions we want to attend and -10000.0 for masked positions.# Since we are adding it to the raw scores before the softmax, this is# effectively the same as removing these entirely.extended_attention_mask = extended_attention_mask.to(dtype=next(self.parameters()).dtype) # fp16 compatibilityextended_attention_mask = (1.0 - extended_attention_mask) * -10000.0# Process the visual attention maskif visual_attention_mask is not None:extended_visual_attention_mask = visual_attention_mask.unsqueeze(1).unsqueeze(2)extended_visual_attention_mask = extended_visual_attention_mask.to(dtype=next(self.parameters()).dtype) # fp16 compatibilityextended_visual_attention_mask = (1.0 - extended_visual_attention_mask) * -10000.0else:extended_visual_attention_mask = None# Positional Word Embeddingsembedding_output = self.embeddings(input_ids, token_type_ids)# Run LXRT backbonelang_feats, visn_feats = self.encoder(embedding_output,extended_attention_mask,visn_feats=visual_feats,visn_attention_mask=extended_visual_attention_mask)pooled_output = self.pooler(lang_feats)return (lang_feats, visn_feats), pooled_output

BertPooler 类

class BertPooler(nn.Module):def __init__(self, config):super(BertPooler, self).__init__()self.dense = nn.Linear(config.hidden_size, config.hidden_size)self.activation = nn.Tanh()def forward(self, hidden_states):# We "pool" the model by simply taking the hidden state corresponding# to the first token.first_token_tensor = hidden_states[:, 0]pooled_output = self.dense(first_token_tensor)pooled_output = self.activation(pooled_output)return pooled_output


这里就是模型堆叠的具体方式,首先通过两个模态的堆叠,最后按照交互式堆叠几层后,返回lang_feats, visn_feats

class LXRTEncoder(nn.Module):def __init__(self, config):super().__init__()# Obj-level image embedding layerself.visn_fc = VisualFeatEncoder(config)# Number of layersself.num_l_layers = VISUAL_CONFIG.l_layersself.num_x_layers = VISUAL_CONFIG.x_layersself.num_r_layers = VISUAL_CONFIG.r_layersprint("LXRT encoder with %d l_layers, %d x_layers, and %d r_layers." %(self.num_l_layers, self.num_x_layers, self.num_r_layers))# Layers# Using self.layer instead of self.l_layer to support loading BERT weights.self.layer = nn.ModuleList([BertLayer(config) for _ in range(self.num_l_layers)])self.x_layers = nn.ModuleList([LXRTXLayer(config) for _ in range(self.num_x_layers)])self.r_layers = nn.ModuleList([BertLayer(config) for _ in range(self.num_r_layers)])def forward(self, lang_feats, lang_attention_mask,visn_feats, visn_attention_mask=None):# Run visual embedding layer# Note: Word embedding layer was executed outside this module.#       Keep this design to allow loading BERT weights.visn_feats = self.visn_fc(visn_feats)# Run language layersfor layer_module in self.layer:lang_feats = layer_module(lang_feats, lang_attention_mask)# Run relational layersfor layer_module in self.r_layers:visn_feats = layer_module(visn_feats, visn_attention_mask)# Run cross-modality layersfor layer_module in self.x_layers:lang_feats, visn_feats = layer_module(lang_feats, lang_attention_mask,visn_feats, visn_attention_mask)return lang_feats, visn_feats

6.具体看一个LXMERT VQA 数据集的初始化init函数

这里重要的看点就是如何将问题和图片建立一一对应的关系,通过imageid question实体。

class VQADataset:"""A VQA data example in json file:{"answer_type": "other","img_id": "COCO_train2014_000000458752","label": {"net": 1},"question_id": 458752000,"question_type": "what is this","sent": "What is this photo taken looking through?"}"""def __init__(self, splits: str):self.name = splitsself.splits = splits.split(',')# Loading datasetsself.data = []for split in self.splits:self.data.extend(json.load(open("data/vqa/%s.json" % split)))print("Load %d data from split(s) %s." % (len(self.data), self.name))# Convert list to dict (for evaluation)self.id2datum = {datum['question_id']: datumfor datum in self.data}# Answersself.ans2label = json.load(open("data/vqa/trainval_ans2label.json"))self.label2ans = json.load(open("data/vqa/trainval_label2ans.json"))assert len(self.ans2label) == len(self.label2ans)@propertydef num_answers(self):return len(self.ans2label)def __len__(self):return len(self.data)"""
An example in obj36 tsv:
FIELDNAMES = ["img_id", "img_h", "img_w", "objects_id", "objects_conf","attrs_id", "attrs_conf", "num_boxes", "boxes", "features"]
FIELDNAMES would be keys in the dict returned by load_obj_tsv.
class VQATorchDataset(Dataset):def __init__(self, dataset: VQADataset):super().__init__()self.raw_dataset = datasetif args.tiny:topk = TINY_IMG_NUMelif args.fast:topk = FAST_IMG_NUMelse:topk = None# Loading detection features to img_dataimg_data = []for split in dataset.splits:# Minival is 5K images in MS COCO, which is used in evaluating VQA/LXMERT-pre-training.# It is saved as the top 5K features in val2014_***.tsvload_topk = 5000 if (split == 'minival' and topk is None) else topkimg_data.extend(load_obj_tsv(os.path.join(MSCOCO_IMGFEAT_ROOT, '%s_obj36.tsv' % (SPLIT2NAME[split])),topk=load_topk))# Convert img list to dictself.imgid2img = {}for img_datum in img_data:self.imgid2img[img_datum['img_id']] = img_datum# Only kept the data with loaded image featuresself.data = []for datum in self.raw_dataset.data:if datum['img_id'] in self.imgid2img:self.data.append(datum)print("Use %d data in torch dataset" % (len(self.data)))print()

具体代码:大佬的github 部分

从最后一句可以理解到作者是将问题 作为主要研究对象,按照image 分配的形式,将问题根据image的split 进行分配。然后一起训练网络


np.testing.assert_array_less 的用法

检测一个array 里面是否有小于第二个参数,如果小于,则报错

这个函数在lxmert 中是将box 归一化,然后检测归一化后的数值都要小于1.00001

Mismatched 说明总共多少个元素不符合这个条件!


