001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.zip.DataFormatException;
025import java.util.zip.Inflater;
026
027import javax.jms.Destination;
028import javax.jms.InvalidClientIDException;
029import javax.jms.JMSException;
030import javax.jms.Message;
031import javax.security.auth.login.CredentialException;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQBytesMessage;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTextMessage;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionError;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.ConnectionInfo;
045import org.apache.activemq.command.ExceptionResponse;
046import org.apache.activemq.command.MessageAck;
047import org.apache.activemq.command.MessageDispatch;
048import org.apache.activemq.command.MessageId;
049import org.apache.activemq.command.ProducerId;
050import org.apache.activemq.command.ProducerInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
056import org.apache.activemq.util.ByteArrayOutputStream;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.apache.activemq.util.LRUCache;
063import org.apache.activemq.util.LongSequenceGenerator;
064import org.fusesource.hawtbuf.Buffer;
065import org.fusesource.hawtbuf.UTF8Buffer;
066import org.fusesource.mqtt.client.QoS;
067import org.fusesource.mqtt.client.Topic;
068import org.fusesource.mqtt.codec.CONNACK;
069import org.fusesource.mqtt.codec.CONNECT;
070import org.fusesource.mqtt.codec.DISCONNECT;
071import org.fusesource.mqtt.codec.MQTTFrame;
072import org.fusesource.mqtt.codec.PINGREQ;
073import org.fusesource.mqtt.codec.PINGRESP;
074import org.fusesource.mqtt.codec.PUBACK;
075import org.fusesource.mqtt.codec.PUBCOMP;
076import org.fusesource.mqtt.codec.PUBLISH;
077import org.fusesource.mqtt.codec.PUBREC;
078import org.fusesource.mqtt.codec.PUBREL;
079import org.fusesource.mqtt.codec.SUBACK;
080import org.fusesource.mqtt.codec.SUBSCRIBE;
081import org.fusesource.mqtt.codec.UNSUBACK;
082import org.fusesource.mqtt.codec.UNSUBSCRIBE;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086public class MQTTProtocolConverter {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
089
090    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
091    public static final int V3_1 = 3;
092    public static final int V3_1_1 = 4;
093
094    public static final String SINGLE_LEVEL_WILDCARD = "+";
095    public static final String MULTI_LEVEL_WILDCARD = "#";
096
097    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
098    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
099    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
100    static final int DEFAULT_CACHE_SIZE = 5000;
101
102    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
103    private final SessionId sessionId = new SessionId(connectionId, -1);
104    private final ProducerId producerId = new ProducerId(sessionId, 1);
105    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
106
107    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
108    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
109    private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
110
111    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
112    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
113
114    private final MQTTTransport mqttTransport;
115    private final BrokerService brokerService;
116
117    private final Object commnadIdMutex = new Object();
118    private int lastCommandId;
119    private final AtomicBoolean connected = new AtomicBoolean(false);
120    private final ConnectionInfo connectionInfo = new ConnectionInfo();
121    private CONNECT connect;
122    private String clientId;
123    private long defaultKeepAlive;
124    private int activeMQSubscriptionPrefetch = -1;
125    private final MQTTPacketIdGenerator packetIdGenerator;
126    private boolean publishDollarTopics;
127
128    public int version;
129
130    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
131
132    /*
133     * Subscription strategy configuration element.
134     *   > mqtt-default-subscriptions
135     *   > mqtt-virtual-topic-subscriptions
136     */
137    private String subscriptionStrategyName = "mqtt-default-subscriptions";
138    private MQTTSubscriptionStrategy subsciptionStrategy;
139
140    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
141        this.mqttTransport = mqttTransport;
142        this.brokerService = brokerService;
143        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
144        this.defaultKeepAlive = 0;
145    }
146
147    int generateCommandId() {
148        synchronized (commnadIdMutex) {
149            return lastCommandId++;
150        }
151    }
152
153    public void sendToActiveMQ(Command command, ResponseHandler handler) {
154
155        // Lets intercept message send requests..
156        if (command instanceof ActiveMQMessage) {
157            ActiveMQMessage msg = (ActiveMQMessage) command;
158            try {
159                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
160                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
161                    // specification requirements for system assigned destinations.
162                    if (handler != null) {
163                        try {
164                            handler.onResponse(this, new Response());
165                        } catch (IOException e) {
166                            LOG.warn("Failed to send command " + command, e);
167                        }
168                    }
169                    return;
170                }
171            } catch (IOException e) {
172                LOG.warn("Failed to send command " + command, e);
173            }
174        }
175
176        command.setCommandId(generateCommandId());
177        if (handler != null) {
178            command.setResponseRequired(true);
179            resposeHandlers.put(command.getCommandId(), handler);
180        }
181        getMQTTTransport().sendToActiveMQ(command);
182    }
183
184    void sendToMQTT(MQTTFrame frame) {
185        try {
186            mqttTransport.sendToMQTT(frame);
187        } catch (IOException e) {
188            LOG.warn("Failed to send frame " + frame, e);
189        }
190    }
191
192    /**
193     * Convert a MQTT command
194     */
195    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
196        switch (frame.messageType()) {
197            case PINGREQ.TYPE:
198                LOG.debug("Received a ping from client: " + getClientId());
199                checkConnected();
200                sendToMQTT(PING_RESP_FRAME);
201                LOG.debug("Sent Ping Response to " + getClientId());
202                break;
203            case CONNECT.TYPE:
204                CONNECT connect = new CONNECT().decode(frame);
205                onMQTTConnect(connect);
206                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
207                break;
208            case DISCONNECT.TYPE:
209                LOG.debug("MQTT Client {} disconnecting", getClientId());
210                onMQTTDisconnect();
211                break;
212            case SUBSCRIBE.TYPE:
213                onSubscribe(new SUBSCRIBE().decode(frame));
214                break;
215            case UNSUBSCRIBE.TYPE:
216                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
217                break;
218            case PUBLISH.TYPE:
219                onMQTTPublish(new PUBLISH().decode(frame));
220                break;
221            case PUBACK.TYPE:
222                onMQTTPubAck(new PUBACK().decode(frame));
223                break;
224            case PUBREC.TYPE:
225                onMQTTPubRec(new PUBREC().decode(frame));
226                break;
227            case PUBREL.TYPE:
228                onMQTTPubRel(new PUBREL().decode(frame));
229                break;
230            case PUBCOMP.TYPE:
231                onMQTTPubComp(new PUBCOMP().decode(frame));
232                break;
233            default:
234                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
235        }
236    }
237
238    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
239        if (connected.get()) {
240            throw new MQTTProtocolException("Already connected.");
241        }
242        this.connect = connect;
243
244        // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01
245        // (unacceptable protocol level) and then disconnect the Client if the Protocol Level
246        // is not supported by the Server [MQTT-3.1.2-2].
247        if (connect.version() < 3 || connect.version() > 4) {
248            CONNACK ack = new CONNACK();
249            ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
250            try {
251                getMQTTTransport().sendToMQTT(ack.encode());
252                getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null));
253            } catch (IOException e) {
254                getMQTTTransport().onException(IOExceptionSupport.create(e));
255            }
256            return;
257        }
258
259        String clientId = "";
260        if (connect.clientId() != null) {
261            clientId = connect.clientId().toString();
262        }
263
264        String userName = null;
265        if (connect.userName() != null) {
266            userName = connect.userName().toString();
267        }
268        String passswd = null;
269        if (connect.password() != null) {
270
271            if (userName == null && connect.version() != V3_1) {
272                // [MQTT-3.1.2-22]: If the user name is not present then the
273                // password must also be absent.
274                // [MQTT-3.1.4-1]: would seem to imply we don't send a CONNACK here.
275                getMQTTTransport().onException(IOExceptionSupport.create("Password given without a user name", null));
276                return;
277            }
278
279            passswd = connect.password().toString();
280        }
281
282        version = connect.version();
283
284        configureInactivityMonitor(connect.keepAlive());
285
286        connectionInfo.setConnectionId(connectionId);
287        if (clientId != null && !clientId.isEmpty()) {
288            connectionInfo.setClientId(clientId);
289        } else {
290            // Clean Session MUST be set for 0 length Client Id
291            if (!connect.cleanSession()) {
292                CONNACK ack = new CONNACK();
293                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
294                try {
295                    getMQTTTransport().sendToMQTT(ack.encode());
296                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
297                } catch (IOException e) {
298                    getMQTTTransport().onException(IOExceptionSupport.create(e));
299                }
300                return;
301            }
302            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
303        }
304
305        connectionInfo.setResponseRequired(true);
306        connectionInfo.setUserName(userName);
307        connectionInfo.setPassword(passswd);
308        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
309
310        sendToActiveMQ(connectionInfo, new ResponseHandler() {
311            @Override
312            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
313
314                if (response.isException()) {
315                    // If the connection attempt fails we close the socket.
316                    Throwable exception = ((ExceptionResponse) response).getException();
317                    //let the client know
318                    CONNACK ack = new CONNACK();
319                    if (exception instanceof InvalidClientIDException) {
320                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
321                    } else if (exception instanceof SecurityException) {
322                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
323                    } else if (exception instanceof CredentialException) {
324                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
325                    } else {
326                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
327                    }
328                    getMQTTTransport().sendToMQTT(ack.encode());
329                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
330                    return;
331                }
332
333                final SessionInfo sessionInfo = new SessionInfo(sessionId);
334                sendToActiveMQ(sessionInfo, null);
335
336                final ProducerInfo producerInfo = new ProducerInfo(producerId);
337                sendToActiveMQ(producerInfo, new ResponseHandler() {
338                    @Override
339                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
340
341                        if (response.isException()) {
342                            // If the connection attempt fails we close the socket.
343                            Throwable exception = ((ExceptionResponse) response).getException();
344                            CONNACK ack = new CONNACK();
345                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
346                            getMQTTTransport().sendToMQTT(ack.encode());
347                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
348                            return;
349                        }
350
351                        CONNACK ack = new CONNACK();
352                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
353                        connected.set(true);
354                        getMQTTTransport().sendToMQTT(ack.encode());
355
356                        if (connect.cleanSession()) {
357                            packetIdGenerator.stopClientSession(getClientId());
358                        } else {
359                            packetIdGenerator.startClientSession(getClientId());
360                        }
361
362                        findSubscriptionStrategy().onConnect(connect);
363                    }
364                });
365            }
366        });
367    }
368
369    void onMQTTDisconnect() throws MQTTProtocolException {
370        if (connected.compareAndSet(true, false)) {
371            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
372            sendToActiveMQ(new ShutdownInfo(), null);
373        }
374        stopTransport();
375    }
376
377    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
378        checkConnected();
379        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
380                  command.messageId(), clientId, connectionInfo.getConnectionId());
381        Topic[] topics = command.topics();
382        if (topics != null) {
383            byte[] qos = new byte[topics.length];
384            for (int i = 0; i < topics.length; i++) {
385                MQTTProtocolSupport.validate(topics[i].name().toString());
386                try {
387                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
388                } catch (IOException e) {
389                    throw new MQTTProtocolException("Failed to process subscription request", true, e);
390                }
391            }
392            SUBACK ack = new SUBACK();
393            ack.messageId(command.messageId());
394            ack.grantedQos(qos);
395            try {
396                getMQTTTransport().sendToMQTT(ack.encode());
397            } catch (IOException e) {
398                LOG.warn("Couldn't send SUBACK for " + command, e);
399            }
400        } else {
401            LOG.warn("No topics defined for Subscription " + command);
402            throw new MQTTProtocolException("SUBSCRIBE command received with no topic filter");
403        }
404    }
405
406    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
407        checkConnected();
408        if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) {
409            throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS"));
410        }
411        UTF8Buffer[] topics = command.topics();
412        if (topics != null) {
413            for (UTF8Buffer topic : topics) {
414                MQTTProtocolSupport.validate(topic.toString());
415                try {
416                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
417                } catch (IOException e) {
418                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
419                }
420            }
421            UNSUBACK ack = new UNSUBACK();
422            ack.messageId(command.messageId());
423            sendToMQTT(ack.encode());
424        } else {
425            LOG.warn("No topics defined for Subscription " + command);
426            throw new MQTTProtocolException("UNSUBSCRIBE command received with no topic filter");
427        }
428    }
429
430    /**
431     * Dispatch an ActiveMQ command
432     */
433    public void onActiveMQCommand(Command command) throws Exception {
434        if (command.isResponse()) {
435            Response response = (Response) command;
436            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
437            if (rh != null) {
438                rh.onResponse(this, response);
439            } else {
440                // Pass down any unexpected errors. Should this close the connection?
441                if (response.isException()) {
442                    Throwable exception = ((ExceptionResponse) response).getException();
443                    handleException(exception, null);
444                }
445            }
446        } else if (command.isMessageDispatch()) {
447            MessageDispatch md = (MessageDispatch) command;
448            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
449            if (sub != null) {
450                MessageAck ack = sub.createMessageAck(md);
451                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
452                switch (publish.qos()) {
453                    case AT_LEAST_ONCE:
454                    case EXACTLY_ONCE:
455                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
456                    case AT_MOST_ONCE:
457                }
458                if (ack != null && sub.expectAck(publish)) {
459                    synchronized (consumerAcks) {
460                        consumerAcks.put(publish.messageId(), ack);
461                    }
462                }
463                LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
464                          publish.messageId(), clientId, connectionInfo.getConnectionId());
465                getMQTTTransport().sendToMQTT(publish.encode());
466                if (ack != null && !sub.expectAck(publish)) {
467                    getMQTTTransport().sendToActiveMQ(ack);
468                }
469            }
470        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
471            // Pass down any unexpected async errors. Should this close the connection?
472            Throwable exception = ((ConnectionError) command).getException();
473            handleException(exception, null);
474        } else if (command.isBrokerInfo()) {
475            //ignore
476        } else {
477            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
478        }
479    }
480
481    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
482        checkConnected();
483        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
484                  command.messageId(), clientId, connectionInfo.getConnectionId());
485        //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet
486        if (containsMqttWildcard(command.topicName().toString())) {
487            // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
488            getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null));
489            return;
490        }
491        ActiveMQMessage message = convertMessage(command);
492        message.setProducerId(producerId);
493        message.onSend();
494        sendToActiveMQ(message, createResponseHandler(command));
495    }
496
497    void onMQTTPubAck(PUBACK command) {
498        short messageId = command.messageId();
499        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
500                  messageId, clientId, connectionInfo.getConnectionId());
501        packetIdGenerator.ackPacketId(getClientId(), messageId);
502        MessageAck ack;
503        synchronized (consumerAcks) {
504            ack = consumerAcks.remove(messageId);
505        }
506        if (ack != null) {
507            getMQTTTransport().sendToActiveMQ(ack);
508        }
509    }
510
511    void onMQTTPubRec(PUBREC commnand) {
512        //from a subscriber - send a PUBREL in response
513        PUBREL pubrel = new PUBREL();
514        pubrel.messageId(commnand.messageId());
515        sendToMQTT(pubrel.encode());
516    }
517
518    void onMQTTPubRel(PUBREL command) {
519        PUBREC ack;
520        synchronized (publisherRecs) {
521            ack = publisherRecs.remove(command.messageId());
522        }
523        if (ack == null) {
524            LOG.warn("Unknown PUBREL: {} received", command.messageId());
525        }
526        PUBCOMP pubcomp = new PUBCOMP();
527        pubcomp.messageId(command.messageId());
528        sendToMQTT(pubcomp.encode());
529    }
530
531    void onMQTTPubComp(PUBCOMP command) {
532        short messageId = command.messageId();
533        packetIdGenerator.ackPacketId(getClientId(), messageId);
534        MessageAck ack;
535        synchronized (consumerAcks) {
536            ack = consumerAcks.remove(messageId);
537        }
538        if (ack != null) {
539            getMQTTTransport().sendToActiveMQ(ack);
540        }
541    }
542
543    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
544        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
545
546        msg.setProducerId(producerId);
547        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
548        msg.setMessageId(id);
549        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
550                command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
551        msg.setTimestamp(System.currentTimeMillis());
552        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
553        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
554        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
555        if (command.retain()) {
556            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
557        }
558
559        ActiveMQDestination destination;
560        synchronized (activeMQDestinationMap) {
561            destination = activeMQDestinationMap.get(command.topicName());
562            if (destination == null) {
563                String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
564                try {
565                    destination = findSubscriptionStrategy().onSend(topicName);
566                } catch (IOException e) {
567                    throw JMSExceptionSupport.create(e);
568                }
569
570                activeMQDestinationMap.put(command.topicName().toString(), destination);
571            }
572        }
573
574        msg.setJMSDestination(destination);
575        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
576        return msg;
577    }
578
579    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
580        PUBLISH result = new PUBLISH();
581        // packet id is set in MQTTSubscription
582        QoS qoS;
583        if (message.propertyExists(QOS_PROPERTY_NAME)) {
584            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
585            qoS = QoS.values()[ordinal];
586
587        } else {
588            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
589        }
590        result.qos(qoS);
591        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
592            result.retain(true);
593        }
594
595        String topicName;
596        synchronized (mqttTopicMap) {
597            topicName = mqttTopicMap.get(message.getJMSDestination());
598            if (topicName == null) {
599                String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination());
600                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
601                mqttTopicMap.put(message.getJMSDestination(), topicName);
602            }
603        }
604        result.topicName(new UTF8Buffer(topicName));
605
606        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
607            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
608            msg.setReadOnlyBody(true);
609            String messageText = msg.getText();
610            if (messageText != null) {
611                result.payload(new Buffer(messageText.getBytes("UTF-8")));
612            }
613        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
614            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
615            msg.setReadOnlyBody(true);
616            byte[] data = new byte[(int) msg.getBodyLength()];
617            msg.readBytes(data);
618            result.payload(new Buffer(data));
619        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
620            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
621            msg.setReadOnlyBody(true);
622            Map<String, Object> map = msg.getContentMap();
623            if (map != null) {
624                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
625            }
626        } else {
627            ByteSequence byteSequence = message.getContent();
628            if (byteSequence != null && byteSequence.getLength() > 0) {
629                if (message.isCompressed()) {
630                    Inflater inflater = new Inflater();
631                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
632                    byte[] data = new byte[4096];
633                    int read;
634                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
635                    while ((read = inflater.inflate(data)) != 0) {
636                        bytesOut.write(data, 0, read);
637                    }
638                    byteSequence = bytesOut.toByteSequence();
639                    bytesOut.close();
640                }
641                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
642            }
643        }
644        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
645                result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
646        return result;
647    }
648
649    public MQTTTransport getMQTTTransport() {
650        return mqttTransport;
651    }
652
653    boolean willSent = false;
654    public void onTransportError() {
655        if (connect != null) {
656            if (connected.get()) {
657                if (connect.willTopic() != null && connect.willMessage() != null && !willSent) {
658                    willSent = true;
659                    try {
660                        PUBLISH publish = new PUBLISH();
661                        publish.topicName(connect.willTopic());
662                        publish.qos(connect.willQos());
663                        publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
664                        publish.payload(connect.willMessage());
665                        publish.retain(connect.willRetain());
666                        ActiveMQMessage message = convertMessage(publish);
667                        message.setProducerId(producerId);
668                        message.onSend();
669
670                        sendToActiveMQ(message, null);
671                    } catch (Exception e) {
672                        LOG.warn("Failed to publish Will Message " + connect.willMessage());
673                    }
674                }
675                // remove connection info
676                sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
677            }
678        }
679    }
680
681    void configureInactivityMonitor(short keepAliveSeconds) {
682        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
683
684        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
685        // then ignore configuring it because it won't exist
686        if (monitor == null) {
687            return;
688        }
689
690        // Client has sent a valid CONNECT frame, we can stop the connect checker.
691        monitor.stopConnectChecker();
692
693        long keepAliveMS = keepAliveSeconds * 1000;
694
695        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
696
697        try {
698            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
699
700            // we'll observe the server-side configured default value (note, no grace period)
701            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
702                keepAliveMS = defaultKeepAlive;
703            }
704
705            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
706
707            monitor.setProtocolConverter(this);
708            monitor.setReadKeepAliveTime(keepAliveMS);
709            monitor.setReadGraceTime(readGracePeriod);
710            monitor.startReadChecker();
711
712            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
713                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
714        } catch (Exception ex) {
715            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
716        }
717    }
718
719    void handleException(Throwable exception, MQTTFrame command) {
720        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
721        LOG.debug("Exception detail", exception);
722
723        if (connected.get() && connectionInfo != null) {
724            connected.set(false);
725            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
726        }
727        stopTransport();
728    }
729
730    void checkConnected() throws MQTTProtocolException {
731        if (!connected.get()) {
732            throw new MQTTProtocolException("Not connected.");
733        }
734    }
735
736    private void stopTransport() {
737        try {
738            getMQTTTransport().stop();
739        } catch (Throwable e) {
740            LOG.debug("Failed to stop MQTT transport ", e);
741        }
742    }
743
744    ResponseHandler createResponseHandler(final PUBLISH command) {
745        if (command != null) {
746            return new ResponseHandler() {
747                @Override
748                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
749                    if (response.isException()) {
750                        Throwable error = ((ExceptionResponse) response).getException();
751                        LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
752                        LOG.trace("Error trace: {}", error);
753                    }
754
755                    switch (command.qos()) {
756                        case AT_LEAST_ONCE:
757                            PUBACK ack = new PUBACK();
758                            ack.messageId(command.messageId());
759                            LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
760                                      command.messageId(), clientId, connectionInfo.getConnectionId());
761                            converter.getMQTTTransport().sendToMQTT(ack.encode());
762                            break;
763                        case EXACTLY_ONCE:
764                            PUBREC req = new PUBREC();
765                            req.messageId(command.messageId());
766                            synchronized (publisherRecs) {
767                                publisherRecs.put(command.messageId(), req);
768                            }
769                            LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
770                                      command.messageId(), clientId, connectionInfo.getConnectionId());
771                            converter.getMQTTTransport().sendToMQTT(req.encode());
772                            break;
773                        default:
774                            break;
775                    }
776                }
777            };
778        }
779        return null;
780    }
781
782    public long getDefaultKeepAlive() {
783        return defaultKeepAlive;
784    }
785
786    /**
787     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
788     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
789     * @param keepAlive the keepAlive in milliseconds
790     */
791    public void setDefaultKeepAlive(long keepAlive) {
792        this.defaultKeepAlive = keepAlive;
793    }
794
795    public int getActiveMQSubscriptionPrefetch() {
796        return activeMQSubscriptionPrefetch;
797    }
798
799    /**
800     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
801     * The default = 1
802     *
803     * @param activeMQSubscriptionPrefetch
804     *        set the prefetch for the corresponding ActiveMQ subscription
805     */
806    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
807        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
808    }
809
810    public MQTTPacketIdGenerator getPacketIdGenerator() {
811        return packetIdGenerator;
812    }
813
814    public void setPublishDollarTopics(boolean publishDollarTopics) {
815        this.publishDollarTopics = publishDollarTopics;
816    }
817
818    public boolean getPublishDollarTopics() {
819        return publishDollarTopics;
820    }
821
822    public ConnectionId getConnectionId() {
823        return connectionId;
824    }
825
826    public SessionId getSessionId() {
827        return sessionId;
828    }
829
830    public boolean isCleanSession() {
831        return this.connect.cleanSession();
832    }
833
834    public String getSubscriptionStrategy() {
835        return subscriptionStrategyName;
836    }
837
838    public void setSubscriptionStrategy(String name) {
839        this.subscriptionStrategyName = name;
840    }
841
842    public String getClientId() {
843        if (clientId == null) {
844            if (connect != null && connect.clientId() != null) {
845                clientId = connect.clientId().toString();
846            } else {
847                clientId = "";
848            }
849        }
850        return clientId;
851    }
852
853    protected boolean containsMqttWildcard(String value) {
854        return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) ||
855                value.contains(MULTI_LEVEL_WILDCARD));
856    }
857
858    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
859        if (subsciptionStrategy == null) {
860            synchronized (STRATAGY_FINDER) {
861                if (subsciptionStrategy != null) {
862                    return subsciptionStrategy;
863                }
864
865                MQTTSubscriptionStrategy strategy = null;
866                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
867                    try {
868                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
869                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
870                        if (strategy instanceof BrokerServiceAware) {
871                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
872                        }
873                        strategy.initialize(this);
874                    } catch (Exception e) {
875                        throw IOExceptionSupport.create(e);
876                    }
877                } else {
878                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
879                }
880
881                this.subsciptionStrategy = strategy;
882            }
883        }
884        return subsciptionStrategy;
885    }
886}