OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Check the existence of records in the elastic search vector store

  • Thread starter Thread starter zbeedatm
  • Start date Start date
Z

zbeedatm

Guest
I have such entries in my elasticsearch index: enter image description here

It's unstructured data, in this case the content of a PDF that was split into chunks, then a LangChain document was created for each chunk and pushed to the index as a different vector.

I faced the issue that each time I loaded the pdf and pushed it to the index, new entries were pushed (with the same content). The code that is used for that purpose is:

Code:
def push_to_elasticsearch(es_index_name,embeddings,docs):

elastic_vector_search = ElasticsearchStore(
    # es_cloud_id=es_cloud_id,
    # es_endpoint=es_endpoint,
    # es_apikey=es_apikey,

    index_name=es_index_name,
    # docs=docs,
    embedding=embeddings,
    es_connection=es_connection
)

docs_ids = [doc.metadata["hash_id"] for doc in docs]
# # print("---------------------------------------------------------->>>>>>>>>>", docs_ids)
vector_exists_dict = check_vectors_exist_by_hash_id(es_index_name, docs_ids)
print("---------------------------------------------------------->>>>>>>>>>", vector_exists_dict)
idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(doc.metadata["hash_id"], False)]
# idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(calculate_content_hash(doc.page_content), False)]

print('Len of docs:', len(docs))
print('Len of idempotency_docs:', len(idempotency_docs))

elastic_vector_search.add_documents(documents=docs)

db = ElasticsearchStore.from_documents(
    docs,
    embeddings,
    es_connection=es_connection,
    index_name=es_index_name,
)

return db

In order to check the existence of vectors before pushing them, I guess I couldn't use the existing _id field (since it's not pushed yet), so I added a new hash_id field in the metadata column (based on hash content), and I want to use it for searching the index before pushing. I still don't know how exactly to implement it. I thought about this implementation:

Code:
def check_vectors_exist_by_hash_id(index_name, docs_hash_ids):
"""
Check if vectors exist for a list of document IDs.

Args:
    doc_ids (list): List of document IDs to check.

Returns:
    dict: A dictionary where keys are document IDs and values are boolean (True if vector exists, False otherwise).
"""
vector_exists_dict = {}
try:
    # Fetch documents by IDs
    responses = es_connection.mget(index=index_name, body={"hash_ids": docs_hash_ids})
    for response in responses["docs"]:
        doc_id = response["hash_id"]
        vector_exists_dict[doc_id] = "embedding" in response["_source"]
except Exception as e:
    print(f"Error checking vector existence for doc_ids: {e}")

return vector_exists_dict

But haven't figured out yet how to filter by these hash_ids!
<p>I have such entries in my elasticsearch index:
<a href="https://i.sstatic.net/8MsCz7zT.png" rel="nofollow noreferrer"><img src="https://i.sstatic.net/8MsCz7zT.png" alt="enter image description here" /></a></p>
<p>It's unstructured data, in this case the content of a PDF that was split into chunks, then a LangChain document was created for each chunk and pushed to the index as a different vector.</p>
<p>I faced the issue that each time I loaded the pdf and pushed it to the index, new entries were pushed (with the same content). The code that is used for that purpose is:</p>
<pre><code>def push_to_elasticsearch(es_index_name,embeddings,docs):

elastic_vector_search = ElasticsearchStore(
# es_cloud_id=es_cloud_id,
# es_endpoint=es_endpoint,
# es_apikey=es_apikey,

index_name=es_index_name,
# docs=docs,
embedding=embeddings,
es_connection=es_connection
)

docs_ids = [doc.metadata["hash_id"] for doc in docs]
# # print("---------------------------------------------------------->>>>>>>>>>", docs_ids)
vector_exists_dict = check_vectors_exist_by_hash_id(es_index_name, docs_ids)
print("---------------------------------------------------------->>>>>>>>>>", vector_exists_dict)
idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(doc.metadata["hash_id"], False)]
# idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(calculate_content_hash(doc.page_content), False)]

print('Len of docs:', len(docs))
print('Len of idempotency_docs:', len(idempotency_docs))

elastic_vector_search.add_documents(documents=docs)

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_connection=es_connection,
index_name=es_index_name,
)

return db
</code></pre>
<p>In order to check the existence of vectors before pushing them, I guess I couldn't use the existing _id field (since it's not pushed yet), so I added a new hash_id field in the metadata column (based on hash content), and I want to use it for searching the index before pushing. I still don't know how exactly to implement it. I thought about this implementation:</p>
<pre><code>def check_vectors_exist_by_hash_id(index_name, docs_hash_ids):
"""
Check if vectors exist for a list of document IDs.

Args:
doc_ids (list): List of document IDs to check.

Returns:
dict: A dictionary where keys are document IDs and values are boolean (True if vector exists, False otherwise).
"""
vector_exists_dict = {}
try:
# Fetch documents by IDs
responses = es_connection.mget(index=index_name, body={"hash_ids": docs_hash_ids})
for response in responses["docs"]:
doc_id = response["hash_id"]
vector_exists_dict[doc_id] = "embedding" in response["_source"]
except Exception as e:
print(f"Error checking vector existence for doc_ids: {e}")

return vector_exists_dict
</code></pre>
<p>But haven't figured out yet how to filter by these hash_ids!</p>
 
Top