redis storm mysql_storm-redis 详解
多的不說,先來代碼分析,再貼我自己寫的代碼。如果代碼有錯誤,求更正。。
導入兩個關鍵包,其他項目需要的包,大家自己導入了,我pom下的包太多,不好一下扔上來。
org.apache.storm
storm-redis
${storm.version}
redis.clients
jedis
2.9.0
我是連接的linux上的redis,所以要對redis進行配置,不然會出現拒絕連接的錯誤。redis部署在linux時,java遠程連接需要修改配置:
修改redis.conf文件
1.將bind 127.0.0.1加上注釋,(#bind 127.0.0.1),允許出本機外的IP訪問redis
2.將protected-mode yes,修改為protected-mode no;不保護redis
3.將daemonize no,修改為daemonize yes;允許redis服務后臺運行
修改防火墻端口號
1.將redis默認的6379注冊到防火墻中
/sbin/iptables -I INPUT -p tcp –dport 6379 -j ACCEPT
2.保存防火墻端口號表
/etc/rc.d/init.d/iptables save
3.重啟防火墻
/etc/rc.d/init.d/iptables restart
4.查看防火墻狀態
/etc/rc.d/init.d/iptables status
使用測試類連接下看能不能連同:import java.util.Iterator;
import java.util.Set;
import redis.clients.jedis.Jedis;
/**
* @author cwc
* @date 2018年5月30日
* @description:
* @version 1.0.0
*/
public class RedisTest {
public static void main(String[]args){
//連接本地的 Redis 服務
Jedis jedis = new Jedis("xxx.xx.xxx.xx");
System.out.println("連接成功");
//查看服務是否運行
System.out.println("服務正在運行: "+jedis.ping());
// 獲取數據并輸出
Set keys = jedis.keys("*");
Iterator it=keys.iterator() ;
while(it.hasNext()){
String key = it.next();
System.out.println(key);
}
}
}
準備就緒,先說說storm向redis寫入:
官方給的寫入API:class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}//這里是用來new 一個新的bolt,在TopologyBuilder時調用操作
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
我反正剛剛看的時候一臉懵逼,之后研究了很久才明白,下面貼我自己的代碼:import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* @author cwc
* @date 2018年5月29日
* @description:這是給的假的數據源
* @version 1.0.0
*/
public class RedisWriteSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector spoutOutputCollector;
/**
* 作為字段word輸出
*/
private static final Map LASTNAME = new HashMap();
static {
LASTNAME.put(0, "anderson");
LASTNAME.put(1, "watson");
LASTNAME.put(2, "ponting");
LASTNAME.put(3, "dravid");
LASTNAME.put(4, "lara");
}
/**
* 作為字段myValues輸出
*/
private static final Map COMPANYNAME = new HashMap();
static {
COMPANYNAME.put(0, "abc");
COMPANYNAME.put(1, "dfg");
COMPANYNAME.put(2, "pqr");
COMPANYNAME.put(3, "ecd");
COMPANYNAME.put(4, "awe");
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public void nextTuple() {
final Random rand = new Random();
int randomNumber = rand.nextInt(5);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));
System.out.println("數據來襲!!!!!!");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// emit the field site.
declarer.declare(new Fields("word","myValues"));
}
}import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.tuple.ITuple;
/**
* @author cwc
* @date 2018年5月30日
* @description:
* @version 1.0.0
*/
public class RedisWriteMapper implements RedisStoreMapper{
private static final long serialVersionUID = 1L;
private RedisDataTypeDescription description;
//這里的key是redis中的key
private final String hashKey = "mykey";
public RedisWriteMapper() {
description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public String getKeyFromTuple(ITuple ituple) {
//這個代表redis中,hash中的字段名
return ituple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple ituple) {
//這個代表redis中,hash中的字段名對應的值
return ituple.getStringByField("myValues");
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
}
storm讀取redis數據:
官方給的API:class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
自己代碼:import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* @author cwc
* @date 2018年5月30日
* @description:
* @version 1.0.0
*/
public class RedisReadSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector spoutOutputCollector;
/**
* 這是剛剛作為word寫入的數據,要通過他獲取我們存的值
*/
private static final Map LASTNAME = new HashMap();
static {
LASTNAME.put(0, "anderson");
LASTNAME.put(1, "watson");
LASTNAME.put(2, "ponting");
LASTNAME.put(3, "dravid");
LASTNAME.put(4, "lara");
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public void nextTuple() {
final Random rand = new Random();
int randomNumber = rand.nextInt(5);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber)));
System.out.println("讀數據來襲!!!!!!");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// emit the field site.
declarer.declare(new Fields("word"));
}
}import java.util.List;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import com.google.common.collect.Lists;
/**
* @author cwc
* @date 2018年5月30日
* @description:
* @version 1.0.0
*/
public class RedisReadMapper implements RedisLookupMapper {
private static final long serialVersionUID = 1L;
//對redis的所支持的種類進行了初始化
private RedisDataTypeDescription description;
//你想要讀取的hash表中的key,這里使用的是剛剛存儲的key字段名
private final String hashKey="mykey";
/**
* redis中儲存結構為hash hashKey為根key 然后在通過getKeyFromTuple 獲得的key找到相對于的value
* key1-key2[]-value key2中的每一個key對應一個value
* lookupValue = jedisCommand.hget(additionalKey, key);
*/
public RedisReadMapper() {
description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public String getKeyFromTuple(ITuple tuple) {
//獲取傳過來的字段名
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//從redis中hash通過上面的key下面找到制定的word中的字段名下的值,有點想hbase中row:cf:val一樣
declarer.declare(new Fields("word","values"));
}
@Override
/**
* 將拿到的數據裝進集合并且返回
*/
public List toTuple(ITuple input, Object value) {
String member =getKeyFromTuple(input);
List values =Lists.newArrayList();
//將拿到的數據存進集合,下面時將兩個值返回的,所以向下游傳值時需要定義兩個名字。
values.add(new Values(member,value));
return values;
}
}import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
/**
* @author cwc
* @date 2018年5月30日
* @description:打印獲取的數據
* @version 1.0.0
*/
public class RedisOutBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
//String str =tuple.getString(0);
String strs =tuple.getString(1);
System.out.println(strs);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector=collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("RedisOutBolt"));
}
}
接下來是??RedisMain,測試讀寫方法:import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.redis.bolt.RedisLookupBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.topology.TopologyBuilder;
public class RedisMain {
public static void main(String[] args) throws Exception {
//writeRedis();
readRedis();
}
/**
* 寫redis
*/
public static void writeRedis(){
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("xxx.xx.xx.xx").setPort(6379).build();
System.out.println("連接成功!!!");
RedisStoreMapper storeMapper = new RedisWriteMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("RedisWriteSpout", new RedisWriteSpout(), 2);
builder.setBolt("to-save", storeBolt, 1).shuffleGrouping("RedisWriteSpout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
System.err.println("寫入完成!!!!!");
try {
Thread.sleep(10000);
//等待6s之后關閉集群
cluster.killTopology("test");
//關閉集群
cluster.shutdown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 讀redis
*/
public static void readRedis(){
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("xxx.xx.xxx.xx").setPort(6379).build();
RedisLookupMapper lookupMapper = new RedisReadMapper();
RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("RedisReadSpout-reader", new RedisReadSpout(), 2);
builder.setBolt("to-lookupBolt", lookupBolt, 1).shuffleGrouping("RedisReadSpout-reader");
builder.setBolt("to-out",new RedisOutBolt(), 1).shuffleGrouping("to-lookupBolt");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
try {
Thread.sleep(100000);
//等待6s之后關閉集群
cluster.killTopology("test");
//關閉集群
cluster.shutdown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}? ? 很多解釋都寫在了代碼注解中,其中也有很多問題,在代碼注釋的地方放生的,認真看下代碼,祝大家零BUG哦~~
總結
以上是生活随笔為你收集整理的redis storm mysql_storm-redis 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 萧敬腾宣布常居 成都未来40天有36天降
- 下一篇: 难逃芯片短缺!分析师称iPhone 14