I’ve developed a distributed ffmpeg google app engine solution.
The solution uses a combination of publish subscribe and redis queues for distributed communication.
https://github.com/dmzoneill/appengine-ffmpeg
Its composed of 2 services which scale horizontally (default and worker).
Coordinator (default service and human interface)
#!/usr/bin/python
from flask import Flask
import os
import sys
import glob
import string
import random
import redis
import logging
from gcloud import storage, pubsub
from google.cloud import logging
PROJECT_ID = 'transcode-159215'
TOPIC = 'projects/{}/topics/message'.format(PROJECT_ID)
logclient = logging.Client()
logger = logclient.logger( "ffmpeg-pool" )
app = Flask(__name__)
app.config[ "SECRET_KEY" ] = "test"
app.debug = True
def publish( msg ):
pubsub_client = pubsub.Client( PROJECT_ID )
topic = pubsub_client.topic( "ffmpeg-pool" )
if not topic.exists():
topic.create()
topic.publish( msg )
@app.route( "/readlog" )
def readLog():
msg = ""
try:
for entry in logger.list_entries():
msg = msg + entry.payload + "
"
logger.delete()
except:
msg = ""
return msg
@app.route( "/cleantopic" )
def cleanTopics():
client = pubsub.Client( PROJECT_ID )
topic = client.topic( "ffmpeg-pool" )
topic.delete()
topic.create()
return "Cleaned topic"
@app.route( "/split" )
def split():
publish( "split" )
return "File queued for spliting"
@app.route( "/transcode" )
def transcode():
publish( "transcode" )
return "Job queued for transcoding"
@app.route( "/combine" )
def combine():
publish( "combine" )
return "Job queued for combining"
@app.route( "/" )
def home():
return "/split | /transcode | /combine | /cleantopic | /readlog"
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080, debug=True)
Worker
import os
from gcloud import storage, pubsub, logging
import sys
import socket
import time
import redis
import glob
from google.cloud import logging
logclient = logging.Client()
logger = logclient.logger( "ffmpeg-pool" )
PROJECT_ID = 'transcode-159215'
TOPIC = 'projects/{}/topics/message'.format(PROJECT_ID)
psclient = None
pstopic = None
pssub = None
class RedisQueue(object):
def __init__( self, name, namespace = 'queue' ):
self.__db = redis.Redis( host = "redis-11670.c10.us-east-1-4.ec2.cloud.redislabs.com", port=11670 )
self.key = '%s:%s' %(namespace, name)
def qsize( self ):
return self.__db.llen( self.key )
def empty( self ):
return self.qsize() == 0
def put( self, item ):
self.__db.rpush( self.key, item )
def get( self, block=True, timeout=None ):
if block:
item = self.__db.blpop( self.key, timeout=timeout )
else:
item = self.__db.lpop( self.key )
if item:
item = item[1]
return item
def get_nowait( self ):
return self.get( False )
def download( rfile ):
client = storage.Client( PROJECT_ID )
bucket = client.bucket( PROJECT_ID + ".appspot.com" )
blob = bucket.blob( rfile )
with open( "/tmp/" + rfile, 'w' ) as f:
blob.download_to_file( f )
logger.log_text( "Worker: Downloaded: /tmp/" + rfile )
def upload( rfile ):
client = storage.Client( PROJECT_ID )
bucket = client.bucket( PROJECT_ID + ".appspot.com" )
blob = bucket.blob( rfile )
blob = bucket.blob( rfile )
blob.upload_from_file( open( "/tmp/" + rfile ) )
logger.log_text( "Worker: Uploaded /tmp/" + rfile )
def transcode( rfile ):
download( rfile )
os.system( "rm /tmp/output*" )
ret = os.system( "ffmpeg -i /tmp/" + rfile + " -c:v libx265 -preset medium -crf 28 -c:a aac -b:a 128k -strict -2 /tmp/output-" + rfile + ".mkv" )
if ret:
logger.log_text( "Worker: convert failed : " + rfile + " - " + str( ret ).encode( 'utf-8' ) )
return
upload( "output-" + rfile + ".mkv" )
def split():
rqueue = RedisQueue( "test" )
download( "sample.mp4" )
os.system( "rm -f /tmp/chunk*" )
ret = os.system( "ffmpeg -i /tmp/sample.mp4 -map 0:a -map 0:v -codec copy -f segment -segment_time 10 -segment_format matroska -v error '/tmp/chunk-%03d.orig'" )
if ret:
return "Failed"
for rfile in glob.glob( "/tmp/chunk*" ):
basename = os.path.basename( rfile )
upload( basename )
rqueue.put( basename )
def combine():
client = storage.Client( PROJECT_ID )
bucket = client.bucket( PROJECT_ID + ".appspot.com" )
blobs = bucket.list_blobs()
os.system( "rm /tmp/*" )
names = []
for blob in blobs:
if "output" in blob.name:
names.append( blob.name.encode( 'utf-8' ) )
names.sort()
with open( '/tmp/combine.lst', 'w' ) as f1:
for name in names:
f1.write( "file '/tmp/" + name + "'\n" )
download( name )
logger.log_text( "Worker: created combine list: /tmp/combine.lst" )
ret = os.system( "ffmpeg -f concat -safe 0 -i /tmp/combine.lst -c copy /tmp/combined.mkv" )
if ret:
logger.log_text( "Worker: combine failed: /tmp/combine.mkv - " + str(ret).encode( 'utf-8' ) )
return
upload( "combined.mkv" )
def subscribe():
global psclient, pstopic, pssub
psclient = pubsub.Client( PROJECT_ID )
pstopic = psclient.topic( "ffmpeg-pool" )
if not pstopic.exists():
pstopic.create()
pssub = pstopic.subscription( "ffmpeg-worker-" + socket.gethostname() )
if not pssub.exists():
pssub.create()
def handlemessages():
global psclient, pstopic, pssub
rqueue = RedisQueue( 'test' )
subscribe()
while True:
messages = pssub.pull( return_immediately=False, max_messages=110 )
for ack_id, message in messages:
payload = message.data.encode( 'utf-8' ).replace( u"\u2018", "'" ).replace( u"\u2019", "'" )
logger.log_text( "Worker: Received message: " + payload )
try:
pssub.acknowledge( [ack_id] )
if payload == "combine":
combine()
elif payload == "split":
split()
else:
rfile = rqueue.get()
basename = os.path.basename( rfile )
logger.log_text( "Worker: Redis popped: " + basename )
while basename != "None":
transcode( basename )
rfile = rqueue.get()
basename = os.path.basename( rfile )
logger.log_text( "Worker: Redis popped: " + rfile )
except Exception as e:
logger.log_text( "Worker: Error: " + e.message )
sys.stderr.write( e.message )
subscribe()
time.sleep( 1 )
if __name__ == '__main__':
handlemessages()