Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-165] Auto release client useless connections #15516

Closed
poorbarcode opened this issue May 10, 2022 · 8 comments · Fixed by #16165
Closed

[PIP-165] Auto release client useless connections #15516

poorbarcode opened this issue May 10, 2022 · 8 comments · Fixed by #16165
Assignees
Labels
Milestone

Comments

@poorbarcode
Copy link
Contributor

poorbarcode commented May 10, 2022

Motivation

Currently, the Pulsar client keeps the connection even if no producers or consumers use this connection.
If a client produces messages to topic A and we have 3 brokers 1, 2, 3. Due to the bundle unloading(load manager)
topic ownership will change from A to B and finally to C. For now, the client-side will keep 3 connections to all 3 brokers.
We can optimize this part to reduce the broker side connections, the client should close the unused connections.

So a mechanism needs to be added to release unwanted connections.

Why are there idle connections?

1.When configuration maxConnectionsPerHosts is not set to 0, the connection is not closed at all.
The design is to hold a fixed number of connections per Host, avoiding frequent closing and creation.

public void releaseConnection(ClientCnx cnx) {
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
}
}
}

2-1. When clients receive command-close, will reconnect immediately.
It's designed to make it possible to reconnect, rebalance, and unload.

public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());
return;
}
long delayMs = backoff.next();
state.setState(State.Connecting);
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s",
state.topic, state.getHandlerName(), cnx.channel(),
delayMs / 1000.0);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
}

2-2. The broker will close client connections before writing ownership info to the ZK. Then clients will get deprecated broker address when it tries lookup.

producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
}

Goal

Automatically release connections that are no longer used.

  • Scope
    • Pulsar client
      Contains connections used by consumers, Producers, and Transactions.

    • Pulsar proxy
      The connection between proxy and broker has two parts: For lookup commands; For consumers, producers commands and other commands.
      The connection "For consumers, producers commands and other commands" is managed by DirectProxyHandler, which holds the connection until the client is closed, so it does not affect of producers or consumers, These connections do not require additional closing.
      The connection "For lookup commands": When the proxy is configured metadataStoreUrl, the Lookup Command will select the registered broker by rotation training and create a connection. If we do not optimize the broker load balancing algorithm, all connections are considered useful connections.
      When the cluster is large, holds so many connections becomes redundant. Later, I will try to put forward other proposals to improve this phenomenon, so this proposal does not involve proxy connection release.

Approach

Periodically check for idle connections and close them.

Changes

API changes

ClientCnx added an idle check method to mark idle time.

/** Create time. **/
private final long createTime;
/** The time when marks the connection is idle. **/
private long IdleMarkTime;
/** The time when the last valid data was transmitted. **/
private long lastWorkTime;
/** Idle stat **/
private IdleState stat;
/** 
  * Check client connection is now free. This method may change the state to idle.
  * This method will not change the state to idle.
  */
public boolean doIdleCheck();
/** Get stat **/
public IdleState getIdleStat();
/** Change stat **/
public boolean setStat(IdleState originalIdleStat, IdleState newIdleStat);

public enum IdleState {
  /** The connection is in use. **/
  using,
  /** The connection is not in use. **/
  idle_marked,
  /** The connection will be released soon. **/
  before_release,
  /** The connection has already been released. **/
  released;
}

Configuration changes

We can set the check frequency and release rule for idle connections at ClientConfigurationData.

@ApiModelProperty(
        name = "connectionMaxIdleSeconds",
        value = "Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. " 
                     + "If  [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections"
)
private int connectionMaxIdleSeconds = 180;

@ApiModelProperty(
        name = "connectionIdleDetectionIntervalSeconds",
        value = "How often check idle connections"
)
private int connectionIdleDetectionIntervalSeconds = 60;

Implementation

  • Pulsar client
    If no consumer, producer, or transaction uses the current connection, release it.
    • A fixed rate task in ConnectionPool, will do two things. 1: mark the connection idle 2: when reach max idle time release it
    • ConnectionCnx holds the Consumer ,Producer and TransactionClient, so we can rely on this information to determine if the connection is still in use
@poorbarcode
Copy link
Contributor Author

poorbarcode commented May 26, 2022

@gaoran10
Copy link
Contributor

gaoran10 commented Jun 3, 2022

Left a comment in discuss email list.

@gaoran10
Copy link
Contributor

gaoran10 commented Jun 3, 2022

Maybe the config connectionMaxIdleSeconds could be as the switch, disable this feature if the connectionMaxIdleSeconds is a negative value.

@poorbarcode
Copy link
Contributor Author

Maybe the config connectionMaxIdleSeconds could be as the switch, disable this feature if the connectionMaxIdleSeconds is a negative value.

Good idea. I changed it. Thanks @gaoran10

@liangyepianzhou
Copy link
Contributor

Left a comment in discuss email list:)

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jun 13, 2022

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jul 21, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone Jul 27, 2022
@Technoboy- Technoboy- removed the Stale label Jul 27, 2022
RobertIndie added a commit to apache/pulsar-client-go that referenced this issue Mar 3, 2023
### Motivation

The go implementation of PIP-165:apache/pulsar#15516


### Modifications

* Add new configuration `ConnectionMaxIdleTime` to `ClientOptions`
* Add a goroutine to `ConnectionPool` to period check and release idle connections.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
4 participants