1 package org.musicontroller.streaming;
2
3 import java.io.FilterOutputStream;
4 import java.io.IOException;
5 import java.io.OutputStream;
6
7 import org.apache.log4j.Logger;
8 import org.farng.mp3.InvalidTagException;
9 import org.farng.mp3.MP3File;
10 import org.farng.mp3.TagNotFoundException;
11
12 import edu.emory.mathcs.backport.java.util.Arrays;
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public class MpegOutputStream extends FilterOutputStream {
31
32 private static final Logger log = Logger.getLogger(MpegOutputStream.class);
33
34 private final static int BUFFERSIZE = 4096;
35
36
37
38
39 private final static int LOWBANDWIDTHDETECTIONLENGTH = 10000;
40
41
42
43 private final static int LOWBANDWIDTHSAMPLEDEPTH = 10;
44
45
46
47
48
49 private final static int FRAMEINTERVAL = LOWBANDWIDTHDETECTIONLENGTH / 26 / LOWBANDWIDTHSAMPLEDEPTH;
50
51
52
53
54
55 private final static int MAXLAG = -5000;
56
57 private int count;
58
59
60
61
62
63 private byte[] testframe;
64 private State state;
65 private int[] framelength;
66 private byte second;
67 private byte third;
68 private byte fourth;
69 private int totalskip;
70 private int _todiscard;
71 private byte vMajor;
72 private byte vMinor;
73 private byte flags;
74 private byte[] metadatasize;
75
76
77
78
79
80 private int[] _bitratesStreamed;
81 private long _byteswritten;
82
83 private long _probeTime;
84 private int _probeLength;
85
86
87
88
89 private int frameCounter;
90
91
92
93
94
95
96 private int _streamDeltas[];
97
98
99
100 private int _streamDeltaIndex;
101
102
103
104
105 private enum State {
106 WAITING_FOR_SYNC,
107 FIRST_SYNC_BYTE_FOUND,
108 SECOND_SYNC_BYTE_FOUND,
109 THIRD_SYNC_BYTE_FOUND,
110 SYNCHRONIZED,
111
112 ID3_I,
113 ID3_D,
114 ID3_3,
115 ID3_vMajor,
116 ID3_vMinor,
117 ID3_flags,
118 ID3_s1,
119 ID3_s2,
120 ID3_s3,
121 ID3_s4,
122
123 TAG_T,
124 TAG_A,
125 TAG_G,
126
127 DISCARDING;
128 }
129
130
131
132
133
134 public MpegOutputStream(OutputStream out) {
135 super(out);
136
137
138 testframe = new byte[BUFFERSIZE];
139 metadatasize = new byte[4];
140 reset();
141 log.debug("New MpegOutputStream created");
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public synchronized void reset() {
157 if (count>0) {
158 log.debug("Incomplete mpeg frame, skipped " + count + " bytes.");
159 count = 0;
160 }
161 totalskip = 0;
162 _bitratesStreamed = new int[]{0,0,0,0,0,0,0,0,0,0,0,0,0,0};
163 _streamDeltas = new int[LOWBANDWIDTHSAMPLEDEPTH];
164 _streamDeltaIndex = 0;
165 state = State.WAITING_FOR_SYNC;
166 _byteswritten = 0L;
167 _probeTime = System.currentTimeMillis();
168 _probeLength = 0;
169 frameCounter = 0;
170 }
171
172
173
174
175
176 public synchronized void write(int b) throws IOException {
177 byte[] buf = new byte[1];
178 buf[0] = (byte) b;
179 write(buf,0,1);
180 }
181
182
183
184
185
186
187 public synchronized void write(byte[] b, int off, int len) throws IOException {
188 if (len==0) return;
189 int endbyte = len+off;
190
191 if (state==State.WAITING_FOR_SYNC) {
192 int skip=0;
193 while (b[off+skip]!=(byte) 0xFF && b[off+skip]!='I' && b[off+skip]!='T' && skip<len-1) {
194 skip++;
195 }
196
197 totalskip+=skip;
198 off+=skip;
199 len-=skip;
200 if (b[off]==(byte) 0xFF) {
201 state = State.FIRST_SYNC_BYTE_FOUND;
202 len--;
203 off++;
204 } else if (b[off]=='T') {
205 state = State.TAG_T;
206 len--;
207 off++;
208 } else if (b[off]=='I') {
209 state = State.ID3_I;
210 len--;
211 off++;
212 }
213 }
214
215 if (state==State.TAG_T) {
216 if (endbyte-off>0) {
217 if (b[off]=='A') {
218 state = State.TAG_A;
219 len--;
220 off++;
221 } else {
222 state = State.WAITING_FOR_SYNC;
223 totalskip+=2;
224 write(b,off,len);
225 return;
226 }
227 }
228 }
229 if (state==State.TAG_A) {
230 if (endbyte-off>0) {
231 if (b[off]=='G') {
232 log.debug("ID3v1 tag found, skipping 128 bytes.");
233 state = State.DISCARDING;
234 len--;
235 off++;
236 _todiscard = 125;
237 } else {
238 state = State.WAITING_FOR_SYNC;
239 totalskip+=3;
240 write(b,off,len);
241 return;
242 }
243 }
244 }
245
246 if (state==State.ID3_I) {
247 if (endbyte-off>0) {
248 if (b[off]=='D') {
249 state = State.ID3_D;
250 len--;
251 off++;
252 } else {
253 state = State.WAITING_FOR_SYNC;
254 totalskip+=2;
255 write(b,off,len);
256 return;
257 }
258 }
259 }
260 if (state==State.ID3_D) {
261 if (endbyte-off>0) {
262 if (b[off]=='3') {
263 state = State.ID3_3;
264 len--;
265 off++;
266 } else {
267 state = State.WAITING_FOR_SYNC;
268 totalskip+=3;
269 write(b,off,len);
270 return;
271 }
272 }
273 }
274 if (state==State.ID3_3) {
275 if (endbyte-off>0) {
276 vMajor = b[off];
277 state = State.ID3_vMajor;
278 len--;
279 off++;
280 }
281 }
282 if (state==State.ID3_vMajor) {
283 if (endbyte-off>0) {
284 vMinor = b[off];
285 state = State.ID3_vMinor;
286 len--;
287 off++;
288 }
289 }
290 if (state==State.ID3_vMinor) {
291 if (endbyte-off>0) {
292 flags = b[off];
293 state = State.ID3_flags;
294 len--;
295 off++;
296 }
297 }
298 if (state==State.ID3_flags) {
299 if (endbyte-off>0) {
300 metadatasize[0] = b[off];
301 state = State.ID3_s1;
302 len--;
303 off++;
304 }
305 }
306 if (state==State.ID3_s1) {
307 if (endbyte-off>0) {
308 metadatasize[1] = b[off];
309 state = State.ID3_s2;
310 len--;
311 off++;
312 }
313 }
314 if (state==State.ID3_s2) {
315 if (endbyte-off>0) {
316 metadatasize[2] = b[off];
317 state = State.ID3_s3;
318 len--;
319 off++;
320 }
321 }
322 if (state==State.ID3_s3) {
323 if (endbyte-off>0) {
324 metadatasize[3] = b[off];
325 state = State.DISCARDING;
326 len--;
327 off++;
328 _todiscard = decodeSyncsafeInteger(metadatasize);
329 if (footerBitSet(flags)) {
330 _todiscard+=10;
331 }
332 log.debug("ID3v2."+vMajor+"."+vMinor+" tag found, skipping " + _todiscard + " bytes.");
333 }
334 }
335 if (state==State.DISCARDING) {
336 while (endbyte-off>0 && _todiscard>0) {
337 len--;
338 off++;
339 _todiscard--;
340 }
341 if (_todiscard==0) {
342 state = State.WAITING_FOR_SYNC;
343 write(b,off,len);
344 return;
345 }
346 }
347
348 if (state==State.FIRST_SYNC_BYTE_FOUND) {
349 if (endbyte-off>0) {
350 if ((b[off] & (byte) 0xE0) == (byte) 0xE0) {
351 second = b[off];
352 state = State.SECOND_SYNC_BYTE_FOUND;
353 len--;
354 off++;
355 } else {
356 state = State.WAITING_FOR_SYNC;
357 totalskip+=2;
358 write(b,off,len);
359 return;
360 }
361 }
362 }
363 if (state==State.SECOND_SYNC_BYTE_FOUND) {
364 if (endbyte-off>0) {
365 third = b[off];
366 state = State.THIRD_SYNC_BYTE_FOUND;
367 len--;
368 off++;
369 }
370 }
371 if (state==State.THIRD_SYNC_BYTE_FOUND) {
372 if (endbyte-off>0) {
373 fourth = b[off];
374 state = State.SYNCHRONIZED;
375 len--;
376 off++;
377
378 testframe[0] = (byte) 0xFF;
379 testframe[1] = second;
380 testframe[2] = third;
381 testframe[3] = fourth;
382 count=4;
383 try {
384 framelength = MP3File.readFrameHeaderCalcLength(testframe);
385 if (framelength[0]==0) {
386 state=State.WAITING_FOR_SYNC;
387 write(b,off,len);
388 return;
389 }
390 updateBitrateArray();
391 } catch (InvalidTagException e) {
392 state=State.WAITING_FOR_SYNC;
393 totalskip+=4;
394 write(b,off,len);
395 return;
396 } catch (TagNotFoundException e) {
397 state=State.WAITING_FOR_SYNC;
398 totalskip+=4;
399 write(b,off,len);
400 return;
401 }
402 }
403 }
404
405 if (state==State.SYNCHRONIZED) {
406
407 if (totalskip>0) {
408 log.debug("Skipped "+totalskip+" bytes of non-Mpeg information.");
409 totalskip=0;
410 }
411 if (len>framelength[0]-count) {
412 System.arraycopy(b,off,testframe,count,framelength[0]-count);
413 int c = framelength[0]-count;
414 count=framelength[0];
415 flushBuffer();
416 state = State.WAITING_FOR_SYNC;
417
418 write(b,off+c,len-c);
419 return;
420 } else {
421 System.arraycopy(b,off,testframe,count,len);
422 count+=len;
423
424 }
425 }
426 }
427
428
429
430
431
432 private void updateBitrateArray() {
433 switch (framelength[1]) {
434 case (32) : _bitratesStreamed[0]+=framelength[0]; break;
435 case (40) : _bitratesStreamed[1]+=framelength[0]; break;
436 case (48) : _bitratesStreamed[2]+=framelength[0]; break;
437 case (56) : _bitratesStreamed[3]+=framelength[0]; break;
438 case (64) : _bitratesStreamed[4]+=framelength[0]; break;
439 case (80) : _bitratesStreamed[5]+=framelength[0]; break;
440 case (96) : _bitratesStreamed[6]+=framelength[0]; break;
441 case (112) : _bitratesStreamed[7]+=framelength[0]; break;
442 case (128) : _bitratesStreamed[8]+=framelength[0]; break;
443 case (160) : _bitratesStreamed[9]+=framelength[0]; break;
444 case (192) : _bitratesStreamed[10]+=framelength[0]; break;
445 case (224) : _bitratesStreamed[11]+=framelength[0]; break;
446 case (256) : _bitratesStreamed[12]+=framelength[0]; break;
447 case (320) : _bitratesStreamed[13]+=framelength[0]; break;
448 }
449 }
450
451
452
453
454
455
456 private boolean footerBitSet(byte flags) {
457 return (flags & (byte) 0x10) == 0x10;
458 }
459
460
461
462
463
464
465
466
467
468 private int decodeSyncsafeInteger(byte[] buffer) {
469 return (buffer[0] << 21) + (buffer[1] << 14) + (buffer[2] << 7) + buffer[3];
470 }
471
472
473
474
475
476
477
478 public synchronized void flush() throws IOException {
479 flushBuffer();
480 out.flush();
481 }
482
483
484
485
486
487
488 private void flushBuffer() throws IOException {
489 if (count==framelength[0]) {
490 out.write(testframe,0,framelength[0]);
491 _byteswritten += count;
492 count = 0;
493
494 frameCounter++;
495 if (frameCounter>FRAMEINTERVAL) {
496 frameCounter = 0;
497 int streamed = getStreamedLengthInMillis();
498 int deltaTimeStreamed = streamed - _probeLength;
499
500 long curr = System.currentTimeMillis();
501 long deltaTime = curr - _probeTime;
502 if (log.isDebugEnabled()) {
503 log.debug("Streamed "+ deltaTimeStreamed + "ms of mpeg in " + deltaTime + "ms (" + 8*_byteswritten/deltaTimeStreamed + "kbs)");
504 }
505
506 int difference = deltaTimeStreamed - (int) deltaTime;
507 if (difference < -30000) {
508
509
510 log.info("Huge difference in stream-time vs time streamed: " + difference + "ms. Stream was probably paused.");
511 } else {
512 int oldsum = 0;
513 for (int i : _streamDeltas) {
514 oldsum += i;
515 }
516 int newsum = oldsum - _streamDeltas[_streamDeltaIndex] + difference;
517 _streamDeltas[_streamDeltaIndex] = difference;
518 if (++_streamDeltaIndex >= _streamDeltas.length) {
519 _streamDeltaIndex = 0;
520 }
521 if ((oldsum > newsum) && (newsum < MAXLAG)) {
522
523
524 log.info("Low bandwidth! Lagging " +newsum+ "ms. " + Arrays.toString(_streamDeltas) + " Transcode to lower bitrate?");
525 } else {
526
527 }
528 }
529
530 _probeTime = curr;
531 _probeLength = streamed;
532 _byteswritten = 0;
533 }
534 }
535 }
536
537
538
539
540
541
542 public int getStreamedLengthInMillis() {
543 int length = 0;
544
545 length+=_bitratesStreamed[0]/4;
546 length+=_bitratesStreamed[1]/5;
547 length+=_bitratesStreamed[2]/5;
548 length+=_bitratesStreamed[3]/7;
549 length+=_bitratesStreamed[4]/8;
550 length+=_bitratesStreamed[5]/10;
551 length+=_bitratesStreamed[6]/12;
552 length+=_bitratesStreamed[7]/14;
553 length+=_bitratesStreamed[8]/16;
554 length+=_bitratesStreamed[9]/20;
555 length+=_bitratesStreamed[10]/24;
556 length+=_bitratesStreamed[11]/28;
557 length+=_bitratesStreamed[12]/32;
558 length+=_bitratesStreamed[13]/40;
559
560 return length;
561 }
562
563
564
565
566
567 public long getBytesStreamed() {
568 return _byteswritten;
569 }
570
571 }