001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.Iterator;
020import java.util.Set;
021
022import javax.jms.JMSException;
023
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.policy.PolicyEntry;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConsumerInfo;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageDispatchNotification;
030import org.apache.activemq.thread.TaskRunnerFactory;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * 
035 * 
036 */
037public class QueueRegion extends AbstractRegion {
038
039    public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
040                       SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
041                       DestinationFactory destinationFactory) {
042        super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
043    }
044
045    public String toString() {
046        return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
047               + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
048    }
049
050    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
051        throws JMSException {
052        ActiveMQDestination destination = info.getDestination();
053        PolicyEntry entry = null;
054        if (destination != null && broker.getDestinationPolicy() != null) {
055            entry = broker.getDestinationPolicy().getEntryFor(destination);
056            
057        }
058        if (info.isBrowser()) {
059            QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
060            if (entry != null) {
061                entry.configure(broker, usageManager, sub);
062            }
063            return sub;
064        } else {
065            QueueSubscription sub =   new QueueSubscription(broker, usageManager,context, info);
066            if (entry != null) {
067                entry.configure(broker, usageManager, sub);
068            }
069            return sub;
070        }
071    }
072
073    protected Set<ActiveMQDestination> getInactiveDestinations() {
074        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
075        for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
076            ActiveMQDestination dest = iter.next();
077            if (!dest.isQueue()) {
078                iter.remove();
079            }
080        }
081        return inactiveDestinations;
082    }
083    
084    /*
085     * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till 
086     * the notification to ensure that the subscription chosen by the master is used.
087     * 
088     * (non-Javadoc)
089     * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
090     */
091    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
092        processDispatchNotificationViaDestination(messageDispatchNotification);
093    }
094}