Getting Started with Dyno Queues

Dyno-Queue is an interesting queue solution on top of Dynomite. Dyno-queue uses dynomite java driver a.k.a Dyno. Dyno Queue gets all benefits from dynomite like Strong Consistency, High Availability, Stability, High Throughput and Low latency and extend to queue semantics.

I highly recommend you read Netflix post about dyno-queue.



Dyno-Queues 

Dyno-queue also extra benefits and properties such as:
* Distributed
* No external locks (e.g. Zookeeper locks)
* Highly concurrent
* At-least-once delivery semantics
* No strict FIFO
* Delayed queue (message is not taken out of the queue until some time in the future)
* Priorities within the shard

Setting up a Dynomite Cluster with Docker

For development and experimentation reason we will use a side project I created called: dynomite-docker which makes very easy to setup dynomite cluster. So let's get started and set up the cluster.

Pre-equirements

  • Install Java JDK 8
  • Install Docker
  • Install Git

Setting up Dynomite Cluster

git clone https://github.com/diegopacheco/dynomite-docker.git
cd dynomite-docker/
./dynomite-docker.sh bake
./dynomite-docker.sh run_single 0.6.0
If you are running on a mac you need to use the dynomite-docker-mac.sh script instead since dynomite-docker.sh is for Linux. After running $ ./dynomite-docker.sh run_single 0.6.0 you will have a 3 node dynomite cluster up and run and you should see something like this on the console:

./dynomite-docker.sh
 
    #                                "      m                      #                #                             
mmmm#  m   m  m mm    mmm   mmmmm  mmm    mm#mm   mmm           mmm#   mmm    mmm   #  m   mmm     m mm         
#" "#  "m m"  #"  #  #" "#  # # #    #      #    #"  #         #" "#  #" "#  #"  "  # m"   #"  #   #"
#   #   #m#   #   #  #   #  # # #    #      #    #""""   """   #   #  #   #  #      #"#    #""""   #     
"#m##   "#    #   #  "#m#"  # # #  mm#mm    "mm  "#mm"         "#m##  "#m#"  "#mm"  #  "m  "#mm"   #  
 
dynomite-docker: easy setup for dynomite clusters for development. Created by: Diego Pacheco.
functions: 

bake        : Bakes docker image
run         : Run Dynomite docker 2 clusters for dual write
run_single  : Run Dynomite docker Single cluster
run_shard   : Run Dynomite docker Shard cluster
dcc         : Run Dynomite Cluster Checker for 2 clusters
dcc_single  : Run Dynomite Cluster Checker for single cluster
dcc_shard   : Run Dynomite Cluster Checker for shard cluster
info        : Get Seeds, IPs and topologies(all 3 possible clusters)
log         : Print dynomite logs, you need pass the node number. i.e: ./dynomite-docker log 1
cli         : Enters redis-cli on dynomite port. i.e: ./dynomite-docker cli 1
keys_single : Runs KEYS * command in all nodes(Single Cluster)
keys_shard  : Runs KEYS * command in all nodes(Shard Cluster)
stop        : Stop and clean up all docker running images
help        : help documentation

./dynomite-docker.sh run_single 0.6.0

0.6.0
Docker images and Network clean up DONE.
c6877e61e4facd8508b461379fdc08a29ffbf28b18d6ec75fb01484381fb61e9
NETWORK ID          NAME                           DRIVER              SCOPE
f2727f2123ad        akka256clusterdocker_default   bridge              local
373f32a8bcdf        bigchaindb_default             bridge              local
c8e017a8cf44        bridge                         bridge              local
aec7394fbc71        host                           host                local
c6877e61e4fa        myDockerNetDynomite            bridge              local
662f75d552fc        none                           null                local
70c097636244        redis_default                  bridge              local
3a5beaf24f5361188c37afe10eb25e23dcc9a9a90876af3a9f5292d7e710e22c
f722712b149c6d286e3fe6fca11b7153c3487c297311c92b599c90d95e21c763
c248f22c89de335be2d100cef91937675b2b886ce572a721c3d617b497f37944
Cluster 2 Single - Topology :
token: 100 dc: dc
  rack1 - 179.18.0.101
  rack2 - 179.18.0.102
  rack3 - 179.18.0.103
Seeds: 179.18.0.101:8102:rack1:dc:100|179.18.0.102:8102:rack2:dc:100|179.18.0.103:8102:rack3:dc:100

Avaliable Dynomite version: v0.5.7, v0.5.8, v0.5.9, v0.6.0

docker-ps

CONTAINER ID        IMAGE                         COMMAND                  CREATED             STATUS              PORTS                                NAMES
c248f22c89de        diegopacheco/dynomitedocker   "/usr/local/dynomi..."   13 seconds ago      Up 11 seconds       6379/tcp, 8101-8102/tcp, 22222/tcp   dynomite3
f722712b149c        diegopacheco/dynomitedocker   "/usr/local/dynomi..."   13 seconds ago      Up 12 seconds       6379/tcp, 8101-8102/tcp, 22222/tcp   dynomite2
3a5beaf24f53        diegopacheco/dynomitedocker   "/usr/local/dynomi..."   14 seconds ago      Up 12 seconds       6379/tcp, 8101-8102/tcp, 22222/tcp   dynomite1
Now that we have dynomite cluster Up and Running with docker we can move next and create the dyno connection.

Setting up a Dyno Connection

In order to set up a Dyno connection, we will need to create 4 classes. There we go:

* DynoConnectionManager: Connects to Dynomite and returns a client in a Sync way.
* DynomiteNodeInfo: Pojo to represent dynomite nodes information.
* DynomiteSeedsParser: Parse String format for objects.
* TokenMapSupplierHelper: Provide topology information about the cluster.

This is generic so we could use this to connect in other clusters. For sake of simplicity, the DynoConnectionManager does not take parameters but as you can see the seeds comes from Archaius so we could change if we need.

import java.util.List;
import com.netflix.config.ConfigurationManager;
import com.netflix.dyno.connectionpool.impl.RetryNTimes;
import com.netflix.dyno.contrib.ArchaiusConnectionPoolConfiguration;
import com.netflix.dyno.jedis.DynoJedisClient;
/**
* Builds Dyno Connection
*
* @author diegopacheco
* @since 01/07/2016
* @version 1.0
*/
public class DynoConnectionManager {
public static DynoJedisClient build() {
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.seeds", "179.18.0.101:8102:rack1:dc:100|179.18.0.102:8102:rack2:dc:100|179.18.0.103:8102:rack3:dc:100");
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.cluster.name","dynomiteCluster");
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.client.name","dynomiteCluster");
ConfigurationManager.getConfigInstance().setProperty("dyno.dyn_o_mite.retryPolicy","RetryNTimes:3:true");
String client = ConfigurationManager.getConfigInstance().getString("dynomite.driver.client.name","");
String cluster = ConfigurationManager.getConfigInstance().getString("dynomite.driver.cluster.name","");
String seeds = ConfigurationManager.getConfigInstance().getString("dynomite.driver.seeds","");
List<DynomiteNodeInfo> nodes = DynomiteSeedsParser.parse(seeds);
DynoJedisClient dynoClient = new DynoJedisClient.Builder()
.withApplicationName(client)
.withDynomiteClusterName(cluster)
.withCPConfig( new ArchaiusConnectionPoolConfiguration(client)
.withTokenSupplier(TokenMapSupplierHelper.toTokenMapSupplier(nodes))
.setMaxConnsPerHost(1)
.setConnectTimeout(2000)
.setRetryPolicyFactory(new RetryNTimes.RetryFactory(3,true))
)
.withHostSupplier(TokenMapSupplierHelper.toHostSupplier(nodes))
.build();
return dynoClient;
}
}
/**
* Represents a Dynomite Seed Node Configuration
*
* @author diegopacheco
* @since 01/07/2016
* @version 1.0
*
*/
public class DynomiteNodeInfo {
private String server;
private String port;
private String rack;
private String dc;
private String tokens;
public DynomiteNodeInfo() {}
public DynomiteNodeInfo(String server, String port, String rack, String dc, String tokens) {
super();
this.server = server;
this.port = port;
this.rack = rack;
this.dc = dc;
this.tokens = tokens;
}
public String getServer() {
return server;
}
public void setServer(String server) {
this.server = server;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getRack() {
return rack;
}
public void setRack(String rack) {
this.rack = rack;
}
public String getDc() {
return dc;
}
public void setDc(String dc) {
this.dc = dc;
}
public String getTokens() {
return tokens;
}
public void setTokens(String tokens) {
this.tokens = tokens;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((server == null) ? 0 : server.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DynomiteNodeInfo other = (DynomiteNodeInfo) obj;
if (server == null) {
if (other.server != null)
return false;
} else if (!server.equals(other.server))
return false;
return true;
}
@Override
public String toString() {
return server + ":" + port + ":" + rack + ":" + dc + ":" + tokens;
}
}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Parser dynomite seeds for dyno.
*
* @author diegopacheco
* @since 01/07/2016
* @version 1.0
*/
public class DynomiteSeedsParser {
public static List<DynomiteNodeInfo> parse(String seeds) {
List<DynomiteNodeInfo> result = new ArrayList<>();
if ("local".equals(seeds))
return Arrays.asList(new DynomiteNodeInfo("127.0.0.1", "8102", "rack1", "localdc", "1383429731"));
if (seeds == null || "".equals(seeds))
throw new IllegalArgumentException("Seeds is blank or null. Invalid Seeds! ");
String[] seedsArray = (seeds.contains("|")) ? seeds.split("\\|") : new String[] { seeds };
if (seedsArray == null || seedsArray.length == 0)
throw new IllegalArgumentException("Invalid Seeds! Seeds: " + seeds);
for (String s : seedsArray) {
String[] itens = s.split(":");
if (itens == null || itens.length == 0)
throw new IllegalArgumentException("Invalid Seeds! Seeds: " + seeds);
DynomiteNodeInfo node = new DynomiteNodeInfo();
node.setServer(itens[0]);
node.setPort(itens[1]);
node.setRack(itens[2]);
node.setDc(itens[3]);
node.setTokens(itens[4]);
result.add(node);
}
return result;
}
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.Host.Status;
import com.netflix.dyno.connectionpool.HostSupplier;
import com.netflix.dyno.connectionpool.TokenMapSupplier;
import com.netflix.dyno.connectionpool.impl.lb.AbstractTokenMapSupplier;
/**
* TokenMapSupplierHelper helps to build TokenMapSupplier.
*
* @author diegopacheco
* @since 01/07/2016
* @version 1.0
*/
public class TokenMapSupplierHelper {
public static TokenMapSupplier toTokenMapSupplier(List<DynomiteNodeInfo> nodes){
StringBuilder jsonSB = new StringBuilder("[");
int count = 0;
for(DynomiteNodeInfo node: nodes){
jsonSB.append(" {\"token\":\""+ node.getTokens()
+ "\",\"hostname\":\"" + node.getServer()
+ "\",\"ip\":\"" + node.getServer()
+ "\",\"zone\":\"" + node.getRack()
+ "\",\"rack\":\"" + node.getRack()
+ "\",\"dc\":\"" + node.getDc()
+ "\"} ");
count++;
if (count < nodes.size())
jsonSB.append(" , ");
}
jsonSB.append(" ]\"");
final String json = jsonSB.toString();
TokenMapSupplier testTokenMapSupplier = new AbstractTokenMapSupplier() {
@Override
public String getTopologyJsonPayload(String hostname) {
return json;
}
@Override
public String getTopologyJsonPayload(java.util.Set<Host> activeHosts) {
return json;
}
};
return testTokenMapSupplier;
}
public static HostSupplier toHostSupplier(List<DynomiteNodeInfo> nodes){
final List<Host> hosts = new ArrayList<Host>();
for(DynomiteNodeInfo node: nodes){
hosts.add(buildHost(node));
}
final HostSupplier customHostSupplier = new HostSupplier() {
@Override
public Collection<Host> getHosts() {
return hosts;
}
};
return customHostSupplier;
}
public static Host buildHost(DynomiteNodeInfo node){
Host host = new Host(node.getServer(), node.getServer(), 8102, node.getRack(), node.getDc(), Status.Up);
return host;
}
}
All right now we can move one and use dyno-queues.

Using Dyno Queues

We just need 1 class in order to use dyno queue - it's pretty simple - let's go.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.github.diegopacheco.netflixoss.pocs.dyno.DynoConnectionManager;
import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.DynoShardSupplier;
import com.netflix.dyno.queues.redis.RedisQueues;
public class DynoQueuesMain {
public static void main(String[] args) {
String region = "dc";
String localDC = "rack1";
String prefix = "dynoQueue_";
int dynoThreadCount = 20;
DynoJedisClient dyno = DynoConnectionManager.build();
DynoShardSupplier ss = new DynoShardSupplier(dyno.getConnPool().getConfiguration().getHostSupplier(), region, localDC);
RedisQueues queues = new RedisQueues(dyno, dyno, prefix, ss, 60_000, 60_000, dynoThreadCount);
String queueName = "msg_queue";
DynoQueue queue = queues.get(queueName);
Message msg = new Message("id1", "message payload");
queue.push(Arrays.asList(msg));
int count = 10;
List<Message> polled = queue.pop(count, 1, TimeUnit.SECONDS);
System.out.println(polled);
queue.ack("id1");
}
}
Alright so here we create a Dyno connection to dynomite using DynoConnectionManager.build() method and then we define a dyno-queue configuration. We need to be defined a PREFIX and local-rack which will be part of the KEY inside redis. IF you running on AWS Rack should be an AZ and local-rack means same AZ as your application is running so you can reduce latency.

Them we can create a RedisQueues object and push messages to a queue we can poll messages from the queue as well and we can ACK messages to. As you can see the API is pretty simple and cool. If you want you can get the full code on my GitHub.

Cheers,
Diego Pacheco

Popular posts from this blog

Having fun with Zig Language

C Unit Testing with Check

Cool Retro Terminal