Skip to content

Milvus

MilvusDocumentRetriever

Bases: Retriever, MilvusVectorStoreParams

Document Retriever using Milvus.

This class implements a document retriever that uses Milvus as the underlying vector store. It extends the VectorStoreNode class and provides functionality to retrieve documents based on vector similarity.

Attributes:

Name Type Description
group Literal[RETRIEVERS]

The group the node belongs to.

name str

The name of the node.

vector_store MilvusVectorStore | None

The MilvusVectorStore instance.

filters dict[str, Any] | None

Filters to apply when retrieving documents.

top_k int

The maximum number of documents to retrieve.

document_retriever MilvusDocumentRetriever

The document retriever component.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments for initializing the node.

{}
Source code in dynamiq/nodes/retrievers/milvus.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class MilvusDocumentRetriever(Retriever, MilvusVectorStoreParams):
    """
    Document Retriever using Milvus.

    This class implements a document retriever that uses Milvus as the underlying vector store.
    It extends the VectorStoreNode class and provides functionality to retrieve documents
    based on vector similarity.

    Attributes:
        group (Literal[NodeGroup.RETRIEVERS]): The group the node belongs to.
        name (str): The name of the node.
        vector_store (MilvusVectorStore | None): The MilvusVectorStore instance.
        filters (dict[str, Any] | None): Filters to apply when retrieving documents.
        top_k (int): The maximum number of documents to retrieve.
        document_retriever (MilvusDocumentRetrieverComponent): The document retriever component.

    Args:
        **kwargs: Keyword arguments for initializing the node.
    """

    name: str = "MilvusDocumentRetriever"
    connection: Milvus | None = None
    vector_store: MilvusVectorStore | None = None
    document_retriever: MilvusDocumentRetrieverComponent | None = None

    def __init__(self, **kwargs):
        """
        Initialize the MilvusDocumentRetriever.

        If neither vector_store nor connection is provided in kwargs, a default Milvus connection will be created.

        Args:
            **kwargs: Keyword arguments for initializing the node.
        """
        if kwargs.get("vector_store") is None and kwargs.get("connection") is None:
            kwargs["connection"] = Milvus()
        super().__init__(**kwargs)

    @property
    def vector_store_cls(self):
        return MilvusVectorStore

    @property
    def vector_store_params(self):
        return self.model_dump(include=set(MilvusVectorStoreParams.model_fields)) | {
            "connection": self.connection,
            "client": self.client,
        }

    def init_components(self, connection_manager: ConnectionManager | None = None):
        """
        Initialize the components of the MilvusDocumentRetriever.

        This method sets up the document retriever component if it hasn't been initialized yet.

        Args:
            connection_manager (ConnectionManager): The connection manager to use.
                Defaults to a new ConnectionManager instance.
        """
        connection_manager = connection_manager or ConnectionManager()
        super().init_components(connection_manager)
        if self.document_retriever is None:
            self.document_retriever = MilvusDocumentRetrieverComponent(
                vector_store=self.vector_store, filters=self.filters, top_k=self.top_k
            )

    def execute(self, input_data: RetrieverInputSchema, config: RunnableConfig = None, **kwargs) -> dict[str, Any]:
        """
        Execute the document retrieval process.

        This method takes an input embedding, retrieves similar documents using the
        document retriever component, and returns the retrieved documents.

        Args:
            input_data (RetrieverInputSchema): The input data containing the query embedding.
            config (RunnableConfig, optional): The configuration for the execution.
            **kwargs: Additional keyword arguments.

        Returns:
            dict[str, Any]: A dictionary containing the retrieved documents.
        """
        config = ensure_config(config)
        self.run_on_node_execute_run(config.callbacks, **kwargs)

        query_embedding = input_data.embedding
        query = input_data.query
        content_key = input_data.content_key
        embedding_key = input_data.embedding_key
        filters = input_data.filters or self.filters
        top_k = input_data.top_k or self.top_k

        output = self.document_retriever.run(
            query_embedding,
            query=query,
            filters=filters,
            top_k=top_k,
            content_key=content_key,
            embedding_key=embedding_key,
        )

        return {
            "documents": output["documents"],
        }

__init__(**kwargs)

Initialize the MilvusDocumentRetriever.

If neither vector_store nor connection is provided in kwargs, a default Milvus connection will be created.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments for initializing the node.

{}
Source code in dynamiq/nodes/retrievers/milvus.py
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, **kwargs):
    """
    Initialize the MilvusDocumentRetriever.

    If neither vector_store nor connection is provided in kwargs, a default Milvus connection will be created.

    Args:
        **kwargs: Keyword arguments for initializing the node.
    """
    if kwargs.get("vector_store") is None and kwargs.get("connection") is None:
        kwargs["connection"] = Milvus()
    super().__init__(**kwargs)

execute(input_data, config=None, **kwargs)

Execute the document retrieval process.

This method takes an input embedding, retrieves similar documents using the document retriever component, and returns the retrieved documents.

Parameters:

Name Type Description Default
input_data RetrieverInputSchema

The input data containing the query embedding.

required
config RunnableConfig

The configuration for the execution.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A dictionary containing the retrieved documents.

Source code in dynamiq/nodes/retrievers/milvus.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def execute(self, input_data: RetrieverInputSchema, config: RunnableConfig = None, **kwargs) -> dict[str, Any]:
    """
    Execute the document retrieval process.

    This method takes an input embedding, retrieves similar documents using the
    document retriever component, and returns the retrieved documents.

    Args:
        input_data (RetrieverInputSchema): The input data containing the query embedding.
        config (RunnableConfig, optional): The configuration for the execution.
        **kwargs: Additional keyword arguments.

    Returns:
        dict[str, Any]: A dictionary containing the retrieved documents.
    """
    config = ensure_config(config)
    self.run_on_node_execute_run(config.callbacks, **kwargs)

    query_embedding = input_data.embedding
    query = input_data.query
    content_key = input_data.content_key
    embedding_key = input_data.embedding_key
    filters = input_data.filters or self.filters
    top_k = input_data.top_k or self.top_k

    output = self.document_retriever.run(
        query_embedding,
        query=query,
        filters=filters,
        top_k=top_k,
        content_key=content_key,
        embedding_key=embedding_key,
    )

    return {
        "documents": output["documents"],
    }

init_components(connection_manager=None)

Initialize the components of the MilvusDocumentRetriever.

This method sets up the document retriever component if it hasn't been initialized yet.

Parameters:

Name Type Description Default
connection_manager ConnectionManager

The connection manager to use. Defaults to a new ConnectionManager instance.

None
Source code in dynamiq/nodes/retrievers/milvus.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def init_components(self, connection_manager: ConnectionManager | None = None):
    """
    Initialize the components of the MilvusDocumentRetriever.

    This method sets up the document retriever component if it hasn't been initialized yet.

    Args:
        connection_manager (ConnectionManager): The connection manager to use.
            Defaults to a new ConnectionManager instance.
    """
    connection_manager = connection_manager or ConnectionManager()
    super().init_components(connection_manager)
    if self.document_retriever is None:
        self.document_retriever = MilvusDocumentRetrieverComponent(
            vector_store=self.vector_store, filters=self.filters, top_k=self.top_k
        )