The following technique could be seen as something of a game changer in how most modern data scientists work. While it is common to work with structured and unstructured text, it is less common to work on raw binary data the reason being the gap between computer science and data science. Textual processing is limited to a standard set of operations that most will be familiar with, that is, acquiring, parsing and storing, and so on. Instead of restricting ourselves to these operations, we will work directly with audio transforming and enrich the uninformed signal data into informed transcription. In doing this, we enable a new type of data pipeline that is analogous to teaching a computer to hear the voice from audio files.
A second (breakthrough) idea that we encourage here is a shift in thinking around how data scientists engage with Hadoop and big data nowadays. While many still consider these technologies as just yet another database, we want to showcase the vast array of possibilities that become available with these tools. After all, no one laughs at the data scientist who can train a machine to talk to customers or make sense of call center recordings.
The first thing to consider is the audio file format. The .wav
files can be processed pretty much as they are using the AudioSystem
library (from javax.sound
), while an .mp3
would require pre-processing using external codec libraries. If we read a file from an InputStream
, we can create an output byte array containing audio signals as follows:
def readFile(song: String) = { val is = new FileInputStream(song) processSong(is) } def processSong(stream: InputStream): Array[Byte] = { val bufferedIn = new BufferedInputStream(stream) val out = new ByteArrayOutputStream val audioInputStream = AudioSystem.getAudioInputStream(bufferedIn) val format = audioInputStream.getFormat val sizeTmp = Math.rint((format.getFrameRate * format.getFrameSize) / format.getFrameRate) .toInt val size = (sizeTmp + format.getFrameSize) - (sizeTmp % format.getFrameSize) val buffer = new Array[Byte](size) var available = true var totalRead = 0 while (available) { val c = audioInputStream.read(buffer, 0, size) totalRead += c if (c > -1) { out.write(buffer, 0, c) } else { available = false } } audioInputStream.close() out.close() out.toByteArray }
Songs are usually encoded using a sample rate of 44KHz, which, according to the Nyquist theorem, is twice as large as the highest frequency that the human ear can perceive (covering ranges from 20Hz to 20KHz).
For more information about the Nyquist theorem, please visit: http://redwood.berkeley.edu/bruno/npb261/aliasing.pdf.
In order to represent the sound that a human being can hear, we would need around 44,000 samples per seconds, hence 176,400 bytes per second for stereo (two channels). The latter is the following byte frequency:
val format = audioInputStream.getFormat val sampleRate = format.getSampleRate val sizeTmp = Math.rint((format.getFrameRate * format.getFrameSize) / format.getFrameRate) .toInt val size = (sizeTmp + format.getFrameSize) - (sizeTmp % format.getFrameSize) val byteFreq = format.getFrameSize * format.getFrameRate.toInt
Finally, we access the audio signal by processing the output byte array and plotting the first few bytes of our sample data (in this case, Figure 1, shows the Mario Bros theme song). Note the timestamp that can be retrieved using both the byte index and the byte frequency values, like so:
val data: Array[Byte] = processSong(inputStream) val timeDomain: Array[(Double, Int)] = data .zipWithIndex .map { case (b, idx) => (minTime + idx * 1000L / byteFreq.toDouble, b.toInt) }
For convenience, we wrap all these audio characteristics into a case class Audio
(shown in the following snippet) to which we will add additional utility methods as we go along in the chapter:
case class Audio(data: Array[Byte], byteFreq: Int, sampleRate: Float, minTime: Long, id: Int= 0) { def duration: Double = (data.length + 1) * 1000L / byteFreq.toDouble def timeDomain: Array[(Double, Int)] = data .zipWithIndex .map { case (b, idx) => (minTime + idx * 1000L / byteFreq.toDouble, b.toInt) } def findPeak: Float = { val freqDomain = frequencyDomain() freqDomain .sortBy(_._2) .reverse .map(_._1) .head } // Next to come }
Now that we have created functions to extract audio signals from .wav
files (via a FileInputStream
), naturally the next step is to use it to process the remaining records stored on HDFS. As already highlighted in previous chapters, this isn't a difficult task once the logic works on a single record. In fact, Spark comes with a utility to process binary data out of the box, so we simply plug in the following function:
def read(library: String, sc: SparkContext) = { sc.binaryFiles(library) .filter { case (filename, stream) => filename.endsWith(".wav") } .map { case (filename, stream) => val audio = processSong(stream.open()) (filename, audio) } } val audioRDD: RDD[(String, Audio)] = read(library, sc)
We make sure we only send .wav
files to our processor and get a new RDD made of a filename (the song name) and its corresponding Audio
case class (including the extracted audio signal).
The binaryFiles
method of Spark reads a file whole (without splitting it) and outputs an RDD containing both the file path and its corresponding input stream. Therefore, it is advised to work on relatively small files (perhaps just a few megabytes) as it clearly affects memory consumption and hence performance.
Accessing the audio time domain is a great achievement, but sadly it's not much value on its own. However, we can use it to better understand what the signal truly represents, that is, to extract the hidden frequencies it comprises. Naturally, we can convert the time domain signal to a frequency domain using Fourier transform.
You can learn more about Fourier transform at http://www.phys.hawaii.edu/~jgl/p274/fourier_intro_Shatkay.pdf.
As a summary, without going into too much detail or having to tackle the complex equations, the basic assumption that Joseph Fourier makes in his legendary and eponymous formula is that all signals are made of an infinite accumulation of sine waves from different frequencies and phases.
Discrete Fourier transform (DFT) is the summation of different sine waves and can be represented using the following equation:
Although this algorithm is trivial to implement using a brute force approach, it is highly inefficient O(n2) since for each data point n, we have to compute the sum of n exponents. Therefore, a three-minute song would generate (3 x 60 x 176,400)2≈ 1015 number of operations. Instead, Cooley and Tukey contributed a Fast Fourier transform (FFT) using a divide and conquer approach to the DFT that reduces the overall time complexity to O(n.log(n)).
The official paper describing the Cooley and Tukey algorithm can be found online: http://www.ams.org/journals/mcom/1965-19-090/S0025-5718-1965-0178586-1/S0025-5718-1965-0178586-1.pdf
Fortunately for us, there are existing FFT implementations available, and so we will compute the FFT using a Java-based library provided by org.apache.commons.math3
. When using this library, we need only to ensure that our input data is padded with zeros so that the total length is a power of two and can be divided into odd and even sequences:
def fft(): Array[Complex] = { val array = Audio.paddingToPowerOf2(data) val transformer = new FastFourierTransformer( DftNormalization.STANDARD) transformer.transform(array.map(_.toDouble), TransformType.FORWARD) }
This returns an array of Complex
numbers that are made of real and imaginary parts and can be easily converted to a frequency and amplitude (or magnitude) as follows. According to the Nyquist theorem, we only need half the frequencies:
def frequencyDomain(): Array[(Float, Double)] = { val t = fft() t.take(t.length / 2) // Nyquist .zipWithIndex .map { case (c, idx) => val freq = (idx + 1) * sampleRate / t.length val amplitude = sqrt(pow(c.getReal, 2) + pow(c.getImaginary, 2)) val db = 20 * log10(amplitude) (freq, db) } }
Finally, we include these functions in the Audio
case class and plot the frequency domain for the first few seconds of the Mario Bros theme song:
In Figure 2, significant peaks can be seen in the medium-high frequency range (between 4KHz and 7KHz) and we will use these as a fingerprint for the song.
Although more efficient, the FFT is still an expensive operation due to its high memory consumption (remember, a typical three-minute song would have around 3 x 60 x 176,400 points to process). This becomes especially problematic when applied to a large number of data points and so must be taken into consideration for large scale processing.
Instead of looking at the full spectrum, we sample our songs using a time window. In fact, a full FFT wouldn't be of any use anyway since we want to know the time each major frequency was heard. Therefore, we iteratively split each Audio
class into smaller case classes of 20 millisecond samples. This timeframe should be small enough for the purpose of analysis meaning small enough so that the FFT can be computed and dense enough to ensure that sufficient frequencies are extracted to provide an adequate audio fingerprint. The produced chunks of 20 milliseconds will drastically increase the overall size of our RDD:
def sampleByTime(duration: Double = 20.0d, padding: Boolean = true): List[Audio] = { val size = (duration * byteFreq / 1000.0f).toInt sample(size, padding) } def sample(size: Int= math.pow(2, 20).toInt, padding: Boolean = true): List[Audio] = { Audio .sample(data, size, padding) .zipWithIndex .map { case (sampleAudio, idx) => val firstByte = idx * size val firstTime = firstByte * 1000L / byteFreq.toLong Audio( sampleAudio, byteFreq, sampleRate, firstTime, idx ) } } val sampleRDD = audioRDDflatMap { case (song, audio) => audio.sampleByTime() .map { sample => (song, sample) } }
While this is not our primary focus, one could rebuild the full FFT spectrum of the entire signal by recombining samples with inner and outer FFT and applying a twiddle factor https://en.wikipedia.org/wiki/Twiddle_factor. This could be useful when processing large records with a limited amount of available memory.
Now we have multiple samples at regular time intervals, we can extract frequency signatures using FFT. In order to generate a sample signature, instead of using the exact peak (which could be approximate), we try to find the closest note in different frequency bands. This offers an approximation, but in doing so it overcomes any noise issues present in the original signal, as noise interferes with our signatures.
We look at the following frequency bands 20-60 Hz, 60-250Hz, 250-2000Hz, 2-4Kz, and 4-6Kz and find the closest note according to the following frequency reference table. These bands are not random. They correspond to the different ranges of musical instruments (for example, double bass spans between 50 to 200Hz, piccolo from 500 to 5KHz).
Figure 4, shows the first sample of our Mario Bros theme song in a lower frequency band. We can see that the highest magnitude of 43Hz corresponds to the prime octave of note F:
For each sample, we build a hash composed of five letters (such as [E-D#-A-B-B-F]) corresponding to the strongest note (the highest peak) in each of the preceding frequency bands. We consider this hash a fingerprint for that particular 20 milliseconds time window. We then build a new RDD made of hash values as follows (we include a hashing function within our Audio
case class):
def hash: String = { val freqDomain = frequencyDomain() freqDomain.groupBy { case (fq, db) => Audio.getFrequencyBand(fq) }.map { case (bucket, frequencies) => val (dominant, _) = frequencies.map { case (fq, db) => (Audio.findClosestNote(fq), db) }.sortBy { case (note, db) => db }.last (bucket, dominant) }.toList .sortBy(_._1) .map(_._2) .mkString("-") } /* 001 Amadeus Mozart - Requiem (K. 626) E-D#-A-B-B-F 001 Amadeus Mozart - Requiem (K. 626) G#-D-F#-B-B-F 001 Amadeus Mozart - Requiem (K. 626) F#-F#-C-B-C-F 001 Amadeus Mozart - Requiem (K. 626) E-F-F#-B-B-F 001 Amadeus Mozart - Requiem (K. 626) E-F#-C#-B-B-F 001 Amadeus Mozart - Requiem (K. 626) B-E-F-A#-C#-F */
Now we group all song IDs sharing the same hash in order to build an RDD of unique hashes:
case class HashSongsPair( id: String, songs: List[Long] ) val hashRDD = sampleRDD.map { case (id, sample) => (sample.hash, id) } .groupByKey() .map { case (id, songs) => HashSongsPair(id, songs.toList) }
Our assumption is that when a hash is defined in a song at a particular time window, similar songs could potentially share similar hashes, but two songs having all the same hashes (and in order) would be truly identical; one could share part of my DNA, but one having the exact same DNA would be a perfect clone of myself.
If a music aficionado is feeling blessed listening to the concerto in D by Tchaikovsky, can we recommend Pachelbel's Canon in D just because both of them share a musical cadence (that is, common frequencies around D)?
Is it valid (and feasible) to recommend playlists that are only based on certain frequency bands? Surely the frequencies themselves would not be enough to fully describe a song. What about tempo, timbre, or rhythm? Is this model complete enough to accurately represent all the nuances of musical diversity and range? Probably not, but for the purpose of data science, it's worth investigating anyway!
3.143.254.90