001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.ClosedChannelException; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.nio.channels.ServerSocketChannel; 033import java.nio.channels.SocketChannel; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.net.ServerSocketFactory; 043import javax.net.ssl.SSLServerSocket; 044 045import org.apache.activemq.Service; 046import org.apache.activemq.ThreadPriorities; 047import org.apache.activemq.TransportLoggerSupport; 048import org.apache.activemq.command.BrokerInfo; 049import org.apache.activemq.openwire.OpenWireFormatFactory; 050import org.apache.activemq.transport.Transport; 051import org.apache.activemq.transport.TransportFactory; 052import org.apache.activemq.transport.TransportServer; 053import org.apache.activemq.transport.TransportServerThreadSupport; 054import org.apache.activemq.util.IOExceptionSupport; 055import org.apache.activemq.util.InetAddressUtil; 056import org.apache.activemq.util.IntrospectionSupport; 057import org.apache.activemq.util.ServiceListener; 058import org.apache.activemq.util.ServiceStopper; 059import org.apache.activemq.util.ServiceSupport; 060import org.apache.activemq.wireformat.WireFormat; 061import org.apache.activemq.wireformat.WireFormatFactory; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A TCP based implementation of {@link TransportServer} 067 */ 068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 071 072 protected volatile ServerSocket serverSocket; 073 protected volatile Selector selector; 074 protected int backlog = 5000; 075 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 076 protected final TcpTransportFactory transportFactory; 077 protected long maxInactivityDuration = 30000; 078 protected long maxInactivityDurationInitalDelay = 10000; 079 protected int minmumWireFormatVersion; 080 protected boolean useQueueForAccept = true; 081 protected boolean allowLinkStealing; 082 083 /** 084 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 085 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 086 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 087 * TransportConnector URIs. 088 */ 089 protected boolean trace = false; 090 091 protected int soTimeout = 0; 092 protected int socketBufferSize = 64 * 1024; 093 protected int connectionTimeout = 30000; 094 095 /** 096 * Name of the LogWriter implementation to use. Names are mapped to classes in the 097 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 098 * set in Connection or TransportConnector URIs. 099 */ 100 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 101 102 /** 103 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 104 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 105 */ 106 protected boolean dynamicManagement = false; 107 108 /** 109 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 110 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 111 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 112 * TransportConnector URIs. 113 */ 114 protected boolean startLogging = true; 115 protected final ServerSocketFactory serverSocketFactory; 116 protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 117 protected Thread socketHandlerThread; 118 119 /** 120 * The maximum number of sockets allowed for this server 121 */ 122 protected int maximumConnections = Integer.MAX_VALUE; 123 protected final AtomicInteger currentTransportCount = new AtomicInteger(); 124 125 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 126 URISyntaxException { 127 super(location); 128 this.transportFactory = transportFactory; 129 this.serverSocketFactory = serverSocketFactory; 130 } 131 132 public void bind() throws IOException { 133 URI bind = getBindLocation(); 134 135 String host = bind.getHost(); 136 host = (host == null || host.length() == 0) ? "localhost" : host; 137 InetAddress addr = InetAddress.getByName(host); 138 139 try { 140 serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 141 configureServerSocket(serverSocket); 142 } catch (IOException e) { 143 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 144 } 145 try { 146 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 147 bind.getQuery(), bind.getFragment())); 148 } catch (URISyntaxException e) { 149 // it could be that the host name contains invalid characters such 150 // as _ on unix platforms so lets try use the IP address instead 151 try { 152 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 153 bind.getQuery(), bind.getFragment())); 154 } catch (URISyntaxException e2) { 155 throw IOExceptionSupport.create(e2); 156 } 157 } 158 } 159 160 private void configureServerSocket(ServerSocket socket) throws SocketException { 161 socket.setSoTimeout(2000); 162 if (transportOptions != null) { 163 164 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 165 // to SSLServerSocket to configure it has a side effect on the socket rendering it 166 // useless as all suites are enabled many of which are considered as insecure. We 167 // instead trap that option here and throw an exception. We should really consider 168 // all invalid options as breaking and not start the transport but the current design 169 // doesn't really allow for this. 170 // 171 // see: https://issues.apache.org/jira/browse/AMQ-4582 172 // 173 if (socket instanceof SSLServerSocket) { 174 if (transportOptions.containsKey("enabledCipherSuites")) { 175 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 176 177 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 178 throw new SocketException(String.format( 179 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 180 } 181 } 182 } 183 184 //AMQ-6599 - don't strip out set properties on the socket as we need to set them 185 //on the Transport as well later 186 IntrospectionSupport.setProperties(socket, transportOptions, false); 187 } 188 } 189 190 /** 191 * @return Returns the wireFormatFactory. 192 */ 193 public WireFormatFactory getWireFormatFactory() { 194 return wireFormatFactory; 195 } 196 197 /** 198 * @param wireFormatFactory 199 * The wireFormatFactory to set. 200 */ 201 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 202 this.wireFormatFactory = wireFormatFactory; 203 } 204 205 /** 206 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 207 * broker. 208 * 209 * @param brokerInfo 210 */ 211 @Override 212 public void setBrokerInfo(BrokerInfo brokerInfo) { 213 } 214 215 public long getMaxInactivityDuration() { 216 return maxInactivityDuration; 217 } 218 219 public void setMaxInactivityDuration(long maxInactivityDuration) { 220 this.maxInactivityDuration = maxInactivityDuration; 221 } 222 223 public long getMaxInactivityDurationInitalDelay() { 224 return this.maxInactivityDurationInitalDelay; 225 } 226 227 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 228 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 229 } 230 231 public int getMinmumWireFormatVersion() { 232 return minmumWireFormatVersion; 233 } 234 235 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 236 this.minmumWireFormatVersion = minmumWireFormatVersion; 237 } 238 239 public boolean isTrace() { 240 return trace; 241 } 242 243 public void setTrace(boolean trace) { 244 this.trace = trace; 245 } 246 247 public String getLogWriterName() { 248 return logWriterName; 249 } 250 251 public void setLogWriterName(String logFormat) { 252 this.logWriterName = logFormat; 253 } 254 255 public boolean isDynamicManagement() { 256 return dynamicManagement; 257 } 258 259 public void setDynamicManagement(boolean useJmx) { 260 this.dynamicManagement = useJmx; 261 } 262 263 public boolean isStartLogging() { 264 return startLogging; 265 } 266 267 public void setStartLogging(boolean startLogging) { 268 this.startLogging = startLogging; 269 } 270 271 /** 272 * @return the backlog 273 */ 274 public int getBacklog() { 275 return backlog; 276 } 277 278 /** 279 * @param backlog 280 * the backlog to set 281 */ 282 public void setBacklog(int backlog) { 283 this.backlog = backlog; 284 } 285 286 /** 287 * @return the useQueueForAccept 288 */ 289 public boolean isUseQueueForAccept() { 290 return useQueueForAccept; 291 } 292 293 /** 294 * @param useQueueForAccept 295 * the useQueueForAccept to set 296 */ 297 public void setUseQueueForAccept(boolean useQueueForAccept) { 298 this.useQueueForAccept = useQueueForAccept; 299 } 300 301 /** 302 * pull Sockets from the ServerSocket 303 */ 304 @Override 305 public void run() { 306 if (!isStopped() && !isStopping()) { 307 final ServerSocket serverSocket = this.serverSocket; 308 if (serverSocket == null) { 309 onAcceptError(new IOException("Server started without a valid ServerSocket")); 310 } 311 312 final ServerSocketChannel channel = serverSocket.getChannel(); 313 if (channel != null) { 314 doRunWithServerSocketChannel(channel); 315 } else { 316 doRunWithServerSocket(serverSocket); 317 } 318 } 319 } 320 321 private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { 322 try { 323 channel.configureBlocking(false); 324 final Selector selector = Selector.open(); 325 326 try { 327 channel.register(selector, SelectionKey.OP_ACCEPT); 328 } catch (ClosedChannelException ex) { 329 try { 330 selector.close(); 331 } catch (IOException ignore) {} 332 333 throw ex; 334 } 335 336 // Update object instance for later cleanup. 337 this.selector = selector; 338 339 while (!isStopped()) { 340 int count = selector.select(10); 341 342 if (count == 0) { 343 continue; 344 } 345 346 Set<SelectionKey> keys = selector.selectedKeys(); 347 348 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 349 final SelectionKey key = i.next(); 350 if (key.isAcceptable()) { 351 try { 352 SocketChannel sc = channel.accept(); 353 if (sc != null) { 354 if (isStopped() || getAcceptListener() == null) { 355 sc.close(); 356 } else { 357 if (useQueueForAccept) { 358 socketQueue.put(sc.socket()); 359 } else { 360 handleSocket(sc.socket()); 361 } 362 } 363 } 364 365 } catch (SocketTimeoutException ste) { 366 // expect this to happen 367 } catch (Exception e) { 368 e.printStackTrace(); 369 if (!isStopping()) { 370 onAcceptError(e); 371 } else if (!isStopped()) { 372 LOG.warn("run()", e); 373 onAcceptError(e); 374 } 375 } 376 } 377 i.remove(); 378 } 379 } 380 } catch (IOException ex) { 381 if (!isStopping()) { 382 onAcceptError(ex); 383 } else if (!isStopped()) { 384 LOG.warn("run()", ex); 385 onAcceptError(ex); 386 } 387 } 388 } 389 390 private void doRunWithServerSocket(final ServerSocket serverSocket) { 391 while (!isStopped()) { 392 Socket socket = null; 393 try { 394 socket = serverSocket.accept(); 395 if (socket != null) { 396 if (isStopped() || getAcceptListener() == null) { 397 socket.close(); 398 } else { 399 if (useQueueForAccept) { 400 socketQueue.put(socket); 401 } else { 402 handleSocket(socket); 403 } 404 } 405 } 406 } catch (SocketTimeoutException ste) { 407 // expect this to happen 408 } catch (Exception e) { 409 if (!isStopping()) { 410 onAcceptError(e); 411 } else if (!isStopped()) { 412 LOG.warn("run()", e); 413 onAcceptError(e); 414 } 415 } 416 } 417 } 418 419 /** 420 * Allow derived classes to override the Transport implementation that this transport server creates. 421 * 422 * @param socket 423 * @param format 424 * 425 * @return a new Transport instance. 426 * 427 * @throws IOException 428 */ 429 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 430 return new TcpTransport(format, socket); 431 } 432 433 /** 434 * @return pretty print of this 435 */ 436 @Override 437 public String toString() { 438 return "" + getBindLocation(); 439 } 440 441 /** 442 * @param socket 443 * @param bindAddress 444 * @return real hostName 445 * @throws UnknownHostException 446 */ 447 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 448 String result = null; 449 if (socket.isBound()) { 450 if (socket.getInetAddress().isAnyLocalAddress()) { 451 // make it more human readable and useful, an alternative to 0.0.0.0 452 result = InetAddressUtil.getLocalHostName(); 453 } else { 454 result = socket.getInetAddress().getCanonicalHostName(); 455 } 456 } else { 457 result = bindAddress.getCanonicalHostName(); 458 } 459 return result; 460 } 461 462 @Override 463 protected void doStart() throws Exception { 464 if (useQueueForAccept) { 465 Runnable run = new Runnable() { 466 @Override 467 public void run() { 468 try { 469 while (!isStopped() && !isStopping()) { 470 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 471 if (sock != null) { 472 try { 473 handleSocket(sock); 474 } catch (Throwable thrown) { 475 if (!isStopping()) { 476 onAcceptError(new Exception(thrown)); 477 } else if (!isStopped()) { 478 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 479 onAcceptError(new Exception(thrown)); 480 } 481 } 482 } 483 } 484 485 } catch (InterruptedException e) { 486 if (!isStopped() || !isStopping()) { 487 LOG.info("socketQueue interrupted - stopping"); 488 onAcceptError(e); 489 } 490 } 491 } 492 }; 493 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 494 socketHandlerThread.setDaemon(true); 495 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 496 socketHandlerThread.start(); 497 } 498 super.doStart(); 499 } 500 501 @Override 502 protected void doStop(ServiceStopper stopper) throws Exception { 503 Exception firstFailure = null; 504 505 try { 506 if (selector != null) { 507 selector.close(); 508 selector = null; 509 } 510 } catch (Exception error) { 511 } 512 513 try { 514 final ServerSocket serverSocket = this.serverSocket; 515 if (serverSocket != null) { 516 this.serverSocket = null; 517 serverSocket.close(); 518 } 519 } catch (Exception error) { 520 firstFailure = error; 521 } 522 523 if (socketHandlerThread != null) { 524 socketHandlerThread.interrupt(); 525 socketHandlerThread = null; 526 } 527 528 try { 529 super.doStop(stopper); 530 } catch (Exception error) { 531 if (firstFailure != null) { 532 firstFailure = error; 533 } 534 } 535 536 if (firstFailure != null) { 537 throw firstFailure; 538 } 539 } 540 541 @Override 542 public InetSocketAddress getSocketAddress() { 543 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 544 } 545 546 protected void handleSocket(Socket socket) { 547 doHandleSocket(socket); 548 } 549 550 final protected void doHandleSocket(Socket socket) { 551 boolean closeSocket = true; 552 boolean countIncremented = false; 553 try { 554 int currentCount; 555 do { 556 currentCount = currentTransportCount.get(); 557 if (currentCount >= this.maximumConnections) { 558 throw new ExceededMaximumConnectionsException( 559 "Exceeded the maximum number of allowed client connections. See the '" + 560 "maximumConnections' property on the TCP transport configuration URI " + 561 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 562 } 563 564 //Increment this value before configuring the transport 565 //This is necessary because some of the transport servers must read from the 566 //socket during configureTransport() so we want to make sure this value is 567 //accurate as the transport server could pause here waiting for data to be sent from a client 568 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 569 countIncremented = true; 570 571 HashMap<String, Object> options = new HashMap<String, Object>(); 572 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 573 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 574 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 575 options.put("trace", Boolean.valueOf(trace)); 576 options.put("soTimeout", Integer.valueOf(soTimeout)); 577 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 578 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 579 options.put("logWriterName", logWriterName); 580 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 581 options.put("startLogging", Boolean.valueOf(startLogging)); 582 options.putAll(transportOptions); 583 584 TransportInfo transportInfo = configureTransport(this, socket); 585 closeSocket = false; 586 587 if (transportInfo.transport instanceof ServiceSupport) { 588 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 589 } 590 591 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 592 transportInfo.transport, transportInfo.format, options); 593 594 getAcceptListener().onAccept(configuredTransport); 595 596 } catch (SocketTimeoutException ste) { 597 // expect this to happen 598 } catch (Exception e) { 599 if (closeSocket) { 600 try { 601 //if closing the socket, only decrement the count it was actually incremented 602 //where it was incremented 603 if (countIncremented) { 604 currentTransportCount.decrementAndGet(); 605 } 606 socket.close(); 607 } catch (Exception ignore) { 608 } 609 } 610 611 if (!isStopping()) { 612 onAcceptError(e); 613 } else if (!isStopped()) { 614 LOG.warn("run()", e); 615 onAcceptError(e); 616 } 617 } 618 } 619 620 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 621 WireFormat format = wireFormatFactory.createWireFormat(); 622 Transport transport = createTransport(socket, format); 623 return new TransportInfo(format, transport, transportFactory); 624 } 625 626 protected class TransportInfo { 627 final WireFormat format; 628 final Transport transport; 629 final TransportFactory transportFactory; 630 631 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 632 this.format = format; 633 this.transport = transport; 634 this.transportFactory = transportFactory; 635 } 636 } 637 638 public int getSoTimeout() { 639 return soTimeout; 640 } 641 642 public void setSoTimeout(int soTimeout) { 643 this.soTimeout = soTimeout; 644 } 645 646 public int getSocketBufferSize() { 647 return socketBufferSize; 648 } 649 650 public void setSocketBufferSize(int socketBufferSize) { 651 this.socketBufferSize = socketBufferSize; 652 } 653 654 public int getConnectionTimeout() { 655 return connectionTimeout; 656 } 657 658 public void setConnectionTimeout(int connectionTimeout) { 659 this.connectionTimeout = connectionTimeout; 660 } 661 662 /** 663 * @return the maximumConnections 664 */ 665 public int getMaximumConnections() { 666 return maximumConnections; 667 } 668 669 /** 670 * @param maximumConnections 671 * the maximumConnections to set 672 */ 673 public void setMaximumConnections(int maximumConnections) { 674 this.maximumConnections = maximumConnections; 675 } 676 677 public AtomicInteger getCurrentTransportCount() { 678 return currentTransportCount; 679 } 680 681 @Override 682 public void started(Service service) { 683 } 684 685 @Override 686 public void stopped(Service service) { 687 this.currentTransportCount.decrementAndGet(); 688 } 689 690 @Override 691 public boolean isSslServer() { 692 return false; 693 } 694 695 @Override 696 public boolean isAllowLinkStealing() { 697 return allowLinkStealing; 698 } 699 700 @Override 701 public void setAllowLinkStealing(boolean allowLinkStealing) { 702 this.allowLinkStealing = allowLinkStealing; 703 } 704}