Skip to content Skip to sidebar Skip to footer

Multiple Named Pipes In Ffmpeg

This question is the follow-up of this question In my application I want to modify various mp3 and then mix them together. I know I could do it with a single command line in FFmpeg

Solution 1:

Using named pipes (Linux only):

Named pipes are required when there are two ore more input streams (that need to be piped from memory buffers). Using named pipes is not trivial at all...

From FFmpeg point of view named pipes are like (non-seekable) input files.

Using named pipes in Python (in Linux): Assume pipe1 is the name of the "named pipe" (e.g. pipe1 = "audio_pipe1").

  1. Create a "named pipe":

    os.mkfifo(pipe1)
    
  2. Open the pipe as "write only" file:

    fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer).
  3. Write the data to the pipe in small chunks. According to this post, the default buffer size of the pipe in most Linux systems is 64KBytes. Because the data is larger than 65536 bytes, we need to write the data to the pipe in small chunks. I decided to use an arbitrary chunk size of 1024 bytes. Pipe writing operation is a "blocking" operation. I solved it by using a "writer" thread:

    defwriter(data, pipe_name, chunk_size):
        # Open the pipes as opening "low level IO" files (open for "open for writing only").
        fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)for i inrange(0, len(data), chunk_size):
            # Write to named pipe as writing to a "low level IO" file (but write the data in small chunks).
            os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe
  4. Close the pipe:

    os.close(fd_pipe)
    
  5. Remove (unlink) the named pipe:

    os.unlink(pipe1)
    

Here is the sample from the previous post, using two named pipes:

import subprocess
import os
from threading import Thread


defcreate_samp():
    # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3# Apply adelay audio filter.# Encode the audio in mp3 format.# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
    sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                              "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
    sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                              "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    return sample1, sample2


defwriter(data, pipe_name, chunk_size):
    # Open the pipes as opening files (open for "open for writing only").
    fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)for i inrange(0, len(data), chunk_size):
        # Write to named pipe as writing to a file (but write the data in small chunks).
        os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe# Closing the pipes as closing files.
    os.close(fd_pipe)


defrecord(samp1, samp2):
    # Names of the "Named pipes"
    pipe1 = "audio_pipe1"
    pipe2 = "audio_pipe2"# Create "named pipes".
    os.mkfifo(pipe1)
    os.mkfifo(pipe2)

    # Open FFmpeg as sub-process# Use two audio input streams:# 1. Named pipe: "audio_pipe1"# 2. Named pipe: "audio_pipe2"# Merge the two audio streams using amix audio filter.# Store the result to output file: output.mp3
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                "-i", pipe1,
                                "-i", pipe2,
                                "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                stdin=subprocess.PIPE)

    # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
    thread1 = Thread(target=writer, args=(samp1, pipe1, 1024))  # thread1 writes samp1 to pipe1
    thread2 = Thread(target=writer, args=(samp2, pipe2, 1024))  # thread2 writes samp2 to pipe2# Start the two threads
    thread1.start()
    thread2.start()

    # Wait for the two writer threads to finish
    thread1.join()
    thread2.join()

    process.wait()  # Wait for FFmpeg sub-process to finish# Remove the "named pipes".
    os.unlink(pipe1)
    os.unlink(pipe2)


sampl1, sampl2 = create_samp()
record(sampl1, sampl2)

Update:

Same solution using a class: Implementing the solution using a class ("NamedPipeWriter" class) is a bit more elegant. The class inherits Thread class, and overrides run method.

You may create a list of multiple objects, and iterate them in a loop, (instead of duplicating the code for each new input stream).

Here is the same solution using a class:

import subprocess
import os
import stat
from threading import Thread


defcreate_samp():
    # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3# Apply adelay audio filter.# Encode the audio in mp3 format.# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
    sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                              "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
    sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                              "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    return sample1, sample2


classNamedPipeWriter(Thread):
    """ Write data (in small chunks) to a named pipe using a thread """def__init__(self, pipe_name, data):
        """ Initialization - get pipe name and data to be written """super().__init__()
        self._pipe_name = pipe_name
        self._chunk_size = 1024
        self._data = data
        

    defrun(self):
        """ Open the pipe, write data in small chunks and close the pipe """
        chunk_size = self._chunk_size
        data = self._data

        # Open the pipes as opening files (open for "open for writing only").
        fd_pipe = os.open(self._pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)for i inrange(0, len(data), chunk_size):
            # Write to named pipe as writing to a file (but write the data in small chunks).
            os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe# Closing the pipes as closing files.
        os.close(fd_pipe)


    

defrecord(samp1, samp2):
    # Names of the "Named pipes"
    pipe1 = "audio_pipe1"
    pipe2 = "audio_pipe2"# Create "named pipes".ifnot stat.S_ISFIFO(os.stat(pipe1).st_mode):
        os.mkfifo(pipe1)  # Create the pipe only if not exist.ifnot stat.S_ISFIFO(os.stat(pipe2).st_mode):
        os.mkfifo(pipe2)

    # Open FFmpeg as sub-process# Use two audio input streams:# 1. Named pipe: "audio_pipe1"# 2. Named pipe: "audio_pipe2"# Merge the two audio streams using amix audio filter.# Store the result to output file: output.mp3
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                "-i", pipe1,
                                "-i", pipe2,
                                "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                stdin=subprocess.PIPE)

    # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
    named_pipe_writer1 = NamedPipeWriter(pipe1, samp1)
    named_pipe_writer2 = NamedPipeWriter(pipe2, samp2)

    # Start the two threads
    named_pipe_writer1.start()
    named_pipe_writer2.start()

    # Wait for the two writer threads to finish
    named_pipe_writer1.join()
    named_pipe_writer1.join()

    process.wait()  # Wait for FFmpeg sub-process to finish# Remove the "named pipes".
    os.unlink(pipe1)
    os.unlink(pipe2)


sampl1, sampl2 = create_samp()
record(sampl1, sampl2)

Notes:

  • The code was tested in Ubuntu 18.04 (in a virtual machine).

Post a Comment for "Multiple Named Pipes In Ffmpeg"