001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.InstanceNotFoundException; 054import javax.management.MalformedObjectNameException; 055import javax.management.ObjectName; 056 057import org.apache.activemq.ActiveMQConnectionMetaData; 058import org.apache.activemq.ConfigurationException; 059import org.apache.activemq.Service; 060import org.apache.activemq.advisory.AdvisoryBroker; 061import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 062import org.apache.activemq.broker.jmx.AnnotatedMBean; 063import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 064import org.apache.activemq.broker.jmx.BrokerView; 065import org.apache.activemq.broker.jmx.ConnectorView; 066import org.apache.activemq.broker.jmx.ConnectorViewMBean; 067import org.apache.activemq.broker.jmx.HealthView; 068import org.apache.activemq.broker.jmx.HealthViewMBean; 069import org.apache.activemq.broker.jmx.JmsConnectorView; 070import org.apache.activemq.broker.jmx.JobSchedulerView; 071import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 072import org.apache.activemq.broker.jmx.Log4JConfigView; 073import org.apache.activemq.broker.jmx.ManagedRegionBroker; 074import org.apache.activemq.broker.jmx.ManagementContext; 075import org.apache.activemq.broker.jmx.NetworkConnectorView; 076import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 077import org.apache.activemq.broker.jmx.ProxyConnectorView; 078import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 079import org.apache.activemq.broker.region.Destination; 080import org.apache.activemq.broker.region.DestinationFactory; 081import org.apache.activemq.broker.region.DestinationFactoryImpl; 082import org.apache.activemq.broker.region.DestinationInterceptor; 083import org.apache.activemq.broker.region.RegionBroker; 084import org.apache.activemq.broker.region.policy.PolicyMap; 085import org.apache.activemq.broker.region.virtual.MirroredQueue; 086import org.apache.activemq.broker.region.virtual.VirtualDestination; 087import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 088import org.apache.activemq.broker.region.virtual.VirtualTopic; 089import org.apache.activemq.broker.scheduler.JobSchedulerStore; 090import org.apache.activemq.broker.scheduler.SchedulerBroker; 091import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 092import org.apache.activemq.command.ActiveMQDestination; 093import org.apache.activemq.command.ActiveMQQueue; 094import org.apache.activemq.command.BrokerId; 095import org.apache.activemq.command.ProducerInfo; 096import org.apache.activemq.filter.DestinationFilter; 097import org.apache.activemq.network.ConnectionFilter; 098import org.apache.activemq.network.DiscoveryNetworkConnector; 099import org.apache.activemq.network.NetworkConnector; 100import org.apache.activemq.network.jms.JmsConnector; 101import org.apache.activemq.openwire.OpenWireFormat; 102import org.apache.activemq.proxy.ProxyConnector; 103import org.apache.activemq.security.MessageAuthorizationPolicy; 104import org.apache.activemq.selector.SelectorParser; 105import org.apache.activemq.store.JournaledStore; 106import org.apache.activemq.store.PListStore; 107import org.apache.activemq.store.PersistenceAdapter; 108import org.apache.activemq.store.PersistenceAdapterFactory; 109import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 110import org.apache.activemq.thread.Scheduler; 111import org.apache.activemq.thread.TaskRunnerFactory; 112import org.apache.activemq.transport.TransportFactorySupport; 113import org.apache.activemq.transport.TransportServer; 114import org.apache.activemq.transport.vm.VMTransportFactory; 115import org.apache.activemq.usage.StoreUsage; 116import org.apache.activemq.usage.SystemUsage; 117import org.apache.activemq.usage.Usage; 118import org.apache.activemq.util.BrokerSupport; 119import org.apache.activemq.util.DefaultIOExceptionHandler; 120import org.apache.activemq.util.IOExceptionHandler; 121import org.apache.activemq.util.IOExceptionSupport; 122import org.apache.activemq.util.IOHelper; 123import org.apache.activemq.util.InetAddressUtil; 124import org.apache.activemq.util.ServiceStopper; 125import org.apache.activemq.util.StoreUtil; 126import org.apache.activemq.util.ThreadPoolUtils; 127import org.apache.activemq.util.TimeUtils; 128import org.apache.activemq.util.URISupport; 129import org.slf4j.Logger; 130import org.slf4j.LoggerFactory; 131import org.slf4j.MDC; 132 133/** 134 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 135 * number of transport connectors, network connectors and a bunch of properties 136 * which can be used to configure the broker as its lazily created. 137 * 138 * @org.apache.xbean.XBean 139 */ 140public class BrokerService implements Service { 141 public static final String DEFAULT_PORT = "61616"; 142 public static final String LOCAL_HOST_NAME; 143 public static final String BROKER_VERSION; 144 public static final String DEFAULT_BROKER_NAME = "localhost"; 145 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 146 public static final long DEFAULT_START_TIMEOUT = 600000L; 147 148 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 149 150 @SuppressWarnings("unused") 151 private static final long serialVersionUID = 7353129142305630237L; 152 153 private boolean useJmx = true; 154 private boolean enableStatistics = true; 155 private boolean persistent = true; 156 private boolean populateJMSXUserID; 157 private boolean useAuthenticatedPrincipalForJMSXUserID; 158 private boolean populateUserNameInMBeans; 159 private long mbeanInvocationTimeout = 0; 160 161 private boolean useShutdownHook = true; 162 private boolean useLoggingForShutdownErrors; 163 private boolean shutdownOnMasterFailure; 164 private boolean shutdownOnSlaveFailure; 165 private boolean waitForSlave; 166 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 167 private boolean passiveSlave; 168 private String brokerName = DEFAULT_BROKER_NAME; 169 private File dataDirectoryFile; 170 private File tmpDataDirectory; 171 private Broker broker; 172 private BrokerView adminView; 173 private ManagementContext managementContext; 174 private ObjectName brokerObjectName; 175 private TaskRunnerFactory taskRunnerFactory; 176 private TaskRunnerFactory persistenceTaskRunnerFactory; 177 private SystemUsage systemUsage; 178 private SystemUsage producerSystemUsage; 179 private SystemUsage consumerSystemUsaage; 180 private PersistenceAdapter persistenceAdapter; 181 private PersistenceAdapterFactory persistenceFactory; 182 protected DestinationFactory destinationFactory; 183 private MessageAuthorizationPolicy messageAuthorizationPolicy; 184 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 185 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 186 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 187 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 188 private final List<Service> services = new ArrayList<Service>(); 189 private transient Thread shutdownHook; 190 private String[] transportConnectorURIs; 191 private String[] networkConnectorURIs; 192 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 193 // to other jms messaging systems 194 private boolean deleteAllMessagesOnStartup; 195 private boolean advisorySupport = true; 196 private URI vmConnectorURI; 197 private String defaultSocketURIString; 198 private PolicyMap destinationPolicy; 199 private final AtomicBoolean started = new AtomicBoolean(false); 200 private final AtomicBoolean stopped = new AtomicBoolean(false); 201 private final AtomicBoolean stopping = new AtomicBoolean(false); 202 private BrokerPlugin[] plugins; 203 private boolean keepDurableSubsActive = true; 204 private boolean useVirtualTopics = true; 205 private boolean useMirroredQueues = false; 206 private boolean useTempMirroredQueues = true; 207 /** 208 * Whether or not virtual destination subscriptions should cause network demand 209 */ 210 private boolean useVirtualDestSubs = false; 211 /** 212 * Whether or not the creation of destinations that match virtual destinations 213 * should cause network demand 214 */ 215 private boolean useVirtualDestSubsOnCreation = false; 216 private BrokerId brokerId; 217 private volatile DestinationInterceptor[] destinationInterceptors; 218 private ActiveMQDestination[] destinations; 219 private PListStore tempDataStore; 220 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 221 private boolean useLocalHostBrokerName; 222 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 223 private final CountDownLatch startedLatch = new CountDownLatch(1); 224 private Broker regionBroker; 225 private int producerSystemUsagePortion = 60; 226 private int consumerSystemUsagePortion = 40; 227 private boolean splitSystemUsageForProducersConsumers; 228 private boolean monitorConnectionSplits = false; 229 private int taskRunnerPriority = Thread.NORM_PRIORITY; 230 private boolean dedicatedTaskRunner; 231 private boolean cacheTempDestinations = false;// useful for failover 232 private int timeBeforePurgeTempDestinations = 5000; 233 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 234 private boolean systemExitOnShutdown; 235 private int systemExitOnShutdownExitCode; 236 private SslContext sslContext; 237 private boolean forceStart = false; 238 private IOExceptionHandler ioExceptionHandler; 239 private boolean schedulerSupport = false; 240 private File schedulerDirectoryFile; 241 private Scheduler scheduler; 242 private ThreadPoolExecutor executor; 243 private int schedulePeriodForDestinationPurge= 0; 244 private int maxPurgedDestinationsPerSweep = 0; 245 private int schedulePeriodForDiskUsageCheck = 0; 246 private int diskUsageCheckRegrowThreshold = -1; 247 private boolean adjustUsageLimits = true; 248 private BrokerContext brokerContext; 249 private boolean networkConnectorStartAsync = false; 250 private boolean allowTempAutoCreationOnSend; 251 private JobSchedulerStore jobSchedulerStore; 252 private final AtomicLong totalConnections = new AtomicLong(); 253 private final AtomicInteger currentConnections = new AtomicInteger(); 254 255 private long offlineDurableSubscriberTimeout = -1; 256 private long offlineDurableSubscriberTaskSchedule = 300000; 257 private DestinationFilter virtualConsumerDestinationFilter; 258 259 private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false); 260 private Throwable startException = null; 261 private boolean startAsync = false; 262 private Date startDate; 263 private boolean slave = true; 264 265 private boolean restartAllowed = true; 266 private boolean restartRequested = false; 267 private boolean rejectDurableConsumers = false; 268 private boolean rollbackOnlyOnAsyncException = true; 269 270 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 271 272 static { 273 274 try { 275 ClassLoader loader = BrokerService.class.getClassLoader(); 276 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 277 Provider bouncycastle = (Provider) clazz.newInstance(); 278 Security.insertProviderAt(bouncycastle, 279 Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2)); 280 LOG.info("Loaded the Bouncy Castle security provider."); 281 } catch(Throwable e) { 282 // No BouncyCastle found so we use the default Java Security Provider 283 } 284 285 String localHostName = "localhost"; 286 try { 287 localHostName = InetAddressUtil.getLocalHostName(); 288 } catch (UnknownHostException e) { 289 LOG.error("Failed to resolve localhost"); 290 } 291 LOCAL_HOST_NAME = localHostName; 292 293 String version = null; 294 try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 295 if (in != null) { 296 try(InputStreamReader isr = new InputStreamReader(in); 297 BufferedReader reader = new BufferedReader(isr)) { 298 version = reader.readLine(); 299 } 300 } 301 } catch (IOException ie) { 302 LOG.warn("Error reading broker version ", ie); 303 } 304 BROKER_VERSION = version; 305 } 306 307 @Override 308 public String toString() { 309 return "BrokerService[" + getBrokerName() + "]"; 310 } 311 312 private String getBrokerVersion() { 313 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 314 if (version == null) { 315 version = BROKER_VERSION; 316 } 317 318 return version; 319 } 320 321 /** 322 * Adds a new transport connector for the given bind address 323 * 324 * @return the newly created and added transport connector 325 * @throws Exception 326 */ 327 public TransportConnector addConnector(String bindAddress) throws Exception { 328 return addConnector(new URI(bindAddress)); 329 } 330 331 /** 332 * Adds a new transport connector for the given bind address 333 * 334 * @return the newly created and added transport connector 335 * @throws Exception 336 */ 337 public TransportConnector addConnector(URI bindAddress) throws Exception { 338 return addConnector(createTransportConnector(bindAddress)); 339 } 340 341 /** 342 * Adds a new transport connector for the given TransportServer transport 343 * 344 * @return the newly created and added transport connector 345 * @throws Exception 346 */ 347 public TransportConnector addConnector(TransportServer transport) throws Exception { 348 return addConnector(new TransportConnector(transport)); 349 } 350 351 /** 352 * Adds a new transport connector 353 * 354 * @return the transport connector 355 * @throws Exception 356 */ 357 public TransportConnector addConnector(TransportConnector connector) throws Exception { 358 transportConnectors.add(connector); 359 return connector; 360 } 361 362 /** 363 * Stops and removes a transport connector from the broker. 364 * 365 * @param connector 366 * @return true if the connector has been previously added to the broker 367 * @throws Exception 368 */ 369 public boolean removeConnector(TransportConnector connector) throws Exception { 370 boolean rc = transportConnectors.remove(connector); 371 if (rc) { 372 unregisterConnectorMBean(connector); 373 } 374 return rc; 375 } 376 377 /** 378 * Adds a new network connector using the given discovery address 379 * 380 * @return the newly created and added network connector 381 * @throws Exception 382 */ 383 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 384 return addNetworkConnector(new URI(discoveryAddress)); 385 } 386 387 /** 388 * Adds a new proxy connector using the given bind address 389 * 390 * @return the newly created and added network connector 391 * @throws Exception 392 */ 393 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 394 return addProxyConnector(new URI(bindAddress)); 395 } 396 397 /** 398 * Adds a new network connector using the given discovery address 399 * 400 * @return the newly created and added network connector 401 * @throws Exception 402 */ 403 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 404 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 405 return addNetworkConnector(connector); 406 } 407 408 /** 409 * Adds a new proxy connector using the given bind address 410 * 411 * @return the newly created and added network connector 412 * @throws Exception 413 */ 414 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 415 ProxyConnector connector = new ProxyConnector(); 416 connector.setBind(bindAddress); 417 connector.setRemote(new URI("fanout:multicast://default")); 418 return addProxyConnector(connector); 419 } 420 421 /** 422 * Adds a new network connector to connect this broker to a federated 423 * network 424 */ 425 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 426 connector.setBrokerService(this); 427 connector.setLocalUri(getVmConnectorURI()); 428 // Set a connection filter so that the connector does not establish loop 429 // back connections. 430 connector.setConnectionFilter(new ConnectionFilter() { 431 @Override 432 public boolean connectTo(URI location) { 433 List<TransportConnector> transportConnectors = getTransportConnectors(); 434 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 435 try { 436 TransportConnector tc = iter.next(); 437 if (location.equals(tc.getConnectUri())) { 438 return false; 439 } 440 } catch (Throwable e) { 441 } 442 } 443 return true; 444 } 445 }); 446 networkConnectors.add(connector); 447 return connector; 448 } 449 450 /** 451 * Removes the given network connector without stopping it. The caller 452 * should call {@link NetworkConnector#stop()} to close the connector 453 */ 454 public boolean removeNetworkConnector(NetworkConnector connector) { 455 boolean answer = networkConnectors.remove(connector); 456 if (answer) { 457 unregisterNetworkConnectorMBean(connector); 458 } 459 return answer; 460 } 461 462 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 463 URI uri = getVmConnectorURI(); 464 connector.setLocalUri(uri); 465 proxyConnectors.add(connector); 466 if (isUseJmx()) { 467 registerProxyConnectorMBean(connector); 468 } 469 return connector; 470 } 471 472 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 473 connector.setBrokerService(this); 474 jmsConnectors.add(connector); 475 if (isUseJmx()) { 476 registerJmsConnectorMBean(connector); 477 } 478 return connector; 479 } 480 481 public JmsConnector removeJmsConnector(JmsConnector connector) { 482 if (jmsConnectors.remove(connector)) { 483 return connector; 484 } 485 return null; 486 } 487 488 public void masterFailed() { 489 if (shutdownOnMasterFailure) { 490 LOG.error("The Master has failed ... shutting down"); 491 try { 492 stop(); 493 } catch (Exception e) { 494 LOG.error("Failed to stop for master failure", e); 495 } 496 } else { 497 LOG.warn("Master Failed - starting all connectors"); 498 try { 499 startAllConnectors(); 500 broker.nowMasterBroker(); 501 } catch (Exception e) { 502 LOG.error("Failed to startAllConnectors", e); 503 } 504 } 505 } 506 507 public String getUptime() { 508 long delta = getUptimeMillis(); 509 510 if (delta == 0) { 511 return "not started"; 512 } 513 514 return TimeUtils.printDuration(delta); 515 } 516 517 public long getUptimeMillis() { 518 if (startDate == null) { 519 return 0; 520 } 521 522 return new Date().getTime() - startDate.getTime(); 523 } 524 525 public boolean isStarted() { 526 return started.get() && startedLatch.getCount() == 0; 527 } 528 529 /** 530 * Forces a start of the broker. 531 * By default a BrokerService instance that was 532 * previously stopped using BrokerService.stop() cannot be restarted 533 * using BrokerService.start(). 534 * This method enforces a restart. 535 * It is not recommended to force a restart of the broker and will not work 536 * for most but some very trivial broker configurations. 537 * For restarting a broker instance we recommend to first call stop() on 538 * the old instance and then recreate a new BrokerService instance. 539 * 540 * @param force - if true enforces a restart. 541 * @throws Exception 542 */ 543 public void start(boolean force) throws Exception { 544 forceStart = force; 545 stopped.set(false); 546 started.set(false); 547 start(); 548 } 549 550 // Service interface 551 // ------------------------------------------------------------------------- 552 553 protected boolean shouldAutostart() { 554 return true; 555 } 556 557 /** 558 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 559 * 560 * delegates to autoStart, done to prevent backwards incompatible signature change 561 */ 562 @PostConstruct 563 private void postConstruct() { 564 try { 565 autoStart(); 566 } catch (Exception ex) { 567 throw new RuntimeException(ex); 568 } 569 } 570 571 /** 572 * 573 * @throws Exception 574 * @org. apache.xbean.InitMethod 575 */ 576 public void autoStart() throws Exception { 577 if(shouldAutostart()) { 578 start(); 579 } 580 } 581 582 @Override 583 public void start() throws Exception { 584 if (stopped.get() || !started.compareAndSet(false, true)) { 585 // lets just ignore redundant start() calls 586 // as its way too easy to not be completely sure if start() has been 587 // called or not with the gazillion of different configuration 588 // mechanisms 589 // throw new IllegalStateException("Already started."); 590 return; 591 } 592 593 setStartException(null); 594 stopping.set(false); 595 startDate = new Date(); 596 MDC.put("activemq.broker", brokerName); 597 598 try { 599 checkMemorySystemUsageLimits(); 600 if (systemExitOnShutdown && useShutdownHook) { 601 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 602 } 603 processHelperProperties(); 604 if (isUseJmx()) { 605 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 606 // we cannot cleanup clear that during shutdown of the broker. 607 MDC.remove("activemq.broker"); 608 try { 609 startManagementContext(); 610 for (NetworkConnector connector : getNetworkConnectors()) { 611 registerNetworkConnectorMBean(connector); 612 } 613 } finally { 614 MDC.put("activemq.broker", brokerName); 615 } 616 } 617 618 // in jvm master slave, lets not publish over existing broker till we get the lock 619 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 620 if (brokerRegistry.lookup(getBrokerName()) == null) { 621 brokerRegistry.bind(getBrokerName(), BrokerService.this); 622 } 623 startPersistenceAdapter(startAsync); 624 startBroker(startAsync); 625 brokerRegistry.bind(getBrokerName(), BrokerService.this); 626 } catch (Exception e) { 627 LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); 628 try { 629 if (!stopped.get()) { 630 stop(); 631 } 632 } catch (Exception ex) { 633 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 634 } 635 throw e; 636 } finally { 637 MDC.remove("activemq.broker"); 638 } 639 } 640 641 private void startPersistenceAdapter(boolean async) throws Exception { 642 if (async) { 643 new Thread("Persistence Adapter Starting Thread") { 644 @Override 645 public void run() { 646 try { 647 doStartPersistenceAdapter(); 648 } catch (Throwable e) { 649 setStartException(e); 650 } finally { 651 synchronized (persistenceAdapterStarted) { 652 persistenceAdapterStarted.set(true); 653 persistenceAdapterStarted.notifyAll(); 654 } 655 } 656 } 657 }.start(); 658 } else { 659 doStartPersistenceAdapter(); 660 } 661 } 662 663 private void doStartPersistenceAdapter() throws Exception { 664 PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); 665 if (persistenceAdapterToStart == null) { 666 checkStartException(); 667 throw new ConfigurationException("Cannot start null persistence adapter"); 668 } 669 persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); 670 persistenceAdapterToStart.setBrokerName(getBrokerName()); 671 LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); 672 if (deleteAllMessagesOnStartup) { 673 deleteAllMessages(); 674 } 675 persistenceAdapterToStart.start(); 676 677 getTempDataStore(); 678 if (tempDataStore != null) { 679 try { 680 // start after we have the store lock 681 tempDataStore.start(); 682 } catch (Exception e) { 683 RuntimeException exception = new RuntimeException( 684 "Failed to start temp data store: " + tempDataStore, e); 685 LOG.error(exception.getLocalizedMessage(), e); 686 throw exception; 687 } 688 } 689 690 getJobSchedulerStore(); 691 if (jobSchedulerStore != null) { 692 try { 693 jobSchedulerStore.start(); 694 } catch (Exception e) { 695 RuntimeException exception = new RuntimeException( 696 "Failed to start job scheduler store: " + jobSchedulerStore, e); 697 LOG.error(exception.getLocalizedMessage(), e); 698 throw exception; 699 } 700 } 701 } 702 703 private void startBroker(boolean async) throws Exception { 704 if (async) { 705 new Thread("Broker Starting Thread") { 706 @Override 707 public void run() { 708 try { 709 synchronized (persistenceAdapterStarted) { 710 if (!persistenceAdapterStarted.get()) { 711 persistenceAdapterStarted.wait(); 712 } 713 } 714 doStartBroker(); 715 } catch (Throwable t) { 716 setStartException(t); 717 } 718 } 719 }.start(); 720 } else { 721 doStartBroker(); 722 } 723 } 724 725 private void doStartBroker() throws Exception { 726 checkStartException(); 727 startDestinations(); 728 addShutdownHook(); 729 730 broker = getBroker(); 731 brokerId = broker.getBrokerId(); 732 733 // need to log this after creating the broker so we have its id and name 734 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 735 broker.start(); 736 737 if (isUseJmx()) { 738 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 739 // try to restart management context 740 // typical for slaves that use the same ports as master 741 managementContext.stop(); 742 startManagementContext(); 743 } 744 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 745 managedBroker.setContextBroker(broker); 746 adminView.setBroker(managedBroker); 747 } 748 749 if (ioExceptionHandler == null) { 750 setIoExceptionHandler(new DefaultIOExceptionHandler()); 751 } 752 753 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 754 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 755 Log4JConfigView log4jConfigView = new Log4JConfigView(); 756 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 757 } 758 759 startAllConnectors(); 760 761 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 762 LOG.info("For help or more information please see: http://activemq.apache.org"); 763 764 getBroker().brokerServiceStarted(); 765 checkStoreSystemUsageLimits(); 766 startedLatch.countDown(); 767 getBroker().nowMasterBroker(); 768 } 769 770 /** 771 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 772 * 773 * delegates to stop, done to prevent backwards incompatible signature change 774 */ 775 @PreDestroy 776 private void preDestroy () { 777 try { 778 stop(); 779 } catch (Exception ex) { 780 throw new RuntimeException(); 781 } 782 } 783 784 /** 785 * 786 * @throws Exception 787 * @org.apache .xbean.DestroyMethod 788 */ 789 @Override 790 public void stop() throws Exception { 791 if (!stopping.compareAndSet(false, true)) { 792 LOG.trace("Broker already stopping/stopped"); 793 return; 794 } 795 796 setStartException(new BrokerStoppedException("Stop invoked")); 797 MDC.put("activemq.broker", brokerName); 798 799 if (systemExitOnShutdown) { 800 new Thread() { 801 @Override 802 public void run() { 803 System.exit(systemExitOnShutdownExitCode); 804 } 805 }.start(); 806 } 807 808 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 809 810 removeShutdownHook(); 811 if (this.scheduler != null) { 812 this.scheduler.stop(); 813 this.scheduler = null; 814 } 815 ServiceStopper stopper = new ServiceStopper(); 816 if (services != null) { 817 for (Service service : services) { 818 stopper.stop(service); 819 } 820 } 821 stopAllConnectors(stopper); 822 this.slave = true; 823 // remove any VMTransports connected 824 // this has to be done after services are stopped, 825 // to avoid timing issue with discovery (spinning up a new instance) 826 BrokerRegistry.getInstance().unbind(getBrokerName()); 827 VMTransportFactory.stopped(getBrokerName()); 828 if (broker != null) { 829 stopper.stop(broker); 830 broker = null; 831 } 832 833 if (jobSchedulerStore != null) { 834 jobSchedulerStore.stop(); 835 jobSchedulerStore = null; 836 } 837 if (tempDataStore != null) { 838 tempDataStore.stop(); 839 tempDataStore = null; 840 } 841 try { 842 stopper.stop(getPersistenceAdapter()); 843 persistenceAdapter = null; 844 if (isUseJmx()) { 845 stopper.stop(managementContext); 846 managementContext = null; 847 } 848 // Clear SelectorParser cache to free memory 849 SelectorParser.clearCache(); 850 } finally { 851 started.set(false); 852 stopped.set(true); 853 stoppedLatch.countDown(); 854 } 855 856 if (this.taskRunnerFactory != null) { 857 this.taskRunnerFactory.shutdown(); 858 this.taskRunnerFactory = null; 859 } 860 if (this.executor != null) { 861 ThreadPoolUtils.shutdownNow(executor); 862 this.executor = null; 863 } 864 865 this.destinationInterceptors = null; 866 this.destinationFactory = null; 867 868 if (startDate != null) { 869 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 870 } 871 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 872 873 synchronized (shutdownHooks) { 874 for (Runnable hook : shutdownHooks) { 875 try { 876 hook.run(); 877 } catch (Throwable e) { 878 stopper.onException(hook, e); 879 } 880 } 881 } 882 883 MDC.remove("activemq.broker"); 884 885 // and clear start date 886 startDate = null; 887 888 stopper.throwFirstException(); 889 } 890 891 public boolean checkQueueSize(String queueName) { 892 long count = 0; 893 long queueSize = 0; 894 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 895 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 896 if (entry.getKey().isQueue()) { 897 if (entry.getValue().getName().matches(queueName)) { 898 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 899 count += queueSize; 900 if (queueSize > 0) { 901 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 902 } 903 } 904 } 905 } 906 return count == 0; 907 } 908 909 /** 910 * This method (both connectorName and queueName are using regex to match) 911 * 1. stop the connector (supposed the user input the connector which the 912 * clients connect to) 2. to check whether there is any pending message on 913 * the queues defined by queueName 3. supposedly, after stop the connector, 914 * client should failover to other broker and pending messages should be 915 * forwarded. if no pending messages, the method finally call stop to stop 916 * the broker. 917 * 918 * @param connectorName 919 * @param queueName 920 * @param timeout 921 * @param pollInterval 922 * @throws Exception 923 */ 924 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 925 if (isUseJmx()) { 926 if (connectorName == null || queueName == null || timeout <= 0) { 927 throw new Exception( 928 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 929 } 930 if (pollInterval <= 0) { 931 pollInterval = 30; 932 } 933 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 934 connectorName, queueName, timeout, pollInterval 935 }); 936 TransportConnector connector; 937 for (int i = 0; i < transportConnectors.size(); i++) { 938 connector = transportConnectors.get(i); 939 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 940 connector.stop(); 941 } 942 } 943 long start = System.currentTimeMillis(); 944 while (System.currentTimeMillis() - start < timeout * 1000) { 945 // check quesize until it gets zero 946 if (checkQueueSize(queueName)) { 947 stop(); 948 break; 949 } else { 950 Thread.sleep(pollInterval * 1000); 951 } 952 } 953 if (stopped.get()) { 954 LOG.info("Successfully stop the broker."); 955 } else { 956 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 957 } 958 } 959 } 960 961 /** 962 * A helper method to block the caller thread until the broker has been 963 * stopped 964 */ 965 public void waitUntilStopped() { 966 while (isStarted() && !stopped.get()) { 967 try { 968 stoppedLatch.await(); 969 } catch (InterruptedException e) { 970 // ignore 971 } 972 } 973 } 974 975 public boolean isStopped() { 976 return stopped.get(); 977 } 978 979 /** 980 * A helper method to block the caller thread until the broker has fully started 981 * @return boolean true if wait succeeded false if broker was not started or was stopped 982 */ 983 public boolean waitUntilStarted() { 984 return waitUntilStarted(DEFAULT_START_TIMEOUT); 985 } 986 987 /** 988 * A helper method to block the caller thread until the broker has fully started 989 * 990 * @param timeout 991 * the amount of time to wait before giving up and returning false. 992 * 993 * @return boolean true if wait succeeded false if broker was not started or was stopped 994 */ 995 public boolean waitUntilStarted(long timeout) { 996 boolean waitSucceeded = isStarted(); 997 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 998 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 999 try { 1000 if (getStartException() != null) { 1001 return waitSucceeded; 1002 } 1003 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 1004 } catch (InterruptedException ignore) { 1005 } 1006 } 1007 return waitSucceeded; 1008 } 1009 1010 // Properties 1011 // ------------------------------------------------------------------------- 1012 /** 1013 * Returns the message broker 1014 */ 1015 public Broker getBroker() throws Exception { 1016 if (broker == null) { 1017 checkStartException(); 1018 broker = createBroker(); 1019 } 1020 return broker; 1021 } 1022 1023 /** 1024 * Returns the administration view of the broker; used to create and destroy 1025 * resources such as queues and topics. Note this method returns null if JMX 1026 * is disabled. 1027 */ 1028 public BrokerView getAdminView() throws Exception { 1029 if (adminView == null) { 1030 // force lazy creation 1031 getBroker(); 1032 } 1033 return adminView; 1034 } 1035 1036 public void setAdminView(BrokerView adminView) { 1037 this.adminView = adminView; 1038 } 1039 1040 public String getBrokerName() { 1041 return brokerName; 1042 } 1043 1044 /** 1045 * Sets the name of this broker; which must be unique in the network 1046 * 1047 * @param brokerName 1048 */ 1049 private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; 1050 public void setBrokerName(String brokerName) { 1051 if (brokerName == null) { 1052 throw new NullPointerException("The broker name cannot be null"); 1053 } 1054 String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); 1055 if (!str.equals(brokerName)) { 1056 LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); 1057 } 1058 this.brokerName = str.trim(); 1059 } 1060 1061 public PersistenceAdapterFactory getPersistenceFactory() { 1062 return persistenceFactory; 1063 } 1064 1065 public File getDataDirectoryFile() { 1066 if (dataDirectoryFile == null) { 1067 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1068 } 1069 return dataDirectoryFile; 1070 } 1071 1072 public File getBrokerDataDirectory() { 1073 String brokerDir = getBrokerName(); 1074 return new File(getDataDirectoryFile(), brokerDir); 1075 } 1076 1077 /** 1078 * Sets the directory in which the data files will be stored by default for 1079 * the JDBC and Journal persistence adaptors. 1080 * 1081 * @param dataDirectory 1082 * the directory to store data files 1083 */ 1084 public void setDataDirectory(String dataDirectory) { 1085 setDataDirectoryFile(new File(dataDirectory)); 1086 } 1087 1088 /** 1089 * Sets the directory in which the data files will be stored by default for 1090 * the JDBC and Journal persistence adaptors. 1091 * 1092 * @param dataDirectoryFile 1093 * the directory to store data files 1094 */ 1095 public void setDataDirectoryFile(File dataDirectoryFile) { 1096 this.dataDirectoryFile = dataDirectoryFile; 1097 } 1098 1099 /** 1100 * @return the tmpDataDirectory 1101 */ 1102 public File getTmpDataDirectory() { 1103 if (tmpDataDirectory == null) { 1104 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1105 } 1106 return tmpDataDirectory; 1107 } 1108 1109 /** 1110 * @param tmpDataDirectory 1111 * the tmpDataDirectory to set 1112 */ 1113 public void setTmpDataDirectory(File tmpDataDirectory) { 1114 this.tmpDataDirectory = tmpDataDirectory; 1115 } 1116 1117 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1118 this.persistenceFactory = persistenceFactory; 1119 } 1120 1121 public void setDestinationFactory(DestinationFactory destinationFactory) { 1122 this.destinationFactory = destinationFactory; 1123 } 1124 1125 public boolean isPersistent() { 1126 return persistent; 1127 } 1128 1129 /** 1130 * Sets whether or not persistence is enabled or disabled. 1131 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1132 */ 1133 public void setPersistent(boolean persistent) { 1134 this.persistent = persistent; 1135 } 1136 1137 public boolean isPopulateJMSXUserID() { 1138 return populateJMSXUserID; 1139 } 1140 1141 /** 1142 * Sets whether or not the broker should populate the JMSXUserID header. 1143 */ 1144 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1145 this.populateJMSXUserID = populateJMSXUserID; 1146 } 1147 1148 public SystemUsage getSystemUsage() { 1149 try { 1150 if (systemUsage == null) { 1151 1152 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1153 systemUsage.setExecutor(getExecutor()); 1154 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1155 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1156 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1157 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1158 addService(this.systemUsage); 1159 } 1160 return systemUsage; 1161 } catch (IOException e) { 1162 LOG.error("Cannot create SystemUsage", e); 1163 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1164 } 1165 } 1166 1167 public void setSystemUsage(SystemUsage memoryManager) { 1168 if (this.systemUsage != null) { 1169 removeService(this.systemUsage); 1170 } 1171 this.systemUsage = memoryManager; 1172 if (this.systemUsage.getExecutor()==null) { 1173 this.systemUsage.setExecutor(getExecutor()); 1174 } 1175 addService(this.systemUsage); 1176 } 1177 1178 /** 1179 * @return the consumerUsageManager 1180 * @throws IOException 1181 */ 1182 public SystemUsage getConsumerSystemUsage() throws IOException { 1183 if (this.consumerSystemUsaage == null) { 1184 if (splitSystemUsageForProducersConsumers) { 1185 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1186 float portion = consumerSystemUsagePortion / 100f; 1187 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1188 addService(this.consumerSystemUsaage); 1189 } else { 1190 consumerSystemUsaage = getSystemUsage(); 1191 } 1192 } 1193 return this.consumerSystemUsaage; 1194 } 1195 1196 /** 1197 * @param consumerSystemUsaage 1198 * the storeSystemUsage to set 1199 */ 1200 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1201 if (this.consumerSystemUsaage != null) { 1202 removeService(this.consumerSystemUsaage); 1203 } 1204 this.consumerSystemUsaage = consumerSystemUsaage; 1205 addService(this.consumerSystemUsaage); 1206 } 1207 1208 /** 1209 * @return the producerUsageManager 1210 * @throws IOException 1211 */ 1212 public SystemUsage getProducerSystemUsage() throws IOException { 1213 if (producerSystemUsage == null) { 1214 if (splitSystemUsageForProducersConsumers) { 1215 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1216 float portion = producerSystemUsagePortion / 100f; 1217 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1218 addService(producerSystemUsage); 1219 } else { 1220 producerSystemUsage = getSystemUsage(); 1221 } 1222 } 1223 return producerSystemUsage; 1224 } 1225 1226 /** 1227 * @param producerUsageManager 1228 * the producerUsageManager to set 1229 */ 1230 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1231 if (this.producerSystemUsage != null) { 1232 removeService(this.producerSystemUsage); 1233 } 1234 this.producerSystemUsage = producerUsageManager; 1235 addService(this.producerSystemUsage); 1236 } 1237 1238 public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { 1239 if (persistenceAdapter == null && !hasStartException()) { 1240 persistenceAdapter = createPersistenceAdapter(); 1241 configureService(persistenceAdapter); 1242 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1243 } 1244 return persistenceAdapter; 1245 } 1246 1247 /** 1248 * Sets the persistence adaptor implementation to use for this broker 1249 * 1250 * @throws IOException 1251 */ 1252 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1253 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1254 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1255 return; 1256 } 1257 this.persistenceAdapter = persistenceAdapter; 1258 configureService(this.persistenceAdapter); 1259 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1260 } 1261 1262 public TaskRunnerFactory getTaskRunnerFactory() { 1263 if (this.taskRunnerFactory == null) { 1264 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1265 isDedicatedTaskRunner()); 1266 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1267 } 1268 return this.taskRunnerFactory; 1269 } 1270 1271 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1272 this.taskRunnerFactory = taskRunnerFactory; 1273 } 1274 1275 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1276 if (taskRunnerFactory == null) { 1277 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1278 true, 1000, isDedicatedTaskRunner()); 1279 } 1280 return persistenceTaskRunnerFactory; 1281 } 1282 1283 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1284 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1285 } 1286 1287 public boolean isUseJmx() { 1288 return useJmx; 1289 } 1290 1291 public boolean isEnableStatistics() { 1292 return enableStatistics; 1293 } 1294 1295 /** 1296 * Sets whether or not the Broker's services enable statistics or not. 1297 */ 1298 public void setEnableStatistics(boolean enableStatistics) { 1299 this.enableStatistics = enableStatistics; 1300 } 1301 1302 /** 1303 * Sets whether or not the Broker's services should be exposed into JMX or 1304 * not. 1305 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1306 */ 1307 public void setUseJmx(boolean useJmx) { 1308 this.useJmx = useJmx; 1309 } 1310 1311 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1312 if (brokerObjectName == null) { 1313 brokerObjectName = createBrokerObjectName(); 1314 } 1315 return brokerObjectName; 1316 } 1317 1318 /** 1319 * Sets the JMX ObjectName for this broker 1320 */ 1321 public void setBrokerObjectName(ObjectName brokerObjectName) { 1322 this.brokerObjectName = brokerObjectName; 1323 } 1324 1325 public ManagementContext getManagementContext() { 1326 if (managementContext == null) { 1327 checkStartException(); 1328 managementContext = new ManagementContext(); 1329 } 1330 return managementContext; 1331 } 1332 1333 synchronized private void checkStartException() { 1334 if (startException != null) { 1335 throw new BrokerStoppedException(startException); 1336 } 1337 } 1338 1339 synchronized private boolean hasStartException() { 1340 return startException != null; 1341 } 1342 1343 synchronized private void setStartException(Throwable t) { 1344 startException = t; 1345 } 1346 1347 public void setManagementContext(ManagementContext managementContext) { 1348 this.managementContext = managementContext; 1349 } 1350 1351 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1352 for (NetworkConnector connector : networkConnectors) { 1353 if (connector.getName().equals(connectorName)) { 1354 return connector; 1355 } 1356 } 1357 return null; 1358 } 1359 1360 public String[] getNetworkConnectorURIs() { 1361 return networkConnectorURIs; 1362 } 1363 1364 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1365 this.networkConnectorURIs = networkConnectorURIs; 1366 } 1367 1368 public TransportConnector getConnectorByName(String connectorName) { 1369 for (TransportConnector connector : transportConnectors) { 1370 if (connector.getName().equals(connectorName)) { 1371 return connector; 1372 } 1373 } 1374 return null; 1375 } 1376 1377 public Map<String, String> getTransportConnectorURIsAsMap() { 1378 Map<String, String> answer = new HashMap<String, String>(); 1379 for (TransportConnector connector : transportConnectors) { 1380 try { 1381 URI uri = connector.getConnectUri(); 1382 if (uri != null) { 1383 String scheme = uri.getScheme(); 1384 if (scheme != null) { 1385 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1386 } 1387 } 1388 } catch (Exception e) { 1389 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1390 } 1391 } 1392 return answer; 1393 } 1394 1395 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1396 ProducerBrokerExchange result = null; 1397 1398 for (TransportConnector connector : transportConnectors) { 1399 for (TransportConnection tc: connector.getConnections()){ 1400 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1401 if (result !=null){ 1402 return result; 1403 } 1404 } 1405 } 1406 return result; 1407 } 1408 1409 public String[] getTransportConnectorURIs() { 1410 return transportConnectorURIs; 1411 } 1412 1413 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1414 this.transportConnectorURIs = transportConnectorURIs; 1415 } 1416 1417 /** 1418 * @return Returns the jmsBridgeConnectors. 1419 */ 1420 public JmsConnector[] getJmsBridgeConnectors() { 1421 return jmsBridgeConnectors; 1422 } 1423 1424 /** 1425 * @param jmsConnectors 1426 * The jmsBridgeConnectors to set. 1427 */ 1428 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1429 this.jmsBridgeConnectors = jmsConnectors; 1430 } 1431 1432 public Service[] getServices() { 1433 return services.toArray(new Service[0]); 1434 } 1435 1436 /** 1437 * Sets the services associated with this broker. 1438 */ 1439 public void setServices(Service[] services) { 1440 this.services.clear(); 1441 if (services != null) { 1442 for (int i = 0; i < services.length; i++) { 1443 this.services.add(services[i]); 1444 } 1445 } 1446 } 1447 1448 /** 1449 * Adds a new service so that it will be started as part of the broker 1450 * lifecycle 1451 */ 1452 public void addService(Service service) { 1453 services.add(service); 1454 } 1455 1456 public void removeService(Service service) { 1457 services.remove(service); 1458 } 1459 1460 public boolean isUseLoggingForShutdownErrors() { 1461 return useLoggingForShutdownErrors; 1462 } 1463 1464 /** 1465 * Sets whether or not we should use commons-logging when reporting errors 1466 * when shutting down the broker 1467 */ 1468 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1469 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1470 } 1471 1472 public boolean isUseShutdownHook() { 1473 return useShutdownHook; 1474 } 1475 1476 /** 1477 * Sets whether or not we should use a shutdown handler to close down the 1478 * broker cleanly if the JVM is terminated. It is recommended you leave this 1479 * enabled. 1480 */ 1481 public void setUseShutdownHook(boolean useShutdownHook) { 1482 this.useShutdownHook = useShutdownHook; 1483 } 1484 1485 public boolean isAdvisorySupport() { 1486 return advisorySupport; 1487 } 1488 1489 /** 1490 * Allows the support of advisory messages to be disabled for performance 1491 * reasons. 1492 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1493 */ 1494 public void setAdvisorySupport(boolean advisorySupport) { 1495 this.advisorySupport = advisorySupport; 1496 } 1497 1498 public List<TransportConnector> getTransportConnectors() { 1499 return new ArrayList<TransportConnector>(transportConnectors); 1500 } 1501 1502 /** 1503 * Sets the transport connectors which this broker will listen on for new 1504 * clients 1505 * 1506 * @org.apache.xbean.Property 1507 * nestedType="org.apache.activemq.broker.TransportConnector" 1508 */ 1509 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1510 for (TransportConnector connector : transportConnectors) { 1511 addConnector(connector); 1512 } 1513 } 1514 1515 public TransportConnector getTransportConnectorByName(String name){ 1516 for (TransportConnector transportConnector : transportConnectors){ 1517 if (name.equals(transportConnector.getName())){ 1518 return transportConnector; 1519 } 1520 } 1521 return null; 1522 } 1523 1524 public TransportConnector getTransportConnectorByScheme(String scheme){ 1525 for (TransportConnector transportConnector : transportConnectors){ 1526 if (scheme.equals(transportConnector.getUri().getScheme())){ 1527 return transportConnector; 1528 } 1529 } 1530 return null; 1531 } 1532 1533 public List<NetworkConnector> getNetworkConnectors() { 1534 return new ArrayList<NetworkConnector>(networkConnectors); 1535 } 1536 1537 public List<ProxyConnector> getProxyConnectors() { 1538 return new ArrayList<ProxyConnector>(proxyConnectors); 1539 } 1540 1541 /** 1542 * Sets the network connectors which this broker will use to connect to 1543 * other brokers in a federated network 1544 * 1545 * @org.apache.xbean.Property 1546 * nestedType="org.apache.activemq.network.NetworkConnector" 1547 */ 1548 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1549 for (Object connector : networkConnectors) { 1550 addNetworkConnector((NetworkConnector) connector); 1551 } 1552 } 1553 1554 /** 1555 * Sets the network connectors which this broker will use to connect to 1556 * other brokers in a federated network 1557 */ 1558 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1559 for (Object connector : proxyConnectors) { 1560 addProxyConnector((ProxyConnector) connector); 1561 } 1562 } 1563 1564 public PolicyMap getDestinationPolicy() { 1565 return destinationPolicy; 1566 } 1567 1568 /** 1569 * Sets the destination specific policies available either for exact 1570 * destinations or for wildcard areas of destinations. 1571 */ 1572 public void setDestinationPolicy(PolicyMap policyMap) { 1573 this.destinationPolicy = policyMap; 1574 } 1575 1576 public BrokerPlugin[] getPlugins() { 1577 return plugins; 1578 } 1579 1580 /** 1581 * Sets a number of broker plugins to install such as for security 1582 * authentication or authorization 1583 */ 1584 public void setPlugins(BrokerPlugin[] plugins) { 1585 this.plugins = plugins; 1586 } 1587 1588 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1589 return messageAuthorizationPolicy; 1590 } 1591 1592 /** 1593 * Sets the policy used to decide if the current connection is authorized to 1594 * consume a given message 1595 */ 1596 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1597 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1598 } 1599 1600 /** 1601 * Delete all messages from the persistent store 1602 * 1603 * @throws IOException 1604 */ 1605 public void deleteAllMessages() throws IOException { 1606 getPersistenceAdapter().deleteAllMessages(); 1607 } 1608 1609 public boolean isDeleteAllMessagesOnStartup() { 1610 return deleteAllMessagesOnStartup; 1611 } 1612 1613 /** 1614 * Sets whether or not all messages are deleted on startup - mostly only 1615 * useful for testing. 1616 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1617 */ 1618 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1619 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1620 } 1621 1622 public URI getVmConnectorURI() { 1623 if (vmConnectorURI == null) { 1624 try { 1625 vmConnectorURI = new URI("vm://" + getBrokerName()); 1626 } catch (URISyntaxException e) { 1627 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1628 } 1629 } 1630 return vmConnectorURI; 1631 } 1632 1633 public void setVmConnectorURI(URI vmConnectorURI) { 1634 this.vmConnectorURI = vmConnectorURI; 1635 } 1636 1637 public String getDefaultSocketURIString() { 1638 if (started.get()) { 1639 if (this.defaultSocketURIString == null) { 1640 for (TransportConnector tc:this.transportConnectors) { 1641 String result = null; 1642 try { 1643 result = tc.getPublishableConnectString(); 1644 } catch (Exception e) { 1645 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1646 } 1647 if (result != null) { 1648 // find first publishable uri 1649 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1650 this.defaultSocketURIString = result; 1651 break; 1652 } else { 1653 // or use the first defined 1654 if (this.defaultSocketURIString == null) { 1655 this.defaultSocketURIString = result; 1656 } 1657 } 1658 } 1659 } 1660 1661 } 1662 return this.defaultSocketURIString; 1663 } 1664 return null; 1665 } 1666 1667 /** 1668 * @return Returns the shutdownOnMasterFailure. 1669 */ 1670 public boolean isShutdownOnMasterFailure() { 1671 return shutdownOnMasterFailure; 1672 } 1673 1674 /** 1675 * @param shutdownOnMasterFailure 1676 * The shutdownOnMasterFailure to set. 1677 */ 1678 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1679 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1680 } 1681 1682 public boolean isKeepDurableSubsActive() { 1683 return keepDurableSubsActive; 1684 } 1685 1686 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1687 this.keepDurableSubsActive = keepDurableSubsActive; 1688 } 1689 1690 public boolean isUseVirtualTopics() { 1691 return useVirtualTopics; 1692 } 1693 1694 /** 1695 * Sets whether or not <a 1696 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1697 * Topics</a> should be supported by default if they have not been 1698 * explicitly configured. 1699 */ 1700 public void setUseVirtualTopics(boolean useVirtualTopics) { 1701 this.useVirtualTopics = useVirtualTopics; 1702 } 1703 1704 public DestinationInterceptor[] getDestinationInterceptors() { 1705 return destinationInterceptors; 1706 } 1707 1708 public boolean isUseMirroredQueues() { 1709 return useMirroredQueues; 1710 } 1711 1712 /** 1713 * Sets whether or not <a 1714 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1715 * Queues</a> should be supported by default if they have not been 1716 * explicitly configured. 1717 */ 1718 public void setUseMirroredQueues(boolean useMirroredQueues) { 1719 this.useMirroredQueues = useMirroredQueues; 1720 } 1721 1722 /** 1723 * Sets the destination interceptors to use 1724 */ 1725 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1726 this.destinationInterceptors = destinationInterceptors; 1727 } 1728 1729 public ActiveMQDestination[] getDestinations() { 1730 return destinations; 1731 } 1732 1733 /** 1734 * Sets the destinations which should be loaded/created on startup 1735 */ 1736 public void setDestinations(ActiveMQDestination[] destinations) { 1737 this.destinations = destinations; 1738 } 1739 1740 /** 1741 * @return the tempDataStore 1742 */ 1743 public synchronized PListStore getTempDataStore() { 1744 if (tempDataStore == null) { 1745 if (!isPersistent()) { 1746 return null; 1747 } 1748 1749 try { 1750 PersistenceAdapter pa = getPersistenceAdapter(); 1751 if( pa!=null && pa instanceof PListStore) { 1752 return (PListStore) pa; 1753 } 1754 } catch (IOException e) { 1755 throw new RuntimeException(e); 1756 } 1757 1758 try { 1759 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1760 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1761 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1762 configureService(tempDataStore); 1763 } catch (Exception e) { 1764 throw new RuntimeException(e); 1765 } 1766 } 1767 return tempDataStore; 1768 } 1769 1770 /** 1771 * @param tempDataStore 1772 * the tempDataStore to set 1773 */ 1774 public void setTempDataStore(PListStore tempDataStore) { 1775 this.tempDataStore = tempDataStore; 1776 configureService(tempDataStore); 1777 } 1778 1779 public int getPersistenceThreadPriority() { 1780 return persistenceThreadPriority; 1781 } 1782 1783 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1784 this.persistenceThreadPriority = persistenceThreadPriority; 1785 } 1786 1787 /** 1788 * @return the useLocalHostBrokerName 1789 */ 1790 public boolean isUseLocalHostBrokerName() { 1791 return this.useLocalHostBrokerName; 1792 } 1793 1794 /** 1795 * @param useLocalHostBrokerName 1796 * the useLocalHostBrokerName to set 1797 */ 1798 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1799 this.useLocalHostBrokerName = useLocalHostBrokerName; 1800 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1801 brokerName = LOCAL_HOST_NAME; 1802 } 1803 } 1804 1805 /** 1806 * Looks up and lazily creates if necessary the destination for the given 1807 * JMS name 1808 */ 1809 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1810 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1811 } 1812 1813 public void removeDestination(ActiveMQDestination destination) throws Exception { 1814 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1815 } 1816 1817 public int getProducerSystemUsagePortion() { 1818 return producerSystemUsagePortion; 1819 } 1820 1821 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1822 this.producerSystemUsagePortion = producerSystemUsagePortion; 1823 } 1824 1825 public int getConsumerSystemUsagePortion() { 1826 return consumerSystemUsagePortion; 1827 } 1828 1829 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1830 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1831 } 1832 1833 public boolean isSplitSystemUsageForProducersConsumers() { 1834 return splitSystemUsageForProducersConsumers; 1835 } 1836 1837 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1838 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1839 } 1840 1841 public boolean isMonitorConnectionSplits() { 1842 return monitorConnectionSplits; 1843 } 1844 1845 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1846 this.monitorConnectionSplits = monitorConnectionSplits; 1847 } 1848 1849 public int getTaskRunnerPriority() { 1850 return taskRunnerPriority; 1851 } 1852 1853 public void setTaskRunnerPriority(int taskRunnerPriority) { 1854 this.taskRunnerPriority = taskRunnerPriority; 1855 } 1856 1857 public boolean isDedicatedTaskRunner() { 1858 return dedicatedTaskRunner; 1859 } 1860 1861 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1862 this.dedicatedTaskRunner = dedicatedTaskRunner; 1863 } 1864 1865 public boolean isCacheTempDestinations() { 1866 return cacheTempDestinations; 1867 } 1868 1869 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1870 this.cacheTempDestinations = cacheTempDestinations; 1871 } 1872 1873 public int getTimeBeforePurgeTempDestinations() { 1874 return timeBeforePurgeTempDestinations; 1875 } 1876 1877 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1878 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1879 } 1880 1881 public boolean isUseTempMirroredQueues() { 1882 return useTempMirroredQueues; 1883 } 1884 1885 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1886 this.useTempMirroredQueues = useTempMirroredQueues; 1887 } 1888 1889 public synchronized JobSchedulerStore getJobSchedulerStore() { 1890 1891 // If support is off don't allow any scheduler even is user configured their own. 1892 if (!isSchedulerSupport()) { 1893 return null; 1894 } 1895 1896 // If the user configured their own we use it even if persistence is disabled since 1897 // we don't know anything about their implementation. 1898 if (jobSchedulerStore == null) { 1899 1900 if (!isPersistent()) { 1901 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1902 configureService(jobSchedulerStore); 1903 return this.jobSchedulerStore; 1904 } 1905 1906 try { 1907 PersistenceAdapter pa = getPersistenceAdapter(); 1908 if (pa != null) { 1909 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1910 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1911 configureService(jobSchedulerStore); 1912 return this.jobSchedulerStore; 1913 } 1914 } catch (IOException e) { 1915 throw new RuntimeException(e); 1916 } catch (UnsupportedOperationException ex) { 1917 // It's ok if the store doesn't implement a scheduler. 1918 } catch (Exception e) { 1919 throw new RuntimeException(e); 1920 } 1921 1922 try { 1923 PersistenceAdapter pa = getPersistenceAdapter(); 1924 if (pa != null && pa instanceof JobSchedulerStore) { 1925 this.jobSchedulerStore = (JobSchedulerStore) pa; 1926 configureService(jobSchedulerStore); 1927 return this.jobSchedulerStore; 1928 } 1929 } catch (IOException e) { 1930 throw new RuntimeException(e); 1931 } 1932 1933 // Load the KahaDB store as a last resort, this only works if KahaDB is 1934 // included at runtime, otherwise this will fail. User should disable 1935 // scheduler support if this fails. 1936 try { 1937 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1938 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1939 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1940 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1941 configureService(jobSchedulerStore); 1942 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1943 } catch (Exception e) { 1944 throw new RuntimeException(e); 1945 } 1946 } 1947 return jobSchedulerStore; 1948 } 1949 1950 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1951 this.jobSchedulerStore = jobSchedulerStore; 1952 configureService(jobSchedulerStore); 1953 } 1954 1955 // 1956 // Implementation methods 1957 // ------------------------------------------------------------------------- 1958 /** 1959 * Handles any lazy-creation helper properties which are added to make 1960 * things easier to configure inside environments such as Spring 1961 * 1962 * @throws Exception 1963 */ 1964 protected void processHelperProperties() throws Exception { 1965 if (transportConnectorURIs != null) { 1966 for (int i = 0; i < transportConnectorURIs.length; i++) { 1967 String uri = transportConnectorURIs[i]; 1968 addConnector(uri); 1969 } 1970 } 1971 if (networkConnectorURIs != null) { 1972 for (int i = 0; i < networkConnectorURIs.length; i++) { 1973 String uri = networkConnectorURIs[i]; 1974 addNetworkConnector(uri); 1975 } 1976 } 1977 if (jmsBridgeConnectors != null) { 1978 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1979 addJmsConnector(jmsBridgeConnectors[i]); 1980 } 1981 } 1982 } 1983 1984 /** 1985 * Check that the store usage limit is not greater than max usable 1986 * space and adjust if it is 1987 */ 1988 protected void checkStoreUsageLimits() throws Exception { 1989 final SystemUsage usage = getSystemUsage(); 1990 1991 if (getPersistenceAdapter() != null) { 1992 PersistenceAdapter adapter = getPersistenceAdapter(); 1993 checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); 1994 1995 long maxJournalFileSize = 0; 1996 long storeLimit = usage.getStoreUsage().getLimit(); 1997 1998 if (adapter instanceof JournaledStore) { 1999 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 2000 } 2001 2002 if (storeLimit > 0 && storeLimit < maxJournalFileSize) { 2003 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2004 " mb, whilst the max journal file size for the store is: " + 2005 maxJournalFileSize / (1024 * 1024) + " mb, " + 2006 "the store will not accept any data when used."); 2007 2008 } 2009 } 2010 } 2011 2012 /** 2013 * Check that temporary usage limit is not greater than max usable 2014 * space and adjust if it is 2015 */ 2016 protected void checkTmpStoreUsageLimits() throws Exception { 2017 final SystemUsage usage = getSystemUsage(); 2018 2019 File tmpDir = getTmpDataDirectory(); 2020 2021 if (tmpDir != null) { 2022 checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); 2023 2024 if (isPersistent()) { 2025 long maxJournalFileSize; 2026 2027 PListStore store = usage.getTempUsage().getStore(); 2028 if (store != null && store instanceof JournaledStore) { 2029 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2030 } else { 2031 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2032 } 2033 long storeLimit = usage.getTempUsage().getLimit(); 2034 2035 if (storeLimit > 0 && storeLimit < maxJournalFileSize) { 2036 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2037 " mb, whilst the max journal file size for the temporary store is: " + 2038 maxJournalFileSize / (1024 * 1024) + " mb, " + 2039 "the temp store will not accept any data when used."); 2040 } 2041 } 2042 } 2043 } 2044 2045 protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException { 2046 if (dir != null) { 2047 dir = StoreUtil.findParentDirectory(dir); 2048 String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; 2049 long storeLimit = storeUsage.getLimit(); 2050 long storeCurrent = storeUsage.getUsage(); 2051 long totalSpace = dir.getTotalSpace(); 2052 long totalUsableSpace = dir.getUsableSpace() + storeCurrent; 2053 //compute byte value of the percent limit 2054 long bytePercentLimit = totalSpace * percentLimit / 100; 2055 int oneMeg = 1024 * 1024; 2056 2057 //Check if the store limit is less than the percent Limit that was set and also 2058 //the usable space...this means we can grow the store larger 2059 //Changes in partition size (total space) as well as changes in usable space should 2060 //be detected here 2061 if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 2062 && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ 2063 2064 // set the limit to be bytePercentLimit or usableSpace if 2065 // usableSpace is less than the percentLimit 2066 long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; 2067 2068 //To prevent changing too often, check threshold 2069 if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { 2070 LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " 2071 + percentLimit + "% of the partition size."); 2072 storeUsage.setLimit(newLimit); 2073 LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace 2074 + "% (" + newLimit / oneMeg + " mb) of the partition size."); 2075 } 2076 2077 //check if the limit is too large for the amount of usable space 2078 } else if (storeLimit > totalUsableSpace) { 2079 final String message = storeName + " limit is " + storeLimit / oneMeg 2080 + " mb (current store usage is " + storeCurrent / oneMeg 2081 + " mb). The data directory: " + dir.getAbsolutePath() 2082 + " only has " + totalUsableSpace / oneMeg 2083 + " mb of usable space."; 2084 2085 if (!isAdjustUsageLimits()) { 2086 LOG.error(message); 2087 throw new ConfigurationException(message); 2088 } 2089 2090 if (percentLimit > 0) { 2091 LOG.warn(storeName + " limit has been set to " 2092 + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" 2093 + " of the partition size but there is not enough usable space." 2094 + " The current store limit (which may have been adjusted by a" 2095 + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" 2096 + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" 2097 + " is available - resetting limit"); 2098 } else { 2099 LOG.warn(message + " - resetting to maximum available disk space: " + 2100 totalUsableSpace / oneMeg + " mb"); 2101 } 2102 storeUsage.setLimit(totalUsableSpace); 2103 } 2104 } 2105 } 2106 2107 /** 2108 * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to 2109 * update store and temporary store limits if the amount of available space 2110 * plus current store size is less than the existin configured limit 2111 */ 2112 protected void scheduleDiskUsageLimitsCheck() throws IOException { 2113 if (schedulePeriodForDiskUsageCheck > 0 && 2114 (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { 2115 Runnable diskLimitCheckTask = new Runnable() { 2116 @Override 2117 public void run() { 2118 try { 2119 checkStoreUsageLimits(); 2120 } catch (Exception e) { 2121 LOG.error("Failed to check persistent disk usage limits", e); 2122 } 2123 2124 try { 2125 checkTmpStoreUsageLimits(); 2126 } catch (Exception e) { 2127 LOG.error("Failed to check temporary store usage limits", e); 2128 } 2129 } 2130 }; 2131 scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); 2132 } 2133 } 2134 2135 protected void checkMemorySystemUsageLimits() throws Exception { 2136 final SystemUsage usage = getSystemUsage(); 2137 long memLimit = usage.getMemoryUsage().getLimit(); 2138 long jvmLimit = Runtime.getRuntime().maxMemory(); 2139 2140 if (memLimit > jvmLimit) { 2141 final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) 2142 + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); 2143 2144 if (adjustUsageLimits) { 2145 usage.getMemoryUsage().setPercentOfJvmHeap(70); 2146 LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 2147 } else { 2148 LOG.error(message); 2149 throw new ConfigurationException(message); 2150 } 2151 } 2152 } 2153 2154 protected void checkStoreSystemUsageLimits() throws Exception { 2155 final SystemUsage usage = getSystemUsage(); 2156 2157 //Check the persistent store and temp store limits if they exist 2158 //and schedule a periodic check to update disk limits if 2159 //schedulePeriodForDiskLimitCheck is set 2160 checkStoreUsageLimits(); 2161 checkTmpStoreUsageLimits(); 2162 scheduleDiskUsageLimitsCheck(); 2163 2164 if (getJobSchedulerStore() != null) { 2165 JobSchedulerStore scheduler = getJobSchedulerStore(); 2166 File schedulerDir = scheduler.getDirectory(); 2167 if (schedulerDir != null) { 2168 2169 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2170 if (!schedulerDir.isAbsolute()) { 2171 schedulerDir = new File(schedulerDirPath); 2172 } 2173 2174 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2175 schedulerDir = schedulerDir.getParentFile(); 2176 } 2177 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2178 long dirFreeSpace = schedulerDir.getUsableSpace(); 2179 if (schedulerLimit > dirFreeSpace) { 2180 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2181 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2182 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2183 dirFreeSpace / (1024 * 1024) + " mb."); 2184 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2185 } 2186 } 2187 } 2188 } 2189 2190 public void stopAllConnectors(ServiceStopper stopper) { 2191 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2192 NetworkConnector connector = iter.next(); 2193 unregisterNetworkConnectorMBean(connector); 2194 stopper.stop(connector); 2195 } 2196 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2197 ProxyConnector connector = iter.next(); 2198 stopper.stop(connector); 2199 } 2200 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2201 JmsConnector connector = iter.next(); 2202 stopper.stop(connector); 2203 } 2204 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2205 TransportConnector connector = iter.next(); 2206 try { 2207 unregisterConnectorMBean(connector); 2208 } catch (IOException e) { 2209 } 2210 stopper.stop(connector); 2211 } 2212 } 2213 2214 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2215 try { 2216 ObjectName objectName = createConnectorObjectName(connector); 2217 connector = connector.asManagedConnector(getManagementContext(), objectName); 2218 ConnectorViewMBean view = new ConnectorView(connector); 2219 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2220 return connector; 2221 } catch (Throwable e) { 2222 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2223 } 2224 } 2225 2226 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2227 if (isUseJmx()) { 2228 try { 2229 ObjectName objectName = createConnectorObjectName(connector); 2230 getManagementContext().unregisterMBean(objectName); 2231 } catch (Throwable e) { 2232 throw IOExceptionSupport.create( 2233 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2234 } 2235 } 2236 } 2237 2238 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2239 return adaptor; 2240 } 2241 2242 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2243 if (isUseJmx()) {} 2244 } 2245 2246 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2247 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2248 } 2249 2250 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2251 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2252 try { 2253 ObjectName objectName = createNetworkConnectorObjectName(connector); 2254 connector.setObjectName(objectName); 2255 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2256 } catch (Throwable e) { 2257 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2258 } 2259 } 2260 2261 public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2262 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2263 } 2264 2265 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2266 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2267 } 2268 2269 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2270 if (isUseJmx()) { 2271 try { 2272 ObjectName objectName = createNetworkConnectorObjectName(connector); 2273 getManagementContext().unregisterMBean(objectName); 2274 } catch (Exception e) { 2275 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2276 } 2277 } 2278 } 2279 2280 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2281 ProxyConnectorView view = new ProxyConnectorView(connector); 2282 try { 2283 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2284 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2285 } catch (Throwable e) { 2286 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2287 } 2288 } 2289 2290 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2291 JmsConnectorView view = new JmsConnectorView(connector); 2292 try { 2293 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2294 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2295 } catch (Throwable e) { 2296 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2297 } 2298 } 2299 2300 /** 2301 * Factory method to create a new broker 2302 * 2303 * @throws Exception 2304 */ 2305 protected Broker createBroker() throws Exception { 2306 regionBroker = createRegionBroker(); 2307 Broker broker = addInterceptors(regionBroker); 2308 // Add a filter that will stop access to the broker once stopped 2309 broker = new MutableBrokerFilter(broker) { 2310 Broker old; 2311 2312 @Override 2313 public void stop() throws Exception { 2314 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2315 // Just ignore additional stop actions. 2316 @Override 2317 public void stop() throws Exception { 2318 } 2319 }); 2320 old.stop(); 2321 } 2322 2323 @Override 2324 public void start() throws Exception { 2325 if (forceStart && old != null) { 2326 this.next.set(old); 2327 } 2328 getNext().start(); 2329 } 2330 }; 2331 return broker; 2332 } 2333 2334 /** 2335 * Factory method to create the core region broker onto which interceptors 2336 * are added 2337 * 2338 * @throws Exception 2339 */ 2340 protected Broker createRegionBroker() throws Exception { 2341 if (destinationInterceptors == null) { 2342 destinationInterceptors = createDefaultDestinationInterceptor(); 2343 } 2344 configureServices(destinationInterceptors); 2345 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2346 if (destinationFactory == null) { 2347 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2348 } 2349 return createRegionBroker(destinationInterceptor); 2350 } 2351 2352 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2353 RegionBroker regionBroker; 2354 if (isUseJmx()) { 2355 try { 2356 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2357 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2358 } catch(MalformedObjectNameException me){ 2359 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2360 throw new IOException(me); 2361 } 2362 } else { 2363 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2364 destinationInterceptor,getScheduler(),getExecutor()); 2365 } 2366 destinationFactory.setRegionBroker(regionBroker); 2367 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2368 regionBroker.setBrokerName(getBrokerName()); 2369 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2370 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2371 if (brokerId != null) { 2372 regionBroker.setBrokerId(brokerId); 2373 } 2374 return regionBroker; 2375 } 2376 2377 /** 2378 * Create the default destination interceptor 2379 */ 2380 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2381 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2382 if (isUseVirtualTopics()) { 2383 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2384 VirtualTopic virtualTopic = new VirtualTopic(); 2385 virtualTopic.setName("VirtualTopic.>"); 2386 VirtualDestination[] virtualDestinations = { virtualTopic }; 2387 interceptor.setVirtualDestinations(virtualDestinations); 2388 answer.add(interceptor); 2389 } 2390 if (isUseMirroredQueues()) { 2391 MirroredQueue interceptor = new MirroredQueue(); 2392 answer.add(interceptor); 2393 } 2394 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2395 answer.toArray(array); 2396 return array; 2397 } 2398 2399 /** 2400 * Strategy method to add interceptors to the broker 2401 * 2402 * @throws IOException 2403 */ 2404 protected Broker addInterceptors(Broker broker) throws Exception { 2405 if (isSchedulerSupport()) { 2406 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2407 if (isUseJmx()) { 2408 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2409 try { 2410 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2411 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2412 this.adminView.setJMSJobScheduler(objectName); 2413 } catch (Throwable e) { 2414 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2415 + e.getMessage(), e); 2416 } 2417 } 2418 broker = sb; 2419 } 2420 if (isUseJmx()) { 2421 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2422 try { 2423 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2424 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2425 } catch (Throwable e) { 2426 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2427 + e.getMessage(), e); 2428 } 2429 } 2430 if (isAdvisorySupport()) { 2431 broker = new AdvisoryBroker(broker); 2432 } 2433 broker = new CompositeDestinationBroker(broker); 2434 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2435 if (isPopulateJMSXUserID()) { 2436 UserIDBroker userIDBroker = new UserIDBroker(broker); 2437 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2438 broker = userIDBroker; 2439 } 2440 if (isMonitorConnectionSplits()) { 2441 broker = new ConnectionSplitBroker(broker); 2442 } 2443 if (plugins != null) { 2444 for (int i = 0; i < plugins.length; i++) { 2445 BrokerPlugin plugin = plugins[i]; 2446 broker = plugin.installPlugin(broker); 2447 } 2448 } 2449 return broker; 2450 } 2451 2452 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2453 if (isPersistent()) { 2454 PersistenceAdapterFactory fac = getPersistenceFactory(); 2455 if (fac != null) { 2456 return fac.createPersistenceAdapter(); 2457 } else { 2458 try { 2459 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2460 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2461 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2462 adaptor.setDirectory(dir); 2463 return adaptor; 2464 } catch (Throwable e) { 2465 throw IOExceptionSupport.create(e); 2466 } 2467 } 2468 } else { 2469 return new MemoryPersistenceAdapter(); 2470 } 2471 } 2472 2473 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2474 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2475 } 2476 2477 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2478 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2479 return new TransportConnector(transport); 2480 } 2481 2482 /** 2483 * Extracts the port from the options 2484 */ 2485 protected Object getPort(Map<?,?> options) { 2486 Object port = options.get("port"); 2487 if (port == null) { 2488 port = DEFAULT_PORT; 2489 LOG.warn("No port specified so defaulting to: {}", port); 2490 } 2491 return port; 2492 } 2493 2494 protected void addShutdownHook() { 2495 if (useShutdownHook) { 2496 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2497 @Override 2498 public void run() { 2499 containerShutdown(); 2500 } 2501 }; 2502 Runtime.getRuntime().addShutdownHook(shutdownHook); 2503 } 2504 } 2505 2506 protected void removeShutdownHook() { 2507 if (shutdownHook != null) { 2508 try { 2509 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2510 } catch (Exception e) { 2511 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2512 } 2513 } 2514 } 2515 2516 /** 2517 * Sets hooks to be executed when broker shut down 2518 * 2519 * @org.apache.xbean.Property 2520 */ 2521 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2522 for (Runnable hook : hooks) { 2523 addShutdownHook(hook); 2524 } 2525 } 2526 2527 /** 2528 * Causes a clean shutdown of the container when the VM is being shut down 2529 */ 2530 protected void containerShutdown() { 2531 try { 2532 stop(); 2533 } catch (IOException e) { 2534 Throwable linkedException = e.getCause(); 2535 if (linkedException != null) { 2536 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2537 } else { 2538 logError("Failed to shut down: " + e, e); 2539 } 2540 if (!useLoggingForShutdownErrors) { 2541 e.printStackTrace(System.err); 2542 } 2543 } catch (Exception e) { 2544 logError("Failed to shut down: " + e, e); 2545 } 2546 } 2547 2548 protected void logError(String message, Throwable e) { 2549 if (useLoggingForShutdownErrors) { 2550 LOG.error("Failed to shut down: " + e); 2551 } else { 2552 System.err.println("Failed to shut down: " + e); 2553 } 2554 } 2555 2556 /** 2557 * Starts any configured destinations on startup 2558 */ 2559 protected void startDestinations() throws Exception { 2560 if (destinations != null) { 2561 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2562 for (int i = 0; i < destinations.length; i++) { 2563 ActiveMQDestination destination = destinations[i]; 2564 getBroker().addDestination(adminConnectionContext, destination,true); 2565 } 2566 } 2567 if (isUseVirtualTopics()) { 2568 startVirtualConsumerDestinations(); 2569 } 2570 } 2571 2572 /** 2573 * Returns the broker's administration connection context used for 2574 * configuring the broker at startup 2575 */ 2576 public ConnectionContext getAdminConnectionContext() throws Exception { 2577 return BrokerSupport.getConnectionContext(getBroker()); 2578 } 2579 2580 protected void startManagementContext() throws Exception { 2581 getManagementContext().setBrokerName(brokerName); 2582 getManagementContext().start(); 2583 adminView = new BrokerView(this, null); 2584 ObjectName objectName = getBrokerObjectName(); 2585 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2586 } 2587 2588 /** 2589 * Start all transport and network connections, proxies and bridges 2590 * 2591 * @throws Exception 2592 */ 2593 public void startAllConnectors() throws Exception { 2594 final Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2595 List<TransportConnector> al = new ArrayList<>(); 2596 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2597 TransportConnector connector = iter.next(); 2598 al.add(startTransportConnector(connector)); 2599 } 2600 if (al.size() > 0) { 2601 // let's clear the transportConnectors list and replace it with 2602 // the started transportConnector instances 2603 this.transportConnectors.clear(); 2604 setTransportConnectors(al); 2605 } 2606 this.slave = false; 2607 URI uri = getVmConnectorURI(); 2608 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2609 map.put("async", "false"); 2610 map.put("create","false"); 2611 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2612 2613 if (!stopped.get()) { 2614 ThreadPoolExecutor networkConnectorStartExecutor = null; 2615 if (isNetworkConnectorStartAsync()) { 2616 // spin up as many threads as needed 2617 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2618 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2619 new ThreadFactory() { 2620 int count=0; 2621 @Override 2622 public Thread newThread(Runnable runnable) { 2623 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2624 thread.setDaemon(true); 2625 return thread; 2626 } 2627 }); 2628 } 2629 2630 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2631 final NetworkConnector connector = iter.next(); 2632 connector.setLocalUri(uri); 2633 startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor); 2634 } 2635 if (networkConnectorStartExecutor != null) { 2636 // executor done when enqueued tasks are complete 2637 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2638 } 2639 2640 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2641 ProxyConnector connector = iter.next(); 2642 connector.start(); 2643 } 2644 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2645 JmsConnector connector = iter.next(); 2646 connector.start(); 2647 } 2648 for (Service service : services) { 2649 configureService(service); 2650 service.start(); 2651 } 2652 } 2653 } 2654 2655 public void startNetworkConnector(final NetworkConnector connector, 2656 final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception { 2657 startNetworkConnector(connector, getBroker().getDurableDestinations(), networkConnectorStartExecutor); 2658 } 2659 2660 public void startNetworkConnector(final NetworkConnector connector, 2661 final Set<ActiveMQDestination> durableDestinations, 2662 final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception { 2663 connector.setBrokerName(getBrokerName()); 2664 //set the durable destinations to match the broker if not set on the connector 2665 if (connector.getDurableDestinations() == null) { 2666 connector.setDurableDestinations(durableDestinations); 2667 } 2668 String defaultSocketURI = getDefaultSocketURIString(); 2669 if (defaultSocketURI != null) { 2670 connector.setBrokerURL(defaultSocketURI); 2671 } 2672 //If using the runtime plugin to start a network connector then the mbean needs 2673 //to be added, under normal start it will already exist so check for InstanceNotFoundException 2674 if (isUseJmx()) { 2675 ObjectName networkMbean = createNetworkConnectorObjectName(connector); 2676 try { 2677 getManagementContext().getObjectInstance(networkMbean); 2678 } catch (InstanceNotFoundException e) { 2679 LOG.debug("Network connector MBean {} not found, registering", networkMbean); 2680 registerNetworkConnectorMBean(connector); 2681 } 2682 } 2683 if (networkConnectorStartExecutor != null) { 2684 networkConnectorStartExecutor.execute(new Runnable() { 2685 @Override 2686 public void run() { 2687 try { 2688 LOG.info("Async start of {}", connector); 2689 connector.start(); 2690 } catch(Exception e) { 2691 LOG.error("Async start of network connector: {} failed", connector, e); 2692 } 2693 } 2694 }); 2695 } else { 2696 connector.start(); 2697 } 2698 } 2699 2700 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2701 connector.setBrokerService(this); 2702 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2703 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2704 if (policy != null) { 2705 connector.setMessageAuthorizationPolicy(policy); 2706 } 2707 if (isUseJmx()) { 2708 connector = registerConnectorMBean(connector); 2709 } 2710 connector.getStatistics().setEnabled(enableStatistics); 2711 connector.start(); 2712 return connector; 2713 } 2714 2715 /** 2716 * Perform any custom dependency injection 2717 */ 2718 protected void configureServices(Object[] services) { 2719 for (Object service : services) { 2720 configureService(service); 2721 } 2722 } 2723 2724 /** 2725 * Perform any custom dependency injection 2726 */ 2727 protected void configureService(Object service) { 2728 if (service instanceof BrokerServiceAware) { 2729 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2730 serviceAware.setBrokerService(this); 2731 } 2732 } 2733 2734 public void handleIOException(IOException exception) { 2735 if (ioExceptionHandler != null) { 2736 ioExceptionHandler.handle(exception); 2737 } else { 2738 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2739 } 2740 } 2741 2742 protected void startVirtualConsumerDestinations() throws Exception { 2743 checkStartException(); 2744 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2745 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2746 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2747 if (!destinations.isEmpty()) { 2748 for (ActiveMQDestination destination : destinations) { 2749 if (filter.matches(destination) == true) { 2750 broker.addDestination(adminConnectionContext, destination, false); 2751 } 2752 } 2753 } 2754 } 2755 2756 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2757 // created at startup, so no sync needed 2758 if (virtualConsumerDestinationFilter == null) { 2759 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2760 if (destinationInterceptors != null) { 2761 for (DestinationInterceptor interceptor : destinationInterceptors) { 2762 if (interceptor instanceof VirtualDestinationInterceptor) { 2763 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2764 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2765 if (virtualDestination instanceof VirtualTopic) { 2766 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2767 } 2768 if (isUseVirtualDestSubs()) { 2769 try { 2770 broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); 2771 LOG.debug("Adding virtual destination: {}", virtualDestination); 2772 } catch (Exception e) { 2773 LOG.warn("Could not fire virtual destination consumer advisory", e); 2774 } 2775 } 2776 } 2777 } 2778 } 2779 } 2780 ActiveMQQueue filter = new ActiveMQQueue(); 2781 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2782 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2783 } 2784 return virtualConsumerDestinationFilter; 2785 } 2786 2787 protected synchronized ThreadPoolExecutor getExecutor() { 2788 if (this.executor == null) { 2789 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2790 2791 private long i = 0; 2792 2793 @Override 2794 public Thread newThread(Runnable runnable) { 2795 this.i++; 2796 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2797 thread.setDaemon(true); 2798 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2799 @Override 2800 public void uncaughtException(final Thread t, final Throwable e) { 2801 LOG.error("Error in thread '{}'", t.getName(), e); 2802 } 2803 }); 2804 return thread; 2805 } 2806 }, new RejectedExecutionHandler() { 2807 @Override 2808 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2809 try { 2810 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2811 } catch (InterruptedException e) { 2812 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2813 } 2814 2815 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2816 } 2817 }); 2818 } 2819 return this.executor; 2820 } 2821 2822 public synchronized Scheduler getScheduler() { 2823 if (this.scheduler==null) { 2824 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2825 try { 2826 this.scheduler.start(); 2827 } catch (Exception e) { 2828 LOG.error("Failed to start Scheduler", e); 2829 } 2830 } 2831 return this.scheduler; 2832 } 2833 2834 public Broker getRegionBroker() { 2835 return regionBroker; 2836 } 2837 2838 public void setRegionBroker(Broker regionBroker) { 2839 this.regionBroker = regionBroker; 2840 } 2841 2842 public void addShutdownHook(Runnable hook) { 2843 synchronized (shutdownHooks) { 2844 shutdownHooks.add(hook); 2845 } 2846 } 2847 2848 public void removeShutdownHook(Runnable hook) { 2849 synchronized (shutdownHooks) { 2850 shutdownHooks.remove(hook); 2851 } 2852 } 2853 2854 public boolean isSystemExitOnShutdown() { 2855 return systemExitOnShutdown; 2856 } 2857 2858 /** 2859 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2860 */ 2861 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2862 this.systemExitOnShutdown = systemExitOnShutdown; 2863 } 2864 2865 public int getSystemExitOnShutdownExitCode() { 2866 return systemExitOnShutdownExitCode; 2867 } 2868 2869 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2870 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2871 } 2872 2873 public SslContext getSslContext() { 2874 return sslContext; 2875 } 2876 2877 public void setSslContext(SslContext sslContext) { 2878 this.sslContext = sslContext; 2879 } 2880 2881 public boolean isShutdownOnSlaveFailure() { 2882 return shutdownOnSlaveFailure; 2883 } 2884 2885 /** 2886 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2887 */ 2888 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2889 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2890 } 2891 2892 public boolean isWaitForSlave() { 2893 return waitForSlave; 2894 } 2895 2896 /** 2897 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2898 */ 2899 public void setWaitForSlave(boolean waitForSlave) { 2900 this.waitForSlave = waitForSlave; 2901 } 2902 2903 public long getWaitForSlaveTimeout() { 2904 return this.waitForSlaveTimeout; 2905 } 2906 2907 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2908 this.waitForSlaveTimeout = waitForSlaveTimeout; 2909 } 2910 2911 /** 2912 * Get the passiveSlave 2913 * @return the passiveSlave 2914 */ 2915 public boolean isPassiveSlave() { 2916 return this.passiveSlave; 2917 } 2918 2919 /** 2920 * Set the passiveSlave 2921 * @param passiveSlave the passiveSlave to set 2922 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2923 */ 2924 public void setPassiveSlave(boolean passiveSlave) { 2925 this.passiveSlave = passiveSlave; 2926 } 2927 2928 /** 2929 * override the Default IOException handler, called when persistence adapter 2930 * has experiences File or JDBC I/O Exceptions 2931 * 2932 * @param ioExceptionHandler 2933 */ 2934 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2935 configureService(ioExceptionHandler); 2936 this.ioExceptionHandler = ioExceptionHandler; 2937 } 2938 2939 public IOExceptionHandler getIoExceptionHandler() { 2940 return ioExceptionHandler; 2941 } 2942 2943 /** 2944 * @return the schedulerSupport 2945 */ 2946 public boolean isSchedulerSupport() { 2947 return this.schedulerSupport; 2948 } 2949 2950 /** 2951 * @param schedulerSupport the schedulerSupport to set 2952 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2953 */ 2954 public void setSchedulerSupport(boolean schedulerSupport) { 2955 this.schedulerSupport = schedulerSupport; 2956 } 2957 2958 /** 2959 * @return the schedulerDirectory 2960 */ 2961 public File getSchedulerDirectoryFile() { 2962 if (this.schedulerDirectoryFile == null) { 2963 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2964 } 2965 return schedulerDirectoryFile; 2966 } 2967 2968 /** 2969 * @param schedulerDirectory the schedulerDirectory to set 2970 */ 2971 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2972 this.schedulerDirectoryFile = schedulerDirectory; 2973 } 2974 2975 public void setSchedulerDirectory(String schedulerDirectory) { 2976 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2977 } 2978 2979 public int getSchedulePeriodForDestinationPurge() { 2980 return this.schedulePeriodForDestinationPurge; 2981 } 2982 2983 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2984 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2985 } 2986 2987 /** 2988 * @param schedulePeriodForDiskUsageCheck 2989 */ 2990 public void setSchedulePeriodForDiskUsageCheck( 2991 int schedulePeriodForDiskUsageCheck) { 2992 this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; 2993 } 2994 2995 public int getDiskUsageCheckRegrowThreshold() { 2996 return diskUsageCheckRegrowThreshold; 2997 } 2998 2999 /** 3000 * @param diskUsageCheckRegrowThreshold 3001 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 3002 */ 3003 public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { 3004 this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; 3005 } 3006 3007 public int getMaxPurgedDestinationsPerSweep() { 3008 return this.maxPurgedDestinationsPerSweep; 3009 } 3010 3011 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 3012 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 3013 } 3014 3015 public BrokerContext getBrokerContext() { 3016 return brokerContext; 3017 } 3018 3019 public void setBrokerContext(BrokerContext brokerContext) { 3020 this.brokerContext = brokerContext; 3021 } 3022 3023 public void setBrokerId(String brokerId) { 3024 this.brokerId = new BrokerId(brokerId); 3025 } 3026 3027 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 3028 return useAuthenticatedPrincipalForJMSXUserID; 3029 } 3030 3031 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 3032 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 3033 } 3034 3035 /** 3036 * Should MBeans that support showing the Authenticated User Name information have this 3037 * value filled in or not. 3038 * 3039 * @return true if user names should be exposed in MBeans 3040 */ 3041 public boolean isPopulateUserNameInMBeans() { 3042 return this.populateUserNameInMBeans; 3043 } 3044 3045 /** 3046 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 3047 * @param value if MBeans should expose user name information. 3048 */ 3049 public void setPopulateUserNameInMBeans(boolean value) { 3050 this.populateUserNameInMBeans = value; 3051 } 3052 3053 /** 3054 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3055 * failing. The default value is to wait forever (zero). 3056 * 3057 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3058 */ 3059 public long getMbeanInvocationTimeout() { 3060 return mbeanInvocationTimeout; 3061 } 3062 3063 /** 3064 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3065 * failing. The default value is to wait forever (zero). 3066 * 3067 * @param mbeanInvocationTimeout 3068 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3069 */ 3070 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 3071 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 3072 } 3073 3074 public boolean isNetworkConnectorStartAsync() { 3075 return networkConnectorStartAsync; 3076 } 3077 3078 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 3079 this.networkConnectorStartAsync = networkConnectorStartAsync; 3080 } 3081 3082 public boolean isAllowTempAutoCreationOnSend() { 3083 return allowTempAutoCreationOnSend; 3084 } 3085 3086 /** 3087 * enable if temp destinations need to be propagated through a network when 3088 * advisorySupport==false. This is used in conjunction with the policy 3089 * gcInactiveDestinations for matching temps so they can get removed 3090 * when inactive 3091 * 3092 * @param allowTempAutoCreationOnSend 3093 */ 3094 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 3095 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 3096 } 3097 3098 public long getOfflineDurableSubscriberTimeout() { 3099 return offlineDurableSubscriberTimeout; 3100 } 3101 3102 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 3103 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 3104 } 3105 3106 public long getOfflineDurableSubscriberTaskSchedule() { 3107 return offlineDurableSubscriberTaskSchedule; 3108 } 3109 3110 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 3111 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 3112 } 3113 3114 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 3115 return isUseVirtualTopics() && destination.isQueue() && 3116 getVirtualTopicConsumerDestinationFilter().matches(destination); 3117 } 3118 3119 synchronized public Throwable getStartException() { 3120 return startException; 3121 } 3122 3123 public boolean isStartAsync() { 3124 return startAsync; 3125 } 3126 3127 public void setStartAsync(boolean startAsync) { 3128 this.startAsync = startAsync; 3129 } 3130 3131 public boolean isSlave() { 3132 return this.slave; 3133 } 3134 3135 public boolean isStopping() { 3136 return this.stopping.get(); 3137 } 3138 3139 /** 3140 * @return true if the broker allowed to restart on shutdown. 3141 */ 3142 public boolean isRestartAllowed() { 3143 return restartAllowed; 3144 } 3145 3146 /** 3147 * Sets if the broker allowed to restart on shutdown. 3148 */ 3149 public void setRestartAllowed(boolean restartAllowed) { 3150 this.restartAllowed = restartAllowed; 3151 } 3152 3153 /** 3154 * A lifecycle manager of the BrokerService should 3155 * inspect this property after a broker shutdown has occurred 3156 * to find out if the broker needs to be re-created and started 3157 * again. 3158 * 3159 * @return true if the broker wants to be restarted after it shuts down. 3160 */ 3161 public boolean isRestartRequested() { 3162 return restartRequested; 3163 } 3164 3165 public void requestRestart() { 3166 this.restartRequested = true; 3167 } 3168 3169 public int getStoreOpenWireVersion() { 3170 return storeOpenWireVersion; 3171 } 3172 3173 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3174 this.storeOpenWireVersion = storeOpenWireVersion; 3175 } 3176 3177 /** 3178 * @return the current number of connections on this Broker. 3179 */ 3180 public int getCurrentConnections() { 3181 return this.currentConnections.get(); 3182 } 3183 3184 /** 3185 * @return the total number of connections this broker has handled since startup. 3186 */ 3187 public long getTotalConnections() { 3188 return this.totalConnections.get(); 3189 } 3190 3191 public void incrementCurrentConnections() { 3192 this.currentConnections.incrementAndGet(); 3193 } 3194 3195 public void decrementCurrentConnections() { 3196 this.currentConnections.decrementAndGet(); 3197 } 3198 3199 public void incrementTotalConnections() { 3200 this.totalConnections.incrementAndGet(); 3201 } 3202 3203 public boolean isRejectDurableConsumers() { 3204 return rejectDurableConsumers; 3205 } 3206 3207 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3208 this.rejectDurableConsumers = rejectDurableConsumers; 3209 } 3210 3211 public boolean isUseVirtualDestSubs() { 3212 return useVirtualDestSubs; 3213 } 3214 3215 public void setUseVirtualDestSubs( 3216 boolean useVirtualDestSubs) { 3217 this.useVirtualDestSubs = useVirtualDestSubs; 3218 } 3219 3220 public boolean isUseVirtualDestSubsOnCreation() { 3221 return useVirtualDestSubsOnCreation; 3222 } 3223 3224 public void setUseVirtualDestSubsOnCreation( 3225 boolean useVirtualDestSubsOnCreation) { 3226 this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; 3227 } 3228 3229 public boolean isAdjustUsageLimits() { 3230 return adjustUsageLimits; 3231 } 3232 3233 public void setAdjustUsageLimits(boolean adjustUsageLimits) { 3234 this.adjustUsageLimits = adjustUsageLimits; 3235 } 3236 3237 public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { 3238 this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; 3239 } 3240 3241 public boolean isRollbackOnlyOnAsyncException() { 3242 return rollbackOnlyOnAsyncException; 3243 } 3244}