Module scrapfly.api_response
Classes
class ApiResponse (request: requests.models.Request, response: requests.models.Response)
-
Expand source code
class ApiResponse: def __init__(self, request: Request, response: Response): self.request = request self.response = response @property def headers(self) -> CaseInsensitiveDict: return self.response.headers @property def status_code(self) -> int: """ This is the status code of our API, not the upstream website """ return self.response.status_code @property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Api-Credit') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape @property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost @property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration @property def error_message(self): if self.error is not None: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += " Checkout the related doc: %s" % list(self.error['links'].values())[0] return message message = "<-- %s | %s." % (self.response.status_code, self.result['message']) if self.result.get('links'): message += " Checkout the related doc: %s" % ", ".join(self.result['links']) return message def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False ) def raise_for_result( self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, ScreenshotAPIError, ExtractionAPIError] ): try: self.response.raise_for_status() except HTTPError as e: if 'error_id' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links'), api_response=self, ) from e # respect raise_on_upstream_error with screenshot and extraction only elif error_class in (ScreenshotAPIError, ExtractionAPIError): if raise_on_upstream_error: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e else: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e
Subclasses
Instance variables
prop cost : int | None
-
Expand source code
@property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost
prop duration_ms : float | None
-
Expand source code
@property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration
prop error_message
-
Expand source code
@property def error_message(self): if self.error is not None: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += " Checkout the related doc: %s" % list(self.error['links'].values())[0] return message message = "<-- %s | %s." % (self.response.status_code, self.result['message']) if self.result.get('links'): message += " Checkout the related doc: %s" % ", ".join(self.result['links']) return message
prop headers : requests.structures.CaseInsensitiveDict
-
Expand source code
@property def headers(self) -> CaseInsensitiveDict: return self.response.headers
prop remaining_quota : int | None
-
Expand source code
@property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Api-Credit') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape
prop status_code : int
-
Expand source code
@property def status_code(self) -> int: """ This is the status code of our API, not the upstream website """ return self.response.status_code
This is the status code of our API, not the upstream website
Methods
def prevent_extra_usage(self)
-
Expand source code
def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False )
def raise_for_result(self,
raise_on_upstream_error: bool,
error_class: ApiHttpClientError | scrapfly.errors.ScreenshotAPIError | scrapfly.errors.ExtractionAPIError)-
Expand source code
def raise_for_result( self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, ScreenshotAPIError, ExtractionAPIError] ): try: self.response.raise_for_status() except HTTPError as e: if 'error_id' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links'), api_response=self, ) from e # respect raise_on_upstream_error with screenshot and extraction only elif error_class in (ScreenshotAPIError, ExtractionAPIError): if raise_on_upstream_error: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e else: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e
class ExtractionApiResponse (request: requests.models.Request,
response: requests.models.Response,
extraction_config: ExtractionConfig,
api_result: bytes | None = None)-
Expand source code
class ExtractionApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.extraction_config = extraction_config self.result = self.handle_api_result(api_result) @property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result @property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None @property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None @property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True @property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
prop content_type : str | None
-
Expand source code
@property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None
prop data : Dict | List | str
-
Expand source code
@property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None
prop extraction_result : Dict | None
-
Expand source code
@property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result
prop extraction_success : bool
-
Expand source code
@property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result})
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ExtractionAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Tuple[str] | None = None)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass try: import zstd self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: pass self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION) self._signing_secret: Optional[Tuple[str]] = None if signing_secrets: _secrets = set() for signing_secret in signing_secrets: _secrets.add(binascii.unhexlify(signing_secret)) self._signing_secret = tuple(_secrets) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]: content_loader = None if content_type.find('application/json') != -1: content_loader = partial(loads, cls=self.JSONDateTimeDecoder) elif content_type.find('application/msgpack') != -1: import msgpack content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) if content_loader is None: raise Exception('Unsupported content type') try: return content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON https://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
-
The type of the None singleton.
var SUPPORTED_CONTENT_TYPES
-
The type of the None singleton.
Methods
def read(self,
content: bytes,
content_encoding: str,
content_type: str,
signature: str | None) ‑> Dict-
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
def verify(self, message: bytes, signature: str) ‑> bool
-
Expand source code
def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False
class ScrapeApiResponse (request: requests.models.Request,
response: requests.models.Response,
scrape_config: ScrapeConfig,
api_result: Dict | None = None,
large_object_handler: Callable | None = None)-
Expand source code
class ScrapeApiResponse(ApiResponse): scrape_config:ScrapeConfig large_object_handler:Callable def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None): super().__init__(request, response) self.scrape_config = scrape_config self.large_object_handler = large_object_handler if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] @property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] @property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] @property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] @property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None @cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
Ancestors
Class variables
var large_object_handler : Callable
-
The type of the None singleton.
var scrape_config : ScrapeConfig
-
The type of the None singleton.
Instance variables
prop config : Dict | None
-
Expand source code
@property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config']
prop content : str
-
Expand source code
@property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content']
prop context : Dict | None
-
Expand source code
@property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context']
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error']
prop scrape_result : Dict | None
-
Expand source code
@property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None)
prop scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success']
var selector : Selector
-
Expand source code
@cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e
var soup : BeautifulSoup
-
Expand source code
@cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature')
prop success : bool
-
Expand source code
@property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
prop upstream_status_code : int | None
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> FrozenDict | None
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result)
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ApiHttpClientError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self,
path: str | None = None,
name: str | None = None,
file:| _io.BytesIO | None = None,
content: str | bytes | None = None)-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> requests.models.Response | None
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
Inherited members
class ScreenshotApiResponse (request: requests.models.Request,
response: requests.models.Response,
screenshot_config: ScreenshotConfig,
api_result: bytes | None = None)-
Expand source code
class ScreenshotApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.screenshot_config = screenshot_config self.result = self.handle_api_result(api_result) @property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary @property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } @property def screenshot_success(self) -> bool: if not self.image: return False return True @property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result
prop image : str | None
-
Expand source code
@property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary
prop metadata : Dict | None
-
Expand source code
@property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') }
prop screenshot_success : bool
-
Expand source code
@property def screenshot_success(self) -> bool: if not self.image: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ScreenshotAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members