diff --git a/archivebox/search/utils.py b/archivebox/search/utils.py index 723c7fb5..4573ca69 100644 --- a/archivebox/search/utils.py +++ b/archivebox/search/utils.py @@ -1,23 +1,113 @@ +from html.parser import HTMLParser +import io + from django.db.models import QuerySet from archivebox.util import enforce_types from archivebox.config import ANSI +BLOCK_SIZE = 32768 + def log_index_started(url): print('{green}[*] Indexing url: {} in the search index {reset}'.format(url, **ANSI)) print( ) -def get_file_result_content(res, extra_path, use_pwd=False): + +class HTMLTextExtractor(HTMLParser): + + TEXT_ATTRS = ["alt", "cite", "href", "label", "list", "placeholder", "title", "value"] + NOTEXT_TAGS = ["script", "style", "template"] + NOTEXT_HREF = ["data:", "javascript:", "#"] + + def __init__(self): + super().__init__() + + self.output = io.StringIO() + self._tag_stack = [] + + def _is_text_attr(self, name, value): + if not isinstance(value, str): + return False + if name == "href" and any(map(lambda p: value.startswith(p), self.NOTEXT_HREF)): + return False + + if name in self.TEXT_ATTRS: + return True + + return False + + def _parent_tag(self): + try: + return self._tag_stack[-1] + except IndexError: + return None + + def _in_notext_tag(self): + return any([t in self._tag_stack for t in self.NOTEXT_TAGS]) + + def handle_starttag(self, tag, attrs): + self._tag_stack.append(tag) + + # Don't write out attribute values if any ancestor + # is in NOTEXT_TAGS + if self._in_notext_tag(): + return + + for name, value in attrs: + if self._is_text_attr(name, value): + self.output.write(value.strip()) + self.output.write(" ") + + def handle_endtag(self, tag): + orig_stack = self._tag_stack.copy() + try: + # Keep popping tags until we find the nearest + # ancestor matching this end tag + while tag != self._tag_stack.pop(): + pass + except IndexError: + # Got to the top of the stack, but somehow missed + # this end tag -- maybe malformed markup -- restore the + # stack + self._tag_stack = orig_stack + + def handle_data(self, data): + # Don't output text data if any ancestor is in NOTEXT_TAGS + if self._in_notext_tag(): + return + if stripped := data.strip(): + self.output.write(stripped) + self.output.write(" ") + + def __str__(self): + return self.output.getvalue() + + +def _read_all(file: io.TextIOBase) -> str: + return file.read() + + +def _extract_html_text(file: io.TextIOBase) -> str: + extractor = HTMLTextExtractor() + while (block := file.read(BLOCK_SIZE)): + extractor.feed(block) + else: + extractor.close() + + return str(extractor) + + +def get_file_result_content(res, extra_path, use_pwd=False, *, filter=_read_all): if use_pwd: fpath = f'{res.pwd}/{res.output}' else: fpath = f'{res.output}' - + if extra_path: fpath = f'{fpath}/{extra_path}' - with open(fpath, 'r', encoding='utf-8') as file: - data = file.read() + with open(fpath, 'r', encoding='utf-8', errors='replace') as file: + data = filter(file) if data: return [data] return [] @@ -38,7 +128,7 @@ def get_indexable_content(results: QuerySet): if method == 'readability': return get_file_result_content(res, 'content.txt', use_pwd=True) elif method == 'singlefile': - return get_file_result_content(res, '', use_pwd=True) + return get_file_result_content(res, '', use_pwd=True, filter=_extract_html_text) elif method == 'dom': return get_file_result_content(res, '', use_pwd=True) elif method == 'wget':