001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicLong; 021 022import org.apache.activemq.Service; 023import org.apache.activemq.command.ActiveMQQueue; 024import org.apache.activemq.command.ActiveMQTopic; 025import org.apache.activemq.command.BrokerId; 026import org.apache.activemq.command.BrokerInfo; 027import org.apache.activemq.command.Command; 028import org.apache.activemq.command.ConnectionId; 029import org.apache.activemq.command.ConnectionInfo; 030import org.apache.activemq.command.ConsumerInfo; 031import org.apache.activemq.command.ExceptionResponse; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageDispatch; 035import org.apache.activemq.command.ProducerInfo; 036import org.apache.activemq.command.Response; 037import org.apache.activemq.command.SessionInfo; 038import org.apache.activemq.command.ShutdownInfo; 039import org.apache.activemq.transport.DefaultTransportListener; 040import org.apache.activemq.transport.FutureResponse; 041import org.apache.activemq.transport.ResponseCallback; 042import org.apache.activemq.transport.Transport; 043import org.apache.activemq.util.IdGenerator; 044import org.apache.activemq.util.ServiceStopper; 045import org.apache.activemq.util.ServiceSupport; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Forwards all messages from the local broker to the remote broker. 051 * 052 * @org.apache.xbean.XBean 053 * 054 */ 055public class ForwardingBridge implements Service { 056 057 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 058 private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class); 059 060 final AtomicLong enqueueCounter = new AtomicLong(); 061 final AtomicLong dequeueCounter = new AtomicLong(); 062 ConnectionInfo connectionInfo; 063 SessionInfo sessionInfo; 064 ProducerInfo producerInfo; 065 ConsumerInfo queueConsumerInfo; 066 ConsumerInfo topicConsumerInfo; 067 BrokerId localBrokerId; 068 BrokerId remoteBrokerId; 069 BrokerInfo localBrokerInfo; 070 BrokerInfo remoteBrokerInfo; 071 072 private final Transport localBroker; 073 private final Transport remoteBroker; 074 private String clientId; 075 private int prefetchSize = 1000; 076 private boolean dispatchAsync; 077 private String destinationFilter = ">"; 078 private NetworkBridgeListener bridgeFailedListener; 079 private boolean useCompression = false; 080 081 public ForwardingBridge(Transport localBroker, Transport remoteBroker) { 082 this.localBroker = localBroker; 083 this.remoteBroker = remoteBroker; 084 } 085 086 public void start() throws Exception { 087 LOG.info("Starting a network connection between {} and {} has been established.", localBroker, remoteBroker); 088 089 localBroker.setTransportListener(new DefaultTransportListener() { 090 public void onCommand(Object o) { 091 Command command = (Command)o; 092 serviceLocalCommand(command); 093 } 094 095 public void onException(IOException error) { 096 serviceLocalException(error); 097 } 098 }); 099 100 remoteBroker.setTransportListener(new DefaultTransportListener() { 101 public void onCommand(Object o) { 102 Command command = (Command)o; 103 serviceRemoteCommand(command); 104 } 105 106 public void onException(IOException error) { 107 serviceRemoteException(error); 108 } 109 }); 110 111 localBroker.start(); 112 remoteBroker.start(); 113 } 114 115 protected void triggerStartBridge() throws IOException { 116 Thread thead = new Thread() { 117 public void run() { 118 try { 119 startBridge(); 120 } catch (IOException e) { 121 LOG.error("Failed to start network bridge: ", e); 122 } 123 } 124 }; 125 thead.start(); 126 } 127 128 /** 129 * @throws IOException 130 */ 131 final void startBridge() throws IOException { 132 connectionInfo = new ConnectionInfo(); 133 connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId())); 134 connectionInfo.setClientId(clientId); 135 localBroker.oneway(connectionInfo); 136 remoteBroker.oneway(connectionInfo); 137 138 sessionInfo = new SessionInfo(connectionInfo, 1); 139 localBroker.oneway(sessionInfo); 140 remoteBroker.oneway(sessionInfo); 141 142 queueConsumerInfo = new ConsumerInfo(sessionInfo, 1); 143 queueConsumerInfo.setDispatchAsync(dispatchAsync); 144 queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter)); 145 queueConsumerInfo.setPrefetchSize(prefetchSize); 146 queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 147 localBroker.oneway(queueConsumerInfo); 148 149 producerInfo = new ProducerInfo(sessionInfo, 1); 150 producerInfo.setResponseRequired(false); 151 remoteBroker.oneway(producerInfo); 152 153 if (connectionInfo.getClientId() != null) { 154 topicConsumerInfo = new ConsumerInfo(sessionInfo, 2); 155 topicConsumerInfo.setDispatchAsync(dispatchAsync); 156 topicConsumerInfo.setSubscriptionName("topic-bridge"); 157 topicConsumerInfo.setRetroactive(true); 158 topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter)); 159 topicConsumerInfo.setPrefetchSize(prefetchSize); 160 topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 161 localBroker.oneway(topicConsumerInfo); 162 } 163 164 LOG.info("Network connection between {} and {} has been established.", localBroker, remoteBroker); 165 } 166 167 public void stop() throws Exception { 168 try { 169 if (connectionInfo != null) { 170 localBroker.request(connectionInfo.createRemoveCommand()); 171 remoteBroker.request(connectionInfo.createRemoveCommand()); 172 } 173 localBroker.setTransportListener(null); 174 remoteBroker.setTransportListener(null); 175 localBroker.oneway(new ShutdownInfo()); 176 remoteBroker.oneway(new ShutdownInfo()); 177 } finally { 178 ServiceStopper ss = new ServiceStopper(); 179 ss.stop(localBroker); 180 ss.stop(remoteBroker); 181 ss.throwFirstException(); 182 } 183 } 184 185 public void serviceRemoteException(Throwable error) { 186 LOG.info("Unexpected remote exception: {}", error.getMessage()); 187 LOG.debug("Exception trace: ", error); 188 } 189 190 protected void serviceRemoteCommand(Command command) { 191 try { 192 if (command.isBrokerInfo()) { 193 synchronized (this) { 194 remoteBrokerInfo = (BrokerInfo)command; 195 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 196 if (localBrokerId != null) { 197 if (localBrokerId.equals(remoteBrokerId)) { 198 LOG.info("Disconnecting loop back connection."); 199 ServiceSupport.dispose(this); 200 } else { 201 triggerStartBridge(); 202 } 203 } 204 } 205 } else { 206 LOG.warn("Unexpected remote command: {}", command); 207 } 208 } catch (IOException e) { 209 serviceLocalException(e); 210 } 211 } 212 213 public void serviceLocalException(Throwable error) { 214 LOG.info("Unexpected local exception: {}", error.getMessage()); 215 LOG.debug("Exception trace: ", error); 216 fireBridgeFailed(); 217 } 218 219 protected void serviceLocalCommand(Command command) { 220 try { 221 if (command.isMessageDispatch()) { 222 223 enqueueCounter.incrementAndGet(); 224 225 final MessageDispatch md = (MessageDispatch)command; 226 Message message = md.getMessage(); 227 message.setProducerId(producerInfo.getProducerId()); 228 229 if (message.getOriginalTransactionId() == null) { 230 message.setOriginalTransactionId(message.getTransactionId()); 231 } 232 message.setTransactionId(null); 233 234 if (isUseCompression()) { 235 message.compress(); 236 } 237 238 if (!message.isResponseRequired()) { 239 // If the message was originally sent using async send, we will preserve that 240 // QOS by bridging it using an async send (small chance of message loss). 241 remoteBroker.oneway(message); 242 dequeueCounter.incrementAndGet(); 243 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); 244 245 } else { 246 247 // The message was not sent using async send, so we should 248 // only ack the local 249 // broker when we get confirmation that the remote broker 250 // has received the message. 251 ResponseCallback callback = new ResponseCallback() { 252 public void onCompletion(FutureResponse future) { 253 try { 254 Response response = future.getResult(); 255 if (response.isException()) { 256 ExceptionResponse er = (ExceptionResponse)response; 257 serviceLocalException(er.getException()); 258 } else { 259 dequeueCounter.incrementAndGet(); 260 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); 261 } 262 } catch (IOException e) { 263 serviceLocalException(e); 264 } 265 } 266 }; 267 268 remoteBroker.asyncRequest(message, callback); 269 } 270 271 // Ack on every message since we don't know if the broker is 272 // blocked due to memory 273 // usage and is waiting for an Ack to un-block him. 274 275 // Acking a range is more efficient, but also more prone to 276 // locking up a server 277 // Perhaps doing something like the following should be policy 278 // based. 279 // if( 280 // md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) 281 // ) { 282 // queueDispatched++; 283 // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) 284 // ) { 285 // localBroker.oneway(new MessageAck(md, 286 // MessageAck.STANDARD_ACK_TYPE, queueDispatched)); 287 // queueDispatched=0; 288 // } 289 // } else { 290 // topicDispatched++; 291 // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) 292 // ) { 293 // localBroker.oneway(new MessageAck(md, 294 // MessageAck.STANDARD_ACK_TYPE, topicDispatched)); 295 // topicDispatched=0; 296 // } 297 // } 298 } else if (command.isBrokerInfo()) { 299 synchronized (this) { 300 localBrokerInfo = (BrokerInfo)command; 301 localBrokerId = localBrokerInfo.getBrokerId(); 302 if (remoteBrokerId != null) { 303 if (remoteBrokerId.equals(localBrokerId)) { 304 LOG.info("Disconnecting loop back connection."); 305 ServiceSupport.dispose(this); 306 } else { 307 triggerStartBridge(); 308 } 309 } 310 } 311 } else { 312 LOG.debug("Unexpected local command: {}", command); 313 } 314 } catch (IOException e) { 315 serviceLocalException(e); 316 } 317 } 318 319 public String getClientId() { 320 return clientId; 321 } 322 323 public void setClientId(String clientId) { 324 this.clientId = clientId; 325 } 326 327 public int getPrefetchSize() { 328 return prefetchSize; 329 } 330 331 public void setPrefetchSize(int prefetchSize) { 332 this.prefetchSize = prefetchSize; 333 } 334 335 public boolean isDispatchAsync() { 336 return dispatchAsync; 337 } 338 339 public void setDispatchAsync(boolean dispatchAsync) { 340 this.dispatchAsync = dispatchAsync; 341 } 342 343 public String getDestinationFilter() { 344 return destinationFilter; 345 } 346 347 public void setDestinationFilter(String destinationFilter) { 348 this.destinationFilter = destinationFilter; 349 } 350 351 public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) { 352 this.bridgeFailedListener = listener; 353 } 354 355 private void fireBridgeFailed() { 356 NetworkBridgeListener l = this.bridgeFailedListener; 357 if (l != null) { 358 l.bridgeFailed(); 359 } 360 } 361 362 public String getRemoteAddress() { 363 return remoteBroker.getRemoteAddress(); 364 } 365 366 public String getLocalAddress() { 367 return localBroker.getRemoteAddress(); 368 } 369 370 public String getLocalBrokerName() { 371 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 372 } 373 374 public String getRemoteBrokerName() { 375 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 376 } 377 378 public long getDequeueCounter() { 379 return dequeueCounter.get(); 380 } 381 382 public long getEnqueueCounter() { 383 return enqueueCounter.get(); 384 } 385 386 /** 387 * @param useCompression 388 * True if forwarded Messages should have their bodies compressed. 389 */ 390 public void setUseCompression(boolean useCompression) { 391 this.useCompression = useCompression; 392 } 393 394 /** 395 * @return the vale of the useCompression setting, true if forwarded messages will be compressed. 396 */ 397 public boolean isUseCompression() { 398 return useCompression; 399 } 400}