# Licensed under a 3-clause BSD style license - see LICENSE.rst from urllib.parse import parse_qs from urllib.request import urlopen from astropy.utils.data import get_pkg_data_contents from .standard_profile import SAMPSimpleXMLRPCRequestHandler, ThreadingXMLRPCServer __all__ = [] CROSS_DOMAIN = get_pkg_data_contents("data/crossdomain.xml") CLIENT_ACCESS_POLICY = get_pkg_data_contents("data/clientaccesspolicy.xml") class WebProfileRequestHandler(SAMPSimpleXMLRPCRequestHandler): """ Handler of XMLRPC requests performed through the Web Profile. """ def _send_CORS_header(self): if self.headers.get("Origin") is not None: method = self.headers.get("Access-Control-Request-Method") if method and self.command == "OPTIONS": # Preflight method self.send_header("Content-Length", "0") self.send_header( "Access-Control-Allow-Origin", self.headers.get("Origin") ) self.send_header("Access-Control-Allow-Methods", method) self.send_header("Access-Control-Allow-Headers", "Content-Type") self.send_header("Access-Control-Allow-Credentials", "true") if self.headers.get("Access-Control-Request-Private-Network") == "true": self.send_header("Access-Control-Allow-Private-Network", "true") else: # Simple method self.send_header( "Access-Control-Allow-Origin", self.headers.get("Origin") ) self.send_header("Access-Control-Allow-Headers", "Content-Type") self.send_header("Access-Control-Allow-Credentials", "true") def end_headers(self): self._send_CORS_header() SAMPSimpleXMLRPCRequestHandler.end_headers(self) def _serve_cross_domain_xml(self): cross_domain = False if self.path == "/crossdomain.xml": # Adobe standard response = CROSS_DOMAIN self.send_response(200, "OK") self.send_header("Content-Type", "text/x-cross-domain-policy") self.send_header("Content-Length", f"{len(response)}") self.end_headers() self.wfile.write(response.encode("utf-8")) self.wfile.flush() cross_domain = True elif self.path == "/clientaccesspolicy.xml": # Microsoft standard response = CLIENT_ACCESS_POLICY self.send_response(200, "OK") self.send_header("Content-Type", "text/xml") self.send_header("Content-Length", f"{len(response)}") self.end_headers() self.wfile.write(response.encode("utf-8")) self.wfile.flush() cross_domain = True return cross_domain def do_POST(self): if self._serve_cross_domain_xml(): return return SAMPSimpleXMLRPCRequestHandler.do_POST(self) def do_HEAD(self): if not self.is_http_path_valid(): self.report_404() return if self._serve_cross_domain_xml(): return def do_OPTIONS(self): self.send_response(200, "OK") self.end_headers() def do_GET(self): if not self.is_http_path_valid(): self.report_404() return split_path = self.path.split("?") if split_path[0] in [f"/translator/{clid}" for clid in self.server.clients]: # Request of a file proxying urlpath = parse_qs(split_path[1]) try: proxyfile = urlopen(urlpath["ref"][0]) self.send_response(200, "OK") self.end_headers() self.wfile.write(proxyfile.read()) proxyfile.close() except OSError: self.report_404() return if self._serve_cross_domain_xml(): return def is_http_path_valid(self): valid_paths = ["/clientaccesspolicy.xml", "/crossdomain.xml"] + [ f"/translator/{clid}" for clid in self.server.clients ] return self.path.split("?")[0] in valid_paths class WebProfileXMLRPCServer(ThreadingXMLRPCServer): """ XMLRPC server supporting the SAMP Web Profile. """ def __init__( self, addr, log=None, requestHandler=WebProfileRequestHandler, logRequests=True, allow_none=True, encoding=None, ): self.clients = [] ThreadingXMLRPCServer.__init__( self, addr, log, requestHandler, logRequests, allow_none, encoding ) def add_client(self, client_id): self.clients.append(client_id) def remove_client(self, client_id): try: self.clients.remove(client_id) except ValueError: # No warning here because this method gets called for all clients, # not just web clients, and we expect it to fail for non-web # clients. pass def web_profile_text_dialog(request, queue): samp_name = "unknown" if isinstance(request[0], str): # To support the old protocol version samp_name = request[0] else: samp_name = request[0]["samp.name"] text = f"""A Web application which declares to be Name: {samp_name} Origin: {request[2]} is requesting to be registered with the SAMP Hub. Pay attention that if you permit its registration, such application will acquire all current user privileges, like file read/write. Do you give your consent? [yes|no]""" print(text) answer = input(">>> ") queue.put(answer.lower() in ["yes", "y"])