001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.zip.DataFormatException;
021
022import javax.jms.JMSException;
023
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ActiveMQMessage;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageDispatch;
029import org.fusesource.mqtt.client.QoS;
030import org.fusesource.mqtt.codec.PUBLISH;
031
032/**
033 * Keeps track of the MQTT client subscription so that acking is correctly done.
034 */
035public class MQTTSubscription {
036
037    private final MQTTProtocolConverter protocolConverter;
038
039    private final ConsumerInfo consumerInfo;
040    private final String topicName;
041    private final QoS qos;
042
043    public MQTTSubscription(MQTTProtocolConverter protocolConverter, String topicName, QoS qos, ConsumerInfo consumerInfo) {
044        this.protocolConverter = protocolConverter;
045        this.consumerInfo = consumerInfo;
046        this.qos = qos;
047        this.topicName = topicName;
048    }
049
050    /**
051     * Create a {@link MessageAck} that will acknowledge the given {@link MessageDispatch}.
052     *
053     * @param md
054     *        the {@link MessageDispatch} to acknowledge.
055     *
056     * @return a new {@link MessageAck} command to acknowledge the message.
057     */
058    public MessageAck createMessageAck(MessageDispatch md) {
059        return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
060    }
061
062    /**
063     * Creates a PUBLISH command that can be sent to a remote client from an
064     * incoming {@link ActiveMQMessage} instance.
065     *
066     * @param message
067     *        the message to convert to a PUBLISH command.
068     *
069     * @return a new PUBLISH command that is populated from the {@link ActiveMQMessage}.
070     *
071     * @throws DataFormatException
072     * @throws IOException
073     * @throws JMSException
074     */
075    public PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
076        PUBLISH publish = protocolConverter.convertMessage(message);
077        if (publish.qos().ordinal() > this.qos.ordinal()) {
078            publish.qos(this.qos);
079        }
080        switch (publish.qos()) {
081            case AT_LEAST_ONCE:
082            case EXACTLY_ONCE:
083                // set packet id, and optionally dup flag
084                protocolConverter.getPacketIdGenerator().setPacketId(protocolConverter.getClientId(), this, message, publish);
085            case AT_MOST_ONCE:
086        }
087        return publish;
088    }
089
090    /**
091     * Given a PUBLISH command determine if it will expect an ACK based on the
092     * QoS of the Publish command and the QoS of this subscription.
093     *
094     * @param publish
095     *        The publish command to inspect.
096     *
097     * @return true if the client will expect an PUBACK for this PUBLISH.
098     */
099    public boolean expectAck(PUBLISH publish) {
100        QoS publishQoS = publish.qos();
101        if (publishQoS.compareTo(this.qos) > 0){
102            publishQoS = this.qos;
103        }
104        return !publishQoS.equals(QoS.AT_MOST_ONCE);
105    }
106
107    /**
108     * @returns the original topic name value the client used when subscribing.
109     */
110    public String getTopicName() {
111        return this.topicName;
112    }
113
114    /**
115     * The real {@link ActiveMQDestination} that this subscription is assigned.
116     *
117     * @return the real {@link ActiveMQDestination} assigned to this subscription.
118     */
119    public ActiveMQDestination getDestination() {
120        return consumerInfo.getDestination();
121    }
122
123    /**
124     * Gets the {@link ConsumerInfo} that describes the subscription sent to ActiveMQ.
125     *
126     * @return the {@link ConsumerInfo} used to create this subscription.
127     */
128    public ConsumerInfo getConsumerInfo() {
129        return consumerInfo;
130    }
131
132    /**
133     * @return the assigned QoS value for this subscription.
134     */
135    public QoS getQoS() {
136        return qos;
137    }
138
139    @Override
140    public String toString() {
141        return "MQTT Sub: topic[" + topicName + "] -> [" + consumerInfo.getDestination() + "]";
142    }
143}