Clean up thread synchronization.

Make sure encoder and streamer use same dimension.
This commit is contained in:
Erik Forkalsrud 2013-01-23 23:57:16 -08:00
parent e65c87ac86
commit 461adb098f
3 changed files with 68 additions and 60 deletions

View file

@ -99,4 +99,11 @@ public class Dimension {
return outd; return outd;
} }
public Dimension even() {
return new Dimension(this.w % 2 > 0 ? this.w + 1 : this.w,
this.h % 2 > 0 ? this.h + 1 : this.h);
}
} }

View file

@ -283,18 +283,18 @@ public class MovieCoder {
private Chunk currentChunk = null; private Chunk currentChunk = null;
private int chunkPos; private int chunkPos;
private int remainingCapacity; private int remainingCapacity;
private int chunkNo = 0; private volatile int chunkInProgress = 0;
private volatile int chunkAvailable = 0;
private FlvFilter filter; private FlvFilter filter;
private String dbKey; private String dbKey;
private long fileTimestamp; private long fileTimestamp;
private int orientation; private int orientation;
private volatile boolean done = false;
public EncodingProcess(File file, Thumbnail thumbnail, Dimension size) { public EncodingProcess(File file, Thumbnail thumbnail, Dimension size) {
this.file = file; this.file = file;
this.fileTimestamp = file.lastModified(); this.fileTimestamp = file.lastModified();
this.targetSize = size; this.targetSize = size.even();
if (targetSize.getWidth() % 2 > 0) targetSize = new Dimension(targetSize.getWidth() + 1, targetSize.getHeight());
if (targetSize.getHeight() % 2 > 0) targetSize = new Dimension(targetSize.getWidth(), targetSize.getHeight() + 1);
this.dbKey = key(file, targetSize); this.dbKey = key(file, targetSize);
FlvMetadata extraMeta = new FlvMetadata(); FlvMetadata extraMeta = new FlvMetadata();
extraMeta.setDuration(thumbnail.getDuration()); extraMeta.setDuration(thumbnail.getDuration());
@ -381,36 +381,54 @@ public class MovieCoder {
// Generate header again to get updated metadata // Generate header again to get updated metadata
// TODO (knut 30 JUL 2011) Add some metadata about the file in the first chunk of the file // TODO (knut 30 JUL 2011) Add some metadata about the file in the first chunk of the file
// size of first chunk, size of last chunk, number number of chunks between first-last, size of chunks between first-last // size of first chunk, size of last chunk, number number of chunks between first-last, size of chunks between first-last
// This will be used to determine the right chunk number from a given byte offset in a range oritented HTTP request // This will be used to determine the right chunk number from a given byte offset in a range oriented HTTP request
filter.generateHeaderChunk(); filter.generateHeaderChunk();
} catch (Exception e) { } catch (Exception e) {
log.error("uh?", e); log.error("uh?", e);
movieDb.delete(dbKey); movieDb.delete(dbKey);
synchronized (this) {
this.chunkAvailable = Integer.MAX_VALUE;
}
} finally { } finally {
synchronized (MovieCoder.this) { synchronized (MovieCoder.this) {
currentEncodings.remove(key(file, targetSize)); currentEncodings.remove(key(file, targetSize));
} }
notifyListeners(chunkNo, true); done = true;
notifyListeners(chunkAvailable);
} }
} }
/**
* @return number of chunks available.
* -Integer.MAX_VALUE if something went very wrong
* -1 * number of chunks available when we're done.
*/
public synchronized int waitForData() {
if (!this.done) {
try {
this.wait(2000L);
} catch (InterruptedException e) {
// ignore
}
}
int numChunks = this.chunkAvailable + 1;
return this.done ? -numChunks : numChunks;
}
private synchronized void notifyListeners(int chunkNo) {
this.chunkAvailable = chunkNo;
this.notifyAll();
}
@Override @Override
public void writeHeader(byte[] data) { public void writeHeader(byte[] data) {
int len = data.length; int len = data.length;
Chunk chunk0 = new Chunk(fileTimestamp, len, 0); Chunk chunk0 = new Chunk(fileTimestamp, len, 0);
chunk0.bits = data; chunk0.bits = data;
log.info("Writing " + dbKey + " seq 0 (" + chunkInProgress + ") " + data.length);
movieDb.store(dbKey, 0, chunk0); movieDb.store(dbKey, 0, chunk0);
notifyListeners(0, false); notifyListeners(chunkInProgress);
}
private void notifyListeners(int chunkNo, boolean end) {
for (EncodingProcessListener listener : listeners) {
if (end) {
listener.codingFinished(chunkNo);
} else {
listener.chunkAvailable(chunkNo);
}
}
} }
@Override @Override
@ -440,7 +458,7 @@ public class MovieCoder {
private void startNewChunk() { private void startNewChunk() {
this.chunkNo++; this.chunkInProgress++;
this.currentChunk = new Chunk(fileTimestamp, chunkSize, 0); this.currentChunk = new Chunk(fileTimestamp, chunkSize, 0);
this.chunkPos = 0; this.chunkPos = 0;
this.remainingCapacity = chunkSize; this.remainingCapacity = chunkSize;
@ -448,10 +466,11 @@ public class MovieCoder {
private void endChunk() { private void endChunk() {
log.info("Receiving chunk " + chunkNo); log.info("store chunk " + chunkInProgress);
movieDb.store(dbKey, chunkNo, currentChunk); movieDb.store(dbKey, chunkInProgress, currentChunk);
log.info("Writing " + dbKey + " seq " + chunkInProgress + " (" + chunkInProgress + ") " + currentChunk.bits.length);
currentChunk = null; currentChunk = null;
notifyListeners(chunkNo, false); notifyListeners(chunkInProgress);
} }
private void endLastChunk() { private void endLastChunk() {
@ -462,16 +481,16 @@ public class MovieCoder {
// reallocate // reallocate
Chunk last = new Chunk(fileTimestamp, chunkPos, 0); Chunk last = new Chunk(fileTimestamp, chunkPos, 0);
System.arraycopy(currentChunk.bits, 0, last.bits, 0, chunkPos); System.arraycopy(currentChunk.bits, 0, last.bits, 0, chunkPos);
movieDb.store(key(file, targetSize), chunkNo, last); movieDb.store(dbKey, chunkInProgress, last);
log.info("Writing " + dbKey + " seq " + chunkInProgress + " (" + chunkInProgress + ") " + last.bits.length);
currentChunk = null; currentChunk = null;
notifyListeners(chunkNo, true);
} }
public synchronized void addListener(EncodingProcessListener videoStreamer) { public synchronized void addListener(EncodingProcessListener videoStreamer) {
listeners.add(videoStreamer); listeners.add(videoStreamer);
} }
public void removeListener(VideoStreamer videoStreamer) { public synchronized void removeListener(VideoStreamer videoStreamer) {
listeners.remove(videoStreamer); listeners.remove(videoStreamer);
} }
} }
@ -522,7 +541,7 @@ public class MovieCoder {
synchronized VideoStreamer grabStream(File file, Thumbnail thumbnail, String size) { synchronized VideoStreamer grabStream(File file, Thumbnail thumbnail, String size) {
Dimension targetSize = thumbnail.getSize().scale(size); Dimension targetSize = thumbnail.getSize().scale(size).even();
String key = key(file, targetSize); String key = key(file, targetSize);
// See if we have (at least the beginning of) the file in the DB // See if we have (at least the beginning of) the file in the DB
@ -547,21 +566,18 @@ public class MovieCoder {
} }
class VideoStreamer implements EncodingProcessListener { class VideoStreamer {
private int chunkNo = 0; private int chunkNo = 0;
private int lastChunkNo = -1;
private EncodingProcess ep; private EncodingProcess ep;
private Chunk chunk; private Chunk chunk;
private String key; private String key;
private boolean done = false;
private VideoStreamer(String key, EncodingProcess ep, Chunk chunk0) { private VideoStreamer(String key, EncodingProcess ep, Chunk chunk0) {
this.key = key; this.key = key;
this.ep = ep; this.ep = ep;
this.chunk = chunk0; this.chunk = chunk0;
if (ep != null) {
ep.addListener(this);
}
// Range requests can hook in here // Range requests can hook in here
// if we have chunk metadata in chunk0 we can use that to compute the first // if we have chunk metadata in chunk0 we can use that to compute the first
// chunk we want and set this.chunkNo accordingly. Otherwise (not likely // chunk we want and set this.chunkNo accordingly. Otherwise (not likely
@ -571,16 +587,12 @@ public class MovieCoder {
// The end of the range is also a matter of counting the bytes. // The end of the range is also a matter of counting the bytes.
} }
private boolean done() {
return lastChunkNo >= 0 && chunkNo > lastChunkNo;
}
public void stream(OutputStream out) throws IOException, InterruptedException { public void stream(OutputStream out) throws IOException, InterruptedException {
while (!done()) { while (!done) {
if (chunk == null) { if (chunk == null) {
log.info("Looking for " + chunkNo); log.info("Looking for " + key + " - " + chunkNo);
chunk = movieDb.load(key, chunkNo); chunk = movieDb.load(key, chunkNo);
} }
if (chunk != null) { if (chunk != null) {
@ -590,35 +602,23 @@ public class MovieCoder {
chunkNo++; chunkNo++;
continue; continue;
} }
if (!done()) { if (!done) {
if (ep != null) { if (ep != null) {
synchronized (this) { int numChunks = ep.waitForData();
wait(2000L); System.out.println("Got " + numChunks);
} if (numChunks < 0) {
} else { this.done = true;
// We ran out of chunks, so we must be done. }
break; } else {
} // We ran out of chunks, so we must be done.
this.done = true;
break;
}
} }
} }
if (ep != null) {
ep.removeListener(this);
}
log.info("done sending"); log.info("done sending");
} }
@Override
public synchronized void chunkAvailable(int chunkNo) {
notify();
}
@Override
public synchronized void codingFinished(int lastCunkNo) {
this.lastChunkNo = lastCunkNo;
ep = null;
notify();
}
} }

View file

@ -466,6 +466,7 @@ public class AlbumServlet
boolean notModified(HttpServletRequest req, File f) { boolean notModified(HttpServletRequest req, File f) {
long reqDate = req.getDateHeader("If-Modified-Since"); long reqDate = req.getDateHeader("If-Modified-Since");
long fDate = f.lastModified(); long fDate = f.lastModified();
System.out.println("reqDate is " + new Date(reqDate) + " fDate is " + new Date(fDate) + " " + req.getHeader("If-Modified-Since"));
return reqDate > 0 && fDate > 0 && fDate <= reqDate; return reqDate > 0 && fDate > 0 && fDate <= reqDate;
} }