001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import javax.jms.Connection;
023import javax.jms.ConnectionConsumer;
024import javax.jms.ConnectionMetaData;
025import javax.jms.Destination;
026import javax.jms.ExceptionListener;
027import javax.jms.IllegalStateException;
028import javax.jms.JMSException;
029import javax.jms.Queue;
030import javax.jms.QueueConnection;
031import javax.jms.QueueSession;
032import javax.jms.ServerSessionPool;
033import javax.jms.Session;
034import javax.jms.Topic;
035import javax.jms.TopicConnection;
036import javax.jms.TopicSession;
037import javax.resource.spi.ConnectionRequestInfo;
038import org.apache.activemq.ActiveMQQueueSession;
039import org.apache.activemq.ActiveMQSession;
040import org.apache.activemq.ActiveMQTopicSession;
041
042/**
043 * Acts as a pass through proxy for a JMS Connection object. It intercepts
044 * events that are of interest of the ActiveMQManagedConnection.
045 *
046 * 
047 */
048public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
049
050    private ActiveMQManagedConnection managedConnection;
051    private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
052    private ExceptionListener exceptionListener;
053    private ActiveMQConnectionRequestInfo info;
054
055    public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection, ConnectionRequestInfo info) {
056        this.managedConnection = managedConnection;
057        if (info instanceof ActiveMQConnectionRequestInfo) {
058            this.info = (ActiveMQConnectionRequestInfo) info;
059        }
060    }
061
062    /**
063     * Used to let the ActiveMQManagedConnection that this connection handel is
064     * not needed by the app.
065     *
066     * @throws JMSException
067     */
068    public void close() throws JMSException {
069        if (managedConnection != null) {
070            managedConnection.proxyClosedEvent(this);
071        }
072    }
073
074    /**
075     * Called by the ActiveMQManagedConnection to invalidate this proxy.
076     */
077    public void cleanup() {
078        exceptionListener = null;
079        managedConnection = null;
080        synchronized (sessions) {
081            for (ManagedSessionProxy p : sessions) {
082                try {
083                    //TODO is this dangerous?  should we copy the list before iterating?
084                    p.cleanup();
085                } catch (JMSException ignore) {
086                }
087            }
088            sessions.clear();
089        }
090    }
091
092    /**
093     * @return "physical" underlying activemq connection, if proxy is associated with a managed connection
094     * @throws javax.jms.JMSException if managed connection is null
095     */
096    private Connection getConnection() throws JMSException {
097        if (managedConnection == null) {
098            throw new IllegalStateException("The Connection is closed");
099        }
100        return managedConnection.getPhysicalConnection();
101    }
102
103    /**
104     * @param transacted      Whether session is transacted
105     * @param acknowledgeMode session acknowledge mode
106     * @return session proxy
107     * @throws JMSException on error
108     */
109    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
110        return createSessionProxy(transacted, acknowledgeMode);
111    }
112
113    /**
114     * @param transacted      Whether session is transacted
115     * @param acknowledgeMode session acknowledge mode
116     * @return session proxy
117     * @throws JMSException on error
118     */
119    private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
120        ActiveMQSession session;
121        if (info != null && info.isUseSessionArgs()) {
122            session = (ActiveMQSession) getConnection().createSession(transacted, transacted ? Session.SESSION_TRANSACTED : acknowledgeMode);
123        } else {
124            session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
125        }
126        ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
127        session.setTransactionContext(txContext);
128        ManagedSessionProxy p = new ManagedSessionProxy(session, this);
129        p.setUseSharedTxContext(managedConnection.isInManagedTx());
130        synchronized (sessions) {
131            sessions.add(p);
132        }
133        return p;
134    }
135
136    protected void sessionClosed(ManagedSessionProxy session) {
137        synchronized (sessions) {
138            sessions.remove(session);
139        }
140    }
141
142    public void setUseSharedTxContext(boolean enable) throws JMSException {
143        synchronized (sessions) {
144            for (ManagedSessionProxy p : sessions) {
145                p.setUseSharedTxContext(enable);
146            }
147        }
148    }
149
150    /**
151     * @param transacted      Whether session is transacted
152     * @param acknowledgeMode session acknowledge mode
153     * @return session proxy
154     * @throws JMSException on error
155     */
156    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
157        return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
158    }
159
160    /**
161     * @param transacted      Whether session is transacted
162     * @param acknowledgeMode session acknowledge mode
163     * @return session proxy
164     * @throws JMSException on error
165     */
166    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
167        return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
168    }
169
170    /**
171     * @return client id from delegate
172     * @throws JMSException
173     */
174    public String getClientID() throws JMSException {
175        return getConnection().getClientID();
176    }
177
178    /**
179     * @return exception listener from delegate
180     * @throws JMSException
181     */
182    public ExceptionListener getExceptionListener() throws JMSException {
183        return getConnection().getExceptionListener();
184    }
185
186    /**
187     * @return connection metadata from delegate
188     * @throws JMSException
189     */
190    public ConnectionMetaData getMetaData() throws JMSException {
191        return getConnection().getMetaData();
192    }
193
194    /**
195     * Sets client id on delegate
196     * @param clientID new clientId
197     * @throws JMSException
198     */
199    public void setClientID(String clientID) throws JMSException {
200        getConnection().setClientID(clientID);
201    }
202
203    /**
204     * sets exception listener on delegate
205     * @param listener new listener
206     * @throws JMSException
207     */
208    public void setExceptionListener(ExceptionListener listener) throws JMSException {
209        getConnection();
210        exceptionListener = listener;
211    }
212
213    /**
214     * @throws JMSException
215     */
216    public void start() throws JMSException {
217        getConnection().start();
218    }
219
220    /**
221     * @throws JMSException
222     */
223    public void stop() throws JMSException {
224        getConnection().stop();
225    }
226
227    /**
228     * @param queue
229     * @param messageSelector
230     * @param sessionPool
231     * @param maxMessages
232     * @return
233     * @throws JMSException
234     */
235    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
236        throw new JMSException("Not Supported.");
237    }
238
239    /**
240     * @param topic
241     * @param messageSelector
242     * @param sessionPool
243     * @param maxMessages
244     * @return
245     * @throws JMSException
246     */
247    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
248        throw new JMSException("Not Supported.");
249    }
250
251    /**
252     * @param destination
253     * @param messageSelector
254     * @param sessionPool
255     * @param maxMessages
256     * @return
257     * @throws JMSException
258     */
259    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
260        throw new JMSException("Not Supported.");
261    }
262
263    /**
264     * @param topic
265     * @param subscriptionName
266     * @param messageSelector
267     * @param sessionPool
268     * @param maxMessages
269     * @return
270     * @throws JMSException
271     */
272    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
273        throw new JMSException("Not Supported.");
274    }
275
276    /**
277     * @return Returns the managedConnection.
278     */
279    public ActiveMQManagedConnection getManagedConnection() {
280        return managedConnection;
281    }
282
283    public void onException(JMSException e) {
284        if (exceptionListener != null && managedConnection != null) {
285            try {
286                exceptionListener.onException(e);
287            } catch (Throwable ignore) {
288                // We can never trust user code so ignore any exceptions.
289            }
290        }
291    }
292
293}