| | |
| | | import webbrowser |
| | | |
| | | import hupper |
| | | from paste.deploy import ( |
| | | loadapp, |
| | | loadserver, |
| | | ) |
| | | |
| | | from pyramid.compat import PY2 |
| | | from pyramid.compat import configparser |
| | | |
| | | from pyramid.scripts.common import get_config_loader |
| | | from pyramid.scripts.common import parse_vars |
| | | from pyramid.scripts.common import setup_logging |
| | | from pyramid.path import AssetResolver |
| | | from pyramid.settings import aslist |
| | | |
| | |
| | | "passed here.", |
| | | ) |
| | | |
| | | ConfigParser = configparser.ConfigParser # testing |
| | | loadapp = staticmethod(loadapp) # testing |
| | | loadserver = staticmethod(loadserver) # testing |
| | | _get_config_loader = staticmethod(get_config_loader) # for testing |
| | | |
| | | open_url = None |
| | | |
| | |
| | | if self.args.verbose > 0: |
| | | print(msg) |
| | | |
| | | def get_config_vars(self): |
| | | restvars = self.args.config_vars |
| | | return parse_vars(restvars) |
| | | def get_config_path(self, loader): |
| | | return os.path.abspath(loader.uri.path) |
| | | |
| | | def pserve_file_config(self, filename, global_conf=None): |
| | | here = os.path.abspath(os.path.dirname(filename)) |
| | | defaults = {} |
| | | if global_conf: |
| | | defaults.update(global_conf) |
| | | defaults['here'] = here |
| | | |
| | | config = self.ConfigParser(defaults=defaults) |
| | | config.optionxform = str |
| | | config.read(filename) |
| | | try: |
| | | items = dict(config.items('pserve')) |
| | | except configparser.NoSectionError: |
| | | return |
| | | |
| | | watch_files = aslist(items.get('watch_files', ''), flatten=False) |
| | | def pserve_file_config(self, loader, global_conf=None): |
| | | settings = loader.get_settings('pserve', global_conf) |
| | | config_path = self.get_config_path(loader) |
| | | here = os.path.dirname(config_path) |
| | | watch_files = aslist(settings.get('watch_files', ''), flatten=False) |
| | | |
| | | # track file paths relative to the ini file |
| | | resolver = AssetResolver(package=None) |
| | |
| | | self.watch_files.add(os.path.abspath(file)) |
| | | |
| | | # attempt to determine the url of the server |
| | | open_url = items.get('open_url') |
| | | open_url = settings.get('open_url') |
| | | if open_url: |
| | | self.open_url = open_url |
| | | |
| | | def _guess_server_url(self, filename, server_name, |
| | | global_conf=None): # pragma: no cover |
| | | def guess_server_url(self, loader, server_name, global_conf=None): |
| | | server_name = server_name or 'main' |
| | | here = os.path.abspath(os.path.dirname(filename)) |
| | | defaults = {} |
| | | if global_conf: |
| | | defaults.update(global_conf) |
| | | defaults['here'] = here |
| | | |
| | | config = self.ConfigParser(defaults=defaults) |
| | | config.optionxform = str |
| | | config.read(filename) |
| | | try: |
| | | items = dict(config.items('server:' + server_name)) |
| | | except configparser.NoSectionError: |
| | | return |
| | | |
| | | if 'port' in items: |
| | | return 'http://127.0.0.1:{port}'.format(**items) |
| | | settings = loader.get_settings('server:' + server_name, global_conf) |
| | | if 'port' in settings: |
| | | return 'http://127.0.0.1:{port}'.format(**settings) |
| | | |
| | | def run(self): # pragma: no cover |
| | | if not self.args.config_uri: |
| | | self.out('You must give a config file') |
| | | return 2 |
| | | config_uri = self.args.config_uri |
| | | config_vars = parse_vars(self.args.config_vars) |
| | | app_spec = self.args.config_uri |
| | | |
| | | vars = self.get_config_vars() |
| | | app_name = self.args.app_name |
| | | |
| | | base = os.getcwd() |
| | | if not self._scheme_re.search(app_spec): |
| | | config_path = os.path.join(base, app_spec) |
| | | app_spec = 'config:' + app_spec |
| | | else: |
| | | config_path = None |
| | | loader = self._get_config_loader(config_uri) |
| | | loader.setup_logging(config_vars) |
| | | |
| | | self.pserve_file_config(loader, global_conf=config_vars) |
| | | |
| | | server_name = self.args.server_name |
| | | if self.args.server: |
| | | server_spec = 'egg:pyramid' |
| | |
| | | else: |
| | | server_spec = app_spec |
| | | |
| | | server_loader = loader |
| | | if server_spec != app_spec: |
| | | server_loader = self.get_config_loader(server_spec) |
| | | |
| | | # do not open the browser on each reload so check hupper first |
| | | if self.args.browser and not hupper.is_active(): |
| | | self.pserve_file_config(config_path, global_conf=vars) |
| | | url = self.open_url |
| | | |
| | | # do not guess the url if the server is sourced from a different |
| | | # location than the config_path |
| | | if not url and server_spec == app_spec: |
| | | url = self._guess_server_url(config_path, server_name, vars) |
| | | if not url: |
| | | url = self.guess_server_url( |
| | | server_loader, server_name, config_vars) |
| | | |
| | | if not url: |
| | | self.out('WARNING: could not determine the server\'s url to ' |
| | |
| | | ) |
| | | return 0 |
| | | |
| | | if config_path: |
| | | setup_logging(config_path, global_conf=vars) |
| | | self.pserve_file_config(config_path, global_conf=vars) |
| | | self.watch_files.add(config_path) |
| | | config_path = self.get_config_path(loader) |
| | | self.watch_files.add(config_path) |
| | | |
| | | server_path = self.get_config_path(server_loader) |
| | | self.watch_files.add(server_path) |
| | | |
| | | if hupper.is_active(): |
| | | reloader = hupper.get_reloader() |
| | | reloader.watch_files(list(self.watch_files)) |
| | | |
| | | server = self.loadserver( |
| | | server_spec, name=server_name, relative_to=base, global_conf=vars) |
| | | server = server_loader.get_wsgi_server(server_name, config_vars) |
| | | |
| | | app = self.loadapp( |
| | | app_spec, name=app_name, relative_to=base, global_conf=vars) |
| | | app = loader.get_wsgi_app(app_name, config_vars) |
| | | |
| | | if self.args.verbose > 0: |
| | | if hasattr(os, 'getpid'): |
| | |
| | | result = self._callFUT( |
| | | '/foo/bar/myapp.ini', 'myapp', options={'a': 'b'}, |
| | | _loader=loader) |
| | | self.assertEqual(loader.uri, '/foo/bar/myapp.ini') |
| | | self.assertEqual(loader.uri.path, '/foo/bar/myapp.ini') |
| | | self.assertEqual(len(loader.calls), 1) |
| | | self.assertEqual(loader.calls[0]['op'], 'app') |
| | | self.assertEqual(loader.calls[0]['name'], 'myapp') |
| | |
| | | result = self._callFUT( |
| | | '/foo/bar/myapp.ini', 'myapp', options={'a': 'b'}, |
| | | _loader=loader) |
| | | self.assertEqual(loader.uri, '/foo/bar/myapp.ini') |
| | | self.assertEqual(loader.uri.path, '/foo/bar/myapp.ini') |
| | | self.assertEqual(len(loader.calls), 1) |
| | | self.assertEqual(loader.calls[0]['op'], 'app_settings') |
| | | self.assertEqual(loader.calls[0]['name'], 'myapp') |
| | |
| | | |
| | | def test_it_no_global_conf(self): |
| | | loader = DummyLoader() |
| | | self._callFUT('/abc', _loader=loader) |
| | | self.assertEqual(loader.uri, '/abc') |
| | | self._callFUT('/abc.ini', _loader=loader) |
| | | self.assertEqual(loader.uri.path, '/abc.ini') |
| | | self.assertEqual(len(loader.calls), 1) |
| | | self.assertEqual(loader.calls[0]['op'], 'logging') |
| | | self.assertEqual(loader.calls[0]['defaults'], None) |
| | | |
| | | def test_it_global_conf_empty(self): |
| | | loader = DummyLoader() |
| | | self._callFUT('/abc', global_conf={}, _loader=loader) |
| | | self.assertEqual(loader.uri, '/abc') |
| | | self._callFUT('/abc.ini', global_conf={}, _loader=loader) |
| | | self.assertEqual(loader.uri.path, '/abc.ini') |
| | | self.assertEqual(len(loader.calls), 1) |
| | | self.assertEqual(loader.calls[0]['op'], 'logging') |
| | | self.assertEqual(loader.calls[0]['defaults'], {}) |
| | | |
| | | def test_it_global_conf_not_empty(self): |
| | | loader = DummyLoader() |
| | | self._callFUT('/abc', global_conf={'key': 'val'}, _loader=loader) |
| | | self.assertEqual(loader.uri, '/abc') |
| | | self._callFUT('/abc.ini', global_conf={'key': 'val'}, _loader=loader) |
| | | self.assertEqual(loader.uri.path, '/abc.ini') |
| | | self.assertEqual(len(loader.calls), 1) |
| | | self.assertEqual(loader.calls[0]['op'], 'logging') |
| | | self.assertEqual(loader.calls[0]['defaults'], {'key': 'val'}) |
| | |
| | | def __init__(self, environ): |
| | | self.environ = environ |
| | | self.matchdict = {} |
| | | |
| | | class DummyConfigParser(object): |
| | | def read(self, x): |
| | | pass |
| | | |
| | | def has_section(self, name): |
| | | return True |
| | | |
| | | class DummyConfigParserModule(object): |
| | | ConfigParser = DummyConfigParser |
| | |
| | | self.views = [(None, view, None) for view in views] |
| | | self.__request_attrs__ = attrs |
| | | |
| | | class DummyConfigParser(object): |
| | | def __init__(self, result, defaults=None): |
| | | self.result = result |
| | | self.defaults = defaults |
| | | |
| | | def read(self, filename): |
| | | self.filename = filename |
| | | |
| | | def items(self, section): |
| | | self.section = section |
| | | if self.result is None: |
| | | from pyramid.compat import configparser |
| | | raise configparser.NoSectionError(section) |
| | | return self.result |
| | | |
| | | class DummyConfigParserFactory(object): |
| | | items = None |
| | | |
| | | def __call__(self, defaults=None): |
| | | self.defaults = defaults |
| | | self.parser = DummyConfigParser(self.items, defaults) |
| | | return self.parser |
| | | |
| | | class DummyCloser(object): |
| | | def __call__(self): |
| | | self.called = True |
| | |
| | | |
| | | |
| | | class DummyLoader(object): |
| | | def __init__(self, settings=None, app_settings=None, app=None): |
| | | def __init__(self, settings=None, app_settings=None, app=None, server=None): |
| | | if not settings: |
| | | settings = {} |
| | | if not app_settings: |
| | |
| | | self.settings = settings |
| | | self.app_settings = app_settings |
| | | self.app = app |
| | | self.server = server |
| | | self.calls = [] |
| | | |
| | | def __call__(self, uri): |
| | | self.uri = uri |
| | | import plaster |
| | | self.uri = plaster.parse_uri(uri) |
| | | return self |
| | | |
| | | def add_call(self, op, name, defaults): |
| | |
| | | self.add_call('app_settings', name, defaults) |
| | | return self.app_settings |
| | | |
| | | def get_wsgi_server(self, name=None, defaults=None): |
| | | self.add_call('server', name, defaults) |
| | | return self.server |
| | | |
| | | def setup_logging(self, defaults): |
| | | self.add_call('logging', None, defaults) |
| | | self.defaults = defaults |
| | |
| | | [('Content-Type', 'text/html; charset=UTF-8')]) |
| | | command.run() |
| | | self.assertEqual(self._path_info, '/') |
| | | self.assertEqual(self.loader.uri, 'development.ini') |
| | | self.assertEqual(self.loader.uri.path, 'development.ini') |
| | | self.assertEqual(self.loader.calls[0]['op'], 'logging') |
| | | self.assertEqual(self.loader.calls[1]['op'], 'app') |
| | | self.assertEqual(self.loader.calls[1]['name'], None) |
| | |
| | | [('Content-Type', 'text/html; charset=UTF-8')]) |
| | | command.run() |
| | | self.assertEqual(self._path_info, '/abc') |
| | | self.assertEqual(self.loader.uri, 'development.ini') |
| | | self.assertEqual(self.loader.uri.path, 'development.ini') |
| | | self.assertEqual(self._out, ['abc']) |
| | | |
| | | def test_command_has_bad_config_header(self): |
| | |
| | | def setUp(self): |
| | | from pyramid.compat import NativeIO |
| | | self.out_ = NativeIO() |
| | | self.config_factory = dummy.DummyConfigParserFactory() |
| | | |
| | | def out(self, msg): |
| | | self.out_.write(msg) |
| | | |
| | | def _get_server(*args, **kwargs): |
| | | def server(app): |
| | | return '' |
| | | return server |
| | | |
| | | def _getTargetClass(self): |
| | | from pyramid.scripts.pserve import PServeCommand |
| | |
| | | effargs.extend(args) |
| | | cmd = self._getTargetClass()(effargs) |
| | | cmd.out = self.out |
| | | cmd.ConfigParser = self.config_factory |
| | | self.loader = dummy.DummyLoader() |
| | | cmd._get_config_loader = self.loader |
| | | return cmd |
| | | |
| | | def test_run_no_args(self): |
| | |
| | | self.assertEqual(result, 2) |
| | | self.assertEqual(self.out_.getvalue(), 'You must give a config file') |
| | | |
| | | def test_config_vars_no_command(self): |
| | | inst = self._makeOne() |
| | | inst.args.config_uri = 'foo' |
| | | inst.args.config_vars = ['a=1', 'b=2'] |
| | | result = inst.get_config_vars() |
| | | self.assertEqual(result, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_parse_vars_good(self): |
| | | inst = self._makeOne('development.ini', 'a=1', 'b=2') |
| | | inst.loadserver = self._get_server |
| | | |
| | | app = dummy.DummyApp() |
| | | |
| | | def get_app(*args, **kwargs): |
| | | app.global_conf = kwargs.get('global_conf', None) |
| | | def get_app(name, global_conf): |
| | | app.name = name |
| | | app.global_conf = global_conf |
| | | return app |
| | | self.loader.get_wsgi_app = get_app |
| | | self.loader.server = lambda x: x |
| | | |
| | | inst.loadapp = get_app |
| | | inst.run() |
| | | self.assertEqual(app.global_conf, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_parse_vars_bad(self): |
| | | inst = self._makeOne('development.ini', 'a') |
| | | inst.loadserver = self._get_server |
| | | self.assertRaises(ValueError, inst.run) |
| | | |
| | | def test_config_file_finds_watch_files(self): |
| | | inst = self._makeOne('development.ini') |
| | | self.config_factory.items = [( |
| | | 'watch_files', |
| | | 'foo\n/baz\npyramid.tests.test_scripts:*.py', |
| | | )] |
| | | inst.pserve_file_config('/base/path.ini', global_conf={'a': '1'}) |
| | | self.assertEqual(self.config_factory.defaults, { |
| | | loader = self.loader('/base/path.ini') |
| | | loader.settings = {'pserve': { |
| | | 'watch_files': 'foo\n/baz\npyramid.tests.test_scripts:*.py', |
| | | }} |
| | | inst.pserve_file_config(loader, global_conf={'a': '1'}) |
| | | self.assertEqual(loader.calls[0]['defaults'], { |
| | | 'a': '1', |
| | | 'here': os.path.abspath('/base'), |
| | | }) |
| | | self.assertEqual(inst.watch_files, set([ |
| | | os.path.abspath('/base/foo'), |
| | |
| | | |
| | | def test_config_file_finds_open_url(self): |
| | | inst = self._makeOne('development.ini') |
| | | self.config_factory.items = [( |
| | | 'open_url', 'http://127.0.0.1:8080/', |
| | | )] |
| | | inst.pserve_file_config('/base/path.ini', global_conf={'a': '1'}) |
| | | self.assertEqual(self.config_factory.defaults, { |
| | | loader = self.loader('/base/path.ini') |
| | | loader.settings = {'pserve': { |
| | | 'open_url': 'http://127.0.0.1:8080/', |
| | | }} |
| | | inst.pserve_file_config(loader, global_conf={'a': '1'}) |
| | | self.assertEqual(loader.calls[0]['defaults'], { |
| | | 'a': '1', |
| | | 'here': os.path.abspath('/base'), |
| | | }) |
| | | self.assertEqual(inst.open_url, 'http://127.0.0.1:8080/') |
| | | |
| | | def test__guess_server_url(self): |
| | | def test_guess_server_url(self): |
| | | inst = self._makeOne('development.ini') |
| | | self.config_factory.items = [( |
| | | 'port', '8080', |
| | | )] |
| | | url = inst._guess_server_url( |
| | | '/base/path.ini', 'main', global_conf={'a': '1'}) |
| | | self.assertEqual(self.config_factory.defaults, { |
| | | loader = self.loader('/base/path.ini') |
| | | loader.settings = {'server:foo': { |
| | | 'port': '8080', |
| | | }} |
| | | url = inst.guess_server_url(loader, 'foo', global_conf={'a': '1'}) |
| | | self.assertEqual(loader.calls[0]['defaults'], { |
| | | 'a': '1', |
| | | 'here': os.path.abspath('/base'), |
| | | }) |
| | | self.assertEqual(self.config_factory.parser.section, 'server:main') |
| | | self.assertEqual(url, 'http://127.0.0.1:8080') |
| | | |
| | | def test_reload_call_hupper_with_correct_args(self): |