001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.util;
018
019import java.io.DataInput;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.UTFDataFormatException;
023
024import org.apache.activemq.util.ByteSequence;
025import org.apache.activemq.util.MarshallingSupport;
026
027/**
028 * Optimized ByteArrayInputStream that can be used more than once
029 */
030public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable {
031
032    private byte[] buf;
033    private int pos;
034    private int offset;
035    private int length;
036
037    private byte[] work;
038
039    /**
040     * Creates a <code>StoreByteArrayInputStream</code>.
041     *
042     * @param buf the input buffer.
043     */
044    public DataByteArrayInputStream(byte buf[]) {
045        this.buf = buf;
046        this.pos = 0;
047        this.offset = 0;
048        this.length = buf.length;
049        this.work = new byte[8];
050    }
051
052    /**
053     * Creates a <code>StoreByteArrayInputStream</code>.
054     *
055     * @param sequence the input buffer.
056     */
057    public DataByteArrayInputStream(ByteSequence sequence) {
058        this.buf = sequence.getData();
059        this.offset = sequence.getOffset();
060        this.pos =  this.offset;
061        this.length = sequence.length;
062        this.work = new byte[8];
063    }
064
065    /**
066     * Creates <code>WireByteArrayInputStream</code> with a minmalist byte
067     * array
068     */
069    public DataByteArrayInputStream() {
070        this(new byte[0]);
071    }
072
073    /**
074     * @return the size
075     */
076    public int size() {
077        return pos - offset;
078    }
079
080    /**
081     * @return the underlying data array
082     */
083    public byte[] getRawData() {
084        return buf;
085    }
086
087    /**
088     * reset the <code>StoreByteArrayInputStream</code> to use an new byte
089     * array
090     *
091     * @param newBuff
092     */
093    public void restart(byte[] newBuff) {
094        buf = newBuff;
095        pos = 0;
096        length = newBuff.length;
097    }
098
099    public void restart() {
100        pos = 0;
101        length = buf.length;
102    }
103
104    /**
105     * reset the <code>StoreByteArrayInputStream</code> to use an new
106     * ByteSequence
107     *
108     * @param sequence
109     */
110    public void restart(ByteSequence sequence) {
111        this.buf = sequence.getData();
112        this.pos = sequence.getOffset();
113        this.length = sequence.getLength();
114    }
115
116    /**
117     * re-start the input stream - reusing the current buffer
118     *
119     * @param size
120     */
121    public void restart(int size) {
122        if (buf == null || buf.length < size) {
123            buf = new byte[size];
124        }
125        restart(buf);
126        this.length = size;
127    }
128
129    /**
130     * Reads the next byte of data from this input stream. The value byte is
131     * returned as an <code>int</code> in the range <code>0</code> to
132     * <code>255</code>. If no byte is available because the end of the
133     * stream has been reached, the value <code>-1</code> is returned.
134     * <p>
135     * This <code>read</code> method cannot block.
136     *
137     * @return the next byte of data, or <code>-1</code> if the end of the
138     *         stream has been reached.
139     */
140    @Override
141    public int read() {
142        return (pos < length) ? (buf[pos++] & 0xff) : -1;
143    }
144
145    /**
146     * Reads up to <code>len</code> bytes of data into an array of bytes from
147     * this input stream.
148     *
149     * @param b the buffer into which the data is read.
150     * @param off the start offset of the data.
151     * @param len the maximum number of bytes read.
152     * @return the total number of bytes read into the buffer, or
153     *         <code>-1</code> if there is no more data because the end of the
154     *         stream has been reached.
155     */
156    @Override
157    public int read(byte b[], int off, int len) {
158        if (b == null) {
159            throw new NullPointerException();
160        }
161        if (pos >= length) {
162            return -1;
163        }
164        if (pos + len > length) {
165            len = length - pos;
166        }
167        if (len <= 0) {
168            return 0;
169        }
170        System.arraycopy(buf, pos, b, off, len);
171        pos += len;
172        return len;
173    }
174
175    /**
176     * @return the number of bytes that can be read from the input stream
177     *         without blocking.
178     */
179    @Override
180    public int available() {
181        return length - pos;
182    }
183
184    @Override
185    public void readFully(byte[] b) {
186        read(b, 0, b.length);
187    }
188
189    @Override
190    public void readFully(byte[] b, int off, int len) {
191        read(b, off, len);
192    }
193
194    @Override
195    public int skipBytes(int n) {
196        if (pos + n > length) {
197            n = length - pos;
198        }
199        if (n < 0) {
200            return 0;
201        }
202        pos += n;
203        return n;
204    }
205
206    @Override
207    public boolean readBoolean() {
208        return read() != 0;
209    }
210
211    @Override
212    public byte readByte() {
213        return (byte)read();
214    }
215
216    @Override
217    public int readUnsignedByte() {
218        return read();
219    }
220
221    @Override
222    public short readShort() {
223        this.read(work, 0, 2);
224        return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
225    }
226
227    @Override
228    public int readUnsignedShort() {
229        this.read(work, 0, 2);
230        return ((work[0] & 0xff) << 8) | (work[1] & 0xff);
231    }
232
233    @Override
234    public char readChar() {
235        this.read(work, 0, 2);
236        return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
237    }
238
239    @Override
240    public int readInt() {
241        this.read(work, 0, 4);
242        return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
243               ((work[2] & 0xff) << 8) | (work[3] & 0xff);
244    }
245
246    @Override
247    public long readLong() {
248        this.read(work, 0, 8);
249
250        int i1 = ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
251            ((work[2] & 0xff) << 8) | (work[3] & 0xff);
252        int i2 = ((work[4] & 0xff) << 24) | ((work[5] & 0xff) << 16) |
253            ((work[6] & 0xff) << 8) | (work[7] & 0xff);
254
255        return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
256    }
257
258    @Override
259    public float readFloat() throws IOException {
260        return Float.intBitsToFloat(readInt());
261    }
262
263    @Override
264    public double readDouble() throws IOException {
265        return Double.longBitsToDouble(readLong());
266    }
267
268    @Override
269    public String readLine() {
270        int start = pos;
271        while (pos < length) {
272            int c = read();
273            if (c == '\n') {
274                break;
275            }
276            if (c == '\r') {
277                c = read();
278                if (c != '\n' && c != -1) {
279                    pos--;
280                }
281                break;
282            }
283        }
284        return new String(buf, start, pos);
285    }
286
287    @Override
288    public String readUTF() throws IOException {
289        int length = readUnsignedShort();
290        if (pos + length > buf.length) {
291            throw new UTFDataFormatException("bad string");
292        }
293        char chararr[] = new char[length];
294        String result = MarshallingSupport.convertUTF8WithBuf(buf, chararr, pos, length);
295        pos += length;
296        return result;
297    }
298
299    public int getPos() {
300        return pos;
301    }
302
303    public void setPos(int pos) {
304        this.pos = pos;
305    }
306
307    public int getLength() {
308        return length;
309    }
310
311    public void setLength(int length) {
312        this.length = length;
313    }
314}