001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.File; 020import java.io.IOException; 021import java.net.URI; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.NoSuchElementException; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.ActiveMQConnectionMetaData; 030import org.apache.activemq.broker.BrokerService; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.broker.TransportConnector; 033import org.apache.activemq.broker.region.Subscription; 034import org.apache.activemq.command.ActiveMQQueue; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.ConsumerId; 037import org.apache.activemq.command.ConsumerInfo; 038import org.apache.activemq.command.RemoveSubscriptionInfo; 039import org.apache.activemq.network.NetworkConnector; 040import org.apache.activemq.util.BrokerSupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044public class BrokerView implements BrokerViewMBean { 045 046 private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class); 047 048 ManagedRegionBroker broker; 049 050 private final BrokerService brokerService; 051 private final AtomicInteger sessionIdCounter = new AtomicInteger(0); 052 private ObjectName jmsJobScheduler; 053 054 public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { 055 this.brokerService = brokerService; 056 this.broker = managedBroker; 057 } 058 059 public ManagedRegionBroker getBroker() { 060 return broker; 061 } 062 063 public void setBroker(ManagedRegionBroker broker) { 064 this.broker = broker; 065 } 066 067 @Override 068 public String getBrokerId() { 069 return safeGetBroker().getBrokerId().toString(); 070 } 071 072 @Override 073 public String getBrokerName() { 074 return safeGetBroker().getBrokerName(); 075 } 076 077 @Override 078 public String getBrokerVersion() { 079 return ActiveMQConnectionMetaData.PROVIDER_VERSION; 080 } 081 082 @Override 083 public String getUptime() { 084 return brokerService.getUptime(); 085 } 086 087 @Override 088 public long getUptimeMillis() { 089 return brokerService.getUptimeMillis(); 090 } 091 092 @Override 093 public int getCurrentConnectionsCount() { 094 return brokerService.getCurrentConnections(); 095 } 096 097 @Override 098 public long getTotalConnectionsCount() { 099 return brokerService.getTotalConnections(); 100 } 101 102 @Override 103 public void gc() throws Exception { 104 brokerService.getBroker().gc(); 105 try { 106 brokerService.getPersistenceAdapter().checkpoint(true); 107 } catch (IOException e) { 108 LOG.error("Failed to checkpoint persistence adapter on gc request", e); 109 } 110 } 111 112 @Override 113 public void start() throws Exception { 114 brokerService.start(); 115 } 116 117 @Override 118 public void stop() throws Exception { 119 brokerService.stop(); 120 } 121 122 @Override 123 public void restart() throws Exception { 124 if (brokerService.isRestartAllowed()) { 125 brokerService.requestRestart(); 126 brokerService.stop(); 127 } else { 128 throw new Exception("Restart is not allowed"); 129 } 130 } 131 132 @Override 133 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 134 brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval); 135 } 136 137 @Override 138 public long getTotalEnqueueCount() { 139 return safeGetBroker().getDestinationStatistics().getEnqueues().getCount(); 140 } 141 142 @Override 143 public long getTotalDequeueCount() { 144 return safeGetBroker().getDestinationStatistics().getDequeues().getCount(); 145 } 146 147 @Override 148 public long getTotalConsumerCount() { 149 return safeGetBroker().getDestinationStatistics().getConsumers().getCount(); 150 } 151 152 @Override 153 public long getTotalProducerCount() { 154 return safeGetBroker().getDestinationStatistics().getProducers().getCount(); 155 } 156 157 @Override 158 public long getTotalMessageCount() { 159 return safeGetBroker().getDestinationStatistics().getMessages().getCount(); 160 } 161 162 /** 163 * @return the average size of a message (bytes) 164 */ 165 @Override 166 public long getAverageMessageSize() { 167 // we are okay with the size without decimals so cast to long 168 return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize(); 169 } 170 171 /** 172 * @return the max size of a message (bytes) 173 */ 174 @Override 175 public long getMaxMessageSize() { 176 return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize(); 177 } 178 179 /** 180 * @return the min size of a message (bytes) 181 */ 182 @Override 183 public long getMinMessageSize() { 184 return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize(); 185 } 186 187 public long getTotalMessagesCached() { 188 return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); 189 } 190 191 @Override 192 public int getMemoryPercentUsage() { 193 return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); 194 } 195 196 @Override 197 public long getMemoryLimit() { 198 return brokerService.getSystemUsage().getMemoryUsage().getLimit(); 199 } 200 201 @Override 202 public void setMemoryLimit(long limit) { 203 brokerService.getSystemUsage().getMemoryUsage().setLimit(limit); 204 } 205 206 @Override 207 public long getStoreLimit() { 208 return brokerService.getSystemUsage().getStoreUsage().getLimit(); 209 } 210 211 @Override 212 public int getStorePercentUsage() { 213 return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); 214 } 215 216 @Override 217 public long getTempLimit() { 218 return brokerService.getSystemUsage().getTempUsage().getLimit(); 219 } 220 221 @Override 222 public int getTempPercentUsage() { 223 return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); 224 } 225 226 @Override 227 public long getJobSchedulerStoreLimit() { 228 return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit(); 229 } 230 231 @Override 232 public int getJobSchedulerStorePercentUsage() { 233 return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage(); 234 } 235 236 @Override 237 public void setStoreLimit(long limit) { 238 brokerService.getSystemUsage().getStoreUsage().setLimit(limit); 239 } 240 241 @Override 242 public void setTempLimit(long limit) { 243 brokerService.getSystemUsage().getTempUsage().setLimit(limit); 244 } 245 246 @Override 247 public void setJobSchedulerStoreLimit(long limit) { 248 brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit); 249 } 250 251 @Override 252 public void resetStatistics() { 253 safeGetBroker().getDestinationStatistics().reset(); 254 } 255 256 @Override 257 public void enableStatistics() { 258 safeGetBroker().getDestinationStatistics().setEnabled(true); 259 } 260 261 @Override 262 public void disableStatistics() { 263 safeGetBroker().getDestinationStatistics().setEnabled(false); 264 } 265 266 @Override 267 public boolean isStatisticsEnabled() { 268 return safeGetBroker().getDestinationStatistics().isEnabled(); 269 } 270 271 @Override 272 public boolean isPersistent() { 273 return brokerService.isPersistent(); 274 } 275 276 @Override 277 public void terminateJVM(int exitCode) { 278 System.exit(exitCode); 279 } 280 281 @Override 282 public ObjectName[] getTopics() { 283 return safeGetBroker().getTopicsNonSuppressed(); 284 } 285 286 @Override 287 public ObjectName[] getQueues() { 288 return safeGetBroker().getQueuesNonSuppressed(); 289 } 290 291 @Override 292 public ObjectName[] getTemporaryTopics() { 293 return safeGetBroker().getTemporaryTopicsNonSuppressed(); 294 } 295 296 @Override 297 public ObjectName[] getTemporaryQueues() { 298 return safeGetBroker().getTemporaryQueuesNonSuppressed(); 299 } 300 301 @Override 302 public ObjectName[] getTopicSubscribers() { 303 return safeGetBroker().getTopicSubscribersNonSuppressed(); 304 } 305 306 @Override 307 public ObjectName[] getDurableTopicSubscribers() { 308 return safeGetBroker().getDurableTopicSubscribersNonSuppressed(); 309 } 310 311 @Override 312 public ObjectName[] getQueueSubscribers() { 313 return safeGetBroker().getQueueSubscribersNonSuppressed(); 314 } 315 316 @Override 317 public ObjectName[] getTemporaryTopicSubscribers() { 318 return safeGetBroker().getTemporaryTopicSubscribersNonSuppressed(); 319 } 320 321 @Override 322 public ObjectName[] getTemporaryQueueSubscribers() { 323 return safeGetBroker().getTemporaryQueueSubscribersNonSuppressed(); 324 } 325 326 @Override 327 public ObjectName[] getInactiveDurableTopicSubscribers() { 328 return safeGetBroker().getInactiveDurableTopicSubscribersNonSuppressed(); 329 } 330 331 @Override 332 public ObjectName[] getTopicProducers() { 333 return safeGetBroker().getTopicProducersNonSuppressed(); 334 } 335 336 @Override 337 public ObjectName[] getQueueProducers() { 338 return safeGetBroker().getQueueProducersNonSuppressed(); 339 } 340 341 @Override 342 public ObjectName[] getTemporaryTopicProducers() { 343 return safeGetBroker().getTemporaryTopicProducersNonSuppressed(); 344 } 345 346 @Override 347 public ObjectName[] getTemporaryQueueProducers() { 348 return safeGetBroker().getTemporaryQueueProducersNonSuppressed(); 349 } 350 351 @Override 352 public ObjectName[] getDynamicDestinationProducers() { 353 return safeGetBroker().getDynamicDestinationProducersNonSuppressed(); 354 } 355 356 @Override 357 public String addConnector(String discoveryAddress) throws Exception { 358 TransportConnector connector = brokerService.addConnector(discoveryAddress); 359 if (connector == null) { 360 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 361 } 362 brokerService.startTransportConnector(connector); 363 return connector.getName(); 364 } 365 366 @Override 367 public String addNetworkConnector(String discoveryAddress) throws Exception { 368 NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress); 369 if (connector == null) { 370 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 371 } 372 brokerService.registerNetworkConnectorMBean(connector); 373 connector.start(); 374 return connector.getName(); 375 } 376 377 @Override 378 public boolean removeConnector(String connectorName) throws Exception { 379 TransportConnector connector = brokerService.getConnectorByName(connectorName); 380 if (connector == null) { 381 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 382 } 383 connector.stop(); 384 return brokerService.removeConnector(connector); 385 } 386 387 @Override 388 public boolean removeNetworkConnector(String connectorName) throws Exception { 389 NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName); 390 if (connector == null) { 391 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 392 } 393 connector.stop(); 394 return brokerService.removeNetworkConnector(connector); 395 } 396 397 @Override 398 public void addTopic(String name) throws Exception { 399 safeGetBroker().getContextBroker() 400 .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), true); 401 } 402 403 @Override 404 public void addQueue(String name) throws Exception { 405 safeGetBroker().getContextBroker() 406 .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), true); 407 } 408 409 @Override 410 public void removeTopic(String name) throws Exception { 411 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000); 412 } 413 414 @Override 415 public void removeQueue(String name) throws Exception { 416 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000); 417 } 418 419 @Override 420 public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception { 421 ConnectionContext context = getConnectionContext(); 422 context.setBroker(safeGetBroker()); 423 context.setClientId(clientId); 424 ConsumerInfo info = new ConsumerInfo(); 425 ConsumerId consumerId = new ConsumerId(); 426 consumerId.setConnectionId(clientId); 427 consumerId.setSessionId(sessionIdCounter.incrementAndGet()); 428 consumerId.setValue(0); 429 info.setConsumerId(consumerId); 430 info.setDestination(new ActiveMQTopic(topicName)); 431 info.setSubscriptionName(subscriberName); 432 info.setSelector(selector); 433 Subscription subscription = safeGetBroker().addConsumer(context, info); 434 safeGetBroker().removeConsumer(context, info); 435 if (subscription != null) { 436 return subscription.getObjectName(); 437 } 438 return null; 439 } 440 441 @Override 442 public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception { 443 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 444 info.setClientId(clientId); 445 info.setSubscriptionName(subscriberName); 446 ConnectionContext context = getConnectionContext(); 447 context.setBroker(safeGetBroker()); 448 context.setClientId(clientId); 449 brokerService.getBroker().removeSubscription(context, info); 450 } 451 452 @Override 453 public void reloadLog4jProperties() throws Throwable { 454 Log4JConfigView.doReloadLog4jProperties(); 455 } 456 457 @Override 458 public Map<String, String> getTransportConnectors() { 459 Map<String, String> answer = new HashMap<String, String>(); 460 try { 461 for (TransportConnector connector : brokerService.getTransportConnectors()) { 462 answer.put(connector.getName(), connector.getConnectUri().toString()); 463 } 464 } catch (Exception e) { 465 LOG.debug("Failed to read URI to build transport connectors map", e); 466 } 467 return answer; 468 } 469 470 @Override 471 public String getTransportConnectorByType(String type) { 472 return brokerService.getTransportConnectorURIsAsMap().get(type); 473 } 474 475 @Override 476 public String getVMURL() { 477 URI answer = brokerService.getVmConnectorURI(); 478 return answer != null ? answer.toString() : ""; 479 } 480 481 @Override 482 public String getDataDirectory() { 483 File file = brokerService.getDataDirectoryFile(); 484 try { 485 return file != null ? file.getCanonicalPath() : ""; 486 } catch (IOException e) { 487 return ""; 488 } 489 } 490 491 @Override 492 public ObjectName getJMSJobScheduler() { 493 return this.jmsJobScheduler; 494 } 495 496 public void setJMSJobScheduler(ObjectName name) { 497 this.jmsJobScheduler = name; 498 } 499 500 @Override 501 public boolean isSlave() { 502 return brokerService.isSlave(); 503 } 504 505 private ManagedRegionBroker safeGetBroker() { 506 if (broker == null) { 507 throw new IllegalStateException("Broker is not yet started."); 508 } 509 510 return broker; 511 } 512 513 private ConnectionContext getConnectionContext() { 514 ConnectionContext context; 515 if (broker == null) { 516 context = new ConnectionContext(); 517 } else { 518 ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker()); 519 // Make a local copy of the sharedContext. We do this because we do 520 // not want to set a clientId on the 521 // global sharedContext. Taking a copy of the sharedContext is a 522 // good way to make sure that we are not 523 // messing up the shared context 524 context = sharedContext.copy(); 525 } 526 527 return context; 528 } 529}