001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.ResourceAllocationException; 024 025import org.apache.activemq.advisory.AdvisorySupport; 026import org.apache.activemq.broker.Broker; 027import org.apache.activemq.broker.BrokerService; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.ProducerBrokerExchange; 030import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatchNotification; 037import org.apache.activemq.command.ProducerInfo; 038import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 039import org.apache.activemq.security.SecurityContext; 040import org.apache.activemq.state.ProducerState; 041import org.apache.activemq.store.MessageStore; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.usage.MemoryUsage; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.usage.Usage; 046import org.slf4j.Logger; 047 048/** 049 * 050 */ 051public abstract class BaseDestination implements Destination { 052 /** 053 * The maximum number of messages to page in to the destination from 054 * persistent storage 055 */ 056 public static final int MAX_PAGE_SIZE = 200; 057 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 058 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 059 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 060 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 061 public static final int MAX_AUDIT_DEPTH = 10000; 062 063 protected final AtomicBoolean started = new AtomicBoolean(); 064 protected final ActiveMQDestination destination; 065 protected final Broker broker; 066 protected final MessageStore store; 067 protected SystemUsage systemUsage; 068 protected MemoryUsage memoryUsage; 069 private boolean producerFlowControl = true; 070 private boolean alwaysRetroactive = false; 071 protected boolean warnOnProducerFlowControl = true; 072 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 073 074 private int maxProducersToAudit = 1024; 075 private int maxAuditDepth = 2048; 076 private boolean enableAudit = true; 077 private int maxPageSize = MAX_PAGE_SIZE; 078 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 079 private boolean useCache = true; 080 private int minimumMessageSize = 1024; 081 private boolean lazyDispatch = false; 082 private boolean advisoryForSlowConsumers; 083 private boolean advisoryForFastProducers; 084 private boolean advisoryForDiscardingMessages; 085 private boolean advisoryWhenFull; 086 private boolean advisoryForDelivery; 087 private boolean advisoryForConsumed; 088 private boolean sendAdvisoryIfNoConsumers; 089 private boolean includeBodyForAdvisory; 090 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 091 protected final BrokerService brokerService; 092 protected final Broker regionBroker; 093 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 094 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 095 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 096 protected int cursorMemoryHighWaterMark = 70; 097 protected int storeUsageHighWaterMark = 100; 098 private SlowConsumerStrategy slowConsumerStrategy; 099 private boolean prioritizedMessages; 100 private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 101 private boolean gcIfInactive; 102 private boolean gcWithNetworkConsumers; 103 private long lastActiveTime=0l; 104 private boolean reduceMemoryFootprint = false; 105 protected final Scheduler scheduler; 106 private boolean disposed = false; 107 private boolean doOptimzeMessageStorage = true; 108 /* 109 * percentage of in-flight messages above which optimize message store is disabled 110 */ 111 private int optimizeMessageStoreInFlightLimit = 10; 112 private boolean persistJMSRedelivered; 113 114 /** 115 * @param brokerService 116 * @param store 117 * @param destination 118 * @param parentStats 119 * @throws Exception 120 */ 121 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 122 this.brokerService = brokerService; 123 this.broker = brokerService.getBroker(); 124 this.store = store; 125 this.destination = destination; 126 // let's copy the enabled property from the parent DestinationStatistics 127 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 128 this.destinationStatistics.setParent(parentStats); 129 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 130 this.memoryUsage = this.systemUsage.getMemoryUsage(); 131 this.memoryUsage.setUsagePortion(1.0f); 132 this.regionBroker = brokerService.getRegionBroker(); 133 this.scheduler = brokerService.getBroker().getScheduler(); 134 } 135 136 /** 137 * initialize the destination 138 * 139 * @throws Exception 140 */ 141 public void initialize() throws Exception { 142 // Let the store know what usage manager we are using so that he can 143 // flush messages to disk when usage gets high. 144 if (store != null) { 145 store.setMemoryUsage(this.memoryUsage); 146 } 147 } 148 149 /** 150 * @return the producerFlowControl 151 */ 152 @Override 153 public boolean isProducerFlowControl() { 154 return producerFlowControl; 155 } 156 157 /** 158 * @param producerFlowControl the producerFlowControl to set 159 */ 160 @Override 161 public void setProducerFlowControl(boolean producerFlowControl) { 162 this.producerFlowControl = producerFlowControl; 163 } 164 165 @Override 166 public boolean isAlwaysRetroactive() { 167 return alwaysRetroactive; 168 } 169 170 @Override 171 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 172 this.alwaysRetroactive = alwaysRetroactive; 173 } 174 175 /** 176 * Set's the interval at which warnings about producers being blocked by 177 * resource usage will be triggered. Values of 0 or less will disable 178 * warnings 179 * 180 * @param blockedProducerWarningInterval the interval at which warning about 181 * blocked producers will be triggered. 182 */ 183 @Override 184 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 185 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 186 } 187 188 /** 189 * 190 * @return the interval at which warning about blocked producers will be 191 * triggered. 192 */ 193 @Override 194 public long getBlockedProducerWarningInterval() { 195 return blockedProducerWarningInterval; 196 } 197 198 /** 199 * @return the maxProducersToAudit 200 */ 201 @Override 202 public int getMaxProducersToAudit() { 203 return maxProducersToAudit; 204 } 205 206 /** 207 * @param maxProducersToAudit the maxProducersToAudit to set 208 */ 209 @Override 210 public void setMaxProducersToAudit(int maxProducersToAudit) { 211 this.maxProducersToAudit = maxProducersToAudit; 212 } 213 214 /** 215 * @return the maxAuditDepth 216 */ 217 @Override 218 public int getMaxAuditDepth() { 219 return maxAuditDepth; 220 } 221 222 /** 223 * @param maxAuditDepth the maxAuditDepth to set 224 */ 225 @Override 226 public void setMaxAuditDepth(int maxAuditDepth) { 227 this.maxAuditDepth = maxAuditDepth; 228 } 229 230 /** 231 * @return the enableAudit 232 */ 233 @Override 234 public boolean isEnableAudit() { 235 return enableAudit; 236 } 237 238 /** 239 * @param enableAudit the enableAudit to set 240 */ 241 @Override 242 public void setEnableAudit(boolean enableAudit) { 243 this.enableAudit = enableAudit; 244 } 245 246 @Override 247 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 248 destinationStatistics.getProducers().increment(); 249 this.lastActiveTime=0l; 250 } 251 252 @Override 253 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 254 destinationStatistics.getProducers().decrement(); 255 } 256 257 @Override 258 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 259 destinationStatistics.getConsumers().increment(); 260 this.lastActiveTime=0l; 261 } 262 263 @Override 264 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 265 destinationStatistics.getConsumers().decrement(); 266 this.lastActiveTime=0l; 267 } 268 269 270 @Override 271 public final MemoryUsage getMemoryUsage() { 272 return memoryUsage; 273 } 274 275 @Override 276 public void setMemoryUsage(MemoryUsage memoryUsage) { 277 this.memoryUsage = memoryUsage; 278 } 279 280 @Override 281 public DestinationStatistics getDestinationStatistics() { 282 return destinationStatistics; 283 } 284 285 @Override 286 public ActiveMQDestination getActiveMQDestination() { 287 return destination; 288 } 289 290 @Override 291 public final String getName() { 292 return getActiveMQDestination().getPhysicalName(); 293 } 294 295 @Override 296 public final MessageStore getMessageStore() { 297 return store; 298 } 299 300 @Override 301 public boolean isActive() { 302 boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || 303 destinationStatistics.getProducers().getCount() > 0; 304 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { 305 isActive = hasRegularConsumers(getConsumers()); 306 } 307 return isActive; 308 } 309 310 @Override 311 public int getMaxPageSize() { 312 return maxPageSize; 313 } 314 315 @Override 316 public void setMaxPageSize(int maxPageSize) { 317 this.maxPageSize = maxPageSize; 318 } 319 320 @Override 321 public int getMaxBrowsePageSize() { 322 return this.maxBrowsePageSize; 323 } 324 325 @Override 326 public void setMaxBrowsePageSize(int maxPageSize) { 327 this.maxBrowsePageSize = maxPageSize; 328 } 329 330 public int getMaxExpirePageSize() { 331 return this.maxExpirePageSize; 332 } 333 334 public void setMaxExpirePageSize(int maxPageSize) { 335 this.maxExpirePageSize = maxPageSize; 336 } 337 338 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 339 this.expireMessagesPeriod = expireMessagesPeriod; 340 } 341 342 public long getExpireMessagesPeriod() { 343 return expireMessagesPeriod; 344 } 345 346 @Override 347 public boolean isUseCache() { 348 return useCache; 349 } 350 351 @Override 352 public void setUseCache(boolean useCache) { 353 this.useCache = useCache; 354 } 355 356 @Override 357 public int getMinimumMessageSize() { 358 return minimumMessageSize; 359 } 360 361 @Override 362 public void setMinimumMessageSize(int minimumMessageSize) { 363 this.minimumMessageSize = minimumMessageSize; 364 } 365 366 @Override 367 public boolean isLazyDispatch() { 368 return lazyDispatch; 369 } 370 371 @Override 372 public void setLazyDispatch(boolean lazyDispatch) { 373 this.lazyDispatch = lazyDispatch; 374 } 375 376 protected long getDestinationSequenceId() { 377 return regionBroker.getBrokerSequenceId(); 378 } 379 380 /** 381 * @return the advisoryForSlowConsumers 382 */ 383 public boolean isAdvisoryForSlowConsumers() { 384 return advisoryForSlowConsumers; 385 } 386 387 /** 388 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 389 */ 390 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 391 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 392 } 393 394 /** 395 * @return the advisoryForDiscardingMessages 396 */ 397 public boolean isAdvisoryForDiscardingMessages() { 398 return advisoryForDiscardingMessages; 399 } 400 401 /** 402 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 403 * set 404 */ 405 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 406 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 407 } 408 409 /** 410 * @return the advisoryWhenFull 411 */ 412 public boolean isAdvisoryWhenFull() { 413 return advisoryWhenFull; 414 } 415 416 /** 417 * @param advisoryWhenFull the advisoryWhenFull to set 418 */ 419 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 420 this.advisoryWhenFull = advisoryWhenFull; 421 } 422 423 /** 424 * @return the advisoryForDelivery 425 */ 426 public boolean isAdvisoryForDelivery() { 427 return advisoryForDelivery; 428 } 429 430 /** 431 * @param advisoryForDelivery the advisoryForDelivery to set 432 */ 433 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 434 this.advisoryForDelivery = advisoryForDelivery; 435 } 436 437 /** 438 * @return the advisoryForConsumed 439 */ 440 public boolean isAdvisoryForConsumed() { 441 return advisoryForConsumed; 442 } 443 444 /** 445 * @param advisoryForConsumed the advisoryForConsumed to set 446 */ 447 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 448 this.advisoryForConsumed = advisoryForConsumed; 449 } 450 451 /** 452 * @return the advisdoryForFastProducers 453 */ 454 public boolean isAdvisoryForFastProducers() { 455 return advisoryForFastProducers; 456 } 457 458 /** 459 * @param advisoryForFastProducers the advisdoryForFastProducers to set 460 */ 461 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 462 this.advisoryForFastProducers = advisoryForFastProducers; 463 } 464 465 public boolean isSendAdvisoryIfNoConsumers() { 466 return sendAdvisoryIfNoConsumers; 467 } 468 469 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 470 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 471 } 472 473 public boolean isIncludeBodyForAdvisory() { 474 return includeBodyForAdvisory; 475 } 476 477 public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { 478 this.includeBodyForAdvisory = includeBodyForAdvisory; 479 } 480 481 /** 482 * @return the dead letter strategy 483 */ 484 @Override 485 public DeadLetterStrategy getDeadLetterStrategy() { 486 return deadLetterStrategy; 487 } 488 489 /** 490 * set the dead letter strategy 491 * 492 * @param deadLetterStrategy 493 */ 494 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 495 this.deadLetterStrategy = deadLetterStrategy; 496 } 497 498 @Override 499 public int getCursorMemoryHighWaterMark() { 500 return this.cursorMemoryHighWaterMark; 501 } 502 503 @Override 504 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 505 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 506 } 507 508 /** 509 * called when message is consumed 510 * 511 * @param context 512 * @param messageReference 513 */ 514 @Override 515 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 516 if (advisoryForConsumed) { 517 broker.messageConsumed(context, messageReference); 518 } 519 } 520 521 /** 522 * Called when message is delivered to the broker 523 * 524 * @param context 525 * @param messageReference 526 */ 527 @Override 528 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 529 this.lastActiveTime = 0L; 530 if (advisoryForDelivery) { 531 broker.messageDelivered(context, messageReference); 532 } 533 } 534 535 /** 536 * Called when a message is discarded - e.g. running low on memory This will 537 * happen only if the policy is enabled - e.g. non durable topics 538 * 539 * @param context 540 * @param messageReference 541 */ 542 @Override 543 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 544 if (advisoryForDiscardingMessages) { 545 broker.messageDiscarded(context, sub, messageReference); 546 } 547 } 548 549 /** 550 * Called when there is a slow consumer 551 * 552 * @param context 553 * @param subs 554 */ 555 @Override 556 public void slowConsumer(ConnectionContext context, Subscription subs) { 557 if (advisoryForSlowConsumers) { 558 broker.slowConsumer(context, this, subs); 559 } 560 if (slowConsumerStrategy != null) { 561 slowConsumerStrategy.slowConsumer(context, subs); 562 } 563 } 564 565 /** 566 * Called to notify a producer is too fast 567 * 568 * @param context 569 * @param producerInfo 570 */ 571 @Override 572 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 573 if (advisoryForFastProducers) { 574 broker.fastProducer(context, producerInfo, getActiveMQDestination()); 575 } 576 } 577 578 /** 579 * Called when a Usage reaches a limit 580 * 581 * @param context 582 * @param usage 583 */ 584 @Override 585 public void isFull(ConnectionContext context, Usage<?> usage) { 586 if (advisoryWhenFull) { 587 broker.isFull(context, this, usage); 588 } 589 } 590 591 @Override 592 public void dispose(ConnectionContext context) throws IOException { 593 if (this.store != null) { 594 this.store.removeAllMessages(context); 595 this.store.dispose(context); 596 } 597 this.destinationStatistics.setParent(null); 598 this.memoryUsage.stop(); 599 this.disposed = true; 600 } 601 602 @Override 603 public boolean isDisposed() { 604 return this.disposed; 605 } 606 607 /** 608 * Provides a hook to allow messages with no consumer to be processed in 609 * some way - such as to send to a dead letter queue or something.. 610 */ 611 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 612 if (!msg.isPersistent()) { 613 if (isSendAdvisoryIfNoConsumers()) { 614 // allow messages with no consumers to be dispatched to a dead 615 // letter queue 616 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 617 618 Message message = msg.copy(); 619 // The original destination and transaction id do not get 620 // filled when the message is first sent, 621 // it is only populated if the message is routed to another 622 // destination like the DLQ 623 if (message.getOriginalDestination() != null) { 624 message.setOriginalDestination(message.getDestination()); 625 } 626 if (message.getOriginalTransactionId() != null) { 627 message.setOriginalTransactionId(message.getTransactionId()); 628 } 629 630 ActiveMQTopic advisoryTopic; 631 if (destination.isQueue()) { 632 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 633 } else { 634 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 635 } 636 message.setDestination(advisoryTopic); 637 message.setTransactionId(null); 638 639 // Disable flow control for this since since we don't want 640 // to block. 641 boolean originalFlowControl = context.isProducerFlowControl(); 642 try { 643 context.setProducerFlowControl(false); 644 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 645 producerExchange.setMutable(false); 646 producerExchange.setConnectionContext(context); 647 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 648 context.getBroker().send(producerExchange, message); 649 } finally { 650 context.setProducerFlowControl(originalFlowControl); 651 } 652 653 } 654 } 655 } 656 } 657 658 @Override 659 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 660 } 661 662 public final int getStoreUsageHighWaterMark() { 663 return this.storeUsageHighWaterMark; 664 } 665 666 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 667 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 668 } 669 670 protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 671 waitForSpace(context, producerBrokerExchange, usage, 100, warning); 672 } 673 674 protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 675 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 676 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 677 throw new ResourceAllocationException(warning); 678 } 679 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 680 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 681 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 682 throw new ResourceAllocationException(warning); 683 } 684 } else { 685 long start = System.currentTimeMillis(); 686 long nextWarn = start; 687 producerBrokerExchange.blockingOnFlowControl(true); 688 destinationStatistics.getBlockedSends().increment(); 689 while (!usage.waitForSpace(1000, highWaterMark)) { 690 if (context.getStopping().get()) { 691 throw new IOException("Connection closed, send aborted."); 692 } 693 694 long now = System.currentTimeMillis(); 695 if (now >= nextWarn) { 696 getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))}); 697 nextWarn = now + blockedProducerWarningInterval; 698 } 699 } 700 long finish = System.currentTimeMillis(); 701 long totalTimeBlocked = finish - start; 702 destinationStatistics.getBlockedTime().addTime(totalTimeBlocked); 703 producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked); 704 producerBrokerExchange.blockingOnFlowControl(false); 705 } 706 } 707 708 protected abstract Logger getLog(); 709 710 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 711 this.slowConsumerStrategy = slowConsumerStrategy; 712 } 713 714 @Override 715 public SlowConsumerStrategy getSlowConsumerStrategy() { 716 return this.slowConsumerStrategy; 717 } 718 719 720 @Override 721 public boolean isPrioritizedMessages() { 722 return this.prioritizedMessages; 723 } 724 725 public void setPrioritizedMessages(boolean prioritizedMessages) { 726 this.prioritizedMessages = prioritizedMessages; 727 if (store != null) { 728 store.setPrioritizedMessages(prioritizedMessages); 729 } 730 } 731 732 /** 733 * @return the inactiveTimeoutBeforeGC 734 */ 735 @Override 736 public long getInactiveTimeoutBeforeGC() { 737 return this.inactiveTimeoutBeforeGC; 738 } 739 740 /** 741 * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set 742 */ 743 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 744 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 745 } 746 747 /** 748 * @return the gcIfInactive 749 */ 750 public boolean isGcIfInactive() { 751 return this.gcIfInactive; 752 } 753 754 /** 755 * @param gcIfInactive the gcIfInactive to set 756 */ 757 public void setGcIfInactive(boolean gcIfInactive) { 758 this.gcIfInactive = gcIfInactive; 759 } 760 761 /** 762 * Indicate if it is ok to gc destinations that have only network consumers 763 * @param gcWithNetworkConsumers 764 */ 765 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 766 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 767 } 768 769 public boolean isGcWithNetworkConsumers() { 770 return gcWithNetworkConsumers; 771 } 772 773 @Override 774 public void markForGC(long timeStamp) { 775 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 776 && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { 777 this.lastActiveTime = timeStamp; 778 } 779 } 780 781 @Override 782 public boolean canGC() { 783 boolean result = false; 784 final long currentLastActiveTime = this.lastActiveTime; 785 if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { 786 if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) { 787 result = true; 788 } 789 } 790 return result; 791 } 792 793 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 794 this.reduceMemoryFootprint = reduceMemoryFootprint; 795 } 796 797 public boolean isReduceMemoryFootprint() { 798 return this.reduceMemoryFootprint; 799 } 800 801 @Override 802 public boolean isDoOptimzeMessageStorage() { 803 return doOptimzeMessageStorage; 804 } 805 806 @Override 807 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 808 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 809 } 810 811 public int getOptimizeMessageStoreInFlightLimit() { 812 return optimizeMessageStoreInFlightLimit; 813 } 814 815 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 816 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 817 } 818 819 820 @Override 821 public abstract List<Subscription> getConsumers(); 822 823 protected boolean hasRegularConsumers(List<Subscription> consumers) { 824 boolean hasRegularConsumers = false; 825 for (Subscription subscription: consumers) { 826 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 827 hasRegularConsumers = true; 828 break; 829 } 830 } 831 return hasRegularConsumers; 832 } 833 834 public ConnectionContext createConnectionContext() { 835 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 836 answer.setBroker(this.broker); 837 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 838 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 839 return answer; 840 } 841 842 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) { 843 // the original ack may be a ranged ack, but we are trying to delete 844 // a specific 845 // message store here so we need to convert to a non ranged ack. 846 if (ack.getMessageCount() > 0) { 847 // Dup the ack 848 MessageAck a = new MessageAck(); 849 ack.copy(a); 850 ack = a; 851 // Convert to non-ranged. 852 ack.setMessageCount(1); 853 } 854 // always use node messageId so we can access entry/data Location 855 ack.setFirstMessageId(node.getMessageId()); 856 ack.setLastMessageId(node.getMessageId()); 857 return ack; 858 } 859 860 protected boolean isDLQ() { 861 return destination.isDLQ(); 862 } 863 864 @Override 865 public void duplicateFromStore(Message message, Subscription durableSub) { 866 ConnectionContext connectionContext = createConnectionContext(); 867 getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); 868 Throwable cause = new Throwable("duplicate from store for " + destination); 869 message.setRegionDestination(this); 870 broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); 871 MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); 872 messageAck.setPoisonCause(cause); 873 try { 874 acknowledge(connectionContext, durableSub, messageAck, message); 875 } catch (IOException e) { 876 getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); 877 } 878 } 879 880 public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { 881 this.persistJMSRedelivered = persistJMSRedelivered; 882 } 883 884 public boolean isPersistJMSRedelivered() { 885 return persistJMSRedelivered; 886 } 887}