001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.LinkedList; 021import java.util.List; 022import org.apache.activemq.ActiveMQMessageAudit; 023import org.apache.activemq.Service; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.Destination; 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.command.MessageId; 028import org.apache.activemq.usage.SystemUsage; 029 030/** 031 * Interface to pending message (messages awaiting disptach to a consumer) 032 * cursor 033 * 034 * 035 */ 036public interface PendingMessageCursor extends Service { 037 038 static final long INFINITE_WAIT = 0; 039 040 /** 041 * Add a destination 042 * 043 * @param context 044 * @param destination 045 * @throws Exception 046 */ 047 void add(ConnectionContext context, Destination destination) throws Exception; 048 049 /** 050 * remove a destination 051 * 052 * @param context 053 * @param destination 054 * @throws Exception 055 */ 056 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; 057 058 /** 059 * @return true if there are no pending messages 060 */ 061 boolean isEmpty(); 062 063 /** 064 * check if a Destination is Empty for this cursor 065 * 066 * @param destination 067 * @return true id the Destination is empty 068 */ 069 boolean isEmpty(Destination destination); 070 071 /** 072 * reset the cursor 073 */ 074 void reset(); 075 076 /** 077 * hint to the cursor to release any locks it might have grabbed after a 078 * reset 079 */ 080 void release(); 081 082 /** 083 * add message to await dispatch 084 * 085 * @param node 086 * @return boolean true if successful, false if cursor traps a duplicate 087 * @throws IOException 088 * @throws Exception 089 */ 090 boolean addMessageLast(MessageReference node) throws Exception; 091 092 /** 093 * add message to await dispatch - if it can 094 * 095 * @param node 096 * @param maxWaitTime 097 * @return true if successful 098 * @throws IOException 099 * @throws Exception 100 */ 101 boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception; 102 103 /** 104 * add message to await dispatch 105 * 106 * @param node 107 * @throws Exception 108 */ 109 void addMessageFirst(MessageReference node) throws Exception; 110 111 /** 112 * Add a message recovered from a retroactive policy 113 * 114 * @param node 115 * @throws Exception 116 */ 117 void addRecoveredMessage(MessageReference node) throws Exception; 118 119 /** 120 * @return true if there pending messages to dispatch 121 */ 122 boolean hasNext(); 123 124 /** 125 * @return the next pending message with its reference count increment 126 */ 127 MessageReference next(); 128 129 /** 130 * remove the message at the cursor position 131 */ 132 void remove(); 133 134 /** 135 * @return the number of pending messages 136 */ 137 int size(); 138 139 long messageSize(); 140 141 /** 142 * clear all pending messages 143 */ 144 void clear(); 145 146 /** 147 * Informs the Broker if the subscription needs to intervention to recover 148 * it's state e.g. DurableTopicSubscriber may do 149 * 150 * @return true if recovery required 151 */ 152 boolean isRecoveryRequired(); 153 154 /** 155 * @return the maximum batch size 156 */ 157 int getMaxBatchSize(); 158 159 /** 160 * Set the max batch size 161 * 162 * @param maxBatchSize 163 */ 164 void setMaxBatchSize(int maxBatchSize); 165 166 /** 167 * Give the cursor a hint that we are about to remove messages from memory 168 * only 169 */ 170 void resetForGC(); 171 172 /** 173 * remove a node 174 * 175 * @param node 176 */ 177 void remove(MessageReference node); 178 179 /** 180 * free up any internal buffers 181 */ 182 void gc(); 183 184 /** 185 * Set the UsageManager 186 * 187 * @param systemUsage 188 * @see org.apache.activemq.usage.SystemUsage 189 */ 190 void setSystemUsage(SystemUsage systemUsage); 191 192 /** 193 * @return the usageManager 194 */ 195 SystemUsage getSystemUsage(); 196 197 /** 198 * @return the memoryUsageHighWaterMark 199 */ 200 int getMemoryUsageHighWaterMark(); 201 202 /** 203 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 204 */ 205 void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark); 206 207 /** 208 * @return true if the cursor is full 209 */ 210 boolean isFull(); 211 212 /** 213 * @return true if the cursor has space to page messages into 214 */ 215 public boolean hasSpace(); 216 217 /** 218 * @return true if the cursor has buffered messages ready to deliver 219 */ 220 boolean hasMessagesBufferedToDeliver(); 221 222 /** 223 * destroy the cursor 224 * 225 * @throws Exception 226 */ 227 void destroy() throws Exception; 228 229 /** 230 * Page in a restricted number of messages and increment the reference count 231 * 232 * @param maxItems 233 * @return a list of paged in messages 234 */ 235 LinkedList<MessageReference> pageInList(int maxItems); 236 237 /** 238 * set the maximum number of producers to track at one time 239 * @param value 240 */ 241 void setMaxProducersToAudit(int value); 242 243 /** 244 * @return the maximum number of producers to audit 245 */ 246 int getMaxProducersToAudit(); 247 248 /** 249 * Set the maximum depth of message ids to track 250 * @param depth 251 */ 252 void setMaxAuditDepth(int depth); 253 254 /** 255 * @return the audit depth 256 */ 257 int getMaxAuditDepth(); 258 259 /** 260 * @return the enableAudit 261 */ 262 public boolean isEnableAudit(); 263 /** 264 * @param enableAudit the enableAudit to set 265 */ 266 public void setEnableAudit(boolean enableAudit); 267 268 /** 269 * @return true if the underlying state of this cursor 270 * disappears when the broker shuts down 271 */ 272 public boolean isTransient(); 273 274 275 /** 276 * set the audit 277 * @param audit 278 */ 279 public void setMessageAudit(ActiveMQMessageAudit audit); 280 281 282 /** 283 * @return the audit - could be null 284 */ 285 public ActiveMQMessageAudit getMessageAudit(); 286 287 /** 288 * use a cache to improve performance 289 * @param useCache 290 */ 291 public void setUseCache(boolean useCache); 292 293 /** 294 * @return true if a cache may be used 295 */ 296 public boolean isUseCache(); 297 298 /** 299 * remove from auditing the message id 300 * @param id 301 */ 302 public void rollback(MessageId id); 303 304 /** 305 * @return true if cache is being used 306 */ 307 public boolean isCacheEnabled(); 308 309 public void rebase(); 310 311}