Skip to main content

infrahub_sdk.client

Functions

handle_relogin

handle_relogin(func: Callable[..., Coroutine[Any, Any, httpx.Response]])

handle_relogin_sync

handle_relogin_sync(func: Callable[..., httpx.Response])

Classes

ProcessRelationsNode

ProcessRelationsNodeSync

BaseClient

Base class for InfrahubClient and InfrahubClientSync

Methods:

request_context

request_context(self) -> RequestContext | None

request_context

request_context(self, request_context: RequestContext) -> None

start_tracking

start_tracking(self, identifier: str | None = None, params: dict[str, Any] | None = None, delete_unused_nodes: bool = False, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> Self

set_context_properties

set_context_properties(self, identifier: str, params: dict[str, str] | None = None, delete_unused_nodes: bool = True, reset: bool = True, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> None

InfrahubClient

GraphQL Client to interact with Infrahub.

Methods:

get_version

get_version(self) -> str

Return the Infrahub version.

get_user

get_user(self) -> dict

Return user information

get_user_permissions

get_user_permissions(self) -> dict

Return user permissions

create

create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNode

create

create(self, kind: type[SchemaType], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaType

create

create(self, kind: str | type[SchemaType], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNode | SchemaType

delete

delete(self, kind: str | type[SchemaType], id: str, branch: str | None = None) -> None

get

get(self, kind: type[SchemaType], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaType | None

get

get(self, kind: type[SchemaType], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaType

get

get(self, kind: type[SchemaType], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaType

get

get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNode | None

get

get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNode

get

get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNode

get

get(self, kind: str | type[SchemaType], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, **kwargs: Any) -> InfrahubNode | SchemaType | None

count

count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int

Return the number of nodes of a given kind.

all

all(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ...) -> list[SchemaType]

all

all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ...) -> list[InfrahubNode]

all

all(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None) -> list[InfrahubNode] | list[SchemaType]

Retrieve all nodes of a given kind

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.

Returns:

  • list[InfrahubNode]: List of Nodes

filters

filters(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., **kwargs: Any) -> list[SchemaType]

filters

filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., **kwargs: Any) -> list[InfrahubNode]

filters

filters(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, **kwargs: Any) -> list[InfrahubNode] | list[SchemaType]

Retrieve nodes of a given kind based on provided filters.

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • partial_match: Allow partial match of filter criteria for the query.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • **kwargs: Additional filter criteria for the query.

Returns:

  • list[InfrahubNodeSync]: List of Nodes that match the given filters.

clone

clone(self, branch: str | None = None) -> InfrahubClient

Return a cloned version of the client using the same configuration

execute_graphql

execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool = True, tracker: str | None = None) -> dict

Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.

Args:

  • query: GraphQL Query to execute, can be a query or a mutation
  • variables: Variables to pass along with the GraphQL query. Defaults to None.
  • branch_name: Name of the branch on which the query will be executed. Defaults to None.
  • at: Time when the query should be executed. Defaults to None.
  • timeout: Timeout in second for the query. Defaults to None.
  • raise_for_error: Flag to indicate that we need to raise an exception if the response has some errors. Defaults to True.

Raises: GraphQLError: description

Returns:

  • description

refresh_login

refresh_login(self) -> None

login

login(self, refresh: bool = False) -> None

query_gql_query

query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> dict

get_diff_summary

get_diff_summary(self, branch: str, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> list[NodeDiff]

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> SchemaType

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> CoreNode | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> CoreNode | SchemaType | None

Allocate a new IP address by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to set on the address to allocate.
  • address_type: Kind of the address to allocate.
  • data: A key/value map to use to set attributes values on the allocated address.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNode: Node corresponding to the allocated resource.

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> SchemaType

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> CoreNode | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> CoreNode | SchemaType | None

Allocate a new IP prefix by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to allocate.
  • member_type: Member type of the prefix to allocate.
  • prefix_type: Kind of the prefix to allocate.
  • data: A key/value map to use to set attributes values on the allocated prefix.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNode: Node corresponding to the allocated resource.

create_batch

create_batch(self, return_exceptions: bool = False) -> InfrahubBatch

get_list_repositories

get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]

repository_update_commit

repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool

InfrahubClientSync

Methods:

get_version

get_version(self) -> str

Return the Infrahub version.

get_user

get_user(self) -> dict

Return user information

get_user_permissions

get_user_permissions(self) -> dict

Return user permissions

create

create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNodeSync

create

create(self, kind: type[SchemaTypeSync], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaTypeSync

create

create(self, kind: str | type[SchemaTypeSync], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync

delete

delete(self, kind: str | type[SchemaTypeSync], id: str, branch: str | None = None) -> None

clone

clone(self, branch: str | None = None) -> InfrahubClientSync

Return a cloned version of the client using the same configuration

execute_graphql

execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool = True, tracker: str | None = None) -> dict

Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.

Args:

  • query: GraphQL Query to execute, can be a query or a mutation
  • variables: Variables to pass along with the GraphQL query. Defaults to None.
  • branch_name: Name of the branch on which the query will be executed. Defaults to None.
  • at: Time when the query should be executed. Defaults to None.
  • timeout: Timeout in second for the query. Defaults to None.
  • raise_for_error: Flag to indicate that we need to raise an exception if the response has some errors. Defaults to True.

Raises: GraphQLError: When an error occurs during the execution of the GraphQL query or mutation.

Returns:

  • The result of the GraphQL query or mutation.

count

count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int

Return the number of nodes of a given kind.

all

all(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ...) -> list[SchemaTypeSync]

all

all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ...) -> list[InfrahubNodeSync]

all

all(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None) -> list[InfrahubNodeSync] | list[SchemaTypeSync]

Retrieve all nodes of a given kind

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.

Returns:

  • list[InfrahubNodeSync]: List of Nodes

filters

filters(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., **kwargs: Any) -> list[SchemaTypeSync]

filters

filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., **kwargs: Any) -> list[InfrahubNodeSync]

filters

filters(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, **kwargs: Any) -> list[InfrahubNodeSync] | list[SchemaTypeSync]

Retrieve nodes of a given kind based on provided filters.

Args:

  • kind: kind of the nodes to query
  • at: Time of the query. Defaults to Now.
  • branch: Name of the branch to query from. Defaults to default_branch.
  • timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.
  • populate_store: Flag to indicate whether to populate the store with the retrieved nodes.
  • offset: The offset for pagination.
  • limit: The limit for pagination.
  • include: List of attributes or relationships to include in the query.
  • exclude: List of attributes or relationships to exclude from the query.
  • fragment: Flag to use GraphQL fragments for generic schemas.
  • prefetch_relationships: Flag to indicate whether to prefetch related node data.
  • partial_match: Allow partial match of filter criteria for the query.
  • parallel: Whether to use parallel processing for the query.
  • order: Ordering related options. Setting disable=True enhances performances.
  • **kwargs: Additional filter criteria for the query.

Returns:

  • list[InfrahubNodeSync]: List of Nodes that match the given filters.

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaTypeSync | None

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaTypeSync

get

get(self, kind: type[SchemaTypeSync], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> SchemaTypeSync

get

get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNodeSync | None

get

get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNodeSync

get

get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., **kwargs: Any) -> InfrahubNodeSync

get

get(self, kind: str | type[SchemaTypeSync], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync | None

create_batch

create_batch(self, return_exceptions: bool = False) -> InfrahubBatchSync

Create a batch to execute multiple queries concurrently.

Executing the batch will be performed using a thread pool, meaning it cannot guarantee the execution order. It is not recommended to use such batch to manipulate objects that depend on each others.

get_list_repositories

get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]

query_gql_query

query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> dict

get_diff_summary

get_diff_summary(self, branch: str, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> list[NodeDiff]

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> SchemaTypeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> CoreNodeSync | None

allocate_next_ip_address

allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> CoreNodeSync | SchemaTypeSync | None

Allocate a new IP address by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • prefix_length: Length of the prefix to set on the address to allocate.
  • address_type: Kind of the address to allocate.
  • data: A key/value map to use to set attributes values on the allocated address.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNodeSync: Node corresponding to the allocated resource.

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> SchemaTypeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool = ...) -> CoreNodeSync | None

allocate_next_ip_prefix

allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool = True) -> CoreNodeSync | SchemaTypeSync | None

Allocate a new IP prefix by using the provided resource pool.

Args:

  • resource_pool: Node corresponding to the pool to allocate resources from.
  • identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.
  • size: Length of the prefix to allocate.
  • member_type: Member type of the prefix to allocate.
  • prefix_type: Kind of the prefix to allocate.
  • data: A key/value map to use to set attributes values on the allocated prefix.
  • branch: Name of the branch to allocate from. Defaults to default_branch.
  • timeout: Flag to indicate whether to populate the store with the retrieved nodes.
  • tracker: The offset for pagination.
  • raise_for_error: The limit for pagination.

Returns: InfrahubNodeSync: Node corresponding to the allocated resource.

repository_update_commit

repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool

refresh_login

refresh_login(self) -> None

login

login(self, refresh: bool = False) -> None