From df80ddc5cd49ca373555cb3e1d2364d6d4d313a4 Mon Sep 17 00:00:00 2001 From: David Xue Date: Mon, 4 Mar 2024 11:56:38 -0500 Subject: [PATCH] Data Ingestion Improvement/Cleanup/Bug Fix - Part 2 (#307) Code ready for review, evaluations/testing is still in progress ### Description - This is **part 2** of the data ingestion improvements. The goal is to analyze and investigate any potential issues with the data ingestion process, remove noisy data such as badly formatted content or gibberish text, find bugs, and enforce standards on the process to have predictable cost estimation/reduction. ### Technical Changes - Renamed the original `split.py` file to `chunking_utils.py` (multiple files changed due to import naming) - During the above change, found 2 DAGs somehow were NOT changed. Provider docs and SDK docs's incremental ingest DAGs somehow never had a chunking/splitting task (even though the bulk ingest does). I added chunking to these 2 DAGs - Airflow Docs extraction process has been improved(see code for details) - Provider Docs extraction process has been improved (see code for details) - Astro SDK Docs extraction process has been improved (see code for details) -> Astro SDK docs are not currently being ingested, but fixing code since they exist. - Provider docs used to ingest all past versions leading to many duplicate docs in the DB. It is now changes to only ingest the `stable` version of the docs. ### Evaluations - Answer quality generally improved with some add on of more concise answering and linking hyperlinks when the model knows the answer is not exhaustive. - Please attached for a quick list of results - [data_ingest_comparison_part_2.csv](https://github.com/astronomer/ask-astro/files/14484682/data_ingest_comparison_part_2.csv) [data_ingest_results_part_2.csv](https://github.com/astronomer/ask-astro/files/14484683/data_ingest_results_part_2.csv) - Bulk ingest also runs with no issues image ### Related Issues closes #221 closes #258 closes #295 (Reranker has been addressed in GCP environment variables, embedding model change completed in a different PR) closes #285 (This PR prevents empty docs from being ingested) --- airflow/Dockerfile | 2 +- .../dags/ingestion/ask-astro-forum-load.py | 4 +- .../ingestion/ask-astro-load-airflow-docs.py | 4 +- .../ingestion/ask-astro-load-astro-cli.py | 4 +- .../ingestion/ask-astro-load-astro-sdk.py | 6 +- .../ask-astro-load-astronomer-docs.py | 4 +- .../ask-astro-load-astronomer-provider.py | 6 +- .../dags/ingestion/ask-astro-load-blogs.py | 4 +- .../ingestion/ask-astro-load-cosmos-docs.py | 4 +- .../dags/ingestion/ask-astro-load-github.py | 4 +- .../dags/ingestion/ask-astro-load-registry.py | 6 +- .../dags/ingestion/ask-astro-load-slack.py | 4 +- .../ingestion/ask-astro-load-stackoverflow.py | 4 +- airflow/dags/ingestion/ask-astro-load.py | 8 +- .../tasks/{split.py => chunking_utils.py} | 76 ++++++++++++++----- airflow/include/tasks/extract/airflow_docs.py | 5 +- .../extract/astronomer_providers_docs.py | 40 +++++++++- .../include/tasks/extract/utils/html_utils.py | 6 +- airflow/tests/test_baseline.py | 4 +- 19 files changed, 139 insertions(+), 56 deletions(-) rename airflow/include/tasks/{split.py => chunking_utils.py} (65%) diff --git a/airflow/Dockerfile b/airflow/Dockerfile index 10d3afd1..7aff2fce 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -1 +1 @@ -FROM quay.io/astronomer/astro-runtime:10.0.0 +FROM quay.io/astronomer/astro-runtime:10.1.0 diff --git a/airflow/dags/ingestion/ask-astro-forum-load.py b/airflow/dags/ingestion/ask-astro-forum-load.py index 7e4d4bd1..3e178377 100644 --- a/airflow/dags/ingestion/ask-astro-forum-load.py +++ b/airflow/dags/ingestion/ask-astro-forum-load.py @@ -36,9 +36,9 @@ def get_astro_forum_content(): ), ) def ask_astro_load_astro_forum(): - from include.tasks import split + from include.tasks import chunking_utils - split_docs = task(split.split_html).expand(dfs=[get_astro_forum_content()]) + split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_forum_content()]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py index bb3ec557..332bedad 100644 --- a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py @@ -35,10 +35,10 @@ def ask_astro_load_airflow_docs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import airflow_docs - extracted_airflow_docs = task(split.split_html).expand( + extracted_airflow_docs = task(chunking_utils.split_html).expand( dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)] ) diff --git a/airflow/dags/ingestion/ask-astro-load-astro-cli.py b/airflow/dags/ingestion/ask-astro-load-astro-cli.py index 20fd2da1..1b5c2054 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-cli.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-cli.py @@ -32,11 +32,11 @@ def ask_astro_load_astro_cli_docs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import astro_cli_docs extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)() - split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs]) + split_md_docs = task(chunking_utils.split_html).expand(dfs=[extract_astro_cli_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py index 11d2f7d9..4e1ee463 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py @@ -37,6 +37,10 @@ def get_astro_sdk_content(): ), ) def ask_astro_load_astro_sdk(): + from include.tasks import chunking_utils + + split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_sdk_content()]) + _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", @@ -45,7 +49,7 @@ def ask_astro_load_astro_sdk(): verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=[get_astro_sdk_content()]) + ).expand(input_data=[split_docs]) ask_astro_load_astro_sdk() diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py index bf6b26cd..a7d62491 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -31,12 +31,12 @@ def ask_astro_load_astronomer_docs(): """ This DAG performs incremental load for any new docs in astronomer docs. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract.astro_docs import extract_astro_docs astro_docs = task(extract_astro_docs)() - split_html_docs = task(split.split_html).expand(dfs=[astro_docs]) + split_html_docs = task(chunking_utils.split_html).expand(dfs=[astro_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py index f5d5c65d..28250fe6 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py @@ -43,6 +43,10 @@ def ask_astro_load_astronomer_providers(): any existing documents that have been updated will be removed and re-added. """ + from include.tasks import chunking_utils + + split_docs = task(chunking_utils.split_html).expand(dfs=[get_provider_content()]) + _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", @@ -51,7 +55,7 @@ def ask_astro_load_astronomer_providers(): verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=[get_provider_content()]) + ).expand(input_data=[split_docs]) ask_astro_load_astronomer_providers() diff --git a/airflow/dags/ingestion/ask-astro-load-blogs.py b/airflow/dags/ingestion/ask-astro-load-blogs.py index 84f78019..9a05880c 100644 --- a/airflow/dags/ingestion/ask-astro-load-blogs.py +++ b/airflow/dags/ingestion/ask-astro-load-blogs.py @@ -34,12 +34,12 @@ def ask_astro_load_blogs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import blogs blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date) - split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[blogs_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py b/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py index ea2bcad0..ac5c6dec 100644 --- a/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py @@ -33,9 +33,9 @@ def ask_astro_load_cosmos_docs(): any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils - split_docs = task(split.split_html).expand(dfs=[extract_cosmos_docs()]) + split_docs = task(chunking_utils.split_html).expand(dfs=[extract_cosmos_docs()]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index e68055e2..e5c00776 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -43,7 +43,7 @@ def ask_astro_load_github(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import github md_docs = ( @@ -58,7 +58,7 @@ def ask_astro_load_github(): .expand(repo_base=issues_docs_sources) ) - split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs, issues_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-registry.py b/airflow/dags/ingestion/ask-astro-load-registry.py index 480a70cb..d7161263 100644 --- a/airflow/dags/ingestion/ask-astro-load-registry.py +++ b/airflow/dags/ingestion/ask-astro-load-registry.py @@ -32,16 +32,16 @@ def ask_astro_load_registry(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import registry registry_cells_docs = task(registry.extract_astro_registry_cell_types)() registry_dags_docs = task(registry.extract_astro_registry_dags)() - split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[registry_cells_docs]) - split_code_docs = task(split.split_python).expand(dfs=[registry_dags_docs]) + split_code_docs = task(chunking_utils.split_python).expand(dfs=[registry_dags_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-slack.py b/airflow/dags/ingestion/ask-astro-load-slack.py index 8fbf1b44..ee984644 100644 --- a/airflow/dags/ingestion/ask-astro-load-slack.py +++ b/airflow/dags/ingestion/ask-astro-load-slack.py @@ -42,12 +42,12 @@ def ask_astro_load_slack(): DAG should run nightly to capture threads between archive periods. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import slack slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources) - split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[slack_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index 6135f82d..913b2301 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -38,7 +38,7 @@ def ask_astro_load_stackoverflow(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split + from include.tasks import chunking_utils from include.tasks.extract import stack_overflow stack_overflow_docs = ( @@ -47,7 +47,7 @@ def ask_astro_load_stackoverflow(): .expand(tag=stackoverflow_tags) ) - split_md_docs = task(split.split_markdown).expand(dfs=[stack_overflow_docs]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[stack_overflow_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index a71fd090..e8e909ae 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -75,7 +75,7 @@ def ask_astro_load_bulk(): """ - from include.tasks import split + from include.tasks import chunking_utils @task def get_schema_and_process(schema_file: str) -> list: @@ -432,11 +432,11 @@ def import_baseline( python_code_tasks = [registry_dags_docs] - split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=markdown_tasks) - split_code_docs = task(split.split_python).expand(dfs=python_code_tasks) + split_code_docs = task(chunking_utils.split_python).expand(dfs=python_code_tasks) - split_html_docs = task(split.split_html).expand(dfs=html_tasks) + split_html_docs = task(chunking_utils.split_html).expand(dfs=html_tasks) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, diff --git a/airflow/include/tasks/split.py b/airflow/include/tasks/chunking_utils.py similarity index 65% rename from airflow/include/tasks/split.py rename to airflow/include/tasks/chunking_utils.py index ba387009..92a3da07 100644 --- a/airflow/include/tasks/split.py +++ b/airflow/include/tasks/chunking_utils.py @@ -1,13 +1,28 @@ from __future__ import annotations +import logging + import pandas as pd +import tiktoken from langchain.schema import Document from langchain.text_splitter import ( - Language, RecursiveCharacterTextSplitter, ) from langchain_community.document_transformers import Html2TextTransformer +logger = logging.getLogger("airflow.task") + +TARGET_CHUNK_SIZE = 2500 + + +def enforce_max_token_len(text: str) -> str: + encoding = tiktoken.get_encoding("cl100k_base") + encoded_text = encoding.encode(text) + if len(encoded_text) > 8191: + logger.info("Token length of string exceeds the max content length of the tokenizer. Truncating...") + return encoding.decode(encoded_text[:8191]) + return text + def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame: """ @@ -27,11 +42,25 @@ def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame: df = pd.concat(dfs, axis=0, ignore_index=True) - splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200, separators=["\n\n", "\n", " ", ""]) + # directly from the langchain splitter library + separators = ["\n\n", "\n", " ", ""] + splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( + # cl100k_base is used for text ada 002 and later embedding models + encoding_name="cl100k_base", + chunk_size=TARGET_CHUNK_SIZE, + chunk_overlap=200, + separators=separators, + is_separator_regex=True, + ) df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)])) df = df.explode("doc_chunks", ignore_index=True) df["content"] = df["doc_chunks"].apply(lambda x: x.page_content) + + # Remove blank doc chunks + df = df[~df["content"].apply(lambda x: x.isspace() or x == "")] + + df["content"] = df["content"].apply(enforce_max_token_len) df.drop(["doc_chunks"], inplace=True, axis=1) df.reset_index(inplace=True, drop=True) @@ -56,15 +85,36 @@ def split_python(dfs: list[pd.DataFrame]) -> pd.DataFrame: df = pd.concat(dfs, axis=0, ignore_index=True) - splitter = RecursiveCharacterTextSplitter.from_language( - language=Language.PYTHON, - # chunk_size=50, - chunk_overlap=0, + # directly from the langchain splitter library + python_separators = [ + # First, try to split along class definitions + "\nclass ", + "\ndef ", + "\n\tdef ", + # Now split by the normal type of lines + "\n\n", + "\n", + " ", + "", + ] + + splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( + # cl100k_base is used for text ada 002 and later embedding models + encoding_name="cl100k_base", + chunk_size=TARGET_CHUNK_SIZE, + chunk_overlap=200, + separators=python_separators, + is_separator_regex=True, ) df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)])) df = df.explode("doc_chunks", ignore_index=True) df["content"] = df["doc_chunks"].apply(lambda x: x.page_content) + + # Remove blank doc chunks + df = df[~df["content"].apply(lambda x: x.isspace() or x == "")] + + df["content"] = df["content"].apply(enforce_max_token_len) df.drop(["doc_chunks"], inplace=True, axis=1) df.reset_index(inplace=True, drop=True) @@ -93,9 +143,10 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( # cl100k_base is used for text ada 002 and later embedding models encoding_name="cl100k_base", - chunk_size=4000, + chunk_size=TARGET_CHUNK_SIZE, chunk_overlap=200, separators=separators, + is_separator_regex=True, ) # Split by chunking first @@ -110,17 +161,8 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: # Remove blank doc chunks df = df[~df["content"].apply(lambda x: x.isspace() or x == "")] + df["content"] = df["content"].apply(enforce_max_token_len) df.drop(["doc_chunks"], inplace=True, axis=1) df.reset_index(inplace=True, drop=True) return df - - -def split_list(urls: list[str], chunk_size: int = 0) -> list[list[str]]: - """ - split the list of string into chunk of list of string - - param urls: URL list we want to chunk - param chunk_size: Max size of chunked list - """ - return [urls[i : i + chunk_size] for i in range(0, len(urls), chunk_size)] diff --git a/airflow/include/tasks/extract/airflow_docs.py b/airflow/include/tasks/extract/airflow_docs.py index 7869745e..cb2b74bd 100644 --- a/airflow/include/tasks/extract/airflow_docs.py +++ b/airflow/include/tasks/extract/airflow_docs.py @@ -4,12 +4,11 @@ import urllib.parse import pandas as pd -import requests from bs4 import BeautifulSoup from weaviate.util import generate_uuid5 from airflow.decorators import task -from include.tasks.extract.utils.html_utils import get_internal_links +from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links @task @@ -46,7 +45,7 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: df = pd.DataFrame(docs_links, columns=["docLink"]) - df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content) + df["html_content"] = df["docLink"].apply(fetch_page_content) df["content"] = df["html_content"].apply( lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main")) diff --git a/airflow/include/tasks/extract/astronomer_providers_docs.py b/airflow/include/tasks/extract/astronomer_providers_docs.py index 8eafc053..9e67d5d9 100644 --- a/airflow/include/tasks/extract/astronomer_providers_docs.py +++ b/airflow/include/tasks/extract/astronomer_providers_docs.py @@ -1,14 +1,48 @@ from __future__ import annotations +import re +import urllib.parse + import pandas as pd +from bs4 import BeautifulSoup +from weaviate.util import generate_uuid5 -from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe +from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links def extract_provider_docs() -> list[pd.DataFrame]: exclude_docs = ["_api", "_modules", "_sources", "changelog.html", "genindex.html", "py-modindex.html", "#"] base_url = "https://astronomer-providers.readthedocs.io/en/stable/" - urls = get_internal_links(base_url, exclude_docs) - df = urls_to_dataframe(urls, "astronomer-providers") + all_links = get_internal_links(base_url, exclude_docs) + + docs_url_parts = urllib.parse.urlsplit(base_url) + docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" + + # make sure we didn't accidentally pickup any unrelated links in recursion + # get rid of older versions of the docs, only care about "stable" version docs + + old_version_doc_pattern = r"/(\d+\.)*\d+/" + + def is_doc_link_invalid(link): + return (docs_url_base not in link) or re.search(old_version_doc_pattern, link) or "/latest/" in link + + invalid_doc_links = {link if is_doc_link_invalid(link) else "" for link in all_links} + docs_links = all_links - invalid_doc_links + + df = pd.DataFrame(docs_links, columns=["docLink"]) + + df["html_content"] = df["docLink"].apply(fetch_page_content) + + df["content"] = df["html_content"].apply( + lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main")) + ) + df["content"] = df["content"].apply(lambda x: re.sub("ΒΆ", "", x)) + + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "astronomer-providers" + df.reset_index(drop=True, inplace=True) + + # column order matters for uuid generation + df = df[["docSource", "sha", "content", "docLink"]] return [df] diff --git a/airflow/include/tasks/extract/utils/html_utils.py b/airflow/include/tasks/extract/utils/html_utils.py index c8034676..01a00601 100644 --- a/airflow/include/tasks/extract/utils/html_utils.py +++ b/airflow/include/tasks/extract/utils/html_utils.py @@ -65,13 +65,13 @@ def is_excluded_url(url: str, exclude_literal: list[str]) -> bool: def clean_tags(text_content: str, tags: list[str] | None = None) -> str | None: """ - Clean the HTML content by removing script and style tags, collapsing whitespaces, and extracting text. + Clean the HTML content by removing unrelated tags, collapsing whitespaces, and extracting text. param text_content (str): The HTML content to be cleaned. param tags: List of html tag want to clean """ if tags is None: - tags = ["script", "style"] + tags = ["script", "style", "button", "img", "svg"] soup = BeautifulSoup(text_content, "html.parser").find("body") if soup is None: @@ -141,7 +141,7 @@ def get_page_links(url: str, current_page_content: bytes, exclude_literal: list[ continue attempted_urls.add(href) new_page_content = fetch_page_content(href) - if (not new_page_content) or generate_uuid5(new_page_content) in page_content_hash: + if (not new_page_content) or generate_uuid5(new_page_content) in internal_page_hashset: continue logger.info(href) internal_urls.add(href) diff --git a/airflow/tests/test_baseline.py b/airflow/tests/test_baseline.py index 0b3f71d7..460e90f9 100644 --- a/airflow/tests/test_baseline.py +++ b/airflow/tests/test_baseline.py @@ -3,7 +3,7 @@ from textwrap import dedent import pandas as pd -from include.tasks import ingest, split +from include.tasks import chunking_utils, ingest from weaviate_provider.hooks.weaviate import WeaviateHook from weaviate_provider.operators.weaviate import WeaviateCheckSchemaBranchOperator, WeaviateCreateSchemaOperator @@ -164,7 +164,7 @@ def check_original_object(original_doc: list[pd.DataFrame], doc_link: str = test original_doc = get_existing_doc() test_doc = create_test_object(original_doc) - split_md_docs = task(split.split_markdown).expand(dfs=[test_doc]) + split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[test_doc]) _upsert_data = ( task.weaviate_import(ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID)