Michael Merickel
2017-03-30 3e489b740b1836c81af240cba579245ab18177da
update pserve
5 files modified
235 ■■■■■ changed files
pyramid/scripts/pserve.py 103 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_paster.py 26 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_scripts/dummy.py 33 ●●●● patch | view | raw | blame | history
pyramid/tests/test_scripts/test_prequest.py 4 ●●●● patch | view | raw | blame | history
pyramid/tests/test_scripts/test_pserve.py 69 ●●●●● patch | view | raw | blame | history
pyramid/scripts/pserve.py
@@ -18,16 +18,11 @@
import webbrowser
import hupper
from paste.deploy import (
    loadapp,
    loadserver,
)
from pyramid.compat import PY2
from pyramid.compat import configparser
from pyramid.scripts.common import get_config_loader
from pyramid.scripts.common import parse_vars
from pyramid.scripts.common import setup_logging
from pyramid.path import AssetResolver
from pyramid.settings import aslist
@@ -113,9 +108,7 @@
             "passed here.",
        )
    ConfigParser = configparser.ConfigParser  # testing
    loadapp = staticmethod(loadapp)  # testing
    loadserver = staticmethod(loadserver)  # testing
    _get_config_loader = staticmethod(get_config_loader)  # for testing
    open_url = None
@@ -133,26 +126,14 @@
        if self.args.verbose > 0:
            print(msg)
    def get_config_vars(self):
        restvars = self.args.config_vars
        return parse_vars(restvars)
    def get_config_path(self, loader):
        return os.path.abspath(loader.uri.path)
    def pserve_file_config(self, filename, global_conf=None):
        here = os.path.abspath(os.path.dirname(filename))
        defaults = {}
        if global_conf:
            defaults.update(global_conf)
        defaults['here'] = here
        config = self.ConfigParser(defaults=defaults)
        config.optionxform = str
        config.read(filename)
        try:
            items = dict(config.items('pserve'))
        except configparser.NoSectionError:
            return
        watch_files = aslist(items.get('watch_files', ''), flatten=False)
    def pserve_file_config(self, loader, global_conf=None):
        settings = loader.get_settings('pserve', global_conf)
        config_path = self.get_config_path(loader)
        here = os.path.dirname(config_path)
        watch_files = aslist(settings.get('watch_files', ''), flatten=False)
        # track file paths relative to the ini file
        resolver = AssetResolver(package=None)
@@ -164,45 +145,30 @@
            self.watch_files.add(os.path.abspath(file))
        # attempt to determine the url of the server
        open_url = items.get('open_url')
        open_url = settings.get('open_url')
        if open_url:
            self.open_url = open_url
    def _guess_server_url(self, filename, server_name,
                          global_conf=None):  # pragma: no cover
    def guess_server_url(self, loader, server_name, global_conf=None):
        server_name = server_name or 'main'
        here = os.path.abspath(os.path.dirname(filename))
        defaults = {}
        if global_conf:
            defaults.update(global_conf)
        defaults['here'] = here
        config = self.ConfigParser(defaults=defaults)
        config.optionxform = str
        config.read(filename)
        try:
            items = dict(config.items('server:' + server_name))
        except configparser.NoSectionError:
            return
        if 'port' in items:
            return 'http://127.0.0.1:{port}'.format(**items)
        settings = loader.get_settings('server:' + server_name, global_conf)
        if 'port' in settings:
            return 'http://127.0.0.1:{port}'.format(**settings)
    def run(self):  # pragma: no cover
        if not self.args.config_uri:
            self.out('You must give a config file')
            return 2
        config_uri = self.args.config_uri
        config_vars = parse_vars(self.args.config_vars)
        app_spec = self.args.config_uri
        vars = self.get_config_vars()
        app_name = self.args.app_name
        base = os.getcwd()
        if not self._scheme_re.search(app_spec):
            config_path = os.path.join(base, app_spec)
            app_spec = 'config:' + app_spec
        else:
            config_path = None
        loader = self._get_config_loader(config_uri)
        loader.setup_logging(config_vars)
        self.pserve_file_config(loader, global_conf=config_vars)
        server_name = self.args.server_name
        if self.args.server:
            server_spec = 'egg:pyramid'
@@ -211,15 +177,17 @@
        else:
            server_spec = app_spec
        server_loader = loader
        if server_spec != app_spec:
            server_loader = self.get_config_loader(server_spec)
        # do not open the browser on each reload so check hupper first
        if self.args.browser and not hupper.is_active():
            self.pserve_file_config(config_path, global_conf=vars)
            url = self.open_url
            # do not guess the url if the server is sourced from a different
            # location than the config_path
            if not url and server_spec == app_spec:
                url = self._guess_server_url(config_path, server_name, vars)
            if not url:
                url = self.guess_server_url(
                    server_loader, server_name, config_vars)
            if not url:
                self.out('WARNING: could not determine the server\'s url to '
@@ -246,20 +214,19 @@
            )
            return 0
        if config_path:
            setup_logging(config_path, global_conf=vars)
            self.pserve_file_config(config_path, global_conf=vars)
            self.watch_files.add(config_path)
        config_path = self.get_config_path(loader)
        self.watch_files.add(config_path)
        server_path = self.get_config_path(server_loader)
        self.watch_files.add(server_path)
        if hupper.is_active():
            reloader = hupper.get_reloader()
            reloader.watch_files(list(self.watch_files))
        server = self.loadserver(
            server_spec, name=server_name, relative_to=base, global_conf=vars)
        server = server_loader.get_wsgi_server(server_name, config_vars)
        app = self.loadapp(
            app_spec, name=app_name, relative_to=base, global_conf=vars)
        app = loader.get_wsgi_app(app_name, config_vars)
        if self.args.verbose > 0:
            if hasattr(os, 'getpid'):
pyramid/tests/test_paster.py
@@ -22,7 +22,7 @@
        result = self._callFUT(
            '/foo/bar/myapp.ini', 'myapp', options={'a': 'b'},
            _loader=loader)
        self.assertEqual(loader.uri, '/foo/bar/myapp.ini')
        self.assertEqual(loader.uri.path, '/foo/bar/myapp.ini')
        self.assertEqual(len(loader.calls), 1)
        self.assertEqual(loader.calls[0]['op'], 'app')
        self.assertEqual(loader.calls[0]['name'], 'myapp')
@@ -54,7 +54,7 @@
        result = self._callFUT(
            '/foo/bar/myapp.ini', 'myapp', options={'a': 'b'},
            _loader=loader)
        self.assertEqual(loader.uri, '/foo/bar/myapp.ini')
        self.assertEqual(loader.uri.path, '/foo/bar/myapp.ini')
        self.assertEqual(len(loader.calls), 1)
        self.assertEqual(loader.calls[0]['op'], 'app_settings')
        self.assertEqual(loader.calls[0]['name'], 'myapp')
@@ -81,24 +81,24 @@
    def test_it_no_global_conf(self):
        loader = DummyLoader()
        self._callFUT('/abc', _loader=loader)
        self.assertEqual(loader.uri, '/abc')
        self._callFUT('/abc.ini', _loader=loader)
        self.assertEqual(loader.uri.path, '/abc.ini')
        self.assertEqual(len(loader.calls), 1)
        self.assertEqual(loader.calls[0]['op'], 'logging')
        self.assertEqual(loader.calls[0]['defaults'], None)
    def test_it_global_conf_empty(self):
        loader = DummyLoader()
        self._callFUT('/abc', global_conf={}, _loader=loader)
        self.assertEqual(loader.uri, '/abc')
        self._callFUT('/abc.ini', global_conf={}, _loader=loader)
        self.assertEqual(loader.uri.path, '/abc.ini')
        self.assertEqual(len(loader.calls), 1)
        self.assertEqual(loader.calls[0]['op'], 'logging')
        self.assertEqual(loader.calls[0]['defaults'], {})
    def test_it_global_conf_not_empty(self):
        loader = DummyLoader()
        self._callFUT('/abc', global_conf={'key': 'val'}, _loader=loader)
        self.assertEqual(loader.uri, '/abc')
        self._callFUT('/abc.ini', global_conf={'key': 'val'}, _loader=loader)
        self.assertEqual(loader.uri.path, '/abc.ini')
        self.assertEqual(len(loader.calls), 1)
        self.assertEqual(loader.calls[0]['op'], 'logging')
        self.assertEqual(loader.calls[0]['defaults'], {'key': 'val'})
@@ -166,13 +166,3 @@
    def __init__(self, environ):
        self.environ = environ
        self.matchdict = {}
class DummyConfigParser(object):
    def read(self, x):
        pass
    def has_section(self, name):
        return True
class DummyConfigParserModule(object):
    ConfigParser = DummyConfigParser
pyramid/tests/test_scripts/dummy.py
@@ -81,29 +81,6 @@
        self.views = [(None, view, None) for view in views]
        self.__request_attrs__ = attrs
class DummyConfigParser(object):
    def __init__(self, result, defaults=None):
        self.result = result
        self.defaults = defaults
    def read(self, filename):
        self.filename = filename
    def items(self, section):
        self.section = section
        if self.result is None:
            from pyramid.compat import configparser
            raise configparser.NoSectionError(section)
        return self.result
class DummyConfigParserFactory(object):
    items = None
    def __call__(self, defaults=None):
        self.defaults = defaults
        self.parser = DummyConfigParser(self.items, defaults)
        return self.parser
class DummyCloser(object):
    def __call__(self):
        self.called = True
@@ -171,7 +148,7 @@
class DummyLoader(object):
    def __init__(self, settings=None, app_settings=None, app=None):
    def __init__(self, settings=None, app_settings=None, app=None, server=None):
        if not settings:
            settings = {}
        if not app_settings:
@@ -179,10 +156,12 @@
        self.settings = settings
        self.app_settings = app_settings
        self.app = app
        self.server = server
        self.calls = []
    def __call__(self, uri):
        self.uri = uri
        import plaster
        self.uri = plaster.parse_uri(uri)
        return self
    def add_call(self, op, name, defaults):
@@ -200,6 +179,10 @@
        self.add_call('app_settings', name, defaults)
        return self.app_settings
    def get_wsgi_server(self, name=None, defaults=None):
        self.add_call('server', name, defaults)
        return self.server
    def setup_logging(self, defaults):
        self.add_call('logging', None, defaults)
        self.defaults = defaults
pyramid/tests/test_scripts/test_prequest.py
@@ -33,7 +33,7 @@
                [('Content-Type', 'text/html; charset=UTF-8')])
        command.run()
        self.assertEqual(self._path_info, '/')
        self.assertEqual(self.loader.uri, 'development.ini')
        self.assertEqual(self.loader.uri.path, 'development.ini')
        self.assertEqual(self.loader.calls[0]['op'], 'logging')
        self.assertEqual(self.loader.calls[1]['op'], 'app')
        self.assertEqual(self.loader.calls[1]['name'], None)
@@ -44,7 +44,7 @@
                [('Content-Type', 'text/html; charset=UTF-8')])
        command.run()
        self.assertEqual(self._path_info, '/abc')
        self.assertEqual(self.loader.uri, 'development.ini')
        self.assertEqual(self.loader.uri.path, 'development.ini')
        self.assertEqual(self._out, ['abc'])
    def test_command_has_bad_config_header(self):
pyramid/tests/test_scripts/test_pserve.py
@@ -10,15 +10,9 @@
    def setUp(self):
        from pyramid.compat import NativeIO
        self.out_ = NativeIO()
        self.config_factory = dummy.DummyConfigParserFactory()
    def out(self, msg):
        self.out_.write(msg)
    def _get_server(*args, **kwargs):
        def server(app):
            return ''
        return server
    def _getTargetClass(self):
        from pyramid.scripts.pserve import PServeCommand
@@ -29,7 +23,8 @@
        effargs.extend(args)
        cmd = self._getTargetClass()(effargs)
        cmd.out = self.out
        cmd.ConfigParser = self.config_factory
        self.loader = dummy.DummyLoader()
        cmd._get_config_loader = self.loader
        return cmd
    def test_run_no_args(self):
@@ -38,41 +33,33 @@
        self.assertEqual(result, 2)
        self.assertEqual(self.out_.getvalue(), 'You must give a config file')
    def test_config_vars_no_command(self):
        inst = self._makeOne()
        inst.args.config_uri = 'foo'
        inst.args.config_vars = ['a=1', 'b=2']
        result = inst.get_config_vars()
        self.assertEqual(result, {'a': '1', 'b': '2'})
    def test_parse_vars_good(self):
        inst = self._makeOne('development.ini', 'a=1', 'b=2')
        inst.loadserver = self._get_server
        app = dummy.DummyApp()
        def get_app(*args, **kwargs):
            app.global_conf = kwargs.get('global_conf', None)
        def get_app(name, global_conf):
            app.name = name
            app.global_conf = global_conf
            return app
        self.loader.get_wsgi_app = get_app
        self.loader.server = lambda x: x
        inst.loadapp = get_app
        inst.run()
        self.assertEqual(app.global_conf, {'a': '1', 'b': '2'})
    def test_parse_vars_bad(self):
        inst = self._makeOne('development.ini', 'a')
        inst.loadserver = self._get_server
        self.assertRaises(ValueError, inst.run)
    def test_config_file_finds_watch_files(self):
        inst = self._makeOne('development.ini')
        self.config_factory.items = [(
            'watch_files',
            'foo\n/baz\npyramid.tests.test_scripts:*.py',
        )]
        inst.pserve_file_config('/base/path.ini', global_conf={'a': '1'})
        self.assertEqual(self.config_factory.defaults, {
        loader = self.loader('/base/path.ini')
        loader.settings = {'pserve': {
            'watch_files': 'foo\n/baz\npyramid.tests.test_scripts:*.py',
        }}
        inst.pserve_file_config(loader, global_conf={'a': '1'})
        self.assertEqual(loader.calls[0]['defaults'], {
            'a': '1',
            'here': os.path.abspath('/base'),
        })
        self.assertEqual(inst.watch_files, set([
            os.path.abspath('/base/foo'),
@@ -82,28 +69,26 @@
    def test_config_file_finds_open_url(self):
        inst = self._makeOne('development.ini')
        self.config_factory.items = [(
            'open_url', 'http://127.0.0.1:8080/',
        )]
        inst.pserve_file_config('/base/path.ini', global_conf={'a': '1'})
        self.assertEqual(self.config_factory.defaults, {
        loader = self.loader('/base/path.ini')
        loader.settings = {'pserve': {
            'open_url': 'http://127.0.0.1:8080/',
        }}
        inst.pserve_file_config(loader, global_conf={'a': '1'})
        self.assertEqual(loader.calls[0]['defaults'], {
            'a': '1',
            'here': os.path.abspath('/base'),
        })
        self.assertEqual(inst.open_url, 'http://127.0.0.1:8080/')
    def test__guess_server_url(self):
    def test_guess_server_url(self):
        inst = self._makeOne('development.ini')
        self.config_factory.items = [(
            'port', '8080',
        )]
        url = inst._guess_server_url(
            '/base/path.ini', 'main', global_conf={'a': '1'})
        self.assertEqual(self.config_factory.defaults, {
        loader = self.loader('/base/path.ini')
        loader.settings = {'server:foo': {
            'port': '8080',
        }}
        url = inst.guess_server_url(loader, 'foo', global_conf={'a': '1'})
        self.assertEqual(loader.calls[0]['defaults'], {
            'a': '1',
            'here': os.path.abspath('/base'),
        })
        self.assertEqual(self.config_factory.parser.section, 'server:main')
        self.assertEqual(url, 'http://127.0.0.1:8080')
    def test_reload_call_hupper_with_correct_args(self):