2018-05-17 12:51:56 +02:00
|
|
|
package org.gcube.data.access.storagehub;
|
|
|
|
|
|
|
|
import java.io.BufferedInputStream;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.io.InputStream;
|
|
|
|
import java.io.PipedInputStream;
|
|
|
|
import java.io.PipedOutputStream;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
public class MultipleOutputStream {
|
|
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(MultipleOutputStream.class);
|
|
|
|
|
|
|
|
private MyPipedInputStream[] pipedInStreams;
|
|
|
|
|
|
|
|
private InputStream is;
|
|
|
|
|
|
|
|
private MyPipedOututStream[] pipedOutStreams;
|
|
|
|
|
|
|
|
private int index=0;
|
2018-10-25 16:33:23 +02:00
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
public MultipleOutputStream(InputStream is, int number) throws IOException{
|
|
|
|
this.is = is;
|
2018-10-25 16:33:23 +02:00
|
|
|
|
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
logger.debug("requested {} piped streams ",number);
|
2018-10-25 16:33:23 +02:00
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
pipedInStreams = new MyPipedInputStream[number];
|
|
|
|
pipedOutStreams = new MyPipedOututStream[number];
|
2018-10-25 16:33:23 +02:00
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
for (int i =0; i<number; i++) {
|
|
|
|
pipedOutStreams[i] = new MyPipedOututStream();
|
|
|
|
pipedInStreams[i] = new MyPipedInputStream(pipedOutStreams[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2018-10-25 16:33:23 +02:00
|
|
|
public void startWriting() throws IOException{
|
|
|
|
|
|
|
|
BufferedInputStream bis = new BufferedInputStream(is);
|
|
|
|
byte[] buf = new byte[1024*64];
|
|
|
|
int read=-1;
|
|
|
|
int writeTot = 0;
|
|
|
|
while ((read =bis.read(buf))!=-1){
|
|
|
|
for (int i=0; i< pipedInStreams.length; i++) {
|
|
|
|
if (!pipedInStreams[i].isClosed()) {
|
|
|
|
pipedOutStreams[i].write(buf, 0, read);
|
2018-05-17 12:51:56 +02:00
|
|
|
}
|
|
|
|
}
|
2018-10-25 16:33:23 +02:00
|
|
|
|
|
|
|
|
|
|
|
writeTot+= read;
|
|
|
|
if (allOutStreamClosed())
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i=0; i< pipedOutStreams.length; i++) {
|
|
|
|
if (!pipedOutStreams[i].isClosed()) {
|
|
|
|
logger.debug("closing outputstream {}",i);
|
|
|
|
pipedOutStreams[i].close();
|
2018-05-17 12:51:56 +02:00
|
|
|
}
|
|
|
|
}
|
2018-10-25 16:33:23 +02:00
|
|
|
|
|
|
|
logger.debug("total written {} ",writeTot);
|
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private boolean allOutStreamClosed() {
|
|
|
|
for (int i=0; i<pipedOutStreams.length; i++) {
|
|
|
|
if (!pipedOutStreams[i].isClosed())
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
public synchronized InputStream get() {
|
|
|
|
logger.debug("requesting piped streams {}",index);
|
|
|
|
if (index>=pipedInStreams.length) return null;
|
|
|
|
return pipedInStreams[index++];
|
|
|
|
}
|
2018-10-25 16:33:23 +02:00
|
|
|
|
2018-05-17 12:51:56 +02:00
|
|
|
|
|
|
|
public class MyPipedOututStream extends PipedOutputStream{
|
|
|
|
|
|
|
|
boolean close = false;
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() throws IOException {
|
|
|
|
this.close = true;
|
|
|
|
super.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @return the close
|
|
|
|
*/
|
|
|
|
public boolean isClosed() {
|
|
|
|
return close;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void write(byte[] b, int off, int len) throws IOException {
|
|
|
|
try{
|
|
|
|
super.write(b, off, len);
|
|
|
|
}catch(IOException io){
|
|
|
|
this.close = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
public class MyPipedInputStream extends PipedInputStream{
|
|
|
|
|
|
|
|
boolean close = false;
|
|
|
|
|
|
|
|
public MyPipedInputStream(PipedOutputStream src) throws IOException {
|
|
|
|
super(src);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() throws IOException {
|
|
|
|
this.close = true;
|
|
|
|
logger.debug(Thread.currentThread().getName()+" close MyPipedInputStream");
|
|
|
|
super.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @return the close
|
|
|
|
*/
|
|
|
|
public boolean isClosed() {
|
|
|
|
return close;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|