Skip to content

Instantly share code, notes, and snippets.

@markathomas
Created April 27, 2015 18:36
Show Gist options
  • Select an option

  • Save markathomas/7c152528cff2ae46af27 to your computer and use it in GitHub Desktop.

Select an option

Save markathomas/7c152528cff2ae46af27 to your computer and use it in GitHub Desktop.

Revisions

  1. markathomas created this gist Apr 27, 2015.
    213 changes: 213 additions & 0 deletions BadClusterStateTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,213 @@
    import com.hazelcast.config.Config;
    import com.hazelcast.config.MapConfig;
    import com.hazelcast.config.MapIndexConfig;
    import com.hazelcast.config.InterfacesConfig;
    import com.hazelcast.config.NetworkConfig;
    import com.hazelcast.config.SerializerConfig;
    import com.hazelcast.config.TcpIpConfig;
    import com.hazelcast.core.Hazelcast;
    import com.hazelcast.core.HazelcastInstance;
    import com.hazelcast.core.IMap;
    import com.hazelcast.core.LifecycleEvent;
    import com.hazelcast.core.LifecycleListener;
    import com.hazelcast.core.MemberAttributeEvent;
    import com.hazelcast.core.MembershipEvent;
    import com.hazelcast.core.MembershipListener;
    import com.hazelcast.core.PartitionAwareKey;
    import com.hazelcast.instance.GroupProperties;
    import com.hazelcast.instance.Node;
    import com.hazelcast.instance.TestUtil;
    import com.hazelcast.map.merge.LatestUpdateMapMergePolicy;
    import com.hazelcast.query.Predicates;
    import com.hazelcast.test.HazelcastTestSupport;
    import com.hazelcast.test.TestEnvironment;

    import java.util.Collection;
    import java.util.UUID;
    import java.util.concurrent.CountDownLatch;

    import org.testng.annotations.Test;

    public class BadClusterStateTest extends HazelcastTestSupport {

    @Test(timeOut = 1000 * 60 * 10)
    public void testBadState() throws CloneNotSupportedException {
    String mapName = randomMapName();
    Config config = newConfig(LatestUpdateMapMergePolicy.class.getName(), mapName);
    HazelcastInstance h1 = Hazelcast.newHazelcastInstance(config);
    HazelcastInstance h2 = Hazelcast.newHazelcastInstance(config);

    TestMembershipListener membershipListener = new TestMembershipListener(1);
    h2.getCluster().addMembershipListener(membershipListener);
    TestLifecycleListener lifecycleListener = new TestLifecycleListener(1);
    h2.getLifecycleService().addLifecycleListener(lifecycleListener);

    RealtimeCall call = new RealtimeCall();
    call.setId(UUID.randomUUID());
    call.setClusterUUID(UUID.randomUUID());
    call.setDisplayId(1);
    call.setNumber("5554447777");

    IMap<PartitionAwareKey<UUID, UUID>, RealtimeCall> map1 = h1.getMap(mapName);
    IMap<PartitionAwareKey<UUID, UUID>, RealtimeCall> map2 = h2.getMap(mapName);

    map1.put(call.getAffinityKey(), call);

    sleepMillis(1);
    assert map2.get(call.getAffinityKey()) != null;

    closeConnectionBetween(h1, h2);

    assertOpenEventually(membershipListener.latch);
    assertClusterSizeEventually(1, h1);
    assertClusterSizeEventually(1, h2);

    map1 = h1.getMap(mapName);
    map1.remove(call.getAffinityKey());

    sleepMillis(1);
    map2 = h2.getMap(mapName);
    assert map2.get(call.getAffinityKey()) != null;

    assertOpenEventually(lifecycleListener.latch);
    assertClusterSizeEventually(2, h1);
    assertClusterSizeEventually(2, h2);

    map1 = h1.getMap(mapName);
    assert map1.get(call.getAffinityKey()) != null;

    map1.remove(call.getAffinityKey());
    assert map2.get(call.getAffinityKey()) == null;

    for (int i = 0; i < 100; i++) {
    Collection<RealtimeCall> calls = map1.values(Predicates.equal("id", call.getId()));
    System.out.println("Map 1 query by uuid: " + calls.size());
    calls = map2.values(Predicates.equal("id", call.getId()));
    System.out.println("Map 2 query by uuid: " + calls.size());
    calls = map1.values(Predicates.equal("displayId", call.getDisplayId()));
    System.out.println("Map 1 query by display id: " + calls.size());
    calls = map2.values(Predicates.equal("displayId", call.getDisplayId()));
    System.out.println("Map 2 query by displayId: " + calls.size());
    RealtimeCall c = map1.get(call.getAffinityKey());
    System.out.println("Map 1 get by affinity key: " + (c == null ? "null" : "not null"));
    c = map2.get(call.getAffinityKey());
    System.out.println("Map 2 get by affinity key: " + (c == null ? "null" : "not null"));
    sleepMillis(5);
    }
    }

    private Config newConfig(String mergePolicy, String mapName) {
    Config config = new Config();
    config.setProperties(this.getCommonProperties());
    config.setProperty(GroupProperties.PROP_MERGE_FIRST_RUN_DELAY_SECONDS, "5");
    config.setProperty(GroupProperties.PROP_MERGE_NEXT_RUN_DELAY_SECONDS, "3");
    MapConfig mapConfig = config.getMapConfig(mapName);
    mapConfig.setMergePolicy(mergePolicy);
    mapConfig.setBackupCount(1);
    mapConfig.setReadBackupData(true);
    mapConfig.setStatisticsEnabled(true);
    mapConfig.setMaxIdleSeconds(0);
    mapConfig.setTimeToLiveSeconds(0);
    mapConfig.addMapIndexConfig(new MapIndexConfig("id", false));
    mapConfig.addMapIndexConfig(new MapIndexConfig("number", false));
    mapConfig.addMapIndexConfig(new MapIndexConfig("createdOn", true));
    config.setNetworkConfig(this.getLocalhostTcpIpNetworkConfig(6701));
    config.getGroupConfig().setName(mapName);
    config.getGroupConfig().setPassword(mapName);
    return config;
    }

    private void closeConnectionBetween(HazelcastInstance h1, HazelcastInstance h2) {
    if (h1 == null || h2 == null) return;
    final Node n1 = TestUtil.getNode(h1);
    final Node n2 = TestUtil.getNode(h2);
    n1.clusterService.removeAddress(n2.address);
    n2.clusterService.removeAddress(n1.address);
    }

    private class TestLifecycleListener implements LifecycleListener {

    CountDownLatch latch;

    TestLifecycleListener(int countdown) {
    latch = new CountDownLatch(countdown);
    }

    @Override
    public void stateChanged(LifecycleEvent event) {
    if (event.getState() == LifecycleEvent.LifecycleState.MERGED) {
    latch.countDown();
    }
    }
    }

    private class TestMembershipListener implements MembershipListener {

    final CountDownLatch latch;

    TestMembershipListener(int countdown) {
    latch = new CountDownLatch(countdown);
    }

    @Override
    public void memberAdded(MembershipEvent membershipEvent) {

    }

    @Override
    public void memberRemoved(MembershipEvent membershipEvent) {
    latch.countDown();
    }

    @Override
    public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {

    }

    }

    public static void main(String[] args) throws CloneNotSupportedException {
    new BadClusterStateTest().testBadState();
    }


    protected NetworkConfig getLocalhostTcpIpNetworkConfig(int port) {
    NetworkConfig networkConfig = new NetworkConfig();
    networkConfig.setPort(port);
    networkConfig.getJoin().getMulticastConfig().setEnabled(false);
    TcpIpConfig tcpIpConfig = networkConfig.getJoin().getTcpIpConfig();
    tcpIpConfig.setEnabled(true);
    tcpIpConfig.addMember("127.0.0.1");
    InterfacesConfig interfacesConfig = networkConfig.getInterfaces();
    interfacesConfig.setEnabled(true);
    interfacesConfig.setInterfaces(Collections.singleton("127.0.0.*"));
    return networkConfig;
    }

    protected Properties getCommonProperties() {
    Properties properties = new Properties();
    properties.setProperty(GroupProperties.PROP_LOGGING_TYPE, "slf4j");
    properties.setProperty(GroupProperties.PROP_VERSION_CHECK_ENABLED, "false");
    properties.setProperty("hazelcast.mancenter.enabled", "false");
    properties.setProperty(GroupProperties.PROP_WAIT_SECONDS_BEFORE_JOIN, "1");
    properties.setProperty(GroupProperties.PROP_CONNECT_ALL_WAIT_SECONDS, "5");
    properties.setProperty(GroupProperties.PROP_MAX_NO_HEARTBEAT_SECONDS, "2");
    properties.setProperty(GroupProperties.PROP_HEARTBEAT_INTERVAL_SECONDS, "1");
    properties.setProperty(GroupProperties.PROP_MASTER_CONFIRMATION_INTERVAL_SECONDS, "5");
    properties.setProperty(GroupProperties.PROP_MAX_NO_MASTER_CONFIRMATION_SECONDS, "10");
    properties.setProperty(GroupProperties.PROP_MEMBER_LIST_PUBLISH_INTERVAL_SECONDS, "5");
    properties.setProperty(GroupProperties.PROP_MAX_JOIN_MERGE_TARGET_SECONDS, "10");
    properties.setProperty("hazelcast.local.localAddress", "127.0.0.1");
    properties.setProperty("java.net.preferIPv4Stack", "true");
    properties.setProperty(TestEnvironment.HAZELCAST_TEST_USE_NETWORK, "false");

    // randomize multicast group...
    Random rand = new Random();
    int g1 = rand.nextInt(255);
    int g2 = rand.nextInt(255);
    int g3 = rand.nextInt(255);
    properties.setProperty("hazelcast.multicast.group", "224." + g1 + "." + g2 + "." + g3);

    return properties;
    }
    }
    77 changes: 77 additions & 0 deletions RealtimeCall.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,77 @@

    import com.hazelcast.core.PartitionAwareKey;
    import com.hazelcast.nio.ObjectDataInput;
    import com.hazelcast.nio.ObjectDataOutput;
    import com.hazelcast.nio.serialization.DataSerializable;

    import java.io.IOException;
    import java.util.Date;
    import java.util.UUID;

    public class RealtimeCall implements DataSerializable {
    private UUID id;
    private UUID clusterUUID;
    private Date createdOn = new Date();
    private String number;
    private long displayId;

    public UUID getId() {
    return this.id;
    }

    public void setId(UUID id) {
    this.id = id;
    }

    public UUID getClusterUUID() {
    return this.clusterUUID;
    }

    public void setClusterUUID(UUID clusterUUID) {
    this.clusterUUID = clusterUUID;
    }

    public Date getCreatedOn() {
    return this.createdOn;
    }

    public void setCreatedOn(Date createdOn) {
    this.createdOn = createdOn;
    }

    public String getNumber() {
    return this.number;
    }

    public void setNumber(String number) {
    this.number = number;
    }

    public long getDisplayId() {
    return this.displayId;
    }

    public void setDisplayId(long displayId) {
    this.displayId = displayId;
    }

    public PartitionAwareKey<UUID, UUID> getAffinityKey() {
    return new PartitionAwareKey<>(getId(), getClusterUUID());
    }

    public void writeData(ObjectDataOutput out) throws IOException {
    out.writeObject(this.id);
    out.writeObject(this.clusterUUID);
    out.writeObject(this.createdOn);
    out.writeUTF(this.number);
    out.writeLong(this.displayId);
    }

    public void readData(ObjectDataInput in) throws IOException {
    this.id = in.readObject();
    this.clusterUUID = in.readObject();
    this.createdOn = in.readObject();
    this.number = in.readUTF();
    this.displayId = in.readLong();
    }
    }