001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.LinkedList; 023import java.util.StringTokenizer; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.regex.Pattern; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.jmx.ManagedTransportConnector; 030import org.apache.activemq.broker.jmx.ManagementContext; 031import org.apache.activemq.broker.region.ConnectorStatistics; 032import org.apache.activemq.command.BrokerInfo; 033import org.apache.activemq.command.ConnectionControl; 034import org.apache.activemq.security.MessageAuthorizationPolicy; 035import org.apache.activemq.thread.TaskRunnerFactory; 036import org.apache.activemq.transport.Transport; 037import org.apache.activemq.transport.TransportAcceptListener; 038import org.apache.activemq.transport.TransportFactorySupport; 039import org.apache.activemq.transport.TransportServer; 040import org.apache.activemq.transport.discovery.DiscoveryAgent; 041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 042import org.apache.activemq.util.ServiceStopper; 043import org.apache.activemq.util.ServiceSupport; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * @org.apache.xbean.XBean 049 */ 050public class TransportConnector implements Connector, BrokerServiceAware { 051 052 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); 053 054 protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 055 protected TransportStatusDetector statusDector; 056 private BrokerService brokerService; 057 private TransportServer server; 058 private URI uri; 059 private BrokerInfo brokerInfo = new BrokerInfo(); 060 private TaskRunnerFactory taskRunnerFactory; 061 private MessageAuthorizationPolicy messageAuthorizationPolicy; 062 private DiscoveryAgent discoveryAgent; 063 private final ConnectorStatistics statistics = new ConnectorStatistics(); 064 private URI discoveryUri; 065 private String name; 066 private boolean disableAsyncDispatch; 067 private boolean enableStatusMonitor = false; 068 private Broker broker; 069 private boolean updateClusterClients = false; 070 private boolean rebalanceClusterClients; 071 private boolean updateClusterClientsOnRemove = false; 072 private String updateClusterFilter; 073 private boolean auditNetworkProducers = false; 074 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; 075 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; 076 private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy(); 077 private boolean allowLinkStealing; 078 079 LinkedList<String> peerBrokers = new LinkedList<String>(); 080 081 public TransportConnector() { 082 } 083 084 public TransportConnector(TransportServer server) { 085 this(); 086 setServer(server); 087 if (server != null && server.getConnectURI() != null) { 088 URI uri = server.getConnectURI(); 089 if (uri != null && uri.getScheme().equals("vm")) { 090 setEnableStatusMonitor(false); 091 } 092 } 093 } 094 095 /** 096 * @return Returns the connections. 097 */ 098 public CopyOnWriteArrayList<TransportConnection> getConnections() { 099 return connections; 100 } 101 102 /** 103 * Factory method to create a JMX managed version of this transport 104 * connector 105 */ 106 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { 107 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 108 rc.setBrokerInfo(getBrokerInfo()); 109 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 110 rc.setDiscoveryAgent(getDiscoveryAgent()); 111 rc.setDiscoveryUri(getDiscoveryUri()); 112 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 113 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 114 rc.setName(getName()); 115 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 116 rc.setUri(getUri()); 117 rc.setBrokerService(brokerService); 118 rc.setUpdateClusterClients(isUpdateClusterClients()); 119 rc.setRebalanceClusterClients(isRebalanceClusterClients()); 120 rc.setUpdateClusterFilter(getUpdateClusterFilter()); 121 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove()); 122 rc.setAuditNetworkProducers(isAuditNetworkProducers()); 123 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection()); 124 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection()); 125 rc.setPublishedAddressPolicy(getPublishedAddressPolicy()); 126 rc.setAllowLinkStealing(isAllowLinkStealing()); 127 return rc; 128 } 129 130 @Override 131 public BrokerInfo getBrokerInfo() { 132 return brokerInfo; 133 } 134 135 public void setBrokerInfo(BrokerInfo brokerInfo) { 136 this.brokerInfo = brokerInfo; 137 } 138 139 public TransportServer getServer() throws IOException, URISyntaxException { 140 if (server == null) { 141 setServer(createTransportServer()); 142 } 143 return server; 144 } 145 146 public void setServer(TransportServer server) { 147 this.server = server; 148 } 149 150 public URI getUri() { 151 if (uri == null) { 152 try { 153 uri = getConnectUri(); 154 } catch (Throwable e) { 155 } 156 } 157 return uri; 158 } 159 160 /** 161 * Sets the server transport URI to use if there is not a 162 * {@link TransportServer} configured via the 163 * {@link #setServer(TransportServer)} method. This value is used to lazy 164 * create a {@link TransportServer} instance 165 * 166 * @param uri 167 */ 168 public void setUri(URI uri) { 169 this.uri = uri; 170 } 171 172 public TaskRunnerFactory getTaskRunnerFactory() { 173 return taskRunnerFactory; 174 } 175 176 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 177 this.taskRunnerFactory = taskRunnerFactory; 178 } 179 180 /** 181 * @return the statistics for this connector 182 */ 183 @Override 184 public ConnectorStatistics getStatistics() { 185 return statistics; 186 } 187 188 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 189 return messageAuthorizationPolicy; 190 } 191 192 /** 193 * Sets the policy used to decide if the current connection is authorized to 194 * consume a given message 195 */ 196 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 197 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 198 } 199 200 @Override 201 public void start() throws Exception { 202 broker = brokerService.getBroker(); 203 brokerInfo.setBrokerName(broker.getBrokerName()); 204 brokerInfo.setBrokerId(broker.getBrokerId()); 205 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 206 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 207 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); 208 getServer().setAcceptListener(new TransportAcceptListener() { 209 @Override 210 public void onAccept(final Transport transport) { 211 try { 212 brokerService.getTaskRunnerFactory().execute(new Runnable() { 213 @Override 214 public void run() { 215 try { 216 if (!brokerService.isStopping()) { 217 Connection connection = createConnection(transport); 218 connection.start(); 219 } else { 220 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 221 } 222 } catch (Exception e) { 223 String remoteHost = transport.getRemoteAddress(); 224 ServiceSupport.dispose(transport); 225 onAcceptError(e, remoteHost); 226 } 227 } 228 }); 229 } catch (Exception e) { 230 String remoteHost = transport.getRemoteAddress(); 231 ServiceSupport.dispose(transport); 232 onAcceptError(e, remoteHost); 233 } 234 } 235 236 @Override 237 public void onAcceptError(Exception error) { 238 onAcceptError(error, null); 239 } 240 241 private void onAcceptError(Exception error, String remoteHost) { 242 if (brokerService != null && brokerService.isStopping()) { 243 LOG.info("Could not accept connection during shutdown {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error); 244 } else { 245 LOG.error("Could not accept connection {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error); 246 LOG.debug("Reason: " + error, error); 247 } 248 } 249 }); 250 getServer().setBrokerInfo(brokerInfo); 251 getServer().start(); 252 253 DiscoveryAgent da = getDiscoveryAgent(); 254 if (da != null) { 255 da.registerService(getPublishableConnectString()); 256 da.start(); 257 } 258 if (enableStatusMonitor) { 259 this.statusDector = new TransportStatusDetector(this); 260 this.statusDector.start(); 261 } 262 263 LOG.info("Connector {} started", getName()); 264 } 265 266 public String getPublishableConnectString() throws Exception { 267 String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this); 268 LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri()); 269 return publishableConnectString; 270 } 271 272 public URI getPublishableConnectURI() throws Exception { 273 return publishedAddressPolicy.getPublishableConnectURI(this); 274 } 275 276 @Override 277 public void stop() throws Exception { 278 ServiceStopper ss = new ServiceStopper(); 279 if (discoveryAgent != null) { 280 ss.stop(discoveryAgent); 281 } 282 if (server != null) { 283 ss.stop(server); 284 } 285 if (this.statusDector != null) { 286 this.statusDector.stop(); 287 } 288 289 for (TransportConnection connection : connections) { 290 ss.stop(connection); 291 } 292 server = null; 293 ss.throwFirstException(); 294 LOG.info("Connector {} stopped", getName()); 295 } 296 297 // Implementation methods 298 // ------------------------------------------------------------------------- 299 protected Connection createConnection(Transport transport) throws IOException { 300 // prefer to use task runner from broker service as stop task runner, as we can then 301 // tie it to the lifecycle of the broker service 302 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null 303 : taskRunnerFactory, brokerService.getTaskRunnerFactory()); 304 boolean statEnabled = this.getStatistics().isEnabled(); 305 answer.getStatistics().setEnabled(statEnabled); 306 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 307 return answer; 308 } 309 310 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 311 if (uri == null) { 312 throw new IllegalArgumentException("You must specify either a server or uri property"); 313 } 314 if (brokerService == null) { 315 throw new IllegalArgumentException( 316 "You must specify the brokerService property. Maybe this connector should be added to a broker?"); 317 } 318 return TransportFactorySupport.bind(brokerService, uri); 319 } 320 321 public DiscoveryAgent getDiscoveryAgent() throws IOException { 322 if (discoveryAgent == null) { 323 discoveryAgent = createDiscoveryAgent(); 324 } 325 return discoveryAgent; 326 } 327 328 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 329 if (discoveryUri != null) { 330 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 331 332 if (agent != null && agent instanceof BrokerServiceAware) { 333 ((BrokerServiceAware) agent).setBrokerService(brokerService); 334 } 335 336 return agent; 337 } 338 return null; 339 } 340 341 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 342 this.discoveryAgent = discoveryAgent; 343 } 344 345 public URI getDiscoveryUri() { 346 return discoveryUri; 347 } 348 349 public void setDiscoveryUri(URI discoveryUri) { 350 this.discoveryUri = discoveryUri; 351 } 352 353 public URI getConnectUri() throws IOException, URISyntaxException { 354 if (server != null) { 355 return server.getConnectURI(); 356 } else { 357 return uri; 358 } 359 } 360 361 public void onStarted(TransportConnection connection) { 362 connections.add(connection); 363 } 364 365 public void onStopped(TransportConnection connection) { 366 connections.remove(connection); 367 } 368 369 public String getName() { 370 if (name == null) { 371 uri = getUri(); 372 if (uri != null) { 373 name = uri.toString(); 374 } 375 } 376 return name; 377 } 378 379 public void setName(String name) { 380 this.name = name; 381 } 382 383 @Override 384 public String toString() { 385 String rc = getName(); 386 if (rc == null) { 387 rc = super.toString(); 388 } 389 return rc; 390 } 391 392 protected ConnectionControl getConnectionControl() { 393 boolean rebalance = isRebalanceClusterClients(); 394 String connectedBrokers = ""; 395 String separator = ""; 396 397 if (isUpdateClusterClients()) { 398 synchronized (peerBrokers) { 399 for (String uri : getPeerBrokers()) { 400 connectedBrokers += separator + uri; 401 separator = ","; 402 } 403 404 if (rebalance) { 405 String shuffle = peerBrokers.removeFirst(); 406 peerBrokers.addLast(shuffle); 407 } 408 } 409 } 410 ConnectionControl control = new ConnectionControl(); 411 control.setConnectedBrokers(connectedBrokers); 412 control.setRebalanceConnection(rebalance); 413 return control; 414 } 415 416 public void addPeerBroker(BrokerInfo info) { 417 if (isMatchesClusterFilter(info.getBrokerName())) { 418 synchronized (peerBrokers) { 419 getPeerBrokers().addLast(info.getBrokerURL()); 420 } 421 } 422 } 423 424 public void removePeerBroker(BrokerInfo info) { 425 synchronized (peerBrokers) { 426 getPeerBrokers().remove(info.getBrokerURL()); 427 } 428 } 429 430 public LinkedList<String> getPeerBrokers() { 431 synchronized (peerBrokers) { 432 if (peerBrokers.isEmpty()) { 433 peerBrokers.add(brokerService.getDefaultSocketURIString()); 434 } 435 return peerBrokers; 436 } 437 } 438 439 @Override 440 public void updateClientClusterInfo() { 441 if (isRebalanceClusterClients() || isUpdateClusterClients()) { 442 ConnectionControl control = getConnectionControl(); 443 for (Connection c : this.connections) { 444 c.updateClient(control); 445 if (isRebalanceClusterClients()) { 446 control = getConnectionControl(); 447 } 448 } 449 } 450 } 451 452 private boolean isMatchesClusterFilter(String brokerName) { 453 boolean result = true; 454 String filter = getUpdateClusterFilter(); 455 if (filter != null) { 456 filter = filter.trim(); 457 if (filter.length() > 0) { 458 StringTokenizer tokenizer = new StringTokenizer(filter, ","); 459 while (result && tokenizer.hasMoreTokens()) { 460 String token = tokenizer.nextToken(); 461 result = isMatchesClusterFilter(brokerName, token); 462 } 463 } 464 } 465 466 return result; 467 } 468 469 private boolean isMatchesClusterFilter(String brokerName, String match) { 470 boolean result = true; 471 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) { 472 result = Pattern.matches(match, brokerName); 473 } 474 return result; 475 } 476 477 public boolean isDisableAsyncDispatch() { 478 return disableAsyncDispatch; 479 } 480 481 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 482 this.disableAsyncDispatch = disableAsyncDispatch; 483 } 484 485 /** 486 * @return the enableStatusMonitor 487 */ 488 public boolean isEnableStatusMonitor() { 489 return enableStatusMonitor; 490 } 491 492 /** 493 * @param enableStatusMonitor 494 * the enableStatusMonitor to set 495 */ 496 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 497 this.enableStatusMonitor = enableStatusMonitor; 498 } 499 500 /** 501 * This is called by the BrokerService right before it starts the transport. 502 */ 503 @Override 504 public void setBrokerService(BrokerService brokerService) { 505 this.brokerService = brokerService; 506 } 507 508 public Broker getBroker() { 509 return broker; 510 } 511 512 public BrokerService getBrokerService() { 513 return brokerService; 514 } 515 516 /** 517 * @return the updateClusterClients 518 */ 519 @Override 520 public boolean isUpdateClusterClients() { 521 return this.updateClusterClients; 522 } 523 524 /** 525 * @param updateClusterClients 526 * the updateClusterClients to set 527 */ 528 public void setUpdateClusterClients(boolean updateClusterClients) { 529 this.updateClusterClients = updateClusterClients; 530 } 531 532 /** 533 * @return the rebalanceClusterClients 534 */ 535 @Override 536 public boolean isRebalanceClusterClients() { 537 return this.rebalanceClusterClients; 538 } 539 540 /** 541 * @param rebalanceClusterClients 542 * the rebalanceClusterClients to set 543 */ 544 public void setRebalanceClusterClients(boolean rebalanceClusterClients) { 545 this.rebalanceClusterClients = rebalanceClusterClients; 546 } 547 548 /** 549 * @return the updateClusterClientsOnRemove 550 */ 551 @Override 552 public boolean isUpdateClusterClientsOnRemove() { 553 return this.updateClusterClientsOnRemove; 554 } 555 556 /** 557 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set 558 */ 559 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { 560 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; 561 } 562 563 /** 564 * @return the updateClusterFilter 565 */ 566 @Override 567 public String getUpdateClusterFilter() { 568 return this.updateClusterFilter; 569 } 570 571 /** 572 * @param updateClusterFilter 573 * the updateClusterFilter to set 574 */ 575 public void setUpdateClusterFilter(String updateClusterFilter) { 576 this.updateClusterFilter = updateClusterFilter; 577 } 578 579 @Override 580 public int connectionCount() { 581 return connections.size(); 582 } 583 584 @Override 585 public boolean isAllowLinkStealing() { 586 return server.isAllowLinkStealing(); 587 } 588 589 public void setAllowLinkStealing (boolean allowLinkStealing) { 590 this.allowLinkStealing=allowLinkStealing; 591 } 592 593 public boolean isAuditNetworkProducers() { 594 return auditNetworkProducers; 595 } 596 597 /** 598 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend. 599 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics 600 * @param auditNetworkProducers 601 */ 602 public void setAuditNetworkProducers(boolean auditNetworkProducers) { 603 this.auditNetworkProducers = auditNetworkProducers; 604 } 605 606 public int getMaximumProducersAllowedPerConnection() { 607 return maximumProducersAllowedPerConnection; 608 } 609 610 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) { 611 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection; 612 } 613 614 public int getMaximumConsumersAllowedPerConnection() { 615 return maximumConsumersAllowedPerConnection; 616 } 617 618 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { 619 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; 620 } 621 622 /** 623 * Gets the currently configured policy for creating the published connection address of this 624 * TransportConnector. 625 * 626 * @return the publishedAddressPolicy 627 */ 628 public PublishedAddressPolicy getPublishedAddressPolicy() { 629 return publishedAddressPolicy; 630 } 631 632 /** 633 * Sets the configured policy for creating the published connection address of this 634 * TransportConnector. 635 * 636 * @return the publishedAddressPolicy 637 */ 638 public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) { 639 this.publishedAddressPolicy = publishedAddressPolicy; 640 } 641}