View Javadoc

1   package org.musicontroller.streaming;
2   
3   import java.io.FilterOutputStream;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   
7   import org.apache.log4j.Logger;
8   import org.farng.mp3.InvalidTagException;
9   import org.farng.mp3.MP3File;
10  import org.farng.mp3.TagNotFoundException;
11  
12  import edu.emory.mathcs.backport.java.util.Arrays;
13  
14  /**
15   * An MpegOutputStream filters all (meta)data from a stream, so that a clean
16   * stream of Mpeg-frames remains. This class tries to achieve this by detecting
17   * and decoding mp3-frame-headers in the stream. Correct headers and the
18   * subsequent data are let through, whereas incorrect data is filtered. This
19   * method has a downside: metadata in a mp3-file could in theory contain data
20   * which looks like a valid mp3-frame-header too.
21   * The solution is to not only read mp3-frame-headers, but metadata-headers as
22   * well. If we encounter an ID3-header, we just skip the following data.
23   * <ul>
24   * <li>For ID3v1: after "TAG", 125 bytes of metadata follow.</li>
25   * <li>For ID3v2: after "ID3xyfssss", 4*ssss bytes of metadata follow.</li>
26   * </ul>
27  
28   * @author Varienaja
29   */
30  public class MpegOutputStream extends FilterOutputStream {
31  	
32  	private static final Logger log = Logger.getLogger(MpegOutputStream.class);
33  	
34  	private final static int BUFFERSIZE = 4096;
35  	/**
36  	 * The amount of milliseconds the stream should be inspected to detect
37  	 * possible bandwidth-problems.
38  	 */
39  	private final static int LOWBANDWIDTHDETECTIONLENGTH = 10000;
40  	/**
41  	 * The amount of samples to take to detect possible bandwidth-problems.
42  	 */
43  	private final static int LOWBANDWIDTHSAMPLEDEPTH = 10;
44  	
45  	/**
46  	 * The interval, after which we have to take a sample. An Mpeg frame contains
47  	 * 26 ms of sound.
48  	 */
49  	private final static int FRAMEINTERVAL = LOWBANDWIDTHDETECTIONLENGTH / 26 / LOWBANDWIDTHSAMPLEDEPTH;
50  	
51  	/**
52  	 * The amount of lag (in ms) we must experience in order to go into
53  	 * 'panic-mode'. Use negative values!
54  	 */
55  	private final static int MAXLAG = -5000;
56  	
57  	private int count;
58  
59  	/**
60  	 * When we found sync, we use this buffer to store a temporary copy of the
61  	 * mpeg frame. This buffer contains at most one mpeg frame. 
62  	 */
63  	private byte[] testframe;
64  	private State state;
65  	private int[] framelength;
66  	private byte second;
67  	private byte third;
68  	private byte fourth;
69  	private int totalskip;
70  	private int _todiscard;
71  	private byte vMajor;
72  	private byte vMinor;
73  	private byte flags;
74  	private byte[] metadatasize;
75  	/**
76  	 * Holds an integer-bytecount per bitrate. This can be used to calculate
77  	 * the length of the bytes that were streamed; The bitrates that are held
78  	 * are: 32,40,48,56,64,80,96,112,128,160,192,224,256,320kbs.
79  	 */
80  	private int[] _bitratesStreamed;
81  	private long _byteswritten;
82  	
83  	private long _probeTime;
84  	private int _probeLength;
85  	/**
86  	 * The amount of MPEG frames that have been flushed during a certain period
87  	 * of time. A frame contains 26ms of sound.
88  	 */
89  	private int frameCounter;
90  	
91  	/**
92  	 * A cyclic Array, containing the differences between time-streamed (length
93  	 * of the streamed content) and stream-time (time used to stream the said
94  	 * content).
95  	 */
96  	private int _streamDeltas[];
97  	/**
98  	 * Index to the _streamDeltas array.
99  	 */
100 	private int _streamDeltaIndex;
101 
102 	/**
103 	 * All states we can encounter while trying to sync on a MP3 frame-header.
104 	 */
105 	private enum State {
106 		WAITING_FOR_SYNC,
107 		FIRST_SYNC_BYTE_FOUND,
108 		SECOND_SYNC_BYTE_FOUND,
109 		THIRD_SYNC_BYTE_FOUND,
110 		SYNCHRONIZED,
111 		
112 		ID3_I,
113 		ID3_D,
114 		ID3_3,
115 		ID3_vMajor,
116 		ID3_vMinor,
117 		ID3_flags,
118 		ID3_s1,
119 		ID3_s2,
120 		ID3_s3,
121 		ID3_s4,
122 		
123 		TAG_T,
124 		TAG_A,
125 		TAG_G,
126 		
127 		DISCARDING;
128 	}
129 	
130 	/**
131 	 * Creates a new MpegOutputStream.
132 	 * @param out The OutputStream to filter.
133 	 */
134 	public MpegOutputStream(OutputStream out) {
135 		super(out);
136 		//The maximum theoretical length of such a frame is 1440 bytes(?).
137 		//To be sure, I use a few extra bytes here.
138 		testframe = new byte[BUFFERSIZE];
139 		metadatasize = new byte[4];
140 		reset();
141 		log.debug("New MpegOutputStream created");
142 	}
143 	
144 	/**
145 	 * <p>Resets the internal state of the MpegOutputStream. This is
146 	 * particularly useful when skipping songs. In the case of skipping, the
147 	 * stream is cut off at an arbitrary position, causing the mpeg-stream to
148 	 * become distorted.</p>
149 	 * <p>This method also sets the seconds-counter back to 0.</p>
150 	 * <p>When this class is busy copying an mpeg-frame which is only partly
151 	 * handed over, the output can still contain pops. The new frame would come
152 	 * too early, causing bogus-data to be played as music. A call to this reset
153 	 * method invalidates the current mpeg frame, and therefore rules out
154 	 * another source of nasty popping sounds.</p>
155 	 */
156 	public synchronized void reset() {
157 		if (count>0) {
158 			log.debug("Incomplete mpeg frame, skipped " + count + " bytes.");
159 			count = 0;
160 		}
161 		totalskip = 0;
162 		_bitratesStreamed = new int[]{0,0,0,0,0,0,0,0,0,0,0,0,0,0};
163 		_streamDeltas = new int[LOWBANDWIDTHSAMPLEDEPTH];
164 		_streamDeltaIndex = 0;
165 		state = State.WAITING_FOR_SYNC;
166 		_byteswritten = 0L;
167 		_probeTime = System.currentTimeMillis();
168 		_probeLength = 0;
169 		frameCounter = 0;
170 	}
171 	
172 	/*
173 	 * (non-Javadoc)
174 	 * @see java.io.FilterOutputStream#write(int)
175 	 */
176 	public synchronized void write(int b) throws IOException {
177 		byte[] buf = new byte[1];
178 		buf[0] = (byte) b;
179 		write(buf,0,1);
180 	}
181 	/**
182 	 * Disregards bytes until a valid Mpeg-header is found. Calculates the expected frame-length.
183 	 * When the full header + frame is received, the full frame is handed over
184 	 * to the delegating OutputStream. Only correct MP3-frames are written, all other information
185 	 * is disregarded.
186 	 */
187 	public synchronized void write(byte[] b, int off, int len) throws IOException {
188 		if (len==0) return;
189 		int endbyte = len+off;
190 		
191 		if (state==State.WAITING_FOR_SYNC) { //disregard bytes until 0xFF, 'I' or 'T' found
192 			int skip=0;
193 			while (b[off+skip]!=(byte) 0xFF && b[off+skip]!='I' && b[off+skip]!='T' && skip<len-1) {
194 				skip++;
195 			}
196 
197 			totalskip+=skip;
198 			off+=skip;
199 			len-=skip; //new situation
200 			if (b[off]==(byte) 0xFF) {
201 				state = State.FIRST_SYNC_BYTE_FOUND;
202 				len--;
203 				off++;
204 			} else if (b[off]=='T') {
205 				state = State.TAG_T;
206 				len--;
207 				off++;
208 			} else if (b[off]=='I') {
209 				state = State.ID3_I;
210 				len--;
211 				off++;
212 			}
213 		}
214 		
215 		if (state==State.TAG_T) {
216 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
217 				if (b[off]=='A') {
218 					state = State.TAG_A;
219 					len--;
220 					off++;
221 				} else {
222 					state = State.WAITING_FOR_SYNC;
223 					totalskip+=2; //the last two bytes had to be skipped after all.
224 					write(b,off,len); //try to sync again...
225 					return;
226 				}
227 			}
228 		}
229 		if (state==State.TAG_A) {
230 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
231 				if (b[off]=='G') {
232 					log.debug("ID3v1 tag found, skipping 128 bytes.");
233 					state = State.DISCARDING;
234 					len--;
235 					off++;
236 					_todiscard = 125;
237 				} else {
238 					state = State.WAITING_FOR_SYNC;
239 					totalskip+=3; //the last two bytes had to be skipped after all.
240 					write(b,off,len); //try to sync again...
241 					return;
242 				}
243 			}
244 		}
245 		
246 		if (state==State.ID3_I) {
247 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
248 				if (b[off]=='D') {
249 					state = State.ID3_D;
250 					len--;
251 					off++;
252 				} else {
253 					state = State.WAITING_FOR_SYNC;
254 					totalskip+=2; //the last two bytes had to be skipped after all.
255 					write(b,off,len); //try to sync again...
256 					return;
257 				}
258 			}
259 		}
260 		if (state==State.ID3_D) {
261 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
262 				if (b[off]=='3') {
263 					state = State.ID3_3;
264 					len--;
265 					off++;
266 				} else {
267 					state = State.WAITING_FOR_SYNC;
268 					totalskip+=3; //the last two bytes had to be skipped after all.
269 					write(b,off,len); //try to sync again...
270 					return;
271 				}
272 			}
273 		}
274 		if (state==State.ID3_3) {
275 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
276 				vMajor = b[off];
277 				state = State.ID3_vMajor;
278 				len--;
279 				off++;
280 			}
281 		}
282 		if (state==State.ID3_vMajor) {
283 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
284 				vMinor = b[off];
285 				state = State.ID3_vMinor;
286 				len--;
287 				off++;
288 			}
289 		}
290 		if (state==State.ID3_vMinor) {
291 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
292 				flags = b[off];
293 				state = State.ID3_flags;
294 				len--;
295 				off++;
296 			}
297 		}
298 		if (state==State.ID3_flags) {
299 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
300 				metadatasize[0] = b[off];
301 				state = State.ID3_s1;
302 				len--;
303 				off++;
304 			}
305 		}
306 		if (state==State.ID3_s1) {
307 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
308 				metadatasize[1] = b[off];
309 				state = State.ID3_s2;
310 				len--;
311 				off++;
312 			}
313 		}
314 		if (state==State.ID3_s2) {
315 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
316 				metadatasize[2] = b[off];
317 				state = State.ID3_s3;
318 				len--;
319 				off++;
320 			}
321 		}
322 		if (state==State.ID3_s3) {
323 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
324 				metadatasize[3] = b[off];
325 				state = State.DISCARDING;
326 				len--;
327 				off++;
328 				_todiscard = decodeSyncsafeInteger(metadatasize);
329 				if (footerBitSet(flags)) {
330 					_todiscard+=10;
331 				}
332 				log.debug("ID3v2."+vMajor+"."+vMinor+" tag found, skipping " + _todiscard + " bytes.");
333 			}
334 		}
335 		if (state==State.DISCARDING) {
336 			while (endbyte-off>0 && _todiscard>0) { //If there are bytes to read in the buffer b
337 				len--;
338 				off++;
339 				_todiscard--;
340 			}
341 			if (_todiscard==0) {
342 				state = State.WAITING_FOR_SYNC;
343 				write(b,off,len); //try to sync again...
344 				return;
345 			}
346 		}
347 		
348 		if (state==State.FIRST_SYNC_BYTE_FOUND) {
349 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
350 				if ((b[off] & (byte) 0xE0) == (byte) 0xE0) {
351 					second = b[off];
352 					state = State.SECOND_SYNC_BYTE_FOUND;
353 					len--;
354 					off++;
355 				} else {
356 					state = State.WAITING_FOR_SYNC;
357 					totalskip+=2; //the last two bytes had to be skipped after all.
358 					write(b,off,len); //try to sync again...
359 					return;
360 				}
361 			}
362 		}
363 		if (state==State.SECOND_SYNC_BYTE_FOUND) {
364 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
365 				third = b[off];
366 				state = State.THIRD_SYNC_BYTE_FOUND;
367 				len--;
368 				off++;
369 			}
370 		}
371 		if (state==State.THIRD_SYNC_BYTE_FOUND) {
372 			if (endbyte-off>0) { //If there are bytes to read in the buffer b
373 				fourth = b[off];
374 				state = State.SYNCHRONIZED;
375 				len--;
376 				off++;
377 				
378 				testframe[0] = (byte) 0xFF;
379 				testframe[1] = second;
380 				testframe[2] = third;
381 				testframe[3] = fourth;
382 				count=4;
383 				try {
384 					framelength = MP3File.readFrameHeaderCalcLength(testframe);
385 					if (framelength[0]==0) {
386 						state=State.WAITING_FOR_SYNC;
387 						write(b,off,len);
388 						return;
389 			        }
390 			        updateBitrateArray();
391 				} catch (InvalidTagException e) { //We did not have a sync after all, start seeking again.
392 					state=State.WAITING_FOR_SYNC;
393 					totalskip+=4; //the last four bytes had to be skipped after all.
394 					write(b,off,len);
395 					return;
396 				} catch (TagNotFoundException e) {
397 					state=State.WAITING_FOR_SYNC;
398 					totalskip+=4; //the last two bytes had to be skipped after all.
399 					write(b,off,len);
400 					return;
401 				}
402 			}
403 		}
404 		
405 		if (state==State.SYNCHRONIZED) { //accept framelength bytes, flush and go to WAITING_FOR_SYNC again
406 			//Now that we finally know that we're synced again, log how many bytes we skipped:
407 			if (totalskip>0) {
408 				log.debug("Skipped "+totalskip+" bytes of non-Mpeg information.");
409 				totalskip=0;
410 			}
411 			if (len>framelength[0]-count) {
412 				System.arraycopy(b,off,testframe,count,framelength[0]-count);
413 				int c = framelength[0]-count;
414 				count=framelength[0];
415 				flushBuffer();
416 				state = State.WAITING_FOR_SYNC;
417 				//frame is completely written, waiting for new one
418 				write(b,off+c,len-c);
419 				return;
420 			} else {
421 				System.arraycopy(b,off,testframe,count,len);
422 				count+=len;
423 				//partial frame written, catch rest of frame in the next call of write()
424 			}
425 		}
426 	}
427 
428 	/**
429 	 * Updates the bitrate array. The last-read frame is used to update the
430 	 * contents of the array.
431 	 */
432 	private void updateBitrateArray() {
433 		switch (framelength[1]) {
434 		case (32) : _bitratesStreamed[0]+=framelength[0]; break;
435 		case (40) : _bitratesStreamed[1]+=framelength[0]; break;
436 		case (48) : _bitratesStreamed[2]+=framelength[0]; break;
437 		case (56) : _bitratesStreamed[3]+=framelength[0]; break;
438 		case (64) : _bitratesStreamed[4]+=framelength[0]; break;
439 		case (80) : _bitratesStreamed[5]+=framelength[0]; break;
440 		case (96) : _bitratesStreamed[6]+=framelength[0]; break;
441 		case (112) : _bitratesStreamed[7]+=framelength[0]; break;
442 		case (128) : _bitratesStreamed[8]+=framelength[0]; break;
443 		case (160) : _bitratesStreamed[9]+=framelength[0]; break;
444 		case (192) : _bitratesStreamed[10]+=framelength[0]; break;
445 		case (224) : _bitratesStreamed[11]+=framelength[0]; break;
446 		case (256) : _bitratesStreamed[12]+=framelength[0]; break;
447 		case (320) : _bitratesStreamed[13]+=framelength[0]; break;
448 		}
449 	}
450 	
451 	/**
452 	 * Checks whether byte 4 is set in the flags.
453 	 * @param flags2 The flags
454 	 * @return True, when byte 4 is set.
455 	 */
456 	private boolean footerBitSet(byte flags) {
457 		return (flags & (byte) 0x10) == 0x10;
458 	}
459 
460 	/**
461 	 * Decodes 4 bytes, containing a syncsafe integer. (See 
462 	 * http://www.id3.org/id3v2.4.0-structure section 6.2)
463 	 * NOTE: This method does no checking on the size of the array! Make sure
464 	 * you pass 4 bytes, or you'll get an ArrayOutOfBoundsException.
465 	 * @param buffer The bytearray, holding the syncsafe integer.
466 	 * @return The value as a normal int value.
467 	 */
468 	private int decodeSyncsafeInteger(byte[] buffer) {
469 		return (buffer[0] << 21) + (buffer[1] << 14) + (buffer[2] << 7) + buffer[3];
470 	}
471 
472 	/*
473 	 * Flushes all buffered data, but makes sure only complete Mpeg frames are
474 	 * flushed.
475 	 * (non-Javadoc)
476 	 * @see java.io.FilterOutputStream#flush()
477 	 */
478 	public synchronized void flush() throws IOException {
479 		flushBuffer();
480 		out.flush();
481 	}
482 	
483 	/**
484 	 * Writes the contents of the internal buffer to the OutputStream. This
485 	 * method only has effect when there is a complete Mpeg frame in the buffer.
486 	 * @throws IOException if an I/O error occurs.
487 	 */
488 	private void flushBuffer() throws IOException {
489 		if (count==framelength[0]) { //Only flush complete frames!
490 			out.write(testframe,0,framelength[0]);
491 			_byteswritten += count;
492 			count = 0;
493 			
494 			frameCounter++;
495 			if (frameCounter>FRAMEINTERVAL) {
496 				frameCounter = 0;
497 				int streamed = getStreamedLengthInMillis();
498 				int deltaTimeStreamed = streamed - _probeLength;
499 				
500 				long curr = System.currentTimeMillis();
501 				long deltaTime = curr - _probeTime;
502 				if (log.isDebugEnabled()) {
503 					log.debug("Streamed "+ deltaTimeStreamed + "ms of mpeg in " + deltaTime + "ms (" + 8*_byteswritten/deltaTimeStreamed + "kbs)");
504 				}
505 				
506 				int difference = deltaTimeStreamed - (int) deltaTime;
507 				if (difference < -30000) { 
508 					//More than 30 seconds difference: the client probably has
509 					//paused the stream. ignore this measurement
510 					log.info("Huge difference in stream-time vs time streamed: " + difference + "ms. Stream was probably paused.");
511 				} else {
512 					int oldsum = 0;
513 					for (int i : _streamDeltas) {
514 						oldsum += i;
515 					}
516 					int newsum = oldsum - _streamDeltas[_streamDeltaIndex] + difference;
517 					_streamDeltas[_streamDeltaIndex] = difference;
518 					if (++_streamDeltaIndex >= _streamDeltas.length) {
519 						_streamDeltaIndex = 0;
520 					}
521 					if ((oldsum > newsum) && (newsum < MAXLAG)) { 
522 						//We're experiencing more lag than before AND more than our maximum.
523 						//Enter panic mode.
524 						log.info("Low bandwidth! Lagging " +newsum+ "ms. " + Arrays.toString(_streamDeltas) + " Transcode to lower bitrate?");
525 					} else {
526 						//Leave panic mode.
527 					}
528 				}
529 				
530 				_probeTime = curr;
531 				_probeLength = streamed;
532 				_byteswritten = 0;
533 			}
534 		}
535 	}
536 	
537 	/**
538 	 * Gets the playtime.
539 	 * @return The amount of milliseconds that were transported through this object
540 	 * since the last reset.
541 	 */
542 	public int getStreamedLengthInMillis() {
543 		int length = 0;
544 		
545 		length+=_bitratesStreamed[0]/4; // 32kbit/s --> 2 bytes/ms
546 		length+=_bitratesStreamed[1]/5; // 40kbit
547 		length+=_bitratesStreamed[2]/5; // 48kbit
548 		length+=_bitratesStreamed[3]/7; // 56kbit
549 		length+=_bitratesStreamed[4]/8; // 64kbit
550 		length+=_bitratesStreamed[5]/10; // 80kbit
551 		length+=_bitratesStreamed[6]/12; // 96kbit
552 		length+=_bitratesStreamed[7]/14; // 112kbit
553 		length+=_bitratesStreamed[8]/16; // 128kbit
554 		length+=_bitratesStreamed[9]/20; // 160kbit
555 		length+=_bitratesStreamed[10]/24; // 192kbit
556 		length+=_bitratesStreamed[11]/28; // 224 kbit
557 		length+=_bitratesStreamed[12]/32; // 256 kbit
558 		length+=_bitratesStreamed[13]/40; // 320 kbit;
559 
560 		return length;
561 	}
562 	
563 	/**
564 	 * Gets the number of bytes streamed.
565 	 * @return The number of bytes streamed since the last reset.
566 	 */
567 	public long getBytesStreamed() {
568 		return _byteswritten;
569 	}
570 	
571 }