001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.ClosedChannelException;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.nio.channels.ServerSocketChannel;
033import java.nio.channels.SocketChannel;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.net.ServerSocketFactory;
043import javax.net.ssl.SSLServerSocket;
044
045import org.apache.activemq.Service;
046import org.apache.activemq.ThreadPriorities;
047import org.apache.activemq.TransportLoggerSupport;
048import org.apache.activemq.command.BrokerInfo;
049import org.apache.activemq.openwire.OpenWireFormatFactory;
050import org.apache.activemq.transport.Transport;
051import org.apache.activemq.transport.TransportFactory;
052import org.apache.activemq.transport.TransportServer;
053import org.apache.activemq.transport.TransportServerThreadSupport;
054import org.apache.activemq.util.IOExceptionSupport;
055import org.apache.activemq.util.InetAddressUtil;
056import org.apache.activemq.util.IntrospectionSupport;
057import org.apache.activemq.util.ServiceListener;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ServiceSupport;
060import org.apache.activemq.wireformat.WireFormat;
061import org.apache.activemq.wireformat.WireFormatFactory;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A TCP based implementation of {@link TransportServer}
067 */
068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
069
070    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
071
072    protected volatile ServerSocket serverSocket;
073    protected volatile Selector selector;
074    protected int backlog = 5000;
075    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
076    protected final TcpTransportFactory transportFactory;
077    protected long maxInactivityDuration = 30000;
078    protected long maxInactivityDurationInitalDelay = 10000;
079    protected int minmumWireFormatVersion;
080    protected boolean useQueueForAccept = true;
081    protected boolean allowLinkStealing;
082
083    /**
084     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
085     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
086     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
087     * TransportConnector URIs.
088     */
089    protected boolean trace = false;
090
091    protected int soTimeout = 0;
092    protected int socketBufferSize = 64 * 1024;
093    protected int connectionTimeout = 30000;
094
095    /**
096     * Name of the LogWriter implementation to use. Names are mapped to classes in the
097     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
098     * set in Connection or TransportConnector URIs.
099     */
100    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
101
102    /**
103     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
104     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
105     */
106    protected boolean dynamicManagement = false;
107
108    /**
109     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
110     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
111     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
112     * TransportConnector URIs.
113     */
114    protected boolean startLogging = true;
115    protected final ServerSocketFactory serverSocketFactory;
116    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
117    protected Thread socketHandlerThread;
118
119    /**
120     * The maximum number of sockets allowed for this server
121     */
122    protected int maximumConnections = Integer.MAX_VALUE;
123    protected final AtomicInteger currentTransportCount = new AtomicInteger();
124
125    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
126        URISyntaxException {
127        super(location);
128        this.transportFactory = transportFactory;
129        this.serverSocketFactory = serverSocketFactory;
130    }
131
132    public void bind() throws IOException {
133        URI bind = getBindLocation();
134
135        String host = bind.getHost();
136        host = (host == null || host.length() == 0) ? "localhost" : host;
137        InetAddress addr = InetAddress.getByName(host);
138
139        try {
140            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
141            configureServerSocket(serverSocket);
142        } catch (IOException e) {
143            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
144        }
145        try {
146            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
147                bind.getQuery(), bind.getFragment()));
148        } catch (URISyntaxException e) {
149            // it could be that the host name contains invalid characters such
150            // as _ on unix platforms so lets try use the IP address instead
151            try {
152                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
153                    bind.getQuery(), bind.getFragment()));
154            } catch (URISyntaxException e2) {
155                throw IOExceptionSupport.create(e2);
156            }
157        }
158    }
159
160    private void configureServerSocket(ServerSocket socket) throws SocketException {
161        socket.setSoTimeout(2000);
162        if (transportOptions != null) {
163
164            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
165            // to SSLServerSocket to configure it has a side effect on the socket rendering it
166            // useless as all suites are enabled many of which are considered as insecure.  We
167            // instead trap that option here and throw an exception.  We should really consider
168            // all invalid options as breaking and not start the transport but the current design
169            // doesn't really allow for this.
170            //
171            //  see: https://issues.apache.org/jira/browse/AMQ-4582
172            //
173            if (socket instanceof SSLServerSocket) {
174                if (transportOptions.containsKey("enabledCipherSuites")) {
175                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
176
177                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
178                        throw new SocketException(String.format(
179                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
180                    }
181                }
182            }
183
184            //AMQ-6599 - don't strip out set properties on the socket as we need to set them
185            //on the Transport as well later
186            IntrospectionSupport.setProperties(socket, transportOptions, false);
187        }
188    }
189
190    /**
191     * @return Returns the wireFormatFactory.
192     */
193    public WireFormatFactory getWireFormatFactory() {
194        return wireFormatFactory;
195    }
196
197    /**
198     * @param wireFormatFactory
199     *            The wireFormatFactory to set.
200     */
201    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
202        this.wireFormatFactory = wireFormatFactory;
203    }
204
205    /**
206     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
207     * broker.
208     *
209     * @param brokerInfo
210     */
211    @Override
212    public void setBrokerInfo(BrokerInfo brokerInfo) {
213    }
214
215    public long getMaxInactivityDuration() {
216        return maxInactivityDuration;
217    }
218
219    public void setMaxInactivityDuration(long maxInactivityDuration) {
220        this.maxInactivityDuration = maxInactivityDuration;
221    }
222
223    public long getMaxInactivityDurationInitalDelay() {
224        return this.maxInactivityDurationInitalDelay;
225    }
226
227    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
228        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
229    }
230
231    public int getMinmumWireFormatVersion() {
232        return minmumWireFormatVersion;
233    }
234
235    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
236        this.minmumWireFormatVersion = minmumWireFormatVersion;
237    }
238
239    public boolean isTrace() {
240        return trace;
241    }
242
243    public void setTrace(boolean trace) {
244        this.trace = trace;
245    }
246
247    public String getLogWriterName() {
248        return logWriterName;
249    }
250
251    public void setLogWriterName(String logFormat) {
252        this.logWriterName = logFormat;
253    }
254
255    public boolean isDynamicManagement() {
256        return dynamicManagement;
257    }
258
259    public void setDynamicManagement(boolean useJmx) {
260        this.dynamicManagement = useJmx;
261    }
262
263    public boolean isStartLogging() {
264        return startLogging;
265    }
266
267    public void setStartLogging(boolean startLogging) {
268        this.startLogging = startLogging;
269    }
270
271    /**
272     * @return the backlog
273     */
274    public int getBacklog() {
275        return backlog;
276    }
277
278    /**
279     * @param backlog
280     *            the backlog to set
281     */
282    public void setBacklog(int backlog) {
283        this.backlog = backlog;
284    }
285
286    /**
287     * @return the useQueueForAccept
288     */
289    public boolean isUseQueueForAccept() {
290        return useQueueForAccept;
291    }
292
293    /**
294     * @param useQueueForAccept
295     *            the useQueueForAccept to set
296     */
297    public void setUseQueueForAccept(boolean useQueueForAccept) {
298        this.useQueueForAccept = useQueueForAccept;
299    }
300
301    /**
302     * pull Sockets from the ServerSocket
303     */
304    @Override
305    public void run() {
306        if (!isStopped() && !isStopping()) {
307            final ServerSocket serverSocket = this.serverSocket;
308            if (serverSocket == null) {
309                onAcceptError(new IOException("Server started without a valid ServerSocket"));
310            }
311
312            final ServerSocketChannel channel = serverSocket.getChannel();
313            if (channel != null) {
314                doRunWithServerSocketChannel(channel);
315            } else {
316                doRunWithServerSocket(serverSocket);
317            }
318        }
319    }
320
321    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
322        try {
323            channel.configureBlocking(false);
324            final Selector selector = Selector.open();
325
326            try {
327                channel.register(selector, SelectionKey.OP_ACCEPT);
328            } catch (ClosedChannelException ex) {
329                try {
330                    selector.close();
331                } catch (IOException ignore) {}
332
333                throw ex;
334            }
335
336            // Update object instance for later cleanup.
337            this.selector = selector;
338
339            while (!isStopped()) {
340                int count = selector.select(10);
341
342                if (count == 0) {
343                    continue;
344                }
345
346                Set<SelectionKey> keys = selector.selectedKeys();
347
348                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
349                    final SelectionKey key = i.next();
350                    if (key.isAcceptable()) {
351                        try {
352                            SocketChannel sc = channel.accept();
353                            if (sc != null) {
354                                if (isStopped() || getAcceptListener() == null) {
355                                    sc.close();
356                                } else {
357                                    if (useQueueForAccept) {
358                                        socketQueue.put(sc.socket());
359                                    } else {
360                                        handleSocket(sc.socket());
361                                    }
362                                }
363                            }
364
365                        } catch (SocketTimeoutException ste) {
366                            // expect this to happen
367                        } catch (Exception e) {
368                            e.printStackTrace();
369                            if (!isStopping()) {
370                                onAcceptError(e);
371                            } else if (!isStopped()) {
372                                LOG.warn("run()", e);
373                                onAcceptError(e);
374                            }
375                        }
376                    }
377                    i.remove();
378                }
379            }
380        } catch (IOException ex) {
381            if (!isStopping()) {
382                onAcceptError(ex);
383            } else if (!isStopped()) {
384                LOG.warn("run()", ex);
385                onAcceptError(ex);
386            }
387        }
388    }
389
390    private void doRunWithServerSocket(final ServerSocket serverSocket) {
391        while (!isStopped()) {
392            Socket socket = null;
393            try {
394                socket = serverSocket.accept();
395                if (socket != null) {
396                    if (isStopped() || getAcceptListener() == null) {
397                        socket.close();
398                    } else {
399                        if (useQueueForAccept) {
400                            socketQueue.put(socket);
401                        } else {
402                            handleSocket(socket);
403                        }
404                    }
405                }
406            } catch (SocketTimeoutException ste) {
407                // expect this to happen
408            } catch (Exception e) {
409                if (!isStopping()) {
410                    onAcceptError(e);
411                } else if (!isStopped()) {
412                    LOG.warn("run()", e);
413                    onAcceptError(e);
414                }
415            }
416        }
417    }
418
419    /**
420     * Allow derived classes to override the Transport implementation that this transport server creates.
421     *
422     * @param socket
423     * @param format
424     *
425     * @return a new Transport instance.
426     *
427     * @throws IOException
428     */
429    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
430        return new TcpTransport(format, socket);
431    }
432
433    /**
434     * @return pretty print of this
435     */
436    @Override
437    public String toString() {
438        return "" + getBindLocation();
439    }
440
441    /**
442     * @param socket
443     * @param bindAddress
444     * @return real hostName
445     * @throws UnknownHostException
446     */
447    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
448        String result = null;
449        if (socket.isBound()) {
450            if (socket.getInetAddress().isAnyLocalAddress()) {
451                // make it more human readable and useful, an alternative to 0.0.0.0
452                result = InetAddressUtil.getLocalHostName();
453            } else {
454                result = socket.getInetAddress().getCanonicalHostName();
455            }
456        } else {
457            result = bindAddress.getCanonicalHostName();
458        }
459        return result;
460    }
461
462    @Override
463    protected void doStart() throws Exception {
464        if (useQueueForAccept) {
465            Runnable run = new Runnable() {
466                @Override
467                public void run() {
468                    try {
469                        while (!isStopped() && !isStopping()) {
470                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
471                            if (sock != null) {
472                                try {
473                                    handleSocket(sock);
474                                } catch (Throwable thrown) {
475                                    if (!isStopping()) {
476                                        onAcceptError(new Exception(thrown));
477                                    } else if (!isStopped()) {
478                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
479                                        onAcceptError(new Exception(thrown));
480                                    }
481                                }
482                            }
483                        }
484
485                    } catch (InterruptedException e) {
486                        if (!isStopped() || !isStopping()) {
487                            LOG.info("socketQueue interrupted - stopping");
488                            onAcceptError(e);
489                        }
490                    }
491                }
492            };
493            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
494            socketHandlerThread.setDaemon(true);
495            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
496            socketHandlerThread.start();
497        }
498        super.doStart();
499    }
500
501    @Override
502    protected void doStop(ServiceStopper stopper) throws Exception {
503        Exception firstFailure = null;
504
505        try {
506            if (selector != null) {
507                selector.close();
508                selector = null;
509            }
510        } catch (Exception error) {
511        }
512
513        try {
514            final ServerSocket serverSocket = this.serverSocket;
515            if (serverSocket != null) {
516                this.serverSocket = null;
517                serverSocket.close();
518            }
519        } catch (Exception error) {
520            firstFailure = error;
521        }
522
523        if (socketHandlerThread != null) {
524            socketHandlerThread.interrupt();
525            socketHandlerThread = null;
526        }
527
528        try {
529            super.doStop(stopper);
530        } catch (Exception error) {
531            if (firstFailure != null) {
532                firstFailure = error;
533            }
534        }
535
536        if (firstFailure != null) {
537            throw firstFailure;
538        }
539    }
540
541    @Override
542    public InetSocketAddress getSocketAddress() {
543        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
544    }
545
546    protected void handleSocket(Socket socket) {
547        doHandleSocket(socket);
548    }
549
550    final protected void doHandleSocket(Socket socket) {
551        boolean closeSocket = true;
552        boolean countIncremented = false;
553        try {
554            int currentCount;
555            do {
556                currentCount = currentTransportCount.get();
557                if (currentCount >= this.maximumConnections) {
558                     throw new ExceededMaximumConnectionsException(
559                         "Exceeded the maximum number of allowed client connections. See the '" +
560                         "maximumConnections' property on the TCP transport configuration URI " +
561                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
562                 }
563
564            //Increment this value before configuring the transport
565            //This is necessary because some of the transport servers must read from the
566            //socket during configureTransport() so we want to make sure this value is
567            //accurate as the transport server could pause here waiting for data to be sent from a client
568            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
569            countIncremented = true;
570
571            HashMap<String, Object> options = new HashMap<String, Object>();
572            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
573            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
574            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
575            options.put("trace", Boolean.valueOf(trace));
576            options.put("soTimeout", Integer.valueOf(soTimeout));
577            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
578            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
579            options.put("logWriterName", logWriterName);
580            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
581            options.put("startLogging", Boolean.valueOf(startLogging));
582            options.putAll(transportOptions);
583
584            TransportInfo transportInfo = configureTransport(this, socket);
585            closeSocket = false;
586
587            if (transportInfo.transport instanceof ServiceSupport) {
588                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
589            }
590
591            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
592                    transportInfo.transport, transportInfo.format, options);
593
594            getAcceptListener().onAccept(configuredTransport);
595
596        } catch (SocketTimeoutException ste) {
597            // expect this to happen
598        } catch (Exception e) {
599            if (closeSocket) {
600                try {
601                    //if closing the socket, only decrement the count it was actually incremented
602                    //where it was incremented
603                    if (countIncremented) {
604                        currentTransportCount.decrementAndGet();
605                    }
606                    socket.close();
607                } catch (Exception ignore) {
608                }
609            }
610
611            if (!isStopping()) {
612                onAcceptError(e);
613            } else if (!isStopped()) {
614                LOG.warn("run()", e);
615                onAcceptError(e);
616            }
617        }
618    }
619
620    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
621        WireFormat format = wireFormatFactory.createWireFormat();
622        Transport transport = createTransport(socket, format);
623        return new TransportInfo(format, transport, transportFactory);
624    }
625
626    protected class TransportInfo {
627        final WireFormat format;
628        final Transport transport;
629        final TransportFactory transportFactory;
630
631        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
632            this.format = format;
633            this.transport = transport;
634            this.transportFactory = transportFactory;
635        }
636    }
637
638    public int getSoTimeout() {
639        return soTimeout;
640    }
641
642    public void setSoTimeout(int soTimeout) {
643        this.soTimeout = soTimeout;
644    }
645
646    public int getSocketBufferSize() {
647        return socketBufferSize;
648    }
649
650    public void setSocketBufferSize(int socketBufferSize) {
651        this.socketBufferSize = socketBufferSize;
652    }
653
654    public int getConnectionTimeout() {
655        return connectionTimeout;
656    }
657
658    public void setConnectionTimeout(int connectionTimeout) {
659        this.connectionTimeout = connectionTimeout;
660    }
661
662    /**
663     * @return the maximumConnections
664     */
665    public int getMaximumConnections() {
666        return maximumConnections;
667    }
668
669    /**
670     * @param maximumConnections
671     *            the maximumConnections to set
672     */
673    public void setMaximumConnections(int maximumConnections) {
674        this.maximumConnections = maximumConnections;
675    }
676
677    public AtomicInteger getCurrentTransportCount() {
678        return currentTransportCount;
679    }
680
681    @Override
682    public void started(Service service) {
683    }
684
685    @Override
686    public void stopped(Service service) {
687        this.currentTransportCount.decrementAndGet();
688    }
689
690    @Override
691    public boolean isSslServer() {
692        return false;
693    }
694
695    @Override
696    public boolean isAllowLinkStealing() {
697        return allowLinkStealing;
698    }
699
700    @Override
701    public void setAllowLinkStealing(boolean allowLinkStealing) {
702        this.allowLinkStealing = allowLinkStealing;
703    }
704}