Source code for Code.webapp

# standard imports
import json
import logging
import os
from threading import Thread
import xmltodict

# plex debugging
try:
    import plexhints  # noqa: F401
except ImportError:
    pass
else:  # the code is running outside of Plex
    from plexhints.core_kit import Core  # core kit
    from plexhints.log_kit import Log  # log kit
    from plexhints.parse_kit import Plist  # parse kit
    from plexhints.prefs_kit import Prefs  # prefs kit

# lib imports
import flask
from flask import Flask, Response, render_template, request, send_from_directory
from flask_babel import Babel
import polib
import requests
from werkzeug.utils import secure_filename

# local imports
from const import bundle_identifier, plex_base_url, plex_token, plugin_directory, plugin_logs_directory, \
    system_plugins_directory
import plugin_manager

bundle_path = Core.bundle_path
if bundle_path.endswith('test.bundle'):
    # use current directory instead, to allow for testing outside of Plex
    bundle_path = os.getcwd()

# setup flask app
app = Flask(
    import_name=__name__,
    root_path=os.path.join(bundle_path, 'Contents', 'Resources', 'web'),
    static_folder=os.path.join(bundle_path, 'Contents', 'Resources', 'web'),
    template_folder=os.path.join(bundle_path, 'Contents', 'Resources', 'web', 'templates')
    )

# remove extra lines rendered jinja templates
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True

# localization
babel = Babel(
    app=app,
    default_locale='en',
    default_timezone='UTC',
    default_domain='plugger',
    configure_jinja=True
)

app.config['BABEL_TRANSLATION_DIRECTORIES'] = os.path.join(bundle_path, 'Contents', 'Strings')

# setup logging for flask
Log.Info('Adding flask log handlers to plex plugin logger')
# Log.Debug('loggers: %s' % logging.Logger.manager.loggerDict.keys())
# Log.Debug('loggers: %s' % logging.Logger.manager.loggerDict)

# get the plugin logger
plugin_logger = logging.getLogger(bundle_identifier)

# replace the app.logger handlers with the plugin logger handlers
app.logger.handlers = plugin_logger.handlers
app.logger.setLevel(plugin_logger.level)

# test message
app.logger.info('flask app logger test message')

try:
    Prefs['bool_log_werkzeug_messages']
except KeyError:
    # this fails when building docs
    pass
else:
    if Prefs['bool_log_werkzeug_messages']:
        # get the werkzeug logger
        werkzeug_logger = logging.getLogger('werkzeug')

        # replace the werkzeug logger handlers with the plugin logger handlers
        werkzeug_logger.handlers = plugin_logger.handlers

        # use the same log level as the plugin logger
        werkzeug_logger.setLevel(plugin_logger.level)

        # test message
        werkzeug_logger.info('werkzeug logger test message')


# default plex headers
PLEX_HEADERS = {
    'X-Plex-Token': plex_token,
}


# global objects
plugin_directories = [
    plugin_directory,
    system_plugins_directory,
]
plugins = dict()

# mime type map
mime_type_map = {
    'gif': 'image/gif',
    'ico': 'image/vnd.microsoft.icon',
    'jpg': 'image/jpeg',
    'jpeg': 'image/jpeg',
    'png': 'image/png',
    'svg': 'image/svg+xml',
}


[docs]@babel.localeselector def get_locale(): # type: () -> str """ Get the locale from the config. Get the locale specified in the config. This does not need to be called as it is done so automatically by `babel`. Returns ------- str The locale. See Also -------- pyra.locales.get_locale : Use this function instead. Examples -------- >>> get_locale() en """ return Prefs['enum_locale']
def start_server(): # use threading to start the flask app... or else web server seems to be killed after a couple of minutes flask_thread = Thread( target=app.run, kwargs=dict( host=Prefs['str_http_host'], port=Prefs['int_http_port'], debug=False, use_reloader=False # reloader doesn't work when running in a separate thread ) ) # start flask application flask_thread.start() def stop_server(): # stop flask server # todo - this doesn't work request.environ.get('werkzeug.server.shutdown')
[docs]@app.route('/', methods=["GET"]) @app.route('/home', methods=["GET"]) def home(): # type: () -> render_template """ Serve the webapp home page. This page is where most of the functionality for Plugger is provided. Returns ------- render_template The rendered page. Notes ----- The following routes trigger this function. - `/` - `/home` Examples -------- >>> home() """ return render_template('home.html', title='Home')
[docs]@app.route("/<path:img>", methods=["GET"]) def image(img): # type: (str) -> flask.send_from_directory """ Get image from static/images directory. Returns ------- flask.send_from_directory The image. Notes ----- The following routes trigger this function. - `/favicon.ico` Examples -------- >>> image('favicon.ico') """ directory = os.path.join(app.static_folder, 'images') filename = os.path.basename(secure_filename(filename=img)) # sanitize the input if os.path.isfile(os.path.join(directory, filename)): file_extension = filename.rsplit('.', 1)[-1] if file_extension in mime_type_map: return send_from_directory(directory=directory, filename=filename, mimetype=mime_type_map[file_extension]) else: return Response(response='Invalid file type', status=400, mimetype='text/plain') else: return Response(response='Image not found', status=404, mimetype='text/plain')
[docs]@app.route('/api/plugin/install/', methods=["POST"]) def install_plugin(): # type: () -> Response """ Install a plugin. .. todo:: Complete this function. """ data = request.get_json(force=True) install_status = plugin_manager.initialize_install(plugin_data=data) Log.Warn('Installing plugins is not yet supported, status: {}'.format(install_status))
# get list of installed plugins in json format
[docs]@app.route('/installed_plugins/', methods=["GET"]) def installed_plugins(): # type: () -> Response """ Serve the list of installed plugins. """ # plugins known to the server plugin_list_xml = requests.get(url='%s/:/plugins' % plex_base_url, headers=PLEX_HEADERS).content # convert the plugin_list xml data to json known_plugin_list = xmltodict.parse(plugin_list_xml)['MediaContainer']['Plugin'] known_plugin_identifiers = [plugin['@identifier'] for plugin in known_plugin_list] # walk plugin directory for plugin_dir in plugin_directories: for plugin in os.listdir(plugin_dir): # set default plugin type and version plugin_type = 'user' version = None # get the path of the plugin plugin_path = os.path.join(plugin_dir, plugin) # get the path to the plist file plist_file_path = os.path.join(plugin_path, 'Contents', 'Info.plist') # for system plugins, set the plugin type and get the version from the VERSION file if plugin_dir == system_plugins_directory: plugin_type = 'system' version_file_path = os.path.join(plugin_path, 'Contents', 'VERSION') if os.path.isfile(version_file_path): version = str(Core.storage.load(filename=version_file_path, binary=False)) # the plugger data file plugger_data = None if plugin_dir == plugin_directory: plugger_data_file_path = os.path.join(plugin_path, 'plugger.json') # load plugger json file if os.path.isfile(plugger_data_file_path): plugger_data = json.loads(s=str(Core.storage.load(filename=plugger_data_file_path, binary=False))) # set the version from the plugger data if plugger_data: version = plugger_data.get('version', None) # get the bundle identifier from the plist file if os.path.isfile(plist_file_path): plist_contents = Plist.ObjectFromString(str(Core.storage.load(filename=plist_file_path, binary=False))) try: plugin_identifier = plist_contents['CFBundleIdentifier'] except KeyError: Log.Error('CFBundleIdentifier not found in plist file: %s' % plist_file_path) else: try: plugin_description = plist_contents['PlexAgentAttributionText'] except KeyError: plugin_description = None if plugin_identifier in known_plugin_identifiers: plugins[plugin_identifier] = dict( bundle=plugin, bundle_identifier=plugin_identifier, name=plugin.split('.bundle')[0], description=plugin_description, path=plugin_path, type=plugin_type, version=version, plugger_data=plugger_data, ) else: Log.Error('Plugin not properly loaded in Plex Media Server: %s' % plugin_identifier) else: Log.Error('Info.plist not found in plugin directory: %s' % plugin_path) return Response(response=json.dumps(plugins, sort_keys=True), status=200, mimetype='application/json')
[docs]@app.route('/logs/', defaults={'plugin_identifier': bundle_identifier}, methods=["GET"]) @app.route('/logs/<path:plugin_identifier>', methods=["GET"]) def logs(plugin_identifier): # type: (str) -> render_template """ Serve the plugin logs. Collect and format the logs for the specified plugin. Parameters ---------- plugin_identifier : str The reverse domain name of the plugin, e.g. `dev.lizardbyte.plugger`. Returns ------- render_template The logs template with the requested information. Notes ----- The following routes trigger this function. - `/logs/` - `/logs/<plugin name>` Examples -------- >>> logs(plugin_identifier='dev.lizardbyte.plugger') """ return render_template('logs.html', title='Logs', plugin_identifier=plugin_identifier)
[docs]@app.route('/log_stream/', defaults={'plugin_identifier': bundle_identifier}, methods=["GET"]) @app.route("/log_stream/<path:plugin_identifier>", methods=["GET"]) def log_stream(plugin_identifier): # type: (str) -> Response """ Serve the plugin logs in plain text. Collect and format the logs for the specified plugin. Parameters ---------- plugin_identifier : str The reverse domain name of the plugin, e.g. `dev.lizardbyte.plugger`. Returns ------- Response The text of the log files. Notes ----- The following routes trigger this function. - `/log_stream/` - `/log_stream/<plugin name>` Examples -------- >>> log_stream(plugin_identifier='dev.lizardbyte.plugger') """ base_log_file = '%s.log' % plugin_identifier combined_log = '' count = 5 while count >= 0: if count > 0: log_file_name = '%s.%s' % (base_log_file, count) else: log_file_name = base_log_file log_file = os.path.join(plugin_logs_directory, log_file_name) if os.path.isfile(log_file): # cannot use normal `with open()` as it does not work inside of Plex plugin framework # must use `str()` or Plex re-writes the final log file with the contents of all log files combined_log += str(Core.storage.load(filename=log_file, binary=False)) count += -1 return Response(combined_log, mimetype="text/plain", content_type="text/event-stream")
[docs]@app.route('/status', methods=["GET"]) def status(): # type: () -> dict """ Check the status of Plugger. This is useful for a healthcheck from Docker, and may have many other uses in the future. Returns ------- dict A dictionary of the status. Examples -------- >>> status() """ web_status = {'result': 'success', 'message': 'Ok'} return web_status
@app.route("/thumbnail/<path:plugin_identifier>", methods=["GET"]) def thumbnail(plugin_identifier): # see if plugin_identifier is in plugins if plugin_identifier in plugins: plugin_path = plugins[plugin_identifier]['path'] else: return Response(response='Plugin not found', status=404, mimetype='text/plain') # try to get the plugin thumbnail plugin_thumbnail = None image_priotity = [ 'icon-default', 'attribution' ] image_extensions = [ 'png', 'jpg', 'jpeg', ] for img in image_priotity: if plugin_thumbnail: break # break first loop for extension in image_extensions: plugin_thumbnail_path = os.path.join(plugin_path, 'Contents', 'Resources', '%s.%s' % ( img, extension)) if os.path.isfile(plugin_thumbnail_path): plugin_thumbnail = (os.path.dirname(plugin_thumbnail_path), os.path.basename(plugin_thumbnail_path)) break # break second loop if not plugin_thumbnail: plugin_thumbnail = (os.path.join(app.static_folder, 'images'), 'default-thumb.png') # get file extension image_extension = plugin_thumbnail[1].split('.')[-1] return send_from_directory(directory=plugin_thumbnail[0], filename=plugin_thumbnail[1], mimetype=mime_type_map[image_extension])
[docs]@app.route("/translations", methods=["GET"]) def translations(): # type: () -> Response """ Serve the translations. Returns ------- Response The translations. Examples -------- >>> translations() """ locale = get_locale() po_files = [ '%s/%s/LC_MESSAGES/plugger.po' % (app.config['BABEL_TRANSLATION_DIRECTORIES'], locale), # selected locale '%s/plugger.po' % app.config['BABEL_TRANSLATION_DIRECTORIES'], # fallback to default domain ] for po_file in po_files: if os.path.isfile(po_file): po = polib.pofile(po_file) # convert the po to json data = dict() for entry in po: if entry.msgid: data[entry.msgid] = entry.msgstr Log.Debug('Translation: %s -> %s' % (entry.msgid, entry.msgstr)) return Response(response=json.dumps(data), status=200, mimetype='application/json')