88import org .zstack .core .singleflight .ExternalSingleFlightMsg ;
99import org .zstack .core .singleflight .ExternalSingleFlightReply ;
1010import org .zstack .core .singleflight .MultiNodeSingleFlightImpl ;
11- import org .zstack .core .thread .ThreadFacade ;
11+ import org .zstack .core .thread .* ;
1212import org .zstack .header .AbstractService ;
1313import org .zstack .header .core .*;
1414import org .zstack .header .core .progress .ChainInfo ;
@@ -41,6 +41,8 @@ public class CoreManagerImpl extends AbstractService implements CoreManager {
4141 @ Autowired
4242 private ThreadFacade thdf ;
4343 @ Autowired
44+ private DispatchQueue dpq ;
45+ @ Autowired
4446 private MultiNodeSingleFlightImpl singleFlight ;
4547
4648 @ Override
@@ -55,6 +57,8 @@ public void handleMessage(Message msg) {
5557 private void handleApiMessage (APIMessage msg ) {
5658 if (msg instanceof APIGetChainTaskMsg ) {
5759 handleMessage ((APIGetChainTaskMsg ) msg );
60+ } else if (msg instanceof APIShowPoolStatusMsg ) {
61+ handleMessage ((APIShowPoolStatusMsg ) msg );
5862 }
5963 }
6064
@@ -134,6 +138,43 @@ public void done(ErrorCodeList errorCodeList) {
134138 });
135139 }
136140
141+ private void handleMessage (APIShowPoolStatusMsg msg ) {
142+ APIShowPoolStatusReply reply = new APIShowPoolStatusReply ();
143+
144+ // Thread pool stats
145+ if (thdf instanceof ThreadFacadeMXBean ) {
146+ ThreadPoolStatistic tps = ((ThreadFacadeMXBean ) thdf ).getThreadPoolStatistic ();
147+ reply .addEntry (new PoolStatusEntry ("main-thread-pool" , "ThreadPool" ,
148+ tps .getActiveThreadNum (), (int ) tps .getQueuedTaskNum (), (int ) tps .getMaxPoolSize ()));
149+ }
150+
151+ // Chain task stats (top 20 by pending)
152+ Map <String , ChainTaskStatistic > chainStats = dpq .getChainTaskStatistics ();
153+ chainStats .entrySet ().stream ()
154+ .sorted ((a , b ) -> Long .compare (b .getValue ().getPendingTaskNum (), a .getValue ().getPendingTaskNum ()))
155+ .limit (20 )
156+ .forEach (e -> {
157+ ChainTaskStatistic s = e .getValue ();
158+ reply .addEntry (new PoolStatusEntry (
159+ "chain:" + e .getKey (), "ChainTask" ,
160+ s .getCurrentRunningThreadNum (), (int ) s .getPendingTaskNum (), s .getSyncLevel ()));
161+ });
162+
163+ // Sync task stats (top 20 by pending)
164+ Map <String , SyncTaskStatistic > syncStats = dpq .getSyncTaskStatistics ();
165+ syncStats .entrySet ().stream ()
166+ .sorted ((a , b ) -> Long .compare (b .getValue ().getPendingTaskNum (), a .getValue ().getPendingTaskNum ()))
167+ .limit (20 )
168+ .forEach (e -> {
169+ SyncTaskStatistic s = e .getValue ();
170+ reply .addEntry (new PoolStatusEntry (
171+ "sync:" + e .getKey (), "SyncTask" ,
172+ s .getCurrentRunningThreadNum (), (int ) s .getPendingTaskNum (), s .getSyncLevel ()));
173+ });
174+
175+ bus .reply (msg , reply );
176+ }
177+
137178 private void handle (GetLocalTaskMsg msg ) {
138179 GetLocalTaskReply reply = new GetLocalTaskReply ();
139180 Map <String , ChainInfo > results = msg .getSyncSignatures ().stream ()
0 commit comments