Getting Started with Dyno Queues
Dyno-Queue is an interesting queue solution on top of Dynomite. Dyno-queue uses dynomite java driver a.k.a Dyno. Dyno Queue gets all benefits from dynomite like Strong Consistency, High Availability, Stability, High Throughput and Low latency and extend to queue semantics.
I highly recommend you read Netflix post about dyno-queue.
Dyno-Queues
Dyno-queue also extra benefits and properties such as:
* Distributed
* No external locks (e.g. Zookeeper locks)
* Highly concurrent
* At-least-once delivery semantics
* No strict FIFO
* Delayed queue (message is not taken out of the queue until some time in the future)
* Priorities within the shard
Setting up a Dynomite Cluster with Docker
For development and experimentation reason we will use a side project I created called: dynomite-docker which makes very easy to setup dynomite cluster. So let's get started and set up the cluster.
If you are running on a mac you need to use the dynomite-docker-mac.sh script instead since dynomite-docker.sh is for Linux. After running $ ./dynomite-docker.sh run_single 0.6.0 you will have a 3 node dynomite cluster up and run and you should see something like this on the console:
Now that we have dynomite cluster Up and Running with docker we can move next and create the dyno connection.
Setting up a Dyno Connection
In order to set up a Dyno connection, we will need to create 4 classes. There we go:
* DynoConnectionManager: Connects to Dynomite and returns a client in a Sync way.
* DynomiteNodeInfo: Pojo to represent dynomite nodes information.
* DynomiteSeedsParser: Parse String format for objects.
* TokenMapSupplierHelper: Provide topology information about the cluster.
This is generic so we could use this to connect in other clusters. For sake of simplicity, the DynoConnectionManager does not take parameters but as you can see the seeds comes from Archaius so we could change if we need.
All right now we can move one and use dyno-queues.
Using Dyno Queues
We just need 1 class in order to use dyno queue - it's pretty simple - let's go.
Alright so here we create a Dyno connection to dynomite using DynoConnectionManager.build() method and then we define a dyno-queue configuration. We need to be defined a PREFIX and local-rack which will be part of the KEY inside redis. IF you running on AWS Rack should be an AZ and local-rack means same AZ as your application is running so you can reduce latency.
Them we can create a RedisQueues object and push messages to a queue we can poll messages from the queue as well and we can ACK messages to. As you can see the API is pretty simple and cool. If you want you can get the full code on my GitHub.
Cheers,
Diego Pacheco
I highly recommend you read Netflix post about dyno-queue.
Dyno-Queues
Dyno-queue also extra benefits and properties such as:
* Distributed
* No external locks (e.g. Zookeeper locks)
* Highly concurrent
* At-least-once delivery semantics
* No strict FIFO
* Delayed queue (message is not taken out of the queue until some time in the future)
* Priorities within the shard
Setting up a Dynomite Cluster with Docker
For development and experimentation reason we will use a side project I created called: dynomite-docker which makes very easy to setup dynomite cluster. So let's get started and set up the cluster.
./dynomite-docker.sh
# " m # #
mmmm# m m m mm mmm mmmmm mmm mm#mm mmm mmm# mmm mmm # m mmm m mm
#" "# "m m" #" # #" "# # # # # # #" # #" "# #" "# #" " # m" #" # #"
# # #m# # # # # # # # # # #"""" """ # # # # # #"# #"""" #
"#m## "# # # "#m#" # # # mm#mm "mm "#mm" "#m## "#m#" "#mm" # "m "#mm" #
dynomite-docker: easy setup for dynomite clusters for development. Created by: Diego Pacheco.
functions:
bake : Bakes docker image
run : Run Dynomite docker 2 clusters for dual write
run_single : Run Dynomite docker Single cluster
run_shard : Run Dynomite docker Shard cluster
dcc : Run Dynomite Cluster Checker for 2 clusters
dcc_single : Run Dynomite Cluster Checker for single cluster
dcc_shard : Run Dynomite Cluster Checker for shard cluster
info : Get Seeds, IPs and topologies(all 3 possible clusters)
log : Print dynomite logs, you need pass the node number. i.e: ./dynomite-docker log 1
cli : Enters redis-cli on dynomite port. i.e: ./dynomite-docker cli 1
keys_single : Runs KEYS * command in all nodes(Single Cluster)
keys_shard : Runs KEYS * command in all nodes(Shard Cluster)
stop : Stop and clean up all docker running images
help : help documentation
./dynomite-docker.sh run_single 0.6.0
0.6.0
Docker images and Network clean up DONE.
c6877e61e4facd8508b461379fdc08a29ffbf28b18d6ec75fb01484381fb61e9
NETWORK ID NAME DRIVER SCOPE
f2727f2123ad akka256clusterdocker_default bridge local
373f32a8bcdf bigchaindb_default bridge local
c8e017a8cf44 bridge bridge local
aec7394fbc71 host host local
c6877e61e4fa myDockerNetDynomite bridge local
662f75d552fc none null local
70c097636244 redis_default bridge local
3a5beaf24f5361188c37afe10eb25e23dcc9a9a90876af3a9f5292d7e710e22c
f722712b149c6d286e3fe6fca11b7153c3487c297311c92b599c90d95e21c763
c248f22c89de335be2d100cef91937675b2b886ce572a721c3d617b497f37944
Cluster 2 Single - Topology :
token: 100 dc: dc
rack1 - 179.18.0.101
rack2 - 179.18.0.102
rack3 - 179.18.0.103
Seeds: 179.18.0.101:8102:rack1:dc:100|179.18.0.102:8102:rack2:dc:100|179.18.0.103:8102:rack3:dc:100
Avaliable Dynomite version: v0.5.7, v0.5.8, v0.5.9, v0.6.0
docker-ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c248f22c89de diegopacheco/dynomitedocker "/usr/local/dynomi..." 13 seconds ago Up 11 seconds 6379/tcp, 8101-8102/tcp, 22222/tcp dynomite3
f722712b149c diegopacheco/dynomitedocker "/usr/local/dynomi..." 13 seconds ago Up 12 seconds 6379/tcp, 8101-8102/tcp, 22222/tcp dynomite2
3a5beaf24f53 diegopacheco/dynomitedocker "/usr/local/dynomi..." 14 seconds ago Up 12 seconds 6379/tcp, 8101-8102/tcp, 22222/tcp dynomite1
Setting up a Dyno Connection
In order to set up a Dyno connection, we will need to create 4 classes. There we go:
* DynoConnectionManager: Connects to Dynomite and returns a client in a Sync way.
* DynomiteNodeInfo: Pojo to represent dynomite nodes information.
* DynomiteSeedsParser: Parse String format for objects.
* TokenMapSupplierHelper: Provide topology information about the cluster.
This is generic so we could use this to connect in other clusters. For sake of simplicity, the DynoConnectionManager does not take parameters but as you can see the seeds comes from Archaius so we could change if we need.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import com.netflix.config.ConfigurationManager; | |
import com.netflix.dyno.connectionpool.impl.RetryNTimes; | |
import com.netflix.dyno.contrib.ArchaiusConnectionPoolConfiguration; | |
import com.netflix.dyno.jedis.DynoJedisClient; | |
/** | |
* Builds Dyno Connection | |
* | |
* @author diegopacheco | |
* @since 01/07/2016 | |
* @version 1.0 | |
*/ | |
public class DynoConnectionManager { | |
public static DynoJedisClient build() { | |
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.seeds", "179.18.0.101:8102:rack1:dc:100|179.18.0.102:8102:rack2:dc:100|179.18.0.103:8102:rack3:dc:100"); | |
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.cluster.name","dynomiteCluster"); | |
ConfigurationManager.getConfigInstance().setProperty("dynomite.driver.client.name","dynomiteCluster"); | |
ConfigurationManager.getConfigInstance().setProperty("dyno.dyn_o_mite.retryPolicy","RetryNTimes:3:true"); | |
String client = ConfigurationManager.getConfigInstance().getString("dynomite.driver.client.name",""); | |
String cluster = ConfigurationManager.getConfigInstance().getString("dynomite.driver.cluster.name",""); | |
String seeds = ConfigurationManager.getConfigInstance().getString("dynomite.driver.seeds",""); | |
List<DynomiteNodeInfo> nodes = DynomiteSeedsParser.parse(seeds); | |
DynoJedisClient dynoClient = new DynoJedisClient.Builder() | |
.withApplicationName(client) | |
.withDynomiteClusterName(cluster) | |
.withCPConfig( new ArchaiusConnectionPoolConfiguration(client) | |
.withTokenSupplier(TokenMapSupplierHelper.toTokenMapSupplier(nodes)) | |
.setMaxConnsPerHost(1) | |
.setConnectTimeout(2000) | |
.setRetryPolicyFactory(new RetryNTimes.RetryFactory(3,true)) | |
) | |
.withHostSupplier(TokenMapSupplierHelper.toHostSupplier(nodes)) | |
.build(); | |
return dynoClient; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Represents a Dynomite Seed Node Configuration | |
* | |
* @author diegopacheco | |
* @since 01/07/2016 | |
* @version 1.0 | |
* | |
*/ | |
public class DynomiteNodeInfo { | |
private String server; | |
private String port; | |
private String rack; | |
private String dc; | |
private String tokens; | |
public DynomiteNodeInfo() {} | |
public DynomiteNodeInfo(String server, String port, String rack, String dc, String tokens) { | |
super(); | |
this.server = server; | |
this.port = port; | |
this.rack = rack; | |
this.dc = dc; | |
this.tokens = tokens; | |
} | |
public String getServer() { | |
return server; | |
} | |
public void setServer(String server) { | |
this.server = server; | |
} | |
public String getPort() { | |
return port; | |
} | |
public void setPort(String port) { | |
this.port = port; | |
} | |
public String getRack() { | |
return rack; | |
} | |
public void setRack(String rack) { | |
this.rack = rack; | |
} | |
public String getDc() { | |
return dc; | |
} | |
public void setDc(String dc) { | |
this.dc = dc; | |
} | |
public String getTokens() { | |
return tokens; | |
} | |
public void setTokens(String tokens) { | |
this.tokens = tokens; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((server == null) ? 0 : server.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
DynomiteNodeInfo other = (DynomiteNodeInfo) obj; | |
if (server == null) { | |
if (other.server != null) | |
return false; | |
} else if (!server.equals(other.server)) | |
return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return server + ":" + port + ":" + rack + ":" + dc + ":" + tokens; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
/** | |
* Parser dynomite seeds for dyno. | |
* | |
* @author diegopacheco | |
* @since 01/07/2016 | |
* @version 1.0 | |
*/ | |
public class DynomiteSeedsParser { | |
public static List<DynomiteNodeInfo> parse(String seeds) { | |
List<DynomiteNodeInfo> result = new ArrayList<>(); | |
if ("local".equals(seeds)) | |
return Arrays.asList(new DynomiteNodeInfo("127.0.0.1", "8102", "rack1", "localdc", "1383429731")); | |
if (seeds == null || "".equals(seeds)) | |
throw new IllegalArgumentException("Seeds is blank or null. Invalid Seeds! "); | |
String[] seedsArray = (seeds.contains("|")) ? seeds.split("\\|") : new String[] { seeds }; | |
if (seedsArray == null || seedsArray.length == 0) | |
throw new IllegalArgumentException("Invalid Seeds! Seeds: " + seeds); | |
for (String s : seedsArray) { | |
String[] itens = s.split(":"); | |
if (itens == null || itens.length == 0) | |
throw new IllegalArgumentException("Invalid Seeds! Seeds: " + seeds); | |
DynomiteNodeInfo node = new DynomiteNodeInfo(); | |
node.setServer(itens[0]); | |
node.setPort(itens[1]); | |
node.setRack(itens[2]); | |
node.setDc(itens[3]); | |
node.setTokens(itens[4]); | |
result.add(node); | |
} | |
return result; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.List; | |
import com.netflix.dyno.connectionpool.Host; | |
import com.netflix.dyno.connectionpool.Host.Status; | |
import com.netflix.dyno.connectionpool.HostSupplier; | |
import com.netflix.dyno.connectionpool.TokenMapSupplier; | |
import com.netflix.dyno.connectionpool.impl.lb.AbstractTokenMapSupplier; | |
/** | |
* TokenMapSupplierHelper helps to build TokenMapSupplier. | |
* | |
* @author diegopacheco | |
* @since 01/07/2016 | |
* @version 1.0 | |
*/ | |
public class TokenMapSupplierHelper { | |
public static TokenMapSupplier toTokenMapSupplier(List<DynomiteNodeInfo> nodes){ | |
StringBuilder jsonSB = new StringBuilder("["); | |
int count = 0; | |
for(DynomiteNodeInfo node: nodes){ | |
jsonSB.append(" {\"token\":\""+ node.getTokens() | |
+ "\",\"hostname\":\"" + node.getServer() | |
+ "\",\"ip\":\"" + node.getServer() | |
+ "\",\"zone\":\"" + node.getRack() | |
+ "\",\"rack\":\"" + node.getRack() | |
+ "\",\"dc\":\"" + node.getDc() | |
+ "\"} "); | |
count++; | |
if (count < nodes.size()) | |
jsonSB.append(" , "); | |
} | |
jsonSB.append(" ]\""); | |
final String json = jsonSB.toString(); | |
TokenMapSupplier testTokenMapSupplier = new AbstractTokenMapSupplier() { | |
@Override | |
public String getTopologyJsonPayload(String hostname) { | |
return json; | |
} | |
@Override | |
public String getTopologyJsonPayload(java.util.Set<Host> activeHosts) { | |
return json; | |
} | |
}; | |
return testTokenMapSupplier; | |
} | |
public static HostSupplier toHostSupplier(List<DynomiteNodeInfo> nodes){ | |
final List<Host> hosts = new ArrayList<Host>(); | |
for(DynomiteNodeInfo node: nodes){ | |
hosts.add(buildHost(node)); | |
} | |
final HostSupplier customHostSupplier = new HostSupplier() { | |
@Override | |
public Collection<Host> getHosts() { | |
return hosts; | |
} | |
}; | |
return customHostSupplier; | |
} | |
public static Host buildHost(DynomiteNodeInfo node){ | |
Host host = new Host(node.getServer(), node.getServer(), 8102, node.getRack(), node.getDc(), Status.Up); | |
return host; | |
} | |
} |
Using Dyno Queues
We just need 1 class in order to use dyno queue - it's pretty simple - let's go.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import com.github.diegopacheco.netflixoss.pocs.dyno.DynoConnectionManager; | |
import com.netflix.dyno.jedis.DynoJedisClient; | |
import com.netflix.dyno.queues.DynoQueue; | |
import com.netflix.dyno.queues.Message; | |
import com.netflix.dyno.queues.redis.DynoShardSupplier; | |
import com.netflix.dyno.queues.redis.RedisQueues; | |
public class DynoQueuesMain { | |
public static void main(String[] args) { | |
String region = "dc"; | |
String localDC = "rack1"; | |
String prefix = "dynoQueue_"; | |
int dynoThreadCount = 20; | |
DynoJedisClient dyno = DynoConnectionManager.build(); | |
DynoShardSupplier ss = new DynoShardSupplier(dyno.getConnPool().getConfiguration().getHostSupplier(), region, localDC); | |
RedisQueues queues = new RedisQueues(dyno, dyno, prefix, ss, 60_000, 60_000, dynoThreadCount); | |
String queueName = "msg_queue"; | |
DynoQueue queue = queues.get(queueName); | |
Message msg = new Message("id1", "message payload"); | |
queue.push(Arrays.asList(msg)); | |
int count = 10; | |
List<Message> polled = queue.pop(count, 1, TimeUnit.SECONDS); | |
System.out.println(polled); | |
queue.ack("id1"); | |
} | |
} |
Them we can create a RedisQueues object and push messages to a queue we can poll messages from the queue as well and we can ACK messages to. As you can see the API is pretty simple and cool. If you want you can get the full code on my GitHub.
Cheers,
Diego Pacheco