66import ivan .solscanbot .dto .external .meta .TokenMetaResponseDto ;
77import ivan .solscanbot .dto .internal .BalanceActivity ;
88import ivan .solscanbot .dto .internal .MonitoredAddress ;
9+ import ivan .solscanbot .dto .internal .Token ;
910import ivan .solscanbot .mapper .ActivityMapper ;
11+ import ivan .solscanbot .mapper .TokenMapper ;
1012import ivan .solscanbot .repository .ActivityRepository ;
1113import ivan .solscanbot .repository .MonitoredAddressRepository ;
14+ import ivan .solscanbot .repository .TokenRepository ;
1215import java .math .BigDecimal ;
1316import java .math .MathContext ;
1417import java .math .RoundingMode ;
15- import java .util .ArrayList ;
18+ import java .util .Collections ;
1619import java .util .List ;
1720import java .util .Map ;
1821import java .util .Optional ;
1922import java .util .Set ;
2023import java .util .concurrent .ConcurrentHashMap ;
2124import java .util .concurrent .atomic .AtomicInteger ;
2225import java .util .stream .Collectors ;
26+ import java .util .stream .IntStream ;
2327import lombok .RequiredArgsConstructor ;
2428import lombok .extern .slf4j .Slf4j ;
2529import org .springframework .scheduling .annotation .Scheduled ;
@@ -32,24 +36,25 @@ public class MonitoringService {
3236 private static final int CHUNK_SIZE = 20 ;
3337 private static final String SOLSCAN_ACCOUNT_URL = "https://solscan.io/account/" ;
3438 private static final String SOLSCAN_TOKEN_URL = "https://solscan.io/token/" ;
35- private static final String FIRST_TRACKING_ADDRESS =
36- "APZmQqyytWLMFioMsskqhWrGJCd9Fpo7L2f2YhdpSe6U" ;
3739
3840 private final MonitoredAddressRepository addressRepository ;
3941 private final TelegramBot telegramBot ;
4042 private final SolScanServiceImpl solScanService ;
4143 private final ActivityRepository activityRepository ;
44+ private final TokenRepository tokenRepository ;
4245 private final ActivityMapper activityMapper ;
46+ private final TokenMapper tokenMapper ;
47+ private RateLimiter rateLimiter = RateLimiter .create (1.0 );
4348
44- @ Scheduled (fixedDelay = 300000 )
49+ @ Scheduled (fixedDelay = 60000 )
4550 public void newActivityFound () {
46- RateLimiter rateLimiter = RateLimiter .create (5.0 );
4751 addressRepository .findAll ().forEach (address -> {
4852 try {
4953 rateLimiter .acquire ();
5054 Set <BalanceActivity > activities = fetchAndProcessActivities (address );
5155 if (!activities .isEmpty ()) {
5256 activityRepository .saveAll (activities );
57+ log .info ("New activity found" );
5358 sendTelegramNotification (address , activities );
5459 }
5560 } catch (Exception e ) {
@@ -61,102 +66,127 @@ public void newActivityFound() {
6166 private Set <BalanceActivity > fetchAndProcessActivities (MonitoredAddress address ) {
6267 Set <SingleBalanceActivityResponseDto > newActivities =
6368 solScanService .getNewBalanceActivities (address .getAddress ())
64- .stream ()
65- .filter (act -> BigDecimal .ZERO .equals (act .getPreBalance ()))
66- .collect (Collectors .toSet ());
67-
69+ .stream ()
70+ .filter (act -> BigDecimal .ZERO .equals (act .getPreBalance ()))
71+ .collect (Collectors .toSet ());
72+ log .info ("Fetching new activities for address {}" , address .getAddress ());
73+ if (newActivities .isEmpty ()) {
74+ return Collections .emptySet ();
75+ }
6876 List <String > tokenAddresses = newActivities .stream ()
6977 .map (SingleBalanceActivityResponseDto ::getTokenAddress )
78+ .distinct ()
7079 .toList ();
7180
7281 Map <String , TokenMetaResponseDto > metaMap = batchFetchTokenMetadata (tokenAddresses );
7382
7483 return newActivities .stream ()
75- .map (activityMapper ::toModel )
76- .map (act -> enrichWithTokenMeta (act , metaMap .get (act .getTokenAddress ())))
77- .filter (tok -> tok .getValueInUsd ().compareTo (BigDecimal .valueOf (100 )) > 0 )
84+ .map (dto -> {
85+ TokenMetaResponseDto meta = metaMap .get (dto .getTokenAddress ());
86+ if (meta == null ) {
87+ return Optional .<BalanceActivity >empty ();
88+ }
89+ return enrichWithTokenMeta (dto , meta );
90+ })
91+ .filter (Optional ::isPresent )
92+ .map (Optional ::get )
7893 .peek (act -> act .setMonitoredAddress (address ))
7994 .collect (Collectors .toSet ());
8095 }
8196
8297 private Map <String , TokenMetaResponseDto > batchFetchTokenMetadata (List <String > tokenAddresses ) {
8398 Map <String , TokenMetaResponseDto > metaMap = new ConcurrentHashMap <>();
84- List <String > addressList = new ArrayList <>(tokenAddresses );
85- for (int i = 0 ; i < addressList .size (); i += CHUNK_SIZE ) {
86- List <String > chunk = addressList .subList (
87- i , Math .min (i + CHUNK_SIZE , addressList .size ()));
99+ log .info ("Fetching token metadata for addresses {}" , tokenAddresses );
100+ if (tokenAddresses .isEmpty ()) {
101+ return metaMap ;
102+ }
103+
104+ List <List <String >> chunks = partitionList (tokenAddresses );
105+ chunks .parallelStream ().forEach (chunk -> {
88106 try {
89107 Map <String , TokenMetaResponseDto > chunkResults =
90108 solScanService .getMetaMapFromAddresses (chunk );
91109 metaMap .putAll (chunkResults );
92110 } catch (Exception e ) {
93- log .error ( "Failed to fetch batch metadata for chunk: {}" , chunk , e );
111+ log .warn ( "Batch metadata fetch failed, falling back to individual requests" , e );
94112 fetchTokenMetadata (chunk , metaMap );
95113 }
96- }
114+ });
115+
97116 return metaMap ;
98117 }
99118
119+ private <T > List <List <T >> partitionList (List <T > list ) {
120+ return IntStream .range (0 , (list .size () + CHUNK_SIZE - 1 ) / CHUNK_SIZE )
121+ .mapToObj (i -> list .subList (
122+ i * CHUNK_SIZE , Math .min (list .size (), (i + 1 ) * CHUNK_SIZE )))
123+ .collect (Collectors .toList ());
124+ }
125+
100126 private void fetchTokenMetadata (List <String > tokenAddresses ,
101127 Map <String , TokenMetaResponseDto > metaMap ) {
102128 tokenAddresses .forEach (address -> {
103- TokenMetaResponseDto meta = solScanService .getTokenMeta (address );
104- metaMap .put (address , meta );
129+ try {
130+ TokenMetaResponseDto meta = solScanService .getTokenMeta (address );
131+ if (meta != null ) {
132+ metaMap .put (address , meta );
133+ }
134+ } catch (Exception e ) {
135+ log .error ("Failed to fetch metadata for token: {}" , address , e );
136+ }
105137 });
106138 }
107139
108- private BalanceActivity enrichWithTokenMeta (BalanceActivity act , TokenMetaResponseDto meta ) {
140+ private Optional <BalanceActivity > enrichWithTokenMeta (
141+ SingleBalanceActivityResponseDto dto ,
142+ TokenMetaResponseDto meta
143+ ) {
109144 BigDecimal price = Optional .ofNullable (meta .getPrice ()).orElse (BigDecimal .ZERO );
110145 int decimals = meta .getDecimals () > 0 ? meta .getDecimals () : 9 ;
111- BigDecimal normalizedAmount = act .getAmount ()
146+
147+ BigDecimal normalizedAmount = dto .getAmount ()
112148 .divide (BigDecimal .TEN .pow (decimals ), MathContext .DECIMAL32 );
113- act .setValueInUsd (normalizedAmount .multiply (price ))
114- .setTokenName (meta .getName ())
115- .setTokenSymbol (meta .getSymbol ());
116- return act ;
149+ BigDecimal valueInUsd = normalizedAmount .multiply (price );
150+ if (valueInUsd .compareTo (BigDecimal .valueOf (1 )) > 0 ) {
151+ Token token = tokenRepository .findByAddress (meta .getAddress ())
152+ .orElseGet (() -> tokenRepository .save (tokenMapper .toModelFromMetaDto (meta )));
153+
154+ BalanceActivity act = activityMapper .toModel (dto , token );
155+ act .setValueInUsd (valueInUsd );
156+ log .info ("Saving the Token: {} and creating Balance Activity entity: {}." ,
157+ token .getName (), act .getValueInUsd ());
158+ return Optional .of (act );
159+ }
160+ return Optional .empty ();
117161 }
118162
119163 private void sendTelegramNotification (MonitoredAddress address ,
120164 Set <BalanceActivity > activities ) {
121- AtomicInteger count = new AtomicInteger (1 );
122- String tokens = activities .stream ()
123- .map (act -> String .format (
124- "%d. Token: %s\n USD Value: $%s\n Token link: [%s](%s#balanceChanges)\n " ,
125- count .getAndIncrement (),
126- act .getTokenName (),
127- act .getValueInUsd ().setScale (2 , RoundingMode .HALF_UP ),
128- shortenAddress (act .getTokenAddress ()),
129- SOLSCAN_TOKEN_URL + act .getTokenAddress ()
130- ))
131- .collect (Collectors .joining ());
165+ String tokens = formatTokensMessage (activities );
132166 String message = String .format ("New activity for address: [%s](%s#balanceChanges)\n %s" ,
133167 shortenAddress (address .getAddress ()),
134168 SOLSCAN_ACCOUNT_URL + address .getAddress (),
135169 tokens );
136170 telegramBot .sendNotification (address .getChatId (), message );
137171 }
138172
173+ private String formatTokensMessage (Set <BalanceActivity > balanceActivities ) {
174+ AtomicInteger counter = new AtomicInteger (1 );
175+ return balanceActivities .stream ()
176+ .map (act -> String .format (
177+ "%d. Token: %s\n USD Value: $%s\n Token link: [%s](%s#balanceChanges)\n " ,
178+ counter .getAndIncrement (),
179+ act .getToken ().getName (),
180+ act .getValueInUsd ().setScale (2 , RoundingMode .HALF_UP ),
181+ shortenAddress (act .getToken ().getAddress ()),
182+ SOLSCAN_TOKEN_URL + act .getToken ().getAddress ()
183+ ))
184+ .collect (Collectors .joining ());
185+ }
186+
139187 private String shortenAddress (String address ) {
140188 return address .length () > 8
141189 ? address .substring (0 , 4 ) + "..." + address .substring (address .length () - 4 )
142190 : address ;
143191 }
144-
145- /*public void monitorAddress() {
146- try {
147- if (solScanService.newTokenTransfer(FIRST_TRACKING_ADDRESS)) {
148- Set<Long> ids =
149- addressRepository.findAll()
150- .stream()
151- .map(MonitoredAddress::getChatId)
152- .collect(Collectors.toSet());
153- log.info("New transfer detected, sending notification");
154- for (Long id : ids) {
155- telegramBot.sendNotification(id, "!!!");
156- }
157- }
158- } catch (Exception e) {
159- log.error("Error in scheduled task", e);
160- }
161- }*/
162192}
0 commit comments