-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate1.py
More file actions
303 lines (245 loc) · 11.2 KB
/
update1.py
File metadata and controls
303 lines (245 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import math
import argparse
from typing import Dict, Optional, Any, List, Tuple
import pandas as pd
import requests
from dotenv import load_dotenv
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
HEADERS_ORDER = [
"track_id","track_name","artist_name","year","popularity","artwork_url",
"album_name","acousticness","danceability","duration_ms","energy",
"instrumentalness","key","liveness","loudness","mode","speechiness",
"tempo","time_signature","valence","track_url","language"
]
# ---------- Auth + HTTP with retries ----------
class SpotifyClient:
def __init__(self, client_id: str, client_secret: str, timeout: int = 15, max_retries: int = 6):
self.client_id = client_id
self.client_secret = client_secret
self.timeout = timeout
self.max_retries = max_retries
self._session = requests.Session()
self._token = None
self._token_expires_at = 0.0
def _get_access_token(self) -> str:
now = time.time()
if self._token and now < (self._token_expires_at - 30):
return self._token
resp = requests.post(
SPOTIFY_TOKEN_URL,
data={"grant_type": "client_credentials"},
auth=(self.client_id, self.client_secret),
timeout=self.timeout,
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get access token: {resp.status_code} {resp.text}")
data = resp.json()
self._token = data["access_token"]
self._token_expires_at = now + int(data.get("expires_in", 3600))
return self._token
def _request(self, method: str, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}"
backoff = 1.0
for attempt in range(self.max_retries):
token = self._get_access_token()
headers = {"Authorization": f"Bearer {token}"}
resp = self._session.request(method, url, headers=headers, params=params, timeout=self.timeout)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", "1"))
time.sleep(retry_after + 1)
continue
if resp.status_code in (500, 502, 503, 504):
time.sleep(backoff)
backoff = min(backoff * 2, 16)
continue
if resp.status_code == 401:
# likely token expired or invalid; clear and retry
self._token = None
time.sleep(0.5)
continue
resp.raise_for_status()
return resp.json()
raise RuntimeError(f"Spotify API failed after {self.max_retries} retries: {url}")
# ---------- API helpers ----------
def get_track(self, track_id: str, market: Optional[str] = None) -> Optional[Dict[str, Any]]:
params = {"market": market} if market else None
try:
return self._request("GET", f"/tracks/{track_id}", params=params)
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
return None
raise
def search_track(self, query: str, market: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
params = {"q": query, "type": "track", "limit": limit}
if market:
params["market"] = market
data = self._request("GET", "/search", params=params)
return data.get("tracks", {}).get("items", []) or []
def get_audio_features_batch(self, ids: List[str]) -> Dict[str, Optional[Dict[str, Any]]]:
res: Dict[str, Optional[Dict[str, Any]]] = {i: None for i in ids}
for i in range(0, len(ids), 100):
chunk = ids[i:i+100]
data = self._request("GET", "/audio-features", params={"ids": ",".join(chunk)})
feats = data.get("audio_features") or []
for f in feats:
if f and f.get("id"):
res[f["id"]] = f
time.sleep(0.05)
return res
# ---------- Utilities ----------
def parse_year(release_date: Optional[str]) -> Optional[int]:
if not release_date or not isinstance(release_date, str):
return None
try:
return int(release_date.split("-")[0])
except Exception:
return None
def uri_to_url(spotify_uri: Optional[str]) -> Optional[str]:
if not spotify_uri or ":" not in spotify_uri:
return None
parts = spotify_uri.split(":")
if len(parts) == 3 and parts[0] == "spotify":
return f"https://open.spotify.com/{parts[1]}/{parts[2]}"
return None
def best_track_match(items: List[Dict[str, Any]], want_name: str, want_artist: Optional[str]) -> Optional[Dict[str, Any]]:
"""Rank matches: prefer case-insensitive exact track name AND contains artist, else fallback by popularity."""
if not items:
return None
want_name_l = (want_name or "").strip().lower()
want_artist_l = (want_artist or "").strip().lower() if want_artist else None
def has_artist(it):
if not want_artist_l:
return True
names = [a.get("name", "").lower() for a in it.get("artists", [])]
return any(want_artist_l in n for n in names)
exact = [it for it in items if it.get("name","").strip().lower() == want_name_l and has_artist(it)]
if exact:
return sorted(exact, key=lambda x: x.get("popularity", 0), reverse=True)[0]
with_artist = [it for it in items if has_artist(it)]
if with_artist:
return sorted(with_artist, key=lambda x: x.get("popularity", 0), reverse=True)[0]
return sorted(items, key=lambda x: x.get("popularity", 0), reverse=True)[0]
def ensure_columns(df: pd.DataFrame) -> pd.DataFrame:
for col in HEADERS_ORDER:
if col not in df.columns:
df[col] = pd.Series(dtype="object")
return df[HEADERS_ORDER]
def fill_if_empty(row: pd.Series, key: str, value: Any):
if pd.isna(row.get(key)) or str(row.get(key)).strip() == "":
row[key] = value
# ---------- Main update logic ----------
def update_csv(
csv_path: str,
client: SpotifyClient,
market: Optional[str],
default_language: str
) -> Tuple[int, int]:
df = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
df = ensure_columns(df)
# normalize types for numeric fields later (write as strings to avoid dtype issues)
updated_rows = 0
scanned_rows = 0
# gather track IDs that need features
missing_feat_ids: List[str] = []
# First pass: collect missing info & prepare search/match
for idx, row in df.iloc[62300:].iterrows():
print(idx, row)
scanned_rows += 1
track_id = row.get("track_id", "").strip()
track_name = row.get("track_name", "").strip()
artist_name = row.get("artist_name", "").strip()
track_obj: Optional[Dict[str, Any]] = None
if track_id:
track_obj = client.get_track(track_id, market=market)
if not track_obj:
# fallback: search by name (+ artist if available)
q = track_name
if artist_name:
q = f'track:"{track_name}" artist:"{artist_name}"'
items = client.search_track(q, market=market, limit=10)
match = best_track_match(items, want_name=track_name, want_artist=artist_name or None)
if match:
track_obj = match
# If original track_id empty or wrong, fix it
if not track_id or track_id != match.get("id", ""):
df.at[idx, "track_id"] = match.get("id", "")
if not track_obj:
# Could not find this row on Spotify; skip
continue
# Fill metadata if empty
album = track_obj.get("album") or {}
images = album.get("images") or []
artwork_url = images[0]["url"] if images else ""
year = parse_year(album.get("release_date"))
track_url = track_obj.get("external_urls", {}).get("spotify") or uri_to_url(track_obj.get("uri"))
fill_if_empty(row, "album_name", album.get("name", ""))
fill_if_empty(row, "artwork_url", artwork_url)
fill_if_empty(row, "popularity", str(track_obj.get("popularity", "")))
fill_if_empty(row, "year", str(year if year is not None else ""))
fill_if_empty(row, "track_url", track_url or "")
fill_if_empty(row, "language", default_language)
df.iloc[idx] = row # persist fills
# Check features emptiness
needed = any(
(str(row.get(k)).strip() == "" or pd.isna(row.get(k)))
for k in ["acousticness","danceability","duration_ms","energy","instrumentalness",
"key","liveness","loudness","mode","speechiness","tempo","time_signature","valence"]
)
if needed and row.get("track_id", "").strip():
missing_feat_ids.append(row["track_id"].strip())
# Second pass: fetch and fill audio features in batches
missing_feat_ids = list(dict.fromkeys(missing_feat_ids)) # dedupe, preserve order
feats_map = {}
if missing_feat_ids:
print("fetching")
feats_map = client.get_audio_features_batch(missing_feat_ids)
for idx, row in df.iterrows():
tid = row.get("track_id", "").strip()
if not tid or tid not in feats_map:
continue
f = feats_map.get(tid)
if not f:
continue
# Only fill where empty
for k in ["acousticness","danceability","duration_ms","energy","instrumentalness",
"key","liveness","loudness","mode","speechiness","tempo","time_signature","valence"]:
if str(row.get(k)).strip() == "" or pd.isna(row.get(k)):
# Write as string to avoid dtype issues on save
val = f.get(k)
row[k] = "" if val is None else str(val)
df.iloc[idx] = row
updated_rows += 1
# Ensure final column order and write back
df = ensure_columns(df)
df.to_csv(csv_path, index=False)
return scanned_rows, updated_rows
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="Update an existing Spotify CSV with missing metadata & audio features.")
parser.add_argument("--csv", required=True, help="Path to CSV (will be updated in-place).")
parser.add_argument("--market", default="IN", help="Market code (e.g., IN, US).")
parser.add_argument("--default-language", default="Tamil", help="Language to use when empty (default: Tamil).")
args = parser.parse_args()
# client_id = os.getenv("SPOTIFY_CLIENT_ID")
# client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
client_id = ""
client_secret = ""
if not client_id or not client_secret:
raise SystemExit("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET in your environment.")
client = SpotifyClient(client_id, client_secret)
scanned, updated = update_csv(
csv_path=args.csv,
client=client,
market=args.market,
default_language=args.default_language
)
print(f"Scanned rows: {scanned}")
print(f"Rows with features filled/updated: {updated}")
print("✅ Update complete.")
if __name__ == "__main__":
main()