More work on movie streaming

This commit is contained in:
Knut Forkalsrud 2011-07-09 16:57:30 -07:00
parent a516f02b14
commit 494c0e5930
10 changed files with 402 additions and 162 deletions

View file

@ -10,7 +10,7 @@ public class Chunk {
public Chunk(int capacity, int metaCapacity) {
this.bits = new byte[capacity];
this.bits = new byte[0];
this.meta = new byte[0];
}
}

View file

@ -72,6 +72,19 @@ public class MovieDatabase extends TupleBinding<Chunk> {
}
public void delete(String key) {
int seq = 0;
OperationStatus status = OperationStatus.SUCCESS;
Transaction txn = environment.beginTransaction(null, null);
while (OperationStatus.SUCCESS.equals(status)) {
DatabaseEntry binKey = key(key, seq);
status = db.delete(txn, binKey);
}
txn.commitSync();
}
private DatabaseEntry key(String key, int seq) {
DatabaseEntry returnValue = new DatabaseEntry();
returnValue.setData((key + "#" + seq).getBytes(UTF8));

View file

@ -0,0 +1,9 @@
package org.forkalsrud.album.video;
public interface EncodingProcessListener {
public abstract void chunkAvailable(int chunkNo);
public abstract void codingFinished(int lastCunkNo);
}

View file

@ -0,0 +1,51 @@
package org.forkalsrud.album.video;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
public class ExecUtil {
/**
* The crazy Mac OSX does not even set the PATH to a reasonable value, so
* we have to jump through hoops to guess where we may find the executables
* for mplayer and friends.
*
* @param name
* @return
* @throws IOException
* @throws InterruptedException
*/
public String findExecutableInShellPath(String name) throws IOException, InterruptedException {
String executableForName = name;
ProcessBuilder pb = new ProcessBuilder(Arrays.asList(System.getenv("SHELL")));
pb.redirectErrorStream(true); // send errors to stdout
Process p = pb.start();
PrintStream stdin = new PrintStream(p.getOutputStream());
stdin.print("echo $PATH"); // This is still not entirely portable. Windows would like %PATH%
stdin.close();
InputStream stdout = p.getInputStream();
String searchPath = IOUtils.toString(stdout);
p.waitFor();
String separator = System.getProperty("path.separator");
if (searchPath != null && separator != null && !"".equals(separator)) {
String[] elements = searchPath.split(separator);
for (String path : elements) {
File executable = new File(path, name);
if (executable.isFile()) {
executableForName = executable.getAbsolutePath();
break;
}
}
}
return executableForName;
}
}

View file

@ -1,5 +1,6 @@
package org.forkalsrud.album.video;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@ -12,6 +13,11 @@ public class FlvFilter extends OutputStream {
private static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(FlvFilter.class);
public interface FlvReceiver {
public void writeHeader(byte[]... data);
public void writeBody(byte[] data);
}
private final static int FLV_SIZE_TAGHEADER = 11;
private final static int FLV_SIZE_TAGFOOTER = 4;
private final static int FLV_SIZE_FILEHEADER = 13;
@ -20,8 +26,8 @@ public class FlvFilter extends OutputStream {
private final static int FLV_TAG_VIDEO = 9;
private final static int FLV_TAG_SCRIPTDATA = 18;
private OutputStream headerDst;
private OutputStream bodyDst;
private boolean headerWritten = false;
private FlvReceiver receiver;
private byte[] fileHeader = new byte[FLV_SIZE_FILEHEADER];
private int byteCounter = 0;
@ -38,8 +44,14 @@ public class FlvFilter extends OutputStream {
private int currentTagSize;
private byte[] currentBox;
private FlvMetadata metadata = new FlvMetadata();
private FlvMetadata metadata;
private FlvMetadata extraMetadata;
public FlvFilter(FlvReceiver receiver, FlvMetadata metadata) {
this.receiver = receiver;
this.metadata = new FlvMetadata();
this.extraMetadata = metadata;
}
/**
* Receive some bytes of FLV output
@ -87,6 +99,11 @@ public class FlvFilter extends OutputStream {
currentDataSize = decodeUint24(currentBoxHeader, 1);
currentTagTimestamp = decodeTimestamp(currentBoxHeader, 4);
currentTagSize = currentDataSize + FLV_SIZE_TAGHEADER + FLV_SIZE_TAGFOOTER;
if (currentTagType == 0) {
log.info("Encountered tag type zero, exiting.");
currentBox = null;
return;
}
currentBox = new byte[currentTagSize];
for (int i = 0; i < FLV_SIZE_TAGHEADER; i++) {
currentBox[i] = currentBoxHeader[i];
@ -117,32 +134,63 @@ public class FlvFilter extends OutputStream {
}
void processBox() {
void processBox() throws IOException {
switch (currentTagType) {
case FLV_TAG_VIDEO:
// We expect one SCRIPTDATA box immediately following the file header
// Then only audio and video
if (currentTagType == FLV_TAG_SCRIPTDATA) {
if (!headerWritten) {
incomingMetadataLength = currentTagSize;
/*
TODO (knut 09 JUL 2011) generate (extra-)metadata here to enable user injection of metadata
metadata.read(currentBox, FLV_SIZE_TAGHEADER, currentTagSize - FLV_SIZE_TAGFOOTER);
metadata.merge(extraMetadata);
generateHeader();
*/
receiver.writeHeader(fileHeader, currentBox);
headerWritten = true;
} else {
log.warn("SCRIPTDATA out of order");
receiver.writeBody(currentBox);
}
} else if (currentTagType == FLV_TAG_VIDEO || currentTagType == FLV_TAG_AUDIO) {
if (!headerWritten) {
log.error("SCRIPTDATA out of order");
receiver.writeHeader(fileHeader);
headerWritten = true;
}
if (currentTagType == FLV_TAG_VIDEO) {
int flags = decodeUint8(currentBox, FLV_SIZE_TAGHEADER);
boolean isKeyFrame = ((flags >> 4) & 0xf) == 1;
metadata.addVideoFrame(currentTagPos, currentTagTimestamp, isKeyFrame);
break;
case FLV_TAG_SCRIPTDATA:
incomingMetadataLength = currentTagSize;
break;
case FLV_TAG_AUDIO:
} else { // currentTagType == FLV_TAG_AUDIO
metadata.addAudioFrame(currentTagPos, currentTagTimestamp);
break;
default:
}
receiver.writeBody(currentBox);
metadata.setFileSize(currentTagPos + currentTagSize);
} else {
log.error("Unknown box type: " + currentTagType);
break;
receiver.writeBody(currentBox);
}
}
public void generateHeader(OutputStream out) throws IOException {
@Override
public void flush() throws IOException {
if (currentBox != null) {
throw new IOException("Premature end of file");
}
}
public void generateHeader() throws IOException {
int metadataLength = metadata.calculateLength();
ByteArrayOutputStream out = new ByteArrayOutputStream(fileHeader.length + metadataLength);
out.write(fileHeader);
metadata.setFileOffsetDelta(metadata.calculateLength() - incomingMetadataLength);
metadata.setFileOffsetDelta(metadataLength - incomingMetadataLength);
metadata.writeOnMetadata(out);
receiver.writeHeader(out.toByteArray());
}

View file

@ -7,6 +7,8 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.forkalsrud.album.exif.Dimension;
public class FlvMetadata {
private abstract class Attr<T> {
@ -305,13 +307,13 @@ public class FlvMetadata {
out.write(8); // ECMAArray
writeUint32(out, len);
}
/*
private void writeFlvVariableArrayStart(OutputStream out, String name) throws IOException {
writeFlvString(out, name);
out.write(3); // Variable Array
}
*/
private void writeFlvVariableArrayEnd(OutputStream out) throws IOException {
out.write(0);
out.write(0);
@ -369,4 +371,51 @@ public class FlvMetadata {
return timestamp;
}
}
public void setDuration(String durationStr) {
this.duration.set(Double.parseDouble(durationStr));
}
public void setDimension(Dimension d) {
this.width.set(Double.valueOf(d.getWidth()));
this.height.set(Double.valueOf(d.getHeight()));
}
public void merge(FlvMetadata other) {
for (int i = 0; i < attrs.size(); i++) {
Attr their = other.attrs.get(i);
if (their.isPresent()) {
Attr our = this.attrs.get(i);
our.set(their.value);
}
}
}
public Object get(String prop) {
if (prop == null) {
return null;
}
for (Attr a : attrs) {
if (prop.equals(a.name) && a.isPresent()) {
return a.value;
}
}
return null;
}
public void setFileSize(int size) {
this.filesize.set(Double.valueOf(size));
}
public void read(byte[] buf, int offset, int len) {
if (buf[offset] != 2) { //not string
return;
}
}
}

View file

@ -6,10 +6,8 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -39,48 +37,13 @@ public class MovieCoder {
}
public void init() throws Exception {
this.ffmpegExecutable = findExecutableInShellPath("ffmpeg");
this.mplayerExecutable = findExecutableInShellPath("mplayer");
ExecUtil util = new ExecUtil();
this.ffmpegExecutable = util.findExecutableInShellPath("ffmpeg");
this.mplayerExecutable = util.findExecutableInShellPath("mplayer");
}
/**
* The crazy Mac OSX does not even set the PATH to a reasonable value, so
* we have to jump through hoops to guess where we may find the executables
* for mplayer and friends.
*
* @param name
* @return
* @throws IOException
* @throws InterruptedException
*/
private String findExecutableInShellPath(String name) throws IOException, InterruptedException {
String executableForName = name;
ProcessBuilder pb = new ProcessBuilder(Arrays.asList(System.getenv("SHELL")));
pb.redirectErrorStream(true); // send errors to stdout
Process p = pb.start();
PrintStream stdin = new PrintStream(p.getOutputStream());
stdin.print("echo $PATH"); // This is still not entirely portable. Windows would like %PATH%
stdin.close();
InputStream stdout = p.getInputStream();
String searchPath = IOUtils.toString(stdout);
p.waitFor();
String separator = System.getProperty("path.separator");
if (searchPath != null && separator != null && !"".equals(separator)) {
String[] elements = searchPath.split(separator);
for (String path : elements) {
File executable = new File(path, name);
if (executable.isFile()) {
executableForName = executable.getAbsolutePath();
break;
}
}
}
return executableForName;
}
@ -165,17 +128,28 @@ public class MovieCoder {
}
public static interface EncodingProcessListener {
public abstract void chunkAvailable(int chunkNo);
public abstract void codingFinished(int lastCunkNo);
/**
* @param file
* @param thumbnail
* @param targetSize
* @param key
* @return
*/
private synchronized EncodingProcess submitEncodingJob(File file,
Thumbnail thumbnail, Dimension targetSize, String key) {
EncodingProcess ep;
ep = new EncodingProcess(file, thumbnail, targetSize);
currentEncodings.put(key, ep);
return ep;
}
class EncodingProcess implements Runnable {
final int chunkSize = 65536;
class EncodingProcess implements Runnable, FlvFilter.FlvReceiver {
final int chunkSize = 4 * 65536;
File file;
Thumbnail thumbnail;
Dimension targetSize;
@ -184,39 +158,19 @@ public class MovieCoder {
int chunkPos;
int remainingCapacity;
int chunkNo = 0;
FlvFilter filter;
String dbKey;
public EncodingProcess(File file, Thumbnail thumbnail, Dimension size) {
this.file = file;
this.thumbnail = thumbnail;
this.targetSize = size;
this.dbKey = key(file, targetSize);
FlvMetadata meta = new FlvMetadata();
meta.setDuration(thumbnail.getDuration());
this.filter = new FlvFilter(this, meta);
}
/**
*
*/
private void startNewChunk() {
this.currentChunk = new Chunk(chunkSize, 0);
this.chunkPos = 0;
this.remainingCapacity = chunkSize;
}
/**
*
*/
private void endChunk() {
if (currentChunk != null && chunkPos > 0) {
movieDb.store(file.getPath() + ":" + targetSize.getWidth(), chunkNo, currentChunk);
for (EncodingProcessListener listener : listeners) {
listener.chunkAvailable(chunkNo);
}
chunkNo++;
}
}
/*
* We might want to do H.264 instead of the H.263 codec typically used in FLV files.
* http://rob.opendot.cl/index.php/useful-stuff/ffmpeg-x264-encoding-guide/
@ -231,7 +185,6 @@ public class MovieCoder {
* -acodec libfaac -aq 100 \
* <outfile>.mp4
*/
@Override
public void run() {
@ -252,7 +205,7 @@ public class MovieCoder {
ffmpegExecutable, "-i", file.getAbsolutePath(),
// "-aspect", (thumbnail.getSize().getWidth() + ":" + thumbnail.getSize().getHeight()),
"-s", (targetSize.getWidth() + "x" + targetSize.getHeight()),
"-b", "600k",
"-b", "150k",
"-acodec", "libmp3lame", "-ar", "22050", "-vcodec", "flv",
"-g", "150", "-cmp", "2", "-subcmp", "2", "-mbd", "2",
"-flags", "+aic+cbp+mv0+mv4", "-trellis", "1",
@ -268,34 +221,103 @@ public class MovieCoder {
InputStream diagnostic = p.getErrorStream();
new Thread(new ErrorStreamPumper(diagnostic)).start();
byte[] buffer = new byte[65536];
int len;
startNewChunk();
while ((len = movieStream.read(currentChunk.bits, chunkPos, remainingCapacity)) > 0) {
chunkPos += len;
remainingCapacity -= len;
if (remainingCapacity == 0) {
endChunk();
startNewChunk();
while ((len = movieStream.read(buffer)) > 0) {
filter.write(buffer, 0, len);
}
}
endChunk();
filter.flush();
endLastChunk();
} catch (Exception e) {
log.error("uh?", e);
movieDb.delete(dbKey);
} finally {
synchronized (MovieCoder.this) {
currentEncodings.remove(key(file, targetSize));
}
notifyListeners(chunkNo, true);
}
}
@Override
public void writeHeader(byte[]... data) {
int len = 0;
for (byte[] ba : data) {
len += ba.length;
}
Chunk chunk0 = new Chunk(len, 0);
int i = 0;
for (byte[] ba : data) {
System.arraycopy(ba, 0, chunk0.bits, i, ba.length);
i += ba.length;
}
movieDb.store(dbKey, 0, chunk0);
notifyListeners(0, false);
}
private void notifyListeners(int chunkNo, boolean end) {
for (EncodingProcessListener listener : listeners) {
listener.codingFinished(chunkNo - 1);
if (end) {
listener.codingFinished(chunkNo);
} else {
listener.chunkAvailable(chunkNo);
}
}
}
@Override
public void writeBody(byte[] data) {
writeBody(data, 0, data.length);
}
public synchronized void addListener(VideoStreamer videoStreamer) {
private void writeBody(byte[] data, int offset, int len) {
int remainingInput = len;
int inputPos = offset;
while (remainingInput > 0) {
if (currentChunk == null) {
startNewChunk();
}
int copyLen = Math.min(remainingCapacity, remainingInput);
System.arraycopy(data, inputPos, currentChunk.bits, chunkPos, copyLen);
chunkPos += copyLen;
remainingCapacity -= copyLen;
remainingInput -= copyLen;
if (remainingCapacity == 0) {
endChunk();
}
}
}
private void startNewChunk() {
this.chunkNo++;
this.currentChunk = new Chunk(chunkSize, 0);
this.chunkPos = 0;
this.remainingCapacity = chunkSize;
}
private void endChunk() {
movieDb.store(dbKey, chunkNo, currentChunk);
currentChunk = null;
notifyListeners(chunkNo, false);
}
private void endLastChunk() {
if (currentChunk == null) {
return;
}
// reallocate
Chunk last = new Chunk(chunkPos, 0);
System.arraycopy(currentChunk.bits, 0, last.bits, 0, chunkPos);
movieDb.store(key(file, targetSize), chunkNo, last);
currentChunk = null;
notifyListeners(chunkNo, true);
}
public synchronized void addListener(EncodingProcessListener videoStreamer) {
listeners.add(videoStreamer);
}
@ -317,7 +339,7 @@ public class MovieCoder {
LineNumberReader lnr = new LineNumberReader(new InputStreamReader(is));
String line;
while ((line = lnr.readLine()) != null) {
log.warn(line);
log.info(line);
}
} catch (Exception e) {
log.error("stderr?", e);
@ -330,63 +352,69 @@ public class MovieCoder {
// range requests etcetera
public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException {
new VideoStreamer().stream(file, thumbnail, size, out);
grabStream(file, thumbnail, size).stream(out);
}
private String key(File file, Dimension size) {
private static String key(File file, Dimension size) {
return file.getPath() + ":" + size.getWidth();
}
synchronized VideoStreamer grabStream(File file, Thumbnail thumbnail, String size) {
Dimension targetSize = thumbnail.getSize().scale(size);
String key = key(file, targetSize);
// See if we have (at least the beginning of) the file in the DB
// and whether file is currently in the process of being encoded
EncodingProcess ep = currentEncodings.get(key);
Chunk chunk = movieDb.load(key, 0);
// If neither we need to start the encoding process
if (chunk == null && ep == null) {
ep = submitEncodingJob(file, thumbnail, targetSize, key);
new Thread(ep).start();
}
return new VideoStreamer(key, ep, chunk);
}
class VideoStreamer implements EncodingProcessListener {
int chunkNo = 0;
int lastChunkNo = -1;
private int chunkNo = 0;
private int lastChunkNo = -1;
private EncodingProcess ep;
private Chunk chunk;
private String key;
private VideoStreamer(String key, EncodingProcess ep, Chunk chunk0) {
this.key = key;
this.ep = ep;
this.chunk = chunk0;
if (ep != null) {
ep.addListener(this);
}
// Range requests can hook in here
// if we have chunk metadata in chunk0 we can use that to compute the first
// chunk we want and set this.chunkNo accordingly. Otherwise (not likely
// to happen in practice) we just have to wait for each chunk to become
// available and count the number of bytes as we skip them.
//
// The end of the range is also a matter of counting the bytes.
}
private boolean done() {
return lastChunkNo >= 0 && chunkNo > lastChunkNo;
}
public void stream(File file, Thumbnail thumbnail, String size, OutputStream out) throws IOException, InterruptedException {
public void stream(OutputStream out) throws IOException, InterruptedException {
Dimension targetSize = thumbnail.getSize().scale(size);
String key = key(file, targetSize);
log.info("being asked to stream " + file + " size=" + size);
EncodingProcess ep;
Chunk chunk;
synchronized (MovieCoder.this) {
// See if we have (at least the beginning of) the file in the DB
chunk = movieDb.load(key, chunkNo);
// See if file is currently in the process of being encoded
ep = currentEncodings.get(key);
// If neither we need to start the encoding process
if (chunk == null && ep == null) {
ep = new EncodingProcess(file, thumbnail, targetSize);
currentEncodings.put(key, ep);
ep.addListener(this);
} else if (chunk == null && ep != null) {
// Encoding started but no chunk yet written
ep.addListener(this);
} else if (chunk != null && ep == null) {
// Encoding done, just stream from DB
// Here we do not add ourselves as a listener, so we also don't wait down below
// On the other hand we should have updated metadata about the chunks in chunk 0
// so we should use that to set lastChunkNo
} else {
// Encoding partially done, stream what we have and wait for the rest to become available
ep.addListener(this);
}
}
while (!done()) {
while (chunk != null) {
out.write(chunk.bits);
chunk = movieDb.load(key, chunkNo++);
chunk = movieDb.load(key, ++chunkNo);
}
if (!done()) {
if (ep != null) {
@ -404,6 +432,7 @@ public class MovieCoder {
}
}
@Override
public synchronized void chunkAvailable(int chunkNo) {
notify();
@ -412,6 +441,7 @@ public class MovieCoder {
@Override
public synchronized void codingFinished(int lastCunkNo) {
this.lastChunkNo = lastCunkNo;
ep = null;
notify();
}
}

View file

@ -346,9 +346,10 @@ public class AlbumServlet
res.setStatus(HttpServletResponse.SC_OK);
res.setDateHeader("Last-Modified", entry.getPath().lastModified());
res.setDateHeader("Expires", System.currentTimeMillis() + (30 * 24 * 3600 * 1000L)); // 30 days
res.setContentType("application/octet-stream");
res.setContentType("video/x-flv");
movieCoder.stream(entry.getPath(), entry.getThumbnail(), size, res.getOutputStream());
} catch (Exception ex) {
log.error("darn", ex);
throw new RuntimeException("sadness", ex);
}
}

View file

@ -1,6 +1,5 @@
package org.forkalsrud.album.video;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -10,14 +9,32 @@ import org.apache.commons.io.IOUtils;
public class FlvFilterTest extends TestCase {
private int bodyLen = 0;
public void testWrite() throws IOException {
final StringBuilder buf = new StringBuilder();
InputStream is = getClass().getResourceAsStream("/VideoAd.flv");
FlvFilter os = new FlvFilter();
FlvFilter.FlvReceiver receiver = new FlvFilter.FlvReceiver() {
@Override
public void writeHeader(byte[]... data) {
int len = 0;
for (byte[] ba : data) {
len += ba.length;
}
buf.append("HEADER[" + len + "]BODY[" + bodyLen + "]");
}
@Override
public void writeBody(byte[] data) {
bodyLen += data.length;
}
};
FlvFilter os = new FlvFilter(receiver, new FlvMetadata());
IOUtils.copy(is, os);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
os.generateHeader(baos);
assertEquals(579, baos.size());
os.generateHeader();
assertEquals("HEADER[252]BODY[0]HEADER[579]BODY[911684]", buf.toString());
}
}

View file

@ -0,0 +1,22 @@
package org.forkalsrud.album.video;
import org.forkalsrud.album.exif.Dimension;
import junit.framework.TestCase;
public class FlvMetadataTest extends TestCase {
public void testMerge() throws Exception {
FlvMetadata md1 = new FlvMetadata();
md1.setDuration("3.14");
md1.setDimension(new Dimension(5, 6));
FlvMetadata md2 = new FlvMetadata();
md2.merge(md1);
assertEquals(Double.valueOf("3.14"), md2.get("duration"));
assertEquals(Double.valueOf("5"), md2.get("width"));
assertEquals(Double.valueOf("6"), md2.get("height"));
}
}