More work on streaming video.

This commit is contained in:
Knut Forkalsrud 2011-07-06 19:17:52 -07:00
parent dee96ab87d
commit a516f02b14
4 changed files with 156 additions and 108 deletions

View file

@ -6,9 +6,11 @@ package org.forkalsrud.album.db;
public class Chunk {
public byte[] bits;
public byte[] meta;
public Chunk(int capacity) {
public Chunk(int capacity, int metaCapacity) {
this.bits = new byte[capacity];
this.bits = new byte[0];
}
}

View file

@ -5,8 +5,6 @@ package org.forkalsrud.album.db;
import java.nio.charset.Charset;
import org.forkalsrud.album.web.CachedImage;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@ -85,12 +83,16 @@ public class MovieDatabase extends TupleBinding<Chunk> {
public Chunk entryToObject(TupleInput in) {
int version = in.readInt();
if (version != 1) {
throw new RuntimeException("I only understand version 1");
if (version != 1 && version != 2) {
throw new RuntimeException("I only understand versions 1 and 2");
}
int lobLen = in.readInt();
Chunk chunk = new Chunk(lobLen);
int lobLen = in.readInt();
int metaLen = version >= 2 ? in.readInt() : 0;
Chunk chunk = new Chunk(lobLen, metaLen);
in.read(chunk.bits, 0, lobLen);
if (version >= 2) {
in.read(chunk.meta, 0, metaLen);
}
return chunk;
}
@ -98,9 +100,11 @@ public class MovieDatabase extends TupleBinding<Chunk> {
@Override
public void objectToEntry(Chunk chunk, TupleOutput out) {
out.writeInt(1); // version 1
out.writeInt(2); // version 1
out.writeInt(chunk.bits.length);
out.writeInt(chunk.meta.length);
out.write(chunk.bits);
out.write(chunk.meta);
}
}

View file

@ -1,11 +1,8 @@
package org.forkalsrud.album.video;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
/**
* Separates the FLV header boxes from the body boxes.

View file

@ -8,10 +8,10 @@ import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -31,6 +31,7 @@ public class MovieCoder {
private String mplayerExecutable;
private PictureScaler pictureScaler;
private MovieDatabase movieDb;
private HashMap<String, EncodingProcess> currentEncodings = new HashMap<String, EncodingProcess>();
public MovieCoder(PictureScaler pictureScaler, MovieDatabase movieDb) {
this.pictureScaler = pictureScaler;
@ -164,40 +165,21 @@ public class MovieCoder {
}
class TailingOutputStream extends OutputStream {
public static interface EncodingProcessListener {
int currentPos;
int remainingBytes;
OutputStream dst;
public abstract void chunkAvailable(int chunkNo);
public TailingOutputStream(OutputStream dst, int startPos) {
this.dst = dst;
this.currentPos = startPos;
this.remainingBytes = Integer.MAX_VALUE;
}
@Override
public void write(final int b) throws IOException {
this.write(new byte[] { (byte) (b & 0xff) }, 0, 1);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
dst.write(b, off, len);
currentPos += len;
remainingBytes -= len;
}
public abstract void codingFinished(int lastCunkNo);
}
class EncodingProcess implements Runnable {
final int chunkSize = 65536;
int currentPos = 0;
File file;
Thumbnail thumbnail;
Dimension targetSize;
LinkedList<TailingOutputStream> consumers = new LinkedList<TailingOutputStream>();
ArrayList<EncodingProcessListener> listeners = new ArrayList<EncodingProcessListener>();
Chunk currentChunk = null;
int chunkPos;
int remainingCapacity;
@ -214,7 +196,7 @@ public class MovieCoder {
*/
private void startNewChunk() {
this.currentChunk = new Chunk(chunkSize);
this.currentChunk = new Chunk(chunkSize, 0);
this.chunkPos = 0;
this.remainingCapacity = chunkSize;
}
@ -223,35 +205,20 @@ public class MovieCoder {
*
*/
private void endChunk() {
if (currentChunk != null && chunkPos > 0) {
movieDb.store(file.getPath() + ":" + targetSize.getWidth(), chunkNo++, currentChunk);
movieDb.store(file.getPath() + ":" + targetSize.getWidth(), chunkNo, currentChunk);
for (EncodingProcessListener listener : listeners) {
listener.chunkAvailable(chunkNo);
}
chunkNo++;
}
}
public synchronized void setEncodedData(byte[] buf, int offset, int length) throws IOException {
for (TailingOutputStream consumer : consumers) {
long bytesToSkip = consumer.currentPos - this.currentPos;
int i = offset;
if (bytesToSkip > 0) {
i += bytesToSkip;
}
int remaining = offset + length - i;
int bytesToCopy = Math.min(remaining, consumer.remainingBytes);
if (bytesToCopy > 0) {
consumer.write(buf, i, bytesToCopy);
} else {
// consumer.done();
}
}
this.currentPos += length;
}
/*
* Transcoding probably needs to go in at some point.
* A suitable command line may be like the one below, as suggested by
* We might want to do H.264 instead of the H.263 codec typically used in FLV files.
* http://rob.opendot.cl/index.php/useful-stuff/ffmpeg-x264-encoding-guide/
* http://rob.opendot.cl/index.php/useful-stuff/encoding-a-flv-video-for-embedded-web-playback/
*
@ -290,40 +257,50 @@ public class MovieCoder {
"-g", "150", "-cmp", "2", "-subcmp", "2", "-mbd", "2",
"-flags", "+aic+cbp+mv0+mv4", "-trellis", "1",
"-f", "flv",
"-metadata", "duration=" + thumbnail.getDuration(),
// "-metadata", "duration=" + thumbnail.getDuration(),
"-");
log.info(pb.command().toString());
pb.redirectErrorStream(false);
Process p = pb.start();
p.getOutputStream().close();
InputStream movieStream = p.getInputStream();
InputStream diagnostic = p.getErrorStream();
new Thread(new ErrorStreamPumper(diagnostic)).start();
int len;
startNewChunk();
while ((len = movieStream.read(currentChunk.bits, chunkPos, remainingCapacity)) > 0) {
log.info(pb.command().toString());
pb.redirectErrorStream(false);
Process p = pb.start();
p.getOutputStream().close();
InputStream movieStream = p.getInputStream();
InputStream diagnostic = p.getErrorStream();
new Thread(new ErrorStreamPumper(diagnostic)).start();
setEncodedData(currentChunk.bits, chunkPos, len);
chunkPos += len;
remainingCapacity -= len;
int len;
startNewChunk();
while ((len = movieStream.read(currentChunk.bits, chunkPos, remainingCapacity)) > 0) {
if (remainingCapacity == 0) {
endChunk();
startNewChunk();
chunkPos += len;
remainingCapacity -= len;
if (remainingCapacity == 0) {
endChunk();
startNewChunk();
}
}
}
endChunk();
endChunk();
} catch (Exception e) {
e.printStackTrace(System.err);
log.error("uh?", e);
} finally {
synchronized (MovieCoder.this) {
currentEncodings.remove(key(file, targetSize));
}
for (EncodingProcessListener listener : listeners) {
listener.codingFinished(chunkNo - 1);
}
}
}
public void streamTo(OutputStream out) {
consumers.add(new TailingOutputStream(out, 0));
run();
consumers.clear();
public synchronized void addListener(VideoStreamer videoStreamer) {
listeners.add(videoStreamer);
}
public void removeListener(VideoStreamer videoStreamer) {
listeners.remove(videoStreamer);
}
}
@ -337,37 +314,105 @@ public class MovieCoder {
@Override
public void run() {
try {
LineNumberReader lnr = new LineNumberReader(new InputStreamReader(is));
String line;
while ((line = lnr.readLine()) != null) {
System.err.println(line);
}
LineNumberReader lnr = new LineNumberReader(new InputStreamReader(is));
String line;
while ((line = lnr.readLine()) != null) {
log.warn(line);
}
} catch (Exception e) {
e.printStackTrace(System.err);
log.error("stderr?", e);
}
}
}
HashMap<String, EncodingProcess> encodingsInProgress;
// TODO (knut 05 JUL 2011) Come up with a better interface for supporting
// range requests etcetera
public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException {
System.out.println("being asked to stream " + file + " size=" + size);
Dimension targetSize = thumbnail.getSize().scale(size);
new EncodingProcess(file, thumbnail, targetSize).streamTo(out);
/*
String key = file.getPath() + ":" + targetSize.getWidth();
int chunkNo = 0;
boolean done = false;
while (!done) {
Chunk chunk = movieDb.load(key, chunkNo++);
if (chunk == null) {
break;
}
out.write(chunk.bits);
}
*/
new VideoStreamer().stream(file, thumbnail, size, out);
}
private String key(File file, Dimension size) {
return file.getPath() + ":" + size.getWidth();
}
class VideoStreamer implements EncodingProcessListener {
int chunkNo = 0;
int lastChunkNo = -1;
private boolean done() {
return lastChunkNo >= 0 && chunkNo > lastChunkNo;
}
public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException {
Dimension targetSize = thumbnail.getSize().scale(size);
String key = key(file, targetSize);
log.info("being asked to stream " + file + " size=" + size);
EncodingProcess ep;
Chunk chunk;
synchronized (MovieCoder.this) {
// See if we have (at least the beginning of) the file in the DB
chunk = movieDb.load(key, chunkNo);
// See if file is currently in the process of being encoded
ep = currentEncodings.get(key);
// If neither we need to start the encoding process
if (chunk == null && ep == null) {
ep = new EncodingProcess(file, thumbnail, targetSize);
currentEncodings.put(key, ep);
ep.addListener(this);
} else if (chunk == null && ep != null) {
// Encoding started but no chunk yet written
ep.addListener(this);
} else if (chunk != null && ep == null) {
// Encoding done, just stream from DB
// Here we do not add ourselves as a listener, so we also don't wait down below
// On the other hand we should have updated metadata about the chunks in chunk 0
// so we should use that to set lastChunkNo
} else {
// Encoding partially done, stream what we have and wait for the rest to become available
ep.addListener(this);
}
}
while (!done()) {
while (chunk != null) {
out.write(chunk.bits);
chunk = movieDb.load(key, chunkNo++);
}
if (!done()) {
if (ep != null) {
synchronized (this) {
wait(2000L);
}
} else {
// We ran out of chunks, so we must be done.
break;
}
}
}
if (ep != null) {
ep.removeListener(this);
}
}
@Override
public synchronized void chunkAvailable(int chunkNo) {
notify();
}
@Override
public synchronized void codingFinished(int lastCunkNo) {
this.lastChunkNo = lastCunkNo;
notify();
}
}
}