infrahub_sdk.client
Classes
InfrahubClient
GraphQL Client to interact with Infrahub.
Methods:
get
get(self, kind: type[SchemaType], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType | None
Show 6 other overloads
get
get(self, kind: type[SchemaType], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType
get
get(self, kind: type[SchemaType], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType
get
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode | None
get
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode
get
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode
get
get(self, kind: str | type[SchemaType], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> InfrahubNode | SchemaType | None
delete
delete(self, kind: str | type[SchemaType], id: str, branch: str | None = None) -> None
create
create(self, kind: str | type[SchemaType], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNode | SchemaType
Show 2 other overloads
get_version
get_version(self) -> str
Return the Infrahub version.
get_user
get_user(self) -> dict
Return user information.
get_user_permissions
get_user_permissions(self) -> dict
Return user permissions.
count
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, query_name: str | None = None, **kwargs: Any) -> int
Return the number of nodes of a given kind.
traverse_paths
traverse_paths(self, source: str | InfrahubNode, destination: str | InfrahubNode) -> PathTraversalResult
Find the shortest path(s) between two nodes in the graph.
Kind filters (kind_filter, excluded_kinds, included_kinds) accept
kind-name strings and/or generated protocol classes. relationship_filter
matches schema relationship identifiers (for example dcimconnector__dcimendpoint),
not the per-side names shown in the result.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.destination: Node to reach, as a UUID string or anInfrahubNodeinstance.max_depth: Maximum number of relationship hops to explore.max_paths: Maximum number of paths to return.kind_filter: Only traverse through nodes of these kinds.relationship_filter: Only traverse through these schema relationship identifiers.excluded_namespaces: Schema namespaces to exclude from traversal.excluded_kinds: Node kinds to exclude from traversal.included_kinds: Node kinds to re-include when otherwise excluded by default.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
path_exists
path_exists(self, source: str | InfrahubNode, destination: str | InfrahubNode) -> bool
Return whether at least one path connects source to destination.
Convenience wrapper around :meth:traverse_paths for checks: it requests a single
path (the cheapest way to answer "is there a path?") and returns True if one was
found. Accepts the same source/destination and filter arguments as traverse_paths.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.destination: Node to reach, as a UUID string or anInfrahubNodeinstance.max_depth: Maximum number of relationship hops to explore.kind_filter: Only traverse through nodes of these kinds.relationship_filter: Only traverse through these schema relationship identifiers.excluded_namespaces: Schema namespaces to exclude from traversal.excluded_kinds: Node kinds to exclude from traversal.included_kinds: Node kinds to re-include when otherwise excluded by default.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
reachable_nodes
reachable_nodes(self, source: str | InfrahubNode, target_kinds: list[str | type[SchemaType]]) -> ReachableNodesResult
Find all nodes of the given kinds reachable from a source node.
target_kinds accepts kind-name strings and/or generated protocol classes.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.target_kinds: Kinds of nodes to look for, as kind-name strings or protocol classes.max_depth: Maximum number of relationship hops to explore.max_results: Maximum number of reachable nodes to return.max_paths: Maximum number of paths to compute per reachable node.shortest_paths_only: When True, only return the shortest path(s) to each node.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
get_many
get_many(self, spec: Mapping[str, Mapping[str, Sequence[str]]]) -> dict[str, list[InfrahubNode]]
Fetch nodes of multiple kinds in a single GraphQL operation.
spec maps each kind to a dict with a non-empty ids list (node UUIDs)
and an optional attributes list (attribute names to populate on the
returned nodes). One aliased subselection is emitted per kind, so the cost
is a single round-trip regardless of how many kinds the spec contains.
Returned nodes only have the requested attributes populated; relationships
are not fetched. For full nodes, follow up with :meth:get or :meth:filters.
Args:
spec: Mapping of kind name to a sub-mapping with keysids(list of node UUIDs, required and non-empty) andattributes(list of attribute names, optional - defaults to none, in which case only the id and typename are returned).branch: Name of the branch to query from. Defaults todefault_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.populate_store: When True (default), every hydrated node is added toclient.storeso later lookups by id can skip the network.
Raises:
ValidationError: If the spec cannot be compiled into a query (empty, missingids, malformed kind/attribute identifiers).GraphQLError: If the server rejects the query (for example, an unknown kind or attribute name in the loaded schema).
all
all(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[SchemaType]
Show 2 other overloads
all
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[InfrahubNode]
all
all(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None) -> list[InfrahubNode] | list[SchemaType]
Retrieve all nodes of a given kind.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else All_<kind> is used.
Returns:
- list[InfrahubNode]: List of Nodes
filters
filters(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[SchemaType]
Show 2 other overloads
filters
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[InfrahubNode]
filters
filters(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> list[InfrahubNode] | list[SchemaType]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else Filters_<kind> is used.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
clone
clone(self, branch: str | None = None) -> InfrahubClient
Return a cloned version of the client using the same configuration.
execute_graphql
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, tracker: str | None = None, operation_name: str | None = None) -> dict
Execute a GraphQL query (or mutation).
If retry_on_failure is True, the query will retry until the server becomes reachable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.operation_name: GraphQL operation name, sent asoperationNamein the request payload so tracing/observability tools can identify the operation. Defaults to None.
Returns:
- The GraphQL data payload (response["data"]).
Raises:
GraphQLError: When the GraphQL response contains errors.ServerNotReachableError: If the server is not reachable after exhausting retries.AuthenticationError: If the server returns a 401 or 403 response.URLNotFoundError: If the server returns a 404 response.Error: If the response is unexpectedly missing.
refresh_login
refresh_login(self) -> None
login
login(self, refresh: bool = False) -> None
query_gql_query
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
create_diff
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> list[NodeDiff]
get_diff_tree
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
Raises:
ValueError: Iffrom_timeis later thanto_time.
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaType | None
Show 2 other overloads
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNode | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNode | SchemaType | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPAddressPool.
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaType | None
Show 2 other overloads
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNode | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNode | SchemaType | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPPrefixPool.
create_batch
create_batch(self, return_exceptions: bool = False) -> InfrahubBatch
get_list_repositories
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
repository_update_commit
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
convert_object_type
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNode
Convert a given node to another kind on a given branch.
fields_mapping keys are target fields names and its values indicate how to fill in these fields.
Any mandatory field not having an equivalent field in the source kind should be specified in this
mapping. See https://docs.infrahub.app/guides/object-conversion for more information.
InfrahubClientSync
Methods:
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync | None
Show 6 other overloads
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync
get
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync | None
get
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync
get
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync
get
get(self, kind: str | type[SchemaTypeSync], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync | None
delete
delete(self, kind: str | type[SchemaTypeSync], id: str, branch: str | None = None) -> None
create
create(self, kind: str | type[SchemaTypeSync], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync
Show 2 other overloads
get_version
get_version(self) -> str
Return the Infrahub version.
get_user
get_user(self) -> dict
Return user information.
get_user_permissions
get_user_permissions(self) -> dict
Return user permissions.
clone
clone(self, branch: str | None = None) -> InfrahubClientSync
Return a cloned version of the client using the same configuration.
execute_graphql
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, tracker: str | None = None, operation_name: str | None = None) -> dict
Execute a GraphQL query (or mutation).
If retry_on_failure is True, the query will retry until the server becomes reachable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.operation_name: GraphQL operation name, sent asoperationNamein the request payload so tracing/observability tools can identify the operation. Defaults to None.
Returns:
- The GraphQL data payload (
response["data"]).
Raises:
GraphQLError: When the GraphQL response contains errors.ServerNotReachableError: If the server is not reachable after exhausting retries.AuthenticationError: If the server returns a 401 or 403 response.URLNotFoundError: If the server returns a 404 response.Error: If the response is unexpectedly missing.
count
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, query_name: str | None = None, **kwargs: Any) -> int
Return the number of nodes of a given kind.
traverse_paths
traverse_paths(self, source: str | InfrahubNodeSync, destination: str | InfrahubNodeSync) -> PathTraversalResult
Find the shortest path(s) between two nodes in the graph.
Kind filters (kind_filter, excluded_kinds, included_kinds) accept
kind-name strings and/or generated protocol classes. relationship_filter
matches schema relationship identifiers (for example dcimconnector__dcimendpoint),
not the per-side names shown in the result.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.destination: Node to reach, as a UUID string or anInfrahubNodeinstance.max_depth: Maximum number of relationship hops to explore.max_paths: Maximum number of paths to return.kind_filter: Only traverse through nodes of these kinds.relationship_filter: Only traverse through these schema relationship identifiers.excluded_namespaces: Schema namespaces to exclude from traversal.excluded_kinds: Node kinds to exclude from traversal.included_kinds: Node kinds to re-include when otherwise excluded by default.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
path_exists
path_exists(self, source: str | InfrahubNodeSync, destination: str | InfrahubNodeSync) -> bool
Return whether at least one path connects source to destination.
Convenience wrapper around :meth:traverse_paths for checks: it requests a single
path (the cheapest way to answer "is there a path?") and returns True if one was
found. Accepts the same source/destination and filter arguments as traverse_paths.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.destination: Node to reach, as a UUID string or anInfrahubNodeinstance.max_depth: Maximum number of relationship hops to explore.kind_filter: Only traverse through nodes of these kinds.relationship_filter: Only traverse through these schema relationship identifiers.excluded_namespaces: Schema namespaces to exclude from traversal.excluded_kinds: Node kinds to exclude from traversal.included_kinds: Node kinds to re-include when otherwise excluded by default.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
reachable_nodes
reachable_nodes(self, source: str | InfrahubNodeSync, target_kinds: list[str | type[SchemaTypeSync]]) -> ReachableNodesResult
Find all nodes of the given kinds reachable from a source node.
target_kinds accepts kind-name strings and/or generated protocol classes.
Requires Infrahub 1.10 or later.
Args:
source: Node to start from, as a UUID string or anInfrahubNodeinstance.target_kinds: Kinds of nodes to look for, as kind-name strings or protocol classes.max_depth: Maximum number of relationship hops to explore.max_results: Maximum number of reachable nodes to return.max_paths: Maximum number of paths to compute per reachable node.shortest_paths_only: When True, only return the shortest path(s) to each node.branch: Name of the branch to query from. Defaults to default_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.
Raises:
VersionNotSupportedError: If the server does not support graph traversal (pre-1.10).GraphQLError: When the GraphQL response contains errors (e.g. unknown node).
get_many
get_many(self, spec: Mapping[str, Mapping[str, Sequence[str]]]) -> dict[str, list[InfrahubNodeSync]]
Fetch nodes of multiple kinds in a single GraphQL operation.
spec maps each kind to a dict with a non-empty ids list (node UUIDs)
and an optional attributes list (attribute names to populate on the
returned nodes). One aliased subselection is emitted per kind, so the cost
is a single round-trip regardless of how many kinds the spec contains.
Returned nodes only have the requested attributes populated; relationships
are not fetched. For full nodes, follow up with :meth:get or :meth:filters.
Args:
spec: Mapping of kind name to a sub-mapping with keysids(list of node UUIDs, required and non-empty) andattributes(list of attribute names, optional - defaults to none, in which case only the id and typename are returned).branch: Name of the branch to query from. Defaults todefault_branch.at: Time of the query. Defaults to now.timeout: Overrides the default GraphQL timeout, in seconds.populate_store: When True (default), every hydrated node is added toclient.storeso later lookups by id can skip the network.
Raises:
ValidationError: If the spec cannot be compiled into a query (empty, missingids, malformed kind/attribute identifiers).GraphQLError: If the server rejects the query (for example, an unknown kind or attribute name in the loaded schema).
all
all(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[SchemaTypeSync]
Show 2 other overloads
all
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[InfrahubNodeSync]
all
all(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve all nodes of a given kind.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else All_<kind> is used.
Returns:
- list[InfrahubNodeSync]: List of Nodes
filters
filters(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[SchemaTypeSync]
Show 2 other overloads
filters
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[InfrahubNodeSync]
filters
filters(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else Filters_<kind> is used.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
create_batch
create_batch(self, return_exceptions: bool = False) -> InfrahubBatchSync
Create a batch to execute multiple queries concurrently.
Executing the batch will be performed using a thread pool, meaning it cannot guarantee the execution order. It is not recommended to use such batch to manipulate objects that depend on each others.
get_list_repositories
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
query_gql_query
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
create_diff
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> list[NodeDiff]
get_diff_tree
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
Raises:
ValueError: Iffrom_timeis later thanto_time.
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaTypeSync | None
Show 2 other overloads
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNodeSync | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPAddressPool.
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaTypeSync | None
Show 2 other overloads
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNodeSync | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.size: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPPrefixPool.
repository_update_commit
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
refresh_login
refresh_login(self) -> None
login
login(self, refresh: bool = False) -> None
convert_object_type
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNodeSync
Convert a given node to another kind on a given branch.
fields_mapping keys are target fields names and its values indicate how to fill in these fields.
Any mandatory field not having an equivalent field in the source kind should be specified in this
mapping. See https://docs.infrahub.app/guides/object-conversion for more information.
ProcessRelationsNode
ProxyConfig
ProxyConfigSync
ProcessRelationsNodeSync
BaseClient
Base class for InfrahubClient and InfrahubClientSync.
Methods:
request_context
request_context(self) -> RequestContext | None
request_context
request_context(self, request_context: RequestContext) -> None
start_tracking
start_tracking(self, identifier: str | None = None, params: dict[str, Any] | None = None, delete_unused_nodes: bool = False, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> Self
set_context_properties
set_context_properties(self, identifier: str, params: dict[str, str] | None = None, delete_unused_nodes: bool = True, reset: bool = True, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> None
Functions
handle_relogin
handle_relogin(func: Callable[..., Coroutine[Any, Any, httpx.Response]]) -> Callable[..., Coroutine[Any, Any, httpx.Response]]
handle_relogin_sync
handle_relogin_sync(func: Callable[..., httpx.Response]) -> Callable[..., httpx.Response]
get_kind_as_string
get_kind_as_string(kind: str | type[SchemaType | SchemaTypeSync]) -> str
compile_get_many_query
compile_get_many_query(spec: Mapping[str, Mapping[str, Sequence[str]]]) -> tuple[str, dict[str, list[str]], list[str]]
Compile a kind-to-spec mapping into a single aliased GraphQL query.
Each entry of the input maps a kind name to a sub-mapping with an
ids list (required, non-empty) and an optional attributes list.
Returns (query, variables, kinds_ordered). The query is one operation with one
aliased block per kind (k0, k1, ...) and one [ID] variable per kind
(ids_0, ids_1, ...). Ids and attribute names are deduplicated and sorted
so the emitted query is deterministic.
Raises:
ValidationError: If the spec is empty, has emptyidsfor a kind, omitsidsentirely, or contains kind/attribute names that are not valid GraphQL identifiers.