module tarantool.connection_pool

This module provides API for interaction with Tarantool servers cluster.

class tarantool.connection_pool.ConnectionPool(addrs, user=None, password=None, socket_timeout=None, reconnect_max_attempts=0, reconnect_delay=0, connect_now=True, encoding='utf-8', call_16=False, connection_timeout=None, strategy_class=<class 'tarantool.connection_pool.RoundRobinStrategy'>, refresh_delay=1)

Represents the pool of connections to a cluster of Tarantool servers.

To work with ConnectionPool, box.info must be callable for the user on each server.

ConnectionPool is best suited to work with a single replicaset. Its API is the same as a single server Connection, but requests support mode parameter (a tarantool.Mode value) to choose between read-write and read-only pool instances:

>>> resp = conn.select('demo', 'AAAA', mode=tarantool.Mode.PREFER_RO)
>>> resp
- ['AAAA', 'Alpha']
Parameters
  • addrs (list) –

    List of dictionaries describing server addresses:

    {
        "host': "str" or None,     # mandatory
        "port": int or "str",      # mandatory
        "transport": "str",        # optional
        "ssl_key_file": "str",     # optional
        "ssl_cert_file": "str",    # optional
        "ssl_ca_file": "str",      # optional
        "ssl_ciphers": "str"       # optional
        "ssl_password": "str",     # optional
        "ssl_password_file": "str" # optional
        "auth_type": "str"         # optional
    }
    

    Refer to corresponding Connection parameters.

  • user – Refer to user. The value is used for each connection in the pool.

  • password – Refer to password. The value is used for each connection in the pool.

  • socket_timeout – Refer to socket_timeout. The value is used for each connection in the pool.

  • reconnect_max_attempts – Refer to reconnect_max_attempts. The value is used for each connection in the pool. Be careful: it is internal Connection reconnect unrelated to pool reconnect mechanisms.

  • reconnect_delay – Refer to reconnect_delay. The value is used for each connection in the pool. Be careful: it is internal Connection reconnect unrelated to pool reconnect mechanisms.

  • connect_now (bool, optional) – If True, connect to all pool servers on initialization. Otherwise, you have to call connect() manually after initialization.

  • encoding – Refer to encoding. The value is used for each connection in the pool.

  • call_16 – Refer to call_16. The value is used for each connection in the pool.

  • connection_timeout (float, optional) – Refer to connection_timeout. The value is used for each connection in the pool.

  • strategy_class (StrategyInterface, optional) – Strategy for choosing a server based on a request mode. Defaults to the round-robin strategy.

  • refresh_delay – Minimal time between pool server box.info.ro status background refreshes, in seconds.

Raise

ConfigurationError, Connection exceptions

call(func_name, *args, mode=None, on_push=None, on_push_ctx=None)

Execute a CALL request on the pool server: call a stored Lua function. Refer to call().

Parameters
Return type

Response

Raise

ValueError, call() exceptions

close()

Stop request processing, close each connection in the pool.

connect()

Create a connection to each address specified on initialization and start background process threads for them. There is no need to call this method explicitly until you have set connect_now=False on initialization.

If some connections have failed to connect successfully or provide box.info status (including the case when all of them have failed), no exceptions are raised. Attempts to reconnect and refresh the info would be processed in the background.

crud_count(space_name, conditions=[], opts={}, *, mode=Mode.ANY)

Execute an crud_count request on the pool server: gets rows count through the crud. Refer to crud_count().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_delete(space_name, key, opts={}, *, mode=Mode.ANY)

Execute an crud_delete request on the pool server: deletes row through the crud. Refer to crud_delete().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_get(space_name, key, opts={}, *, mode=Mode.ANY)

Execute an crud_get request on the pool server: gets row through the crud. Refer to crud_get().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_insert(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_insert request on the pool server: inserts row through the crud. Refer to crud_insert().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_insert_many(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_insert_many request on the pool server: inserts batch rows through the crud. Refer to crud_insert_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_insert_object(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_insert_object request on the pool server: inserts object row through the crud. Refer to crud_insert_object().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_insert_object_many(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_insert_object_many request on the pool server: inserts batch object rows through the crud. Refer to crud_insert_object_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_len(space_name, opts={}, *, mode=Mode.ANY)

Execute an crud_len request on the pool server: gets the number of tuples in the space through crud. Refer to crud_len().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_max(space_name, index_name, opts={}, *, mode=Mode.ANY)

Execute an crud_max request on the pool server: gets rows with maximum value in the specified index through crud. Refer to crud_max().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_min(space_name, index_name, opts={}, *, mode=Mode.ANY)

Execute an crud_min request on the pool server: gets rows with minimum value in the specified index through crud. Refer to crud_min().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_replace(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_replace request on the pool server: replaces row through the crud. Refer to crud_replace().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_replace_many(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_replace_many request on the pool server: replaces batch rows through the crud. Refer to crud_replace_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_replace_object(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_replace_object request on the pool server: replaces object row through the crud. Refer to crud_replace_object().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_replace_object_many(space_name, values, opts={}, *, mode=Mode.ANY)

Execute an crud_replace_object_many request on the pool server: replaces batch object rows through the crud. Refer to crud_replace_object_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_select(space_name, conditions=[], opts={}, *, mode=Mode.ANY)

Execute an crud_select request on the pool server: selects rows through the crud. Refer to crud_select().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_stats(space_name=None, *, mode=Mode.ANY)

Execute an crud_stats request on the pool server: gets statistics through the crud. Refer to crud_stats().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_storage_info(opts={}, *, mode=Mode.ANY)

Execute an crud_storage_info request on the pool server: gets storages status through the crud. Refer to crud_storage_info().

Parameters
  • opts – Refer to opts.

  • mode (Mode, optional) – Request mode.

Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_truncate(space_name, opts={}, *, mode=Mode.ANY)

Execute an crud_truncate request on the pool server: truncates rows through crud. Refer to crud_truncate().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_unflatten_rows(rows, metadata, *, mode=Mode.ANY)

Makes rows unflatten through the crud. Refer to crud_unflatten_rows().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_update(space_name, key, operations=[], opts={}, *, mode=Mode.ANY)

Execute an crud_update request on the pool server: updates row through the crud. Refer to crud_update().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_upsert(space_name, values, operations=[], opts={}, *, mode=Mode.ANY)

Execute an crud_upsert request on the pool server: upserts row through the crud. Refer to crud_upsert().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_upsert_many(space_name, values_operation, opts={}, *, mode=Mode.ANY)

Execute an crud_upsert_many request on the pool server: upserts batch rows through the crud. Refer to crud_upsert_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_upsert_object(space_name, values, operations=[], opts={}, *, mode=Mode.ANY)

Execute an crud_upsert_object request on the pool server: upserts object row through the crud. Refer to crud_upsert_object().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

crud_upsert_object_many(space_name, values_operation, opts={}, *, mode=Mode.ANY)

Execute an crud_upsert_object_many request on the pool server: upserts batch object rows through the crud. Refer to crud_upsert_object_many().

Parameters
Return type

CrudResult

Raise

CrudModuleError, DatabaseError

delete(space_name, key, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an DELETE request on the pool server: delete a tuple in the space. Refer to delete().

Parameters
Return type

Response

Raise

delete() exceptions

eval(expr, *args, mode=None, on_push=None, on_push_ctx=None)

Execute an EVAL request on the pool server: evaluate a Lua expression. Refer to eval().

Parameters
Return type

Response

Raise

ValueError, eval() exceptions

execute(query, params=None, *, mode=None)

Execute an SQL request on the pool server. Refer to execute().

Parameters
Return type

Response

Raise

ValueError, execute() exceptions

insert(space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an INSERT request on the pool server: insert a tuple to the space. Refer to insert().

Parameters
Return type

Response

Raise

insert() exceptions

is_closed()

Returns False if at least one connection is not closed and is ready to process requests. Otherwise, returns True.

Return type

bool

ping(notime=False, *, mode=None)

Execute a PING request on the pool server: send an empty request and receive an empty response from the server. Refer to ping().

Parameters
Returns

Refer to ping().

Raise

ValueError, ping() exceptions

replace(space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute a REPLACE request on the pool server: replace a tuple in the space. Refer to replace().

Parameters
Return type

Response

Raise

replace() exceptions

select(space_name, key, *, offset=0, limit=4294967295, index=0, iterator=None, mode=Mode.ANY, on_push=None, on_push_ctx=None)

Execute a SELECT request on the pool server: update a tuple from the space. Refer to select().

Parameters
Return type

Response

Raise

select() exceptions

update(space_name, key, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an UPDATE request on the pool server: update a tuple in the space. Refer to update().

Parameters
Return type

Response

Raise

upsert() exceptions

upsert(space_name, tuple_value, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an UPSERT request on the pool server: upsert a tuple to the space. Refer to upsert().

Parameters
Return type

Response

Raise

upsert() exceptions

class tarantool.connection_pool.InstanceState(status: Status = Status.UNHEALTHY, ro: Optional[bool] = None)

Cluster single server state.

ro: Optional[bool] = None
Type

bool, optional

status: Status = 2
Type

Status

class tarantool.connection_pool.Mode(value)

Request mode.

ANY = 1

Send a request to any server.

PREFER_RO = 5

Send a request to RO server, if possible, RW server otherwise.

PREFER_RW = 4

Send a request to RW server, if possible, RO server otherwise.

RO = 3

Send a request to RO server.

RW = 2

Send a request to RW server.

class tarantool.connection_pool.PoolTask(method_name: str, args: tuple, kwargs: dict)

Store request type and arguments to pass them to some server thread.

args: tuple

Connection method args.

Type

tuple

kwargs: dict

Connection method kwargs.

Type

dict

method_name: str

Connection method name.

Type

str

class tarantool.connection_pool.PoolUnit(addr: dict, conn: ~tarantool.connection.Connection, input_queue: ~queue.Queue = <factory>, output_queue: ~queue.Queue = <factory>, thread: ~typing.Optional[~threading.Thread] = None, state: ~tarantool.connection_pool.InstanceState = <factory>, request_processing_enabled: bool = False)

Class to store a Tarantool server metainfo and to work with it as a part of connection pool.

addr: dict

{"host": host, "port": port} info.

Type

dict

conn: Connection
Type

Connection

input_queue: Queue

Channel to pass requests for the server thread.

Type

queue.Queue

output_queue: Queue

Channel to receive responses from the server thread.

Type

queue.Queue

request_processing_enabled: bool = False

Flag used to stop requests processing requests in the background thread on connection close or destruction.

Type

bool

state: InstanceState

Current server state.

Type

InstanceState

thread: Optional[Thread] = None

Background thread to process requests for the server.

Type

threading.Thread

tarantool.connection_pool.QueueFactory()

Build a queue-based channel.

class tarantool.connection_pool.RoundRobinStrategy(pool)

Simple round-robin pool servers rotation.

Type

list of PoolUnit objects

build()

Initialize (or re-initialize) internal pools to rotate servers based on box.info.ro state.

getnext(mode)

Get server based on the request mode.

Parameters

mode (Mode) – Request mode

Return type

PoolUnit

Raise

PoolTolopogyError

update()

Set flag to re-initialize internal pools on next getnext() call.

class tarantool.connection_pool.Status(value)

Cluster single server status.

HEALTHY = 1

Server is healthy: connection is successful, box.info.ro could be extracted, box.info.status is “running”.

UNHEALTHY = 2

Server is unhealthy: either connection is failed, box.info cannot be extracted, box.info.status is not “running”.

class tarantool.connection_pool.StrategyInterface(pool)

Defines strategy to choose a pool server based on a request mode.

Type

list of PoolUnit objects

abstract getnext(mode)

Get a pool server based on a request mode.

Parameters

mode (Mode) – Request mode.

abstract update()

Refresh the strategy state.