diff --git a/src/main/java/org/forkalsrud/album/db/Chunk.java b/src/main/java/org/forkalsrud/album/db/Chunk.java index 4b550e2..1bbc2fc 100644 --- a/src/main/java/org/forkalsrud/album/db/Chunk.java +++ b/src/main/java/org/forkalsrud/album/db/Chunk.java @@ -6,9 +6,11 @@ package org.forkalsrud.album.db; public class Chunk { public byte[] bits; + public byte[] meta; - public Chunk(int capacity) { + public Chunk(int capacity, int metaCapacity) { this.bits = new byte[capacity]; + this.bits = new byte[0]; } } \ No newline at end of file diff --git a/src/main/java/org/forkalsrud/album/db/MovieDatabase.java b/src/main/java/org/forkalsrud/album/db/MovieDatabase.java index 3e38049..70888bd 100644 --- a/src/main/java/org/forkalsrud/album/db/MovieDatabase.java +++ b/src/main/java/org/forkalsrud/album/db/MovieDatabase.java @@ -5,8 +5,6 @@ package org.forkalsrud.album.db; import java.nio.charset.Charset; -import org.forkalsrud.album.web.CachedImage; - import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; @@ -85,12 +83,16 @@ public class MovieDatabase extends TupleBinding { public Chunk entryToObject(TupleInput in) { int version = in.readInt(); - if (version != 1) { - throw new RuntimeException("I only understand version 1"); + if (version != 1 && version != 2) { + throw new RuntimeException("I only understand versions 1 and 2"); } - int lobLen = in.readInt(); - Chunk chunk = new Chunk(lobLen); + int lobLen = in.readInt(); + int metaLen = version >= 2 ? in.readInt() : 0; + Chunk chunk = new Chunk(lobLen, metaLen); in.read(chunk.bits, 0, lobLen); + if (version >= 2) { + in.read(chunk.meta, 0, metaLen); + } return chunk; } @@ -98,9 +100,11 @@ public class MovieDatabase extends TupleBinding { @Override public void objectToEntry(Chunk chunk, TupleOutput out) { - out.writeInt(1); // version 1 + out.writeInt(2); // version 1 out.writeInt(chunk.bits.length); + out.writeInt(chunk.meta.length); out.write(chunk.bits); + out.write(chunk.meta); } } diff --git a/src/main/java/org/forkalsrud/album/video/FlvFilter.java b/src/main/java/org/forkalsrud/album/video/FlvFilter.java index f7c0a20..6ef2aff 100644 --- a/src/main/java/org/forkalsrud/album/video/FlvFilter.java +++ b/src/main/java/org/forkalsrud/album/video/FlvFilter.java @@ -1,11 +1,8 @@ package org.forkalsrud.album.video; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.util.Arrays; -import java.util.LinkedList; /** * Separates the FLV header boxes from the body boxes. diff --git a/src/main/java/org/forkalsrud/album/video/MovieCoder.java b/src/main/java/org/forkalsrud/album/video/MovieCoder.java index cc37a3c..eaf9d46 100644 --- a/src/main/java/org/forkalsrud/album/video/MovieCoder.java +++ b/src/main/java/org/forkalsrud/album/video/MovieCoder.java @@ -8,10 +8,10 @@ import java.io.LineNumberReader; import java.io.OutputStream; import java.io.PrintStream; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,6 +31,7 @@ public class MovieCoder { private String mplayerExecutable; private PictureScaler pictureScaler; private MovieDatabase movieDb; + private HashMap currentEncodings = new HashMap(); public MovieCoder(PictureScaler pictureScaler, MovieDatabase movieDb) { this.pictureScaler = pictureScaler; @@ -164,40 +165,21 @@ public class MovieCoder { } - class TailingOutputStream extends OutputStream { + public static interface EncodingProcessListener { - int currentPos; - int remainingBytes; - OutputStream dst; + public abstract void chunkAvailable(int chunkNo); - public TailingOutputStream(OutputStream dst, int startPos) { - this.dst = dst; - this.currentPos = startPos; - this.remainingBytes = Integer.MAX_VALUE; - } - - @Override - public void write(final int b) throws IOException { - this.write(new byte[] { (byte) (b & 0xff) }, 0, 1); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - dst.write(b, off, len); - currentPos += len; - remainingBytes -= len; - } + public abstract void codingFinished(int lastCunkNo); } - + class EncodingProcess implements Runnable { final int chunkSize = 65536; - int currentPos = 0; File file; Thumbnail thumbnail; Dimension targetSize; - LinkedList consumers = new LinkedList(); + ArrayList listeners = new ArrayList(); Chunk currentChunk = null; int chunkPos; int remainingCapacity; @@ -214,7 +196,7 @@ public class MovieCoder { */ private void startNewChunk() { - this.currentChunk = new Chunk(chunkSize); + this.currentChunk = new Chunk(chunkSize, 0); this.chunkPos = 0; this.remainingCapacity = chunkSize; } @@ -223,35 +205,20 @@ public class MovieCoder { * */ private void endChunk() { + if (currentChunk != null && chunkPos > 0) { - movieDb.store(file.getPath() + ":" + targetSize.getWidth(), chunkNo++, currentChunk); + + movieDb.store(file.getPath() + ":" + targetSize.getWidth(), chunkNo, currentChunk); + for (EncodingProcessListener listener : listeners) { + listener.chunkAvailable(chunkNo); + } + chunkNo++; } } - public synchronized void setEncodedData(byte[] buf, int offset, int length) throws IOException { - - for (TailingOutputStream consumer : consumers) { - - long bytesToSkip = consumer.currentPos - this.currentPos; - int i = offset; - if (bytesToSkip > 0) { - i += bytesToSkip; - } - int remaining = offset + length - i; - int bytesToCopy = Math.min(remaining, consumer.remainingBytes); - if (bytesToCopy > 0) { - consumer.write(buf, i, bytesToCopy); - } else { - // consumer.done(); - } - } - this.currentPos += length; - } - /* - * Transcoding probably needs to go in at some point. - * A suitable command line may be like the one below, as suggested by + * We might want to do H.264 instead of the H.263 codec typically used in FLV files. * http://rob.opendot.cl/index.php/useful-stuff/ffmpeg-x264-encoding-guide/ * http://rob.opendot.cl/index.php/useful-stuff/encoding-a-flv-video-for-embedded-web-playback/ * @@ -290,40 +257,50 @@ public class MovieCoder { "-g", "150", "-cmp", "2", "-subcmp", "2", "-mbd", "2", "-flags", "+aic+cbp+mv0+mv4", "-trellis", "1", "-f", "flv", - "-metadata", "duration=" + thumbnail.getDuration(), +// "-metadata", "duration=" + thumbnail.getDuration(), "-"); - log.info(pb.command().toString()); - pb.redirectErrorStream(false); - Process p = pb.start(); - p.getOutputStream().close(); - InputStream movieStream = p.getInputStream(); - InputStream diagnostic = p.getErrorStream(); - new Thread(new ErrorStreamPumper(diagnostic)).start(); - int len; - startNewChunk(); - while ((len = movieStream.read(currentChunk.bits, chunkPos, remainingCapacity)) > 0) { + log.info(pb.command().toString()); + pb.redirectErrorStream(false); + Process p = pb.start(); + p.getOutputStream().close(); + InputStream movieStream = p.getInputStream(); + InputStream diagnostic = p.getErrorStream(); + new Thread(new ErrorStreamPumper(diagnostic)).start(); - setEncodedData(currentChunk.bits, chunkPos, len); - chunkPos += len; - remainingCapacity -= len; + int len; + startNewChunk(); + while ((len = movieStream.read(currentChunk.bits, chunkPos, remainingCapacity)) > 0) { - if (remainingCapacity == 0) { - endChunk(); - startNewChunk(); + chunkPos += len; + remainingCapacity -= len; + + if (remainingCapacity == 0) { + endChunk(); + startNewChunk(); + } } - } - endChunk(); + endChunk(); } catch (Exception e) { - e.printStackTrace(System.err); + log.error("uh?", e); + } finally { + synchronized (MovieCoder.this) { + currentEncodings.remove(key(file, targetSize)); + } + for (EncodingProcessListener listener : listeners) { + listener.codingFinished(chunkNo - 1); + } } } - public void streamTo(OutputStream out) { - consumers.add(new TailingOutputStream(out, 0)); - run(); - consumers.clear(); + + public synchronized void addListener(VideoStreamer videoStreamer) { + listeners.add(videoStreamer); + } + + public void removeListener(VideoStreamer videoStreamer) { + listeners.remove(videoStreamer); } } @@ -337,37 +314,105 @@ public class MovieCoder { @Override public void run() { try { - LineNumberReader lnr = new LineNumberReader(new InputStreamReader(is)); - String line; - while ((line = lnr.readLine()) != null) { - System.err.println(line); - } + LineNumberReader lnr = new LineNumberReader(new InputStreamReader(is)); + String line; + while ((line = lnr.readLine()) != null) { + log.warn(line); + } } catch (Exception e) { - e.printStackTrace(System.err); + log.error("stderr?", e); } } } - HashMap encodingsInProgress; - - + // TODO (knut 05 JUL 2011) Come up with a better interface for supporting + // range requests etcetera public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException { -System.out.println("being asked to stream " + file + " size=" + size); - Dimension targetSize = thumbnail.getSize().scale(size); - new EncodingProcess(file, thumbnail, targetSize).streamTo(out); - /* - String key = file.getPath() + ":" + targetSize.getWidth(); - int chunkNo = 0; - boolean done = false; - while (!done) { - Chunk chunk = movieDb.load(key, chunkNo++); - if (chunk == null) { - break; - } - out.write(chunk.bits); - } - */ + + new VideoStreamer().stream(file, thumbnail, size, out); } + private String key(File file, Dimension size) { + + return file.getPath() + ":" + size.getWidth(); + } + + + class VideoStreamer implements EncodingProcessListener { + + int chunkNo = 0; + int lastChunkNo = -1; + + private boolean done() { + return lastChunkNo >= 0 && chunkNo > lastChunkNo; + } + + public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException { + + Dimension targetSize = thumbnail.getSize().scale(size); + String key = key(file, targetSize); + + log.info("being asked to stream " + file + " size=" + size); + + EncodingProcess ep; + Chunk chunk; + + synchronized (MovieCoder.this) { + // See if we have (at least the beginning of) the file in the DB + chunk = movieDb.load(key, chunkNo); + + // See if file is currently in the process of being encoded + ep = currentEncodings.get(key); + + // If neither we need to start the encoding process + if (chunk == null && ep == null) { + ep = new EncodingProcess(file, thumbnail, targetSize); + currentEncodings.put(key, ep); + ep.addListener(this); + } else if (chunk == null && ep != null) { + // Encoding started but no chunk yet written + ep.addListener(this); + } else if (chunk != null && ep == null) { + // Encoding done, just stream from DB + // Here we do not add ourselves as a listener, so we also don't wait down below + // On the other hand we should have updated metadata about the chunks in chunk 0 + // so we should use that to set lastChunkNo + } else { + // Encoding partially done, stream what we have and wait for the rest to become available + ep.addListener(this); + } + } + while (!done()) { + while (chunk != null) { + out.write(chunk.bits); + chunk = movieDb.load(key, chunkNo++); + } + if (!done()) { + if (ep != null) { + synchronized (this) { + wait(2000L); + } + } else { + // We ran out of chunks, so we must be done. + break; + } + } + } + if (ep != null) { + ep.removeListener(this); + } + } + + @Override + public synchronized void chunkAvailable(int chunkNo) { + notify(); + } + + @Override + public synchronized void codingFinished(int lastCunkNo) { + this.lastChunkNo = lastCunkNo; + notify(); + } + } }