001/* 002 * Copyright 2011-2019 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2011-2019 Ping Identity Corporation 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.ByteArrayInputStream; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.InputStream; 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033 034import static com.unboundid.util.UtilityMessages.*; 035 036 037 038/** 039 * This class provides an input stream implementation that can aggregate 040 * multiple input streams. When reading data from this input stream, it will 041 * read from the first input stream until the end of it is reached, at point it 042 * will close it and start reading from the next one, and so on until all input 043 * streams have been exhausted. Closing the aggregate input stream will cause 044 * all remaining input streams to be closed. 045 */ 046@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 047public final class AggregateInputStream 048 extends InputStream 049{ 050 // The currently-active input stream. 051 private volatile InputStream activeInputStream; 052 053 // The iterator that will be used to access the input streams. 054 private final Iterator<InputStream> streamIterator; 055 056 057 058 /** 059 * Creates a new aggregate input stream that will use the provided set of 060 * input streams. 061 * 062 * @param inputStreams The input streams to be used by this aggregate input 063 * stream. It must not be {@code null}. 064 */ 065 public AggregateInputStream(final InputStream... inputStreams) 066 { 067 this(StaticUtils.toList(inputStreams)); 068 } 069 070 071 072 /** 073 * Creates a new aggregate input stream that will use the provided set of 074 * input streams. 075 * 076 * @param inputStreams The input streams to be used by this aggregate input 077 * stream. It must not be {@code null}. 078 */ 079 public AggregateInputStream( 080 final Collection<? extends InputStream> inputStreams) 081 { 082 Validator.ensureNotNull(inputStreams); 083 084 final ArrayList<InputStream> streamList = new ArrayList<>(inputStreams); 085 streamIterator = streamList.iterator(); 086 activeInputStream = null; 087 } 088 089 090 091 /** 092 * Creates a new aggregate input stream that will read data from the specified 093 * files. 094 * 095 * @param files The set of files to be read by this aggregate input stream. 096 * It must not be {@code null}. 097 * 098 * @throws IOException If a problem is encountered while attempting to 099 * create input streams for the provided files. 100 */ 101 public AggregateInputStream(final File... files) 102 throws IOException 103 { 104 this(false, files); 105 } 106 107 108 109 /** 110 * Creates a new aggregate input stream that will read data from the specified 111 * files. 112 * 113 * @param ensureBlankLinesBetweenFiles Indicates whether to ensure that 114 * there is at least one completely 115 * blank line between files. This may 116 * be useful when blank lines are 117 * used as delimiters (for example, when 118 * reading LDIF data), there is a chance 119 * that the files may not end with blank 120 * lines, and the inclusion of extra 121 * blank lines between files will not 122 * cause any harm. 123 * @param files The set of files to be read by this 124 * aggregate input stream. It must not 125 * be {@code null}. 126 * 127 * @throws IOException If a problem is encountered while attempting to 128 * create input streams for the provided files. 129 */ 130 public AggregateInputStream(final boolean ensureBlankLinesBetweenFiles, 131 final File... files) 132 throws IOException 133 { 134 Validator.ensureNotNull(files); 135 136 final ArrayList<InputStream> streamList = new ArrayList<>(2 * files.length); 137 138 IOException ioException = null; 139 for (final File f : files) 140 { 141 if (ensureBlankLinesBetweenFiles && (! streamList.isEmpty())) 142 { 143 final ByteStringBuffer buffer = new ByteStringBuffer(4); 144 buffer.append(StaticUtils.EOL_BYTES); 145 buffer.append(StaticUtils.EOL_BYTES); 146 streamList.add(new ByteArrayInputStream(buffer.toByteArray())); 147 } 148 149 try 150 { 151 streamList.add(new FileInputStream(f)); 152 } 153 catch (final IOException ioe) 154 { 155 Debug.debugException(ioe); 156 ioException = ioe; 157 break; 158 } 159 } 160 161 if (ioException != null) 162 { 163 for (final InputStream s : streamList) 164 { 165 if (s != null) 166 { 167 try 168 { 169 s.close(); 170 } 171 catch (final Exception e) 172 { 173 Debug.debugException(e); 174 } 175 } 176 } 177 178 throw ioException; 179 } 180 181 streamIterator = streamList.iterator(); 182 activeInputStream = null; 183 } 184 185 186 187 /** 188 * Reads the next byte of data from the current active input stream, switching 189 * to the next input stream in the set if appropriate. 190 * 191 * @return The next byte of data that was read, or -1 if all streams have 192 * been exhausted. 193 * 194 * @throws IOException If a problem is encountered while attempting to read 195 * data from an input stream. 196 */ 197 @Override() 198 public int read() 199 throws IOException 200 { 201 while (true) 202 { 203 if (activeInputStream == null) 204 { 205 if (streamIterator.hasNext()) 206 { 207 activeInputStream = streamIterator.next(); 208 continue; 209 } 210 else 211 { 212 return -1; 213 } 214 } 215 216 final int byteRead = activeInputStream.read(); 217 if (byteRead < 0) 218 { 219 activeInputStream.close(); 220 activeInputStream = null; 221 } 222 else 223 { 224 return byteRead; 225 } 226 } 227 } 228 229 230 231 /** 232 * Reads data from the current active input stream into the provided array, 233 * switching to the next input stream in the set if appropriate. 234 * 235 * @param b The array into which the data read should be placed, starting 236 * with an index of zero. It must not be {@code null}. 237 * 238 * @return The number of bytes read into the array, or -1 if all streams have 239 * been exhausted. 240 * 241 * @throws IOException If a problem is encountered while attempting to read 242 * data from an input stream. 243 */ 244 @Override() 245 public int read(final byte[] b) 246 throws IOException 247 { 248 return read(b, 0, b.length); 249 } 250 251 252 253 /** 254 * Reads data from the current active input stream into the provided array, 255 * switching to the next input stream in the set if appropriate. 256 * 257 * @param b The array into which the data read should be placed. It must 258 * not be {@code null}. 259 * @param off The position in the array at which to start writing data. 260 * @param len The maximum number of bytes that may be read. 261 * 262 * @return The number of bytes read into the array, or -1 if all streams have 263 * been exhausted. 264 * 265 * @throws IOException If a problem is encountered while attempting to read 266 * data from an input stream. 267 */ 268 @Override() 269 public int read(final byte[] b, final int off, final int len) 270 throws IOException 271 { 272 while (true) 273 { 274 if (activeInputStream == null) 275 { 276 if (streamIterator.hasNext()) 277 { 278 activeInputStream = streamIterator.next(); 279 continue; 280 } 281 else 282 { 283 return -1; 284 } 285 } 286 287 final int bytesRead = activeInputStream.read(b, off, len); 288 if (bytesRead < 0) 289 { 290 activeInputStream.close(); 291 activeInputStream = null; 292 } 293 else 294 { 295 return bytesRead; 296 } 297 } 298 } 299 300 301 302 /** 303 * Attempts to skip and discard up to the specified number of bytes from the 304 * input stream. 305 * 306 * @param n The number of bytes to attempt to skip. 307 * 308 * @return The number of bytes actually skipped. 309 * 310 * @throws IOException If a problem is encountered while attempting to skip 311 * data from the input stream. 312 */ 313 @Override() 314 public long skip(final long n) 315 throws IOException 316 { 317 if (activeInputStream == null) 318 { 319 if (streamIterator.hasNext()) 320 { 321 activeInputStream = streamIterator.next(); 322 return activeInputStream.skip(n); 323 } 324 else 325 { 326 return 0L; 327 } 328 } 329 else 330 { 331 return activeInputStream.skip(n); 332 } 333 } 334 335 336 337 /** 338 * Retrieves an estimate of the number of bytes that can be read without 339 * blocking. 340 * 341 * @return An estimate of the number of bytes that can be read without 342 * blocking. 343 * 344 * @throws IOException If a problem is encountered while attempting to make 345 * the determination. 346 */ 347 @Override() 348 public int available() 349 throws IOException 350 { 351 if (activeInputStream == null) 352 { 353 if (streamIterator.hasNext()) 354 { 355 activeInputStream = streamIterator.next(); 356 return activeInputStream.available(); 357 } 358 else 359 { 360 return 0; 361 } 362 } 363 else 364 { 365 return activeInputStream.available(); 366 } 367 } 368 369 370 371 /** 372 * Indicates whether this input stream supports the use of the {@code mark} 373 * and {@code reset} methods. This implementation does not support that 374 * capability. 375 * 376 * @return {@code false} to indicate that this input stream implementation 377 * does not support the use of {@code mark} and {@code reset}. 378 */ 379 @Override() 380 public boolean markSupported() 381 { 382 return false; 383 } 384 385 386 387 /** 388 * Marks the current position in the input stream. This input stream does not 389 * support this functionality, so no action will be taken. 390 * 391 * @param readLimit The maximum number of bytes that the caller may wish to 392 * read before being able to reset the stream. 393 */ 394 @Override() 395 public void mark(final int readLimit) 396 { 397 // No implementation is required. 398 } 399 400 401 402 /** 403 * Attempts to reset the position of this input stream to the mark location. 404 * This implementation does not support {@code mark} and {@code reset} 405 * functionality, so this method will always throw an exception. 406 * 407 * @throws IOException To indicate that reset is not supported. 408 */ 409 @Override() 410 public void reset() 411 throws IOException 412 { 413 throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get()); 414 } 415 416 417 418 /** 419 * Closes this input stream. All associated input streams will be closed. 420 * 421 * @throws IOException If an exception was encountered while attempting to 422 * close any of the associated streams. Note that even 423 * if an exception is encountered, an attempt will be 424 * made to close all streams. 425 */ 426 @Override() 427 public void close() 428 throws IOException 429 { 430 IOException firstException = null; 431 432 if (activeInputStream != null) 433 { 434 try 435 { 436 activeInputStream.close(); 437 } 438 catch (final IOException ioe) 439 { 440 Debug.debugException(ioe); 441 firstException = ioe; 442 } 443 activeInputStream = null; 444 } 445 446 while (streamIterator.hasNext()) 447 { 448 final InputStream s = streamIterator.next(); 449 try 450 { 451 s.close(); 452 } 453 catch (final IOException ioe) 454 { 455 Debug.debugException(ioe); 456 if (firstException == null) 457 { 458 firstException = ioe; 459 } 460 } 461 } 462 463 if (firstException != null) 464 { 465 throw firstException; 466 } 467 } 468}