Skip to content

Instantly share code, notes, and snippets.

@nsoft
Created November 3, 2017 02:50
Show Gist options
  • Select an option

  • Save nsoft/d65317672f879fd7dd66536954f52320 to your computer and use it in GitHub Desktop.

Select an option

Save nsoft/d65317672f879fd7dd66536954f52320 to your computer and use it in GitHub Desktop.

Revisions

  1. nsoft created this gist Nov 3, 2017.
    1,587 changes: 1,587 additions & 0 deletions ZkStateReader.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,1587 @@
    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.solr.common.cloud;

    import java.io.Closeable;
    import java.io.UnsupportedEncodingException;
    import java.lang.invoke.MethodHandles;
    import java.net.URLDecoder;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.EnumSet;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Objects;
    import java.util.Set;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.stream.Collectors;

    import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
    import org.apache.solr.common.Callable;
    import org.apache.solr.common.SolrException;
    import org.apache.solr.common.SolrException.ErrorCode;
    import org.apache.solr.common.params.AutoScalingParams;
    import org.apache.solr.common.params.CoreAdminParams;
    import org.apache.solr.common.util.ExecutorUtil;
    import org.apache.solr.common.util.Pair;
    import org.apache.solr.common.util.Utils;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import static java.util.Arrays.asList;
    import static java.util.Collections.EMPTY_MAP;
    import static java.util.Collections.emptyMap;
    import static java.util.Collections.emptySet;
    import static java.util.Collections.emptySortedSet;
    import static java.util.Collections.unmodifiableSet;
    import static org.apache.solr.common.util.Utils.fromJSON;

    public class ZkStateReader implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    public static final String BASE_URL_PROP = "base_url";
    public static final String NODE_NAME_PROP = "node_name";
    public static final String CORE_NODE_NAME_PROP = "core_node_name";
    public static final String ROLES_PROP = "roles";
    public static final String STATE_PROP = "state";
    public static final String CORE_NAME_PROP = "core";
    public static final String COLLECTION_PROP = "collection";
    public static final String ELECTION_NODE_PROP = "election_node";
    public static final String SHARD_ID_PROP = "shard";
    public static final String REPLICA_PROP = "replica";
    public static final String SHARD_RANGE_PROP = "shard_range";
    public static final String SHARD_STATE_PROP = "shard_state";
    public static final String SHARD_PARENT_PROP = "shard_parent";
    public static final String NUM_SHARDS_PROP = "numShards";
    public static final String LEADER_PROP = "leader";
    public static final String PROPERTY_PROP = "property";
    public static final String PROPERTY_PROP_PREFIX = "property.";
    public static final String PROPERTY_VALUE_PROP = "property.value";
    public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
    public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
    public static final String COLLECTIONS_ZKNODE = "/collections";
    public static final String LIVE_NODES_ZKNODE = "/live_nodes";
    public static final String ALIASES = "/aliases.json";
    public static final String CLUSTER_STATE = "/clusterstate.json";
    public static final String CLUSTER_PROPS = "/clusterprops.json";
    public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
    public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
    public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
    public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
    public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState";
    public static final String SOLR_AUTOSCALING_NODE_ADDED_PATH = "/autoscaling/nodeAdded";
    public static final String SOLR_AUTOSCALING_NODE_LOST_PATH = "/autoscaling/nodeLost";

    public static final String REPLICATION_FACTOR = "replicationFactor";
    public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
    public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
    public static final String MAX_CORES_PER_NODE = "maxCoresPerNode";
    public static final String PULL_REPLICAS = "pullReplicas";
    public static final String NRT_REPLICAS = "nrtReplicas";
    public static final String TLOG_REPLICAS = "tlogReplicas";

    public static final String ROLES = "/roles.json";

    public static final String CONFIGS_ZKNODE = "/configs";
    public final static String CONFIGNAME_PROP="configName";

    public static final String LEGACY_CLOUD = "legacyCloud";

    public static final String URL_SCHEME = "urlScheme";

    public static final String REPLICA_TYPE = "type";


    /** A view of the current state of all collections; combines all the different state sources into a single view. */
    protected volatile ClusterState clusterState;

    private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
    private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;

    public static final String LEADER_ELECT_ZKNODE = "leader_elect";

    public static final String SHARD_LEADERS_ZKNODE = "leaders";
    public static final String ELECTION_NODE = "election";

    /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
    private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();

    /** Last seen ZK version of clusterstate.json. */
    private int legacyClusterStateVersion = 0;

    /** Collections with format2 state.json, "interesting" and actively watched. */
    private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<>();

    /** Collections with format2 state.json, not "interesting" and not actively watched. */
    private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();

    private volatile SortedSet<String> liveNodes = emptySortedSet();

    private volatile Map<String, Object> clusterProperties = Collections.emptyMap();

    private final ZkConfigManager configManager;

    private ConfigData securityData;

    private final Runnable securityNodeListener;

    private ConcurrentHashMap<String, CollectionWatch> collectionWatches = new ConcurrentHashMap<>();

    private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");

    private final AliasWatcher aliasWatcher = new AliasWatcher();

    /**
    * Get current {@link AutoScalingConfig}.
    * @return current configuration from <code>autoscaling.json</code>. NOTE:
    * this data is retrieved from ZK on each call.
    */
    public AutoScalingConfig getAutoScalingConfig() throws KeeperException, InterruptedException {
    return getAutoScalingConfig(null);
    }

    /**
    * Get current {@link AutoScalingConfig}.
    * @param watcher optional {@link Watcher} to set on a znode to watch for config changes.
    * @return current configuration from <code>autoscaling.json</code>. NOTE:
    * this data is retrieved from ZK on each call.
    */
    public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws KeeperException, InterruptedException {
    Stat stat = new Stat();

    Map<String, Object> map = new HashMap<>();
    try {
    byte[] bytes = zkClient.getData(SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
    if (bytes != null && bytes.length > 0) {
    map = (Map<String, Object>) fromJSON(bytes);
    }
    } catch (KeeperException.NoNodeException e) {
    // ignore
    }
    map.put(AutoScalingParams.ZK_VERSION, stat.getVersion());
    return new AutoScalingConfig(map);
    }

    /**
    * Writes an alias to zk, and waits for up to 30 seconds to see that that the new alias value
    * is available before returning.
    *
    * @param aliasName The alias to store
    * @param collections The collections that are part of the alias
    */
    public void exportAliasToZk(String aliasName, String collections) {
    Aliases aliases = getAliases().cloneWithCollectionAlias(aliasName, collections);
    try {
    // TODO: worries me that aliases can't be independently updated...
    exportAllAliases(aliases);
    checkForAlias(aliasName, collections);
    // some fudge for other nodes
    Thread.sleep(100);
    this.aliases = aliases;
    } catch (KeeperException e) {
    LOG.error("", e);
    throw new SolrException(ErrorCode.SERVER_ERROR, e);
    } catch (InterruptedException e) {
    LOG.warn("", e);
    throw new SolrException(ErrorCode.SERVER_ERROR, e);
    }
    }

    /**
    * Save the alias map and make it the current alias map.
    *
    * @param aliases the alias map to save and remember
    * @throws KeeperException if zookeeper is especially grumpy
    * @throws InterruptedException if we get interrupted while talking to zookeeper.
    */
    public void exportAllAliases(Aliases aliases) throws KeeperException, InterruptedException {
    synchronized (getUpdateLock()) { // ensure all threads can see the latest when doing this
    byte[] jsonBytes = aliases.toJSON();
    getZkClient().setData(ALIASES, jsonBytes, true);
    this.aliases = aliases;
    }
    }

    private void checkForAlias(String name, String value) throws InterruptedException {
    final boolean[] success = {false};
    AliasCondition updateVisible = new AliasCondition() {
    @Override
    public boolean test(Aliases aliases) {
    String collections = aliases.getCollectionAliasMap().get(name);
    if (Objects.equals(collections, value)) {
    success[0] = true;
    this.latch.countDown();
    return true;
    }
    return false;
    }
    };
    this.aliasWatcher.addCondition(updateVisible);
    updateVisible.await(30, TimeUnit.SECONDS);
    if (!success[0]) {
    LOG.warn("Timeout waiting to be notified of Alias change...");
    }
    }




    private static class CollectionWatch {

    int coreRefCount = 0;
    Set<CollectionStateWatcher> stateWatchers = ConcurrentHashMap.newKeySet();

    public boolean canBeRemoved() {
    return coreRefCount + stateWatchers.size() == 0;
    }

    }

    private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();

    public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
    LEGACY_CLOUD,
    URL_SCHEME,
    AUTO_ADD_REPLICAS,
    CoreAdminParams.BACKUP_LOCATION,
    MAX_CORES_PER_NODE)));

    /**
    * Returns config set name for collection.
    *
    * @param collection to return config set name for
    */
    public String readConfigName(String collection) {

    String configName = null;

    String path = COLLECTIONS_ZKNODE + "/" + collection;
    LOG.debug("Loading collection config from: [{}]", path);

    try {
    byte[] data = zkClient.getData(path, null, null, true);

    if (data != null) {
    ZkNodeProps props = ZkNodeProps.load(data);
    configName = props.getStr(CONFIGNAME_PROP);
    }

    if (configName != null) {
    String configPath = CONFIGS_ZKNODE + "/" + configName;
    if (!zkClient.exists(configPath, true)) {
    LOG.error("Specified config=[{}] does not exist in ZooKeeper at location=[{}]", configName, configPath);
    throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "Specified config does not exist in ZooKeeper: " + configName);
    } else {
    LOG.debug("path=[{}] [{}]=[{}] specified config exists in ZooKeeper", configPath, CONFIGNAME_PROP, configName);
    }
    } else {
    throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
    }
    } catch (KeeperException| InterruptedException e) {
    SolrZkClient.checkInterrupted(e);
    throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e);
    }

    return configName;
    }


    private final SolrZkClient zkClient;

    private final boolean closeClient;

    private volatile Aliases aliases = Aliases.EMPTY;

    private volatile boolean closed = false;

    public ZkStateReader(SolrZkClient zkClient) {
    this(zkClient, null);
    }

    public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
    this.zkClient = zkClient;
    this.configManager = new ZkConfigManager(zkClient);
    this.closeClient = false;
    this.securityNodeListener = securityNodeListener;
    }


    public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
    this.zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
    // on reconnect, reload cloud info
    new OnReconnect() {
    @Override
    public void command() {
    try {
    ZkStateReader.this.createClusterStateWatchersAndUpdate();
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.error("Interrupted", e);
    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted", e);
    }
    }
    });
    this.configManager = new ZkConfigManager(zkClient);
    this.closeClient = true;
    this.securityNodeListener = null;
    }

    public ZkConfigManager getConfigManager() {
    return configManager;
    }

    /**
    * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
    *
    * It is cheaper to call {@link #forceUpdateCollection(String)} on a single collection if you must.
    *
    * @lucene.internal
    */
    public void forciblyRefreshAllClusterStateSlow() throws KeeperException, InterruptedException {
    synchronized (getUpdateLock()) {
    if (clusterState == null) {
    // Never initialized, just run normal initialization.
    createClusterStateWatchersAndUpdate();
    return;
    }
    // No need to set watchers because we should already have watchers registered for everything.
    refreshCollectionList(null);
    refreshLiveNodes(null);
    refreshLegacyClusterState(null);
    // Need a copy so we don't delete from what we're iterating over.
    Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
    Set<String> updatedCollections = new HashSet<>();
    for (String coll : safeCopy) {
    DocCollection newState = fetchCollectionState(coll, null);
    if (updateWatchedCollection(coll, newState)) {
    updatedCollections.add(coll);
    }
    }
    constructState(updatedCollections);
    }
    }

    /**
    * Forcibly refresh a collection's internal state from ZK. Try to avoid having to resort to this when
    * a better design is possible.
    */
    public void forceUpdateCollection(String collection) throws KeeperException, InterruptedException {

    synchronized (getUpdateLock()) {
    if (clusterState == null) {
    LOG.warn("ClusterState watchers have not been initialized");
    return;
    }

    ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
    if (ref == null || legacyCollectionStates.containsKey(collection)) {
    // We either don't know anything about this collection (maybe it's new?) or it's legacy.
    // First update the legacy cluster state.
    LOG.debug("Checking legacy cluster state for collection {}", collection);
    refreshLegacyClusterState(null);
    if (!legacyCollectionStates.containsKey(collection)) {
    // No dice, see if a new collection just got created.
    LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
    if (tryLazyCollection.get() != null) {
    // What do you know, it exists!
    LOG.debug("Adding lazily-loaded reference for collection {}", collection);
    lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
    constructState(Collections.singleton(collection));
    }
    }
    } else if (ref.isLazilyLoaded()) {
    LOG.debug("Refreshing lazily-loaded state for collection {}", collection);
    if (ref.get() != null) {
    return;
    }
    // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there.
    refreshLegacyClusterState(null);
    } else if (watchedCollectionStates.containsKey(collection)) {
    // Exists as a watched collection, force a refresh.
    LOG.debug("Forcing refresh of watched collection state for {}", collection);
    DocCollection newState = fetchCollectionState(collection, null);
    if (updateWatchedCollection(collection, newState)) {
    constructState(Collections.singleton(collection));
    }
    } else {
    LOG.error("Collection {} is not lazy or watched!", collection);
    }
    }

    }

    /** Refresh the set of live nodes. */
    public void updateLiveNodes() throws KeeperException, InterruptedException {
    refreshLiveNodes(null);
    }

    /**
    * Get an immutable copy of the present state of the aliases. References to this object should not be retained
    * in any context where it will be important to know if aliases have changed.
    *
    * @return The current aliases, Aliases.EMPTY if not solr cloud, or no aliases have existed yet. Never returns null.
    */
    public Aliases getAliases() {
    assert aliases != null;
    return aliases;
    }

    public Integer compareStateVersions(String coll, int version) {
    DocCollection collection = clusterState.getCollectionOrNull(coll);
    if (collection == null) return null;
    if (collection.getZNodeVersion() < version) {
    LOG.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
    DocCollection nu = getCollectionLive(this, coll);
    if (nu == null) return -1 ;
    if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
    if (updateWatchedCollection(coll, nu)) {
    synchronized (getUpdateLock()) {
    constructState(Collections.singleton(coll));
    }
    }
    collection = nu;
    }
    }

    if (collection.getZNodeVersion() == version) {
    return null;
    }

    LOG.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion());

    return collection.getZNodeVersion();
    }

    public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
    InterruptedException {
    // We need to fetch the current cluster state and the set of live nodes

    LOG.debug("Updating cluster state from ZooKeeper... ");

    // Sanity check ZK structure.
    if (!zkClient.exists(CLUSTER_STATE, true)) {
    throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
    "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
    }

    // on reconnect of SolrZkClient force refresh and re-add watches.
    loadClusterProperties();
    refreshLiveNodes(new LiveNodeWatcher());
    refreshLegacyClusterState(new LegacyClusterStateWatcher());
    refreshStateFormat2Collections();
    refreshCollectionList(new CollectionsChildWatcher());
    refreshAliases(this.aliasWatcher);

    if (securityNodeListener != null) {
    addSecuritynodeWatcher(pair -> {
    ConfigData cd = new ConfigData();
    cd.data = pair.first() == null || pair.first().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.first()), 4, false);
    cd.version = pair.second() == null ? -1 : pair.second().getVersion();
    securityData = cd;
    securityNodeListener.run();
    });
    securityData = getSecurityProps(true);
    }
    }

    private void refreshAliases(AliasWatcher watcher) throws KeeperException, InterruptedException {
    synchronized (ZkStateReader.this.getUpdateLock()) {
    constructState(Collections.emptySet());
    zkClient.exists(ALIASES, watcher, true);
    }
    updateAliases();
    }

    private void addSecuritynodeWatcher(final Callable<Pair<byte[], Stat>> callback)
    throws KeeperException, InterruptedException {
    zkClient.exists(SOLR_SECURITY_CONF_PATH,
    new Watcher() {

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }
    try {
    synchronized (ZkStateReader.this.getUpdateLock()) {
    LOG.debug("Updating [{}] ... ", SOLR_SECURITY_CONF_PATH);

    // remake watch
    final Watcher thisWatch = this;
    final Stat stat = new Stat();
    final byte[] data = getZkClient().getData(SOLR_SECURITY_CONF_PATH, thisWatch, stat, true);
    try {
    callback.call(new Pair<>(data, stat));
    } catch (Exception e) {
    LOG.error("Error running collections node listener", e);
    }
    }
    } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.warn("Interrupted", e);
    }
    }

    }, true);
    }

    /**
    * Construct the total state view from all sources.
    * Must hold {@link #getUpdateLock()} before calling this.
    *
    * @param changedCollections collections that have changed since the last call,
    * and that should fire notifications
    */
    private void constructState(Set<String> changedCollections) {

    Set<String> liveNodes = this.liveNodes; // volatile read

    // Legacy clusterstate is authoritative, for backwards compatibility.
    // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
    Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);

    // Add state format2 collections, but don't override legacy collection states.
    for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
    result.putIfAbsent(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
    }

    // Finally, add any lazy collections that aren't already accounted for.
    for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
    result.putIfAbsent(entry.getKey(), entry.getValue());
    }

    this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);

    LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
    legacyCollectionStates.keySet().size(),
    collectionWatches.keySet().size(),
    watchedCollectionStates.keySet().size(),
    lazyCollectionStates.keySet().size(),
    clusterState.getCollectionStates().size());

    if (LOG.isTraceEnabled()) {
    LOG.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
    legacyCollectionStates.keySet(),
    collectionWatches.keySet(),
    watchedCollectionStates.keySet(),
    lazyCollectionStates.keySet(),
    clusterState.getCollectionStates());
    }

    for (String collection : changedCollections) {
    notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
    }

    }

    /**
    * Refresh legacy (shared) clusterstate.json
    */
    private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException {
    try {
    final Stat stat = new Stat();
    final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
    final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
    synchronized (getUpdateLock()) {
    if (this.legacyClusterStateVersion >= stat.getVersion()) {
    // Nothing to do, someone else updated same or newer.
    return;
    }
    Set<String> updatedCollections = new HashSet<>();
    for (String coll : this.collectionWatches.keySet()) {
    ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
    // legacy collections are always in-memory
    DocCollection oldState = ref == null ? null : ref.get();
    ClusterState.CollectionRef newRef = loadedData.getCollectionStates().get(coll);
    DocCollection newState = newRef == null ? null : newRef.get();
    if (newState == null) {
    // check that we haven't just migrated
    newState = watchedCollectionStates.get(coll);
    }
    if (!Objects.equals(oldState, newState)) {
    updatedCollections.add(coll);
    }
    }
    this.legacyCollectionStates = loadedData.getCollectionStates();
    this.legacyClusterStateVersion = stat.getVersion();
    constructState(updatedCollections);
    }
    } catch (KeeperException.NoNodeException e) {
    // Ignore missing legacy clusterstate.json.
    synchronized (getUpdateLock()) {
    this.legacyCollectionStates = emptyMap();
    this.legacyClusterStateVersion = 0;
    constructState(Collections.emptySet());
    }
    }
    }

    /**
    * Refresh state format2 collections.
    */
    private void refreshStateFormat2Collections() {
    for (String coll : collectionWatches.keySet()) {
    new StateWatcher(coll).refreshAndWatch();
    }
    }

    // We don't get a Stat or track versions on getChildren() calls, so force linearization.
    private final Object refreshCollectionListLock = new Object();

    /**
    * Search for any lazy-loadable state format2 collections.
    *
    * A stateFormat=1 collection which is not interesting to us can also
    * be put into the {@link #lazyCollectionStates} map here. But that is okay
    * because {@link #constructState(Set)} will give priority to collections in the
    * shared collection state over this map.
    * In fact this is a clever way to avoid doing a ZK exists check on
    * the /collections/collection_name/state.json znode
    * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
    * {@link ClusterState#getCollectionsMap()} methods
    * have a safeguard against exposing wrong collection names to the users
    */
    private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
    synchronized (refreshCollectionListLock) {
    List<String> children = null;
    try {
    children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
    } catch (KeeperException.NoNodeException e) {
    LOG.warn("Error fetching collection names: [{}]", e.getMessage());
    // fall through
    }
    if (children == null || children.isEmpty()) {
    lazyCollectionStates.clear();
    return;
    }

    // Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
    // Don't mess with watchedCollections, they should self-manage.

    // First, drop any children that disappeared.
    this.lazyCollectionStates.keySet().retainAll(children);
    for (String coll : children) {
    // We will create an eager collection for any interesting collections, so don't add to lazy.
    if (!collectionWatches.containsKey(coll)) {
    // Double check contains just to avoid allocating an object.
    LazyCollectionRef existing = lazyCollectionStates.get(coll);
    if (existing == null) {
    lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
    }
    }
    }
    }
    }

    private class LazyCollectionRef extends ClusterState.CollectionRef {

    private final String collName;

    public LazyCollectionRef(String collName) {
    super(null);
    this.collName = collName;
    }

    @Override
    public DocCollection get() {
    gets.incrementAndGet();
    // TODO: consider limited caching
    return getCollectionLive(ZkStateReader.this, collName);
    }

    @Override
    public boolean isLazilyLoaded() {
    return true;
    }

    @Override
    public String toString() {
    return "LazyCollectionRef(" + collName + ")";
    }
    }

    // We don't get a Stat or track versions on getChildren() calls, so force linearization.
    private final Object refreshLiveNodesLock = new Object();
    // Ensures that only the latest getChildren fetch gets applied.
    private final AtomicReference<SortedSet<String>> lastFetchedLiveNodes = new AtomicReference<>();

    /**
    * Refresh live_nodes.
    */
    private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
    synchronized (refreshLiveNodesLock) {
    SortedSet<String> newLiveNodes;
    try {
    List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
    newLiveNodes = new TreeSet<>(nodeList);
    } catch (KeeperException.NoNodeException e) {
    newLiveNodes = emptySortedSet();
    }
    lastFetchedLiveNodes.set(newLiveNodes);
    }

    // Can't lock getUpdateLock() until we release the other, it would cause deadlock.
    SortedSet<String> oldLiveNodes, newLiveNodes;
    synchronized (getUpdateLock()) {
    newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
    if (newLiveNodes == null) {
    // Someone else won the race to apply the last update, just exit.
    return;
    }

    oldLiveNodes = this.liveNodes;
    this.liveNodes = newLiveNodes;
    if (clusterState != null) {
    clusterState.setLiveNodes(newLiveNodes);
    }
    }
    if (oldLiveNodes.size() != newLiveNodes.size()) {
    LOG.info("Updated live nodes from ZooKeeper... ({}) -> ({})", oldLiveNodes.size(), newLiveNodes.size());
    }
    if (LOG.isDebugEnabled()) {
    LOG.debug("Updated live nodes from ZooKeeper... {} -> {}", oldLiveNodes, newLiveNodes);
    }
    if (!oldLiveNodes.equals(newLiveNodes)) { // fire listeners
    liveNodesListeners.forEach(listener ->
    listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes)));
    }
    }

    public void registerLiveNodesListener(LiveNodesListener listener) {
    liveNodesListeners.add(listener);
    }

    public void removeLiveNodesListener(LiveNodesListener listener) {
    liveNodesListeners.remove(listener);
    }

    /**
    * @return information about the cluster from ZooKeeper
    */
    public ClusterState getClusterState() {
    return clusterState;
    }

    public Object getUpdateLock() {
    return this;
    }

    public void close() {
    this.closed = true;
    notifications.shutdown();
    if (closeClient) {
    zkClient.close();
    }
    }

    public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
    ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
    return props.getCoreUrl();
    }

    public Replica getLeader(String collection, String shard) {
    if (clusterState != null) {
    DocCollection docCollection = clusterState.getCollectionOrNull(collection);
    Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
    if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
    return replica;
    }
    }
    return null;
    }

    /**
    * Get shard leader properties, with retry if none exist.
    */
    public Replica getLeaderRetry(String collection, String shard) throws InterruptedException {
    return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
    }

    /**
    * Get shard leader properties, with retry if none exist.
    */
    public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
    long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
    while (true) {
    Replica leader = getLeader(collection, shard);
    if (leader != null) return leader;
    if (System.nanoTime() >= timeoutAt || closed) break;
    Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
    }
    throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
    + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
    + " with live_nodes=" + clusterState.getLiveNodes());
    }

    /**
    * Get path where shard leader properties live in zookeeper.
    */
    public static String getShardLeadersPath(String collection, String shardId) {
    return COLLECTIONS_ZKNODE + "/" + collection + "/"
    + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
    : "") + "/leader";
    }

    /**
    * Get path where shard leader elections ephemeral nodes are.
    */
    public static String getShardLeadersElectPath(String collection, String shardId) {
    return COLLECTIONS_ZKNODE + "/" + collection + "/"
    + LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
    : "");
    }


    public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
    return getReplicaProps(collection, shardId, thisCoreNodeName, null);
    }

    public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
    Replica.State mustMatchStateFilter) {
    return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
    }

    public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
    Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
    //TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only
    return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
    }

    public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
    Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet<Replica.Type> acceptReplicaType) {
    assert thisCoreNodeName != null;
    ClusterState clusterState = this.clusterState;
    if (clusterState == null) {
    return null;
    }
    final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
    if (docCollection == null || docCollection.getSlicesMap() == null) {
    throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
    "Could not find collection in zk: " + collection);
    }

    Map<String,Slice> slices = docCollection.getSlicesMap();
    Slice replicas = slices.get(shardId);
    if (replicas == null) {
    throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
    }

    Map<String,Replica> shardMap = replicas.getReplicasMap();
    List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
    for (Entry<String,Replica> entry : shardMap.entrySet().stream().filter((e)->acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) {
    ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());

    String coreNodeName = entry.getValue().getName();

    if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
    if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
    if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
    nodes.add(nodeProps);
    }
    }
    }
    }
    if (nodes.size() == 0) {
    // no replicas
    return null;
    }

    return nodes;
    }

    public SolrZkClient getZkClient() {
    return zkClient;
    }

    public void updateAliases() throws KeeperException, InterruptedException {
    final byte[] data = zkClient.getData(ALIASES, null, null, true);
    this.aliases = Aliases.fromJSON(data);
    }

    /**
    * Get a cluster property
    *
    * N.B. Cluster properties are updated via ZK watchers, and so may not necessarily
    * be completely up-to-date. If you need to get the latest version, then use a
    * {@link ClusterProperties} instance.
    *
    * @param key the property to read
    * @param defaultValue a default value to use if no such property exists
    * @param <T> the type of the property
    * @return the cluster property, or a default if the property is not set
    */
    @SuppressWarnings("unchecked")
    public <T> T getClusterProperty(String key, T defaultValue) {
    T value = (T) clusterProperties.get(key);
    if (value == null)
    return defaultValue;
    return value;
    }

    /**
    * Get all cluster properties for this cluster
    *
    * N.B. Cluster properties are updated via ZK watchers, and so may not necessarily
    * be completely up-to-date. If you need to get the latest version, then use a
    * {@link ClusterProperties} instance.
    *
    * @return a Map of cluster properties
    */
    public Map<String, Object> getClusterProperties() {
    return Collections.unmodifiableMap(clusterProperties);
    }

    private final Watcher clusterPropertiesWatcher = event -> {
    // session events are not change events, and do not remove the watcher
    if (Watcher.Event.EventType.None.equals(event.getType())) {
    return;
    }
    loadClusterProperties();
    };

    @SuppressWarnings("unchecked")
    private void loadClusterProperties() {
    try {
    while (true) {
    try {
    byte[] data = zkClient.getData(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher, new Stat(), true);
    this.clusterProperties = (Map<String, Object>) Utils.fromJSON(data);
    LOG.debug("Loaded cluster properties: {}", this.clusterProperties);
    return;
    } catch (KeeperException.NoNodeException e) {
    this.clusterProperties = Collections.emptyMap();
    LOG.debug("Loaded empty cluster properties");
    // set an exists watch, and if the node has been created since the last call,
    // read the data again
    if (zkClient.exists(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher, true) == null)
    return;
    }
    }
    } catch (KeeperException | InterruptedException e) {
    LOG.error("Error reading cluster properties from zookeeper", SolrZkClient.checkInterrupted(e));
    }
    }

    /**
    * Returns the content of /security.json from ZooKeeper as a Map
    * If the files doesn't exist, it returns null.
    */
    public ConfigData getSecurityProps(boolean getFresh) {
    if (!getFresh) {
    if (securityData == null) return new ConfigData(EMPTY_MAP,-1);
    return new ConfigData(securityData.data, securityData.version);
    }
    try {
    Stat stat = new Stat();
    if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
    final byte[] data = getZkClient().getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
    return data != null && data.length > 0 ?
    new ConfigData((Map<String, Object>) Utils.fromJSON(data), stat.getVersion()) :
    null;
    }
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties", e) ;
    } catch (KeeperException e) {
    throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties", e) ;
    }
    return null;
    }

    /**
    * Returns the baseURL corresponding to a given node's nodeName --
    * NOTE: does not (currently) imply that the nodeName (or resulting
    * baseURL) exists in the cluster.
    * @lucene.experimental
    */
    public String getBaseUrlForNodeName(final String nodeName) {
    return getBaseUrlForNodeName(nodeName, getClusterProperty(URL_SCHEME, "http"));
    }

    public static String getBaseUrlForNodeName(final String nodeName, String urlScheme) {
    final int _offset = nodeName.indexOf("_");
    if (_offset < 0) {
    throw new IllegalArgumentException("nodeName does not contain expected '_' separator: " + nodeName);
    }
    final String hostAndPort = nodeName.substring(0,_offset);
    try {
    final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
    return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
    } catch (UnsupportedEncodingException e) {
    throw new IllegalStateException("JVM Does not seem to support UTF-8", e);
    }
    }

    /** Watches a single collection's format2 state.json. */
    class StateWatcher implements Watcher {
    private final String coll;

    StateWatcher(String coll) {
    this.coll = coll;
    }

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }

    if (!collectionWatches.containsKey(coll)) {
    // This collection is no longer interesting, stop watching.
    LOG.debug("Uninteresting collection {}", coll);
    return;
    }

    Set<String> liveNodes = ZkStateReader.this.liveNodes;
    LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
    event, coll, liveNodes.size());

    refreshAndWatch();

    }

    /**
    * Refresh collection state from ZK and leave a watch for future changes.
    * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
    * with the results of the refresh.
    */
    public void refreshAndWatch() {
    try {
    DocCollection newState = fetchCollectionState(coll, this);
    updateWatchedCollection(coll, newState);
    synchronized (getUpdateLock()) {
    constructState(Collections.singleton(coll));
    }

    } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("Unwatched collection: [{}]", coll, e);
    throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    LOG.error("Unwatched collection: [{}]", coll, e);
    }
    }
    }

    /** Watches the legacy clusterstate.json. */
    class LegacyClusterStateWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }
    int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size();
    LOG.debug("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize);
    refreshAndWatch();
    }

    /** Must hold {@link #getUpdateLock()} before calling this method. */
    public void refreshAndWatch() {
    try {
    refreshLegacyClusterState(this);
    } catch (KeeperException.NoNodeException e) {
    throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
    "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
    } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.warn("Interrupted", e);
    }
    }
    }

    /** Watches /collections children . */
    class CollectionsChildWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }
    LOG.debug("A collections change: [{}], has occurred - updating...", event);
    refreshAndWatch();
    synchronized (getUpdateLock()) {
    constructState(Collections.emptySet());
    }
    }

    /** Must hold {@link #getUpdateLock()} before calling this method. */
    public void refreshAndWatch() {
    try {
    refreshCollectionList(this);
    } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.warn("Interrupted", e);
    }
    }
    }

    /** Watches the live_nodes and syncs changes. */
    class LiveNodeWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }
    LOG.debug("A live node change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodes.size());
    refreshAndWatch();
    }

    public void refreshAndWatch() {
    try {
    refreshLiveNodes(this);
    } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.warn("Interrupted", e);
    }
    }
    }

    public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
    try {
    return zkStateReader.fetchCollectionState(coll, null);
    } catch (KeeperException e) {
    throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
    }
    }

    private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
    String collectionPath = getCollectionPath(coll);
    while (true) {
    try {
    Stat stat = new Stat();
    byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
    ClusterState state = ClusterState.load(stat.getVersion(), data,
    Collections.<String>emptySet(), collectionPath);
    ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
    return collectionRef == null ? null : collectionRef.get();
    } catch (KeeperException.NoNodeException e) {
    if (watcher != null) {
    // Leave an exists watch in place in case a state.json is created later.
    Stat exists = zkClient.exists(collectionPath, watcher, true);
    if (exists != null) {
    // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
    // Loop and try again.
    continue;
    }
    }
    return null;
    }
    }
    }

    public static String getCollectionPathRoot(String coll) {
    return COLLECTIONS_ZKNODE+"/"+coll;
    }

    public static String getCollectionPath(String coll) {
    return getCollectionPathRoot(coll) + "/state.json";
    }

    /**
    * Notify this reader that a local Core is a member of a collection, and so that collection
    * state should be watched.
    *
    * Not a public API. This method should only be called from ZkController.
    *
    * The number of cores per-collection is tracked, and adding multiple cores from the same
    * collection does not increase the number of watches.
    *
    * @param collection the collection that the core is a member of
    *
    * @see ZkStateReader#unregisterCore(String)
    */
    public void registerCore(String collection) {
    AtomicBoolean reconstructState = new AtomicBoolean(false);
    collectionWatches.compute(collection, (k, v) -> {
    if (v == null) {
    reconstructState.set(true);
    v = new CollectionWatch();
    }
    v.coreRefCount++;
    return v;
    });
    if (reconstructState.get()) {
    new StateWatcher(collection).refreshAndWatch();
    }
    }

    /**
    * Notify this reader that a local core that is a member of a collection has been closed.
    *
    * Not a public API. This method should only be called from ZkController.
    *
    * If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s
    * for that collection either, the collection watch will be removed.
    *
    * @param collection the collection that the core belongs to
    */
    public void unregisterCore(String collection) {
    AtomicBoolean reconstructState = new AtomicBoolean(false);
    collectionWatches.compute(collection, (k, v) -> {
    if (v == null)
    return null;
    if (v.coreRefCount > 0)
    v.coreRefCount--;
    if (v.canBeRemoved()) {
    watchedCollectionStates.remove(collection);
    lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
    reconstructState.set(true);
    return null;
    }
    return v;
    });
    if (reconstructState.get()) {
    synchronized (getUpdateLock()) {
    constructState(Collections.emptySet());
    }
    }
    }

    /**
    * Register a CollectionStateWatcher to be called when the state of a collection changes
    */
    public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
    AtomicBoolean watchSet = new AtomicBoolean(false);
    collectionWatches.compute(collection, (k, v) -> {
    if (v == null) {
    v = new CollectionWatch();
    watchSet.set(true);
    }
    v.stateWatchers.add(stateWatcher);
    return v;
    });

    if (watchSet.get()) {
    new StateWatcher(collection).refreshAndWatch();
    }

    DocCollection state = clusterState.getCollectionOrNull(collection);
    if (stateWatcher.onStateChanged(liveNodes, state) == true) {
    removeCollectionStateWatcher(collection, stateWatcher);
    }
    }

    /**
    * Block until a CollectionStatePredicate returns true, or the wait times out
    *
    * Note that the predicate may be called again even after it has returned true, so
    * implementors should avoid changing state within the predicate call itself.
    *
    * @param collection the collection to watch
    * @param wait how long to wait
    * @param unit the units of the wait parameter
    * @param predicate the predicate to call on state changes
    * @throws InterruptedException on interrupt
    * @throws TimeoutException on timeout
    */
    public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
    throws InterruptedException, TimeoutException {

    final CountDownLatch latch = new CountDownLatch(1);

    CollectionStateWatcher watcher = (n, c) -> {
    boolean matches = predicate.matches(n, c);
    if (matches)
    latch.countDown();
    return matches;
    };
    registerCollectionStateWatcher(collection, watcher);

    try {
    // wait for the watcher predicate to return true, or time out
    if (!latch.await(wait, unit))
    throw new TimeoutException();

    }
    finally {
    removeCollectionStateWatcher(collection, watcher);
    }
    }

    /**
    * Remove a watcher from a collection's watch list.
    *
    * This allows Zookeeper watches to be removed if there is no interest in the
    * collection.
    *
    * @param collection the collection
    * @param watcher the watcher
    */
    public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
    AtomicBoolean reconstructState = new AtomicBoolean(false);
    collectionWatches.compute(collection, (k, v) -> {
    if (v == null)
    return null;
    v.stateWatchers.remove(watcher);
    if (v.canBeRemoved()) {
    watchedCollectionStates.remove(collection);
    lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
    reconstructState.set(true);
    return null;
    }
    return v;
    });
    if (reconstructState.get()) {
    synchronized (getUpdateLock()) {
    constructState(Collections.emptySet());
    }
    }
    }

    /* package-private for testing */
    Set<CollectionStateWatcher> getStateWatchers(String collection) {
    final Set<CollectionStateWatcher> watchers = new HashSet<>();
    collectionWatches.compute(collection, (k, v) -> {
    if (v != null) {
    watchers.addAll(v.stateWatchers);
    }
    return v;
    });
    return watchers;
    }

    // returns true if the state has changed
    private boolean updateWatchedCollection(String coll, DocCollection newState) {

    if (newState == null) {
    LOG.debug("Removing cached collection state for [{}]", coll);
    watchedCollectionStates.remove(coll);
    return true;
    }

    boolean updated = false;
    // CAS update loop
    while (true) {
    if (!collectionWatches.containsKey(coll)) {
    break;
    }
    DocCollection oldState = watchedCollectionStates.get(coll);
    if (oldState == null) {
    if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
    LOG.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
    updated = true;
    break;
    }
    } else {
    if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
    // no change to state, but we might have been triggered by the addition of a
    // state watcher, so run notifications
    updated = true;
    break;
    }
    if (watchedCollectionStates.replace(coll, oldState, newState)) {
    LOG.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
    updated = true;
    break;
    }
    }
    }

    // Resolve race with unregisterCore.
    if (!collectionWatches.containsKey(coll)) {
    watchedCollectionStates.remove(coll);
    LOG.debug("Removing uninteresting collection [{}]", coll);
    }

    return updated;
    }

    public static class ConfigData {
    public Map<String, Object> data;
    public int version;

    public ConfigData() {
    }

    public ConfigData(Map<String, Object> data, int version) {
    this.data = data;
    this.version = version;

    }
    }

    private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
    try {
    notifications.submit(new Notification(liveNodes, collection, collectionState));
    }
    catch (RejectedExecutionException e) {
    if (closed == false) {
    LOG.error("Couldn't run collection notifications for {}", collection, e);
    }
    }
    }

    private class Notification implements Runnable {

    final Set<String> liveNodes;
    final String collection;
    final DocCollection collectionState;

    private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) {
    this.liveNodes = liveNodes;
    this.collection = collection;
    this.collectionState = collectionState;
    }

    @Override
    public void run() {
    List<CollectionStateWatcher> watchers = new ArrayList<>();
    collectionWatches.compute(collection, (k, v) -> {
    if (v == null)
    return null;
    watchers.addAll(v.stateWatchers);
    return v;
    });
    for (CollectionStateWatcher watcher : watchers) {
    if (watcher.onStateChanged(liveNodes, collectionState)) {
    removeCollectionStateWatcher(collection, watcher);
    }
    }
    }

    }

    /**
    * A class to watch for changes to aliases. There should only ever be one instance of this class
    * per instance of ZkStateReader. Normally it will not be useful to create a new instance since
    * this watcher automatically re-registers itself every time it is updated.
    */
    private class AliasWatcher implements Watcher {

    private final List<AliasCondition> conditions = new ArrayList<>();

    @Override
    public void process(WatchedEvent event) {
    // session events are not change events, and do not remove the watcher
    if (EventType.None.equals(event.getType())) {
    return;
    }
    try {
    synchronized (ZkStateReader.this.getUpdateLock()) {
    LOG.debug("Updating aliases... ");

    // re-register the watch
    final byte[] data = zkClient.getData(ALIASES, this, null, true);
    Aliases aliases = Aliases.fromJSON(data);
    ZkStateReader.this.aliases = aliases;
    LOG.debug("New alias definition is: " + ZkStateReader.this.aliases.toString());
    synchronized (conditions) {
    for (AliasCondition condition : conditions) {
    if (condition.test(aliases)) {
    conditions.remove(condition);
    }
    }
    }
    }
    } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
    LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
    } catch (KeeperException e) {
    LOG.error("A ZK error has occurred", e);
    throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
    } catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    LOG.warn("Interrupted", e);
    }
    }

    void addCondition(AliasCondition condition) {
    synchronized (conditions) {
    if (!condition.test(ZkStateReader.this.aliases)) {
    conditions.add(condition);
    }
    }
    }
    }

    private abstract class AliasCondition {
    CountDownLatch latch = new CountDownLatch(1);

    public void await(long timeout, TimeUnit units) throws InterruptedException {
    latch.await(timeout, units);
    }

    /**
    * Perform some test on the aliases. Typical implementations should count down the latch and
    * return true if the test succeeds.
    *
    * @param aliases the aliases object to check.
    * @return true if the desired condition is met and the latch has been counted down.
    */
    abstract boolean test(Aliases aliases);
    }

    }