最近在1家saas企业使用Mqtt开发IM消息推送服务,把开发中的1些问题记录下来,项目仍在商用中,完全的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部份。
主要技术触及到MongoDB,webservice,httpclient,Mqtt等
protocol分为4个模块类来实现,固然这是为了以后的扩大性比较好
首先看1下我们的主类,主要是mqtt基础方法的1个框架
public class MqttProtocol
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static final String HOST = "tcp://xx.xx.xx.xx:1883";
private static final String CLIENTID = "yyyy";
private MqttClient client;
private MqttConnectOptions options = new MqttConnectOptions();
//private String userName = "admin";
//private String passWord = "public";
public MqttMessage message;
private PushCallback callback;
/**
* 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt
服务器
* @throws MqttException
*/
public MqttProtocol() throws MqttException
{
//MemoryPersistence设置clientid的保存情势,默许为之内存保存
client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
callback = new PushCallback();
client.setCallback(callback);
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(60);
connect();
}
/**
* 连接mqtt消息
服务器,同时设置了断开重连的功能,主要是为了高可用性斟酌,在断网
服务器崩溃时候我们的程序依然不会终止
*/
private void connect()
{
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
boolean tryConnecting = true;
while (tryConnecting) {
try {
client.connect(options);
} catch (Exception e1) {
System.out.println("Connection attempt failed with '"+e1.getCause()+
"'. Retrying.");
}
if (client.isConnected()) {
System.out.println("Connected.");
tryConnecting = false;
} else {
pause();
}
}
}
private void pause() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Error handling goes here...
}
}
/**
*
* @param topic
* @param qos
* @throws MqttPersistenceException
* @throws MqttException
* 定阅相干主题
*/
public void subscribe(String topic , int qos) throws MqttPersistenceException,
MqttException
{
client.subscribe(topic, qos);
}
/**
*
* @throws MqttPersistenceException
* @throws MqttException
* 断开连接
服务器
*/
public void disconnect() throws MqttPersistenceException,
MqttException
{
client.disconnect();
}
/**
*
* @author binshi
*实现mqttcallback接口,主要用于接收消息后的处理方法
*/
private class PushCallback implements MqttCallback {
/**
* 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作
*/
public void connectionLost(Throwable cause) {
// 连接丢失后,1般在这里面进行重连
System.out.println("连接断开,可以做重连");
connect();
try {
subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
System.out.println("deliveryComplete---------" + arg0.isComplete());
}
/**
* 连接mongo
数据库,返回关于具体collection的Mongocollection
* @param collectionname
* @return
*/
public void messageArrived(String topic, MqttMessage message) throws Exception
{
System.out.println(topic);
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
//1 抽取事件信令消息
String messagejudge=new String(message.getPayload());
System.out.println("疏忽所有robot消息和offline离线消息");
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(messagejudge);
} catch (Exception e) {
e.printStackTrace();
}
String from=jo.getString("from");
System.out.println("取得from"+from);
System.out.println("肯定消息是不是包括offline,如果包括获得offline,为1就不处理");
String offline=null;
if(messagejudge.contains("offline"))
{
offline=jo.getString("offline");
}
if((offline==null)&&(!from.contains("robot")))
{
System.out.println("处理非系统消息和非离线消息");
String type=jo.getString("type");
System.out.println("取得type"+type);
if(type.equals("shakehand"))
{
System.out.println("处理shakehand消息");
String admin="doyounkowwhy";
if(jo.toString().contains("admin"))
{
admin=jo.getString("admin");
}
System.out.println("获得admin 如果为1定义为客服,否则为普通用户 admin为"+admin);
if(admin.equals("1"))
{
System.out.println("处理客服握手消息");
System.out.println("发送握手成功消息");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);
System.out.println("向客户端发送离线未接收的消息");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);
}
else
{
System.out.println("处理普通用户的握手消息");
String appid=jo.getString("appid");
String pageid=jo.getString("pageid");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);
}
}
else if(type.equals("text")||type.equals("image"))
{
System.out.println("处理图片和文字消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.getTextMsg( tmpindex, from, convid, retopic);
System.out.println("保存图片文字消息");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);
}
else if(type.equals("ack"))
{
System.out.println("处理ack消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);
}
}
}
}
/**
*
* @param args
* @throws MqttException
* 全部工程从这里开始履行,生成可履行jar包,这个设置为主类。
*/
public static void main(String[] args) throws MqttException
{
MqttProtocol signal = new MqttProtocol();
signal.message = new MqttMessage();
/**
server.message.setQos(2);
server.message.setRetained(false);
server.message.setPayload("给客户端124推送的信息".getBytes());
server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
*/
signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
System.out.println(signal.message.isRetained() + "------ratained状态");
}
}
接下来使我们的远程连接模块,主要是通过给定的url调用远程接口
public class RemoteOperation
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static JSONObject remoteCall(String url) throws HttpException, IOException
{
HttpClient httpClient = new HttpClient();
GetMethod method =null ;
method=new GetMethod(url);
int retcode = httpClient.executeMethod(method);
if (retcode != HttpStatus.SC_OK)
{// 发送不成功
logger.info("远程调用出错");
return null;
}
else
{
String body = method.getResponseBodyAsString();
logger.info(body+"远程调用php成功");
JSONObject jsonObject=new JSONObject();
try {
jsonObject=JSONObject.fromObject(body);
} catch (Exception e) {
e.printStackTrace();
}
if (method != null)
{
method.releaseConnection();
}
return jsonObject;
}
}
}
下面是Mongo
数据库的相干操作的1个封装,设计为单例模式,相当于每次都使用同1个client打开连接,类似于连接池的概念,固然业务逻辑部份可以更换
public class MongoDBDao
{
private static Logger logger = Logger.getLogger(MongoDBDao.class);
/**
* MongoClient的实例代表
数据库连接池,是线程安全的,可以被多线程同享,客户端在多线程条件下仅保持1个实例便可
* Mongo是非线程安全的,目前mongodb API中已建议用MongoClient替换Mongo
*/
private MongoClient mongoClient = null;
/**
*
* 私有的构造函数
* 作者:shibin
*/
private MongoDBDao(){
if(mongoClient == null){
String url = Constant.MONGO_MQTT_URL;
String user = Constant.MONGO_MQTT_USER;
String password = Constant.MONGO_MQTT_PASSWORD;
String database = Constant.MONGO_MQTT_DATABASE;
int port = 27017;
ServerAddress serverAddress = new ServerAddress(url, port);
List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>();
serverAddresses.add(serverAddress);
MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());
List<MongoCredential> credentials = new ArrayList<MongoCredential>();
credentials.add(credential);
mongoClient = new MongoClient(serverAddresses, credentials);
System.out.println(mongoClient);
System.out.println("初始化client完成");
}
}
/********单例模式声明开始,采取饿汉式方式生成,保证线程安全********************/
//类初始化时,自行实例化,饿汉式单例模式
private static final MongoDBDao mongoDBDao = new MongoDBDao();
/**
*
* 方法名:getMongoDBDaoImplInstance
* 作者:shibin
*
* 描写:单例的静态工厂方法
* @return
*/
public static MongoDBDao getMongoDBDaoInstance(){
return mongoDBDao;
}
public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException
{
System.out.println("取得message的连接");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
System.out.println("获得convid所对应的msg列表");
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
FindIterable<Document> iterable=null;
iterable = mongoCollection.find(query);
if(iterable.first()!=null)
{
System.out.println(iterable.first());
String res= iterable.first().toJson();
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(res);
} catch (Exception e) {
e.printStackTrace();
}
JSONArray jsonArray=jo.getJSONArray("msg");
for(int i=0;i<jsonArray.length();i++)
{
String read=jsonArray.getJSONObject(i).getString("read");
System.out.println("取得msg对应的第"+i+"条记录的read信息"+read);
System.out.println("判断read是不是包括from的信息,如果不包括且这条消息不是他自己发的就给她发送这条消息");
if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from))
{
System.out.println("取得这条消息的原型,然后加上offline=1并发送消息");
JSONObject msg=jsonArray.getJSONObject(i);
msg.put("offline", "1");
retopic.publish(msg.toString().getBytes(), 0, false);
}
else
{
System.out.println("no offline message for "+from);
}
}
}
}
public void saveTextMsg(String database,String collection,JSONObject jo)
{
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
BasicDBObject query = new BasicDBObject();
String convid=jo.getString("convid");
query.put("_id", convid);
FindIterable iterable;
iterable = mongoCollection.find(query);
System.out.println("更新message之前的值"+iterable.first());
Bson filter = Filters.eq("_id", convid);
Document content = new Document();
String type=jo.getString("type");
if(type.equals("text"))
{
String contentMsg=jo.getJSONObject("content").getString("content");
content.put("content", contentMsg);
}
else
{
String url=jo.getJSONObject("content").getString("url");
content.put("url", url);
}
String admin=jo.getJSONObject("extra").getString("admin");
String headimgurl=jo.getJSONObject("extra").getString("headimgurl");
String nickname=jo.getJSONObject("extra").getString("nickname");
String from=jo.getString("from");
String tmpindex=jo.getString("tmpindex");
Document extra = new Document();
extra.put("nickname", nickname);
Document doc = new Document();
doc.put("from",from );
ArrayList<String> read=new ArrayList<String>();
doc.put("read", read);
Document tdoc = new Document();
tdoc.put("msg", doc);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);
iterable = mongoCollection.find(query);
System.out.println("更新message以后的值"+iterable.first());
}
public void getAck(String tmpindex,String convid,String from,String database,String collection)
{
System.out.println("接收到ack消息后更新message中的read字段");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
query.put("msg.tmpindex", tmpindex);
BasicDBObject query1 = new BasicDBObject();
query1.put("_id", convid);
FindIterable iterable;
FindIterable iterable2;
iterable = mongoCollection.find(query1);
iterable2 = mongoCollection.find(query);
System.out.println("更新message满足id过滤条件之前的值"+iterable.first());
System.out.println("更新message满足id和tmpindex过滤条件之前的值"+iterable2.first());
if(iterable2.first()!=null)
{
Document doc = new Document();
doc.put("msg.$.read", from);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);
}
iterable = mongoCollection.find(query1);
System.out.println("更新messages以后的值"+iterable.first());
}
}
剩下的关于业务逻辑方面的就不多说了,主要是关于mqtt高可用性断开重连的功能和mongo相干的操作