forked from UCSC-VLAA/OpenVision
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathov-zero-shot-test.py
More file actions
208 lines (166 loc) · 7.75 KB
/
ov-zero-shot-test.py
File metadata and controls
208 lines (166 loc) · 7.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import torch
from torchvision import transforms
from PIL import Image
import os, json
from typing import Any, Dict, List, Optional, Tuple, Union
import torch.nn.functional as F
from torch import nn
import argparse
import warnings # stop spam
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
import open_clip
from open_clip.tokenizer import HFTokenizer
from open_clip import create_model
from open_clip.model import CLIP
from transformers import AutoTokenizer
# Example using https://huggingface.co/UCSC-VLAA/openvision-vit-large-patch14-224/tree/main
def parse_arguments():
parser = argparse.ArgumentParser(description='OpenVision Text-Image Test')
parser.add_argument('--use_model', type=str, default="F:/openvision-vit-large-patch14-224", help="Path to an OpenVision model + config")
parser.add_argument('--image_dir', type=str, default="testcat", help="Path to image directory")
return parser.parse_args()
args = parse_arguments()
# === Configs ===
device = "cuda" if torch.cuda.is_available() else "cpu"
model_path = args.use_model
config_path = f"{model_path}/open_clip_config.json"
image_dir = args.image_dir
# === Load config ===
with open(config_path, "r") as f:
cfg = json.load(f)
mean = cfg["preprocess_cfg"]["mean"]
std = cfg["preprocess_cfg"]["std"]
context_len = cfg["model_cfg"]["text_cfg"]["context_length"]
tokenizer_name = cfg["model_cfg"]["text_cfg"]["hf_tokenizer_name"]
model_cfg = cfg["model_cfg"]
image_size = cfg["model_cfg"]["vision_cfg"].get("image_size")
vision_cfg = cfg["model_cfg"]["vision_cfg"]
text_cfg = cfg["model_cfg"]["text_cfg"]
clip_args = {k: v for k, v in model_cfg.items() if k not in ("vision_cfg", "text_cfg")}
# === Instantiate / Load Model ===
model = CLIP(vision_cfg=vision_cfg, text_cfg=text_cfg, **clip_args)
state_dict = torch.load(f"{model_path}/open_clip_pytorch_model.bin", map_location=device)
model.load_state_dict(state_dict)
model = model.to(device)
model.eval()
print("\nVisual Config Used:")
print(f" Pool type: {model.visual.pool_type}")
print(f" Final LN after pool: {model.visual.final_ln_after_pool}")
print(f" Attentional pool: {model.visual.attn_pool is not None}")
print(f" Projection shape: {model.visual.proj.shape}")
print(f" Positional embedding: {model.visual.positional_embedding.shape}")
print(f" CLS token: {model.visual.class_embedding.shape}")
print("\n----------------------------------------\n")
#print({k: getattr(model.visual, k) for k in dir(model.visual)
# if not k.startswith("_") and not callable(getattr(model.visual, k))})
# === Preprocess ===
transform = transforms.Compose([
transforms.Resize((image_size, image_size)),
lambda image: image.convert("RGB"),
transforms.ToTensor(),
transforms.Normalize(mean=mean, std=std),
])
# === Tokenize ===
texts = ["a photo of a cat", "a photo of a dog", "a photo of a bat", "a photo of a text", "cat", "dog", "bat", "hey", "text"]
tokenizer = HFTokenizer(tokenizer_name, context_length=context_len)
text_tokens = tokenizer(texts).to(device)
def text_global_pool(
x: torch.Tensor,
text: Optional[torch.Tensor] = None,
pool_type: str = 'last',
) -> torch.Tensor:
if pool_type == 'first':
pooled = x[:, 0]
elif pool_type == 'last':
pooled = x[:, -1]
elif pool_type == 'argmax':
# take features from the eot embedding (eot_token is the highest number in each sequence)
assert text is not None
pooled = x[torch.arange(x.shape[0]), text.argmax(dim=-1)]
else:
pooled = x
return pooled
# You can also use model.encode_image(), but here's the exposed forward pass if you need it:
def encode_image_fixed(model, image):
# Patchify
x = model.visual.conv1(image) # [B, C, H, W] → [B, width, gh, gw]
x = x.reshape(x.shape[0], x.shape[1], -1) # [B, width, N]
x = x.permute(0, 2, 1) # [B, N, width]
# Add class token + positional embedding
cls_token = model.visual.class_embedding.expand(x.shape[0], 1, -1) # [B, 1, D]
x = torch.cat([cls_token, x], dim=1) # [B, N+1, D]
x = x + model.visual.positional_embedding.to(x.dtype) # [B, N+1, D]
# Dropout + LN
#x = model.visual.patch_dropout(x)
x = model.visual.ln_pre(x)
# Transformer
x = model.visual.transformer(x)
# Final pooling: mean of patch tokens
pooled = x[:, 1:].mean(dim=1) # [B, D]
pooled = model.visual.ln_post(pooled)
pooled = pooled @ model.visual.proj # [B, output_dim]
return pooled
# You can also use model.encode_text(), but here's the exposed forward pass if you need it:
def encode_text_fixed(model, text_tokens):
# Determine dtype used by the transformer's parameters
cast_dtype = model.transformer.get_cast_dtype()
# Token embedding + positional embedding
x = model.token_embedding(text_tokens).to(cast_dtype) # [B, T, D]
x = x + model.positional_embedding[:x.size(1)].to(cast_dtype)
# Transformer with full self-attention mask
x = model.transformer(x, attn_mask=model.attn_mask)
# Final layer norm
x = model.ln_final(x) # [B, T, D]
# Pooling: determine based on model.text_pool_type
x = text_global_pool(x, text_tokens, model.text_pool_type)
#x = text_global_pool(x, text_tokens, 'last') # Correct model config, same as model.text_pool_type
#x = text_global_pool(x, text_tokens, 'argmax') # ruined!
# Projection
if model.text_projection is not None:
if isinstance(model.text_projection, torch.nn.Linear):
x = model.text_projection(x)
else:
x = x @ model.text_projection
return x
# Texts
with torch.no_grad():
text_features = encode_text_fixed(model, text_tokens)
text_features /= text_features.norm(dim=-1, keepdim=True)
# Process all Images
results = []
print("\n=== Cosine Similarities and Predictions ===")
for filename in os.listdir(image_dir):
if not filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")):
continue
image_path = os.path.join(image_dir, filename)
image = Image.open(image_path).convert("RGB")
image_tensor = transform(image).unsqueeze(0).to(device)
with torch.no_grad():
image_features = encode_image_fixed(model, image_tensor)
image_features /= image_features.norm(dim=-1, keepdim=True)
cosine = (image_features @ text_features.T)[0] # shape: (len(texts),)
logits = model.logit_scale.exp() * cosine
probs = logits.softmax(dim=-1)
sorted_indices = cosine.argsort(descending=True)
print(f"\n--- {filename} ---")
for idx in sorted_indices:
label = texts[idx]
cos = cosine[idx].item()
prob = probs[idx].item()
print(f"{label:<25} cosine: {cos:+.4f} prob: {prob:.4%}")
best_idx = probs.argmax().item()
best_label = texts[best_idx]
best_score = probs[best_idx].item()
results.append((filename, best_label, best_score, probs.cpu().tolist()))
# === Per-Text Best Image ===
print("\n=== Best Image Per Text ===")
num_labels = len(texts)
best_images = [(None, -float("inf")) for _ in range(num_labels)] # (filename, prob)
for filename, _, _, prob_list in results:
for i, p in enumerate(prob_list):
if p > best_images[i][1]:
best_images[i] = (filename, p)
for i, (fname, p) in enumerate(best_images):
print(f"{texts[i]:<20} ← {fname:>25} ({p:.4%})")