initial import
This commit is contained in:
11
src/id/iptek/utms/agent/queue/ConsumerMode.java
Normal file
11
src/id/iptek/utms/agent/queue/ConsumerMode.java
Normal file
@ -0,0 +1,11 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
*/
|
||||
public enum ConsumerMode {
|
||||
|
||||
SINGLE, BATCH;
|
||||
|
||||
}
|
||||
212
src/id/iptek/utms/agent/queue/DelayMessageQueue.java
Normal file
212
src/id/iptek/utms/agent/queue/DelayMessageQueue.java
Normal file
@ -0,0 +1,212 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
import id.iptek.utms.agent.db.DatabaseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
* @param <T>
|
||||
*/
|
||||
public class DelayMessageQueue<T> {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private ExecutorService executorservice;
|
||||
private int capacity = 10;
|
||||
private ConsumerMode mode = ConsumerMode.SINGLE;
|
||||
private QueueMessageHandler<T> handler;
|
||||
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
|
||||
private boolean running = false;
|
||||
private int totalThread = 1;
|
||||
private ExecutorService internalExec = null;
|
||||
private Future internalWorker = null;
|
||||
private long batchFlushIntervalMillis = 1000L;
|
||||
|
||||
public DelayMessageQueue() {
|
||||
}
|
||||
|
||||
public DelayMessageQueue(ExecutorService executorservice, QueueMessageHandler<T> handler) {
|
||||
this(executorservice, handler, 10);
|
||||
}
|
||||
|
||||
public DelayMessageQueue(ExecutorService executorservice, QueueMessageHandler<T> handler, int capacity) {
|
||||
this.executorservice = executorservice;
|
||||
this.handler = handler;
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
public void setExecutorservice(ExecutorService executorservice) {
|
||||
this.executorservice = executorservice;
|
||||
}
|
||||
|
||||
public void setCapacity(int capacity) {
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
public int getCapacity() {
|
||||
return capacity;
|
||||
}
|
||||
|
||||
public void setMode(ConsumerMode mode) {
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
public void setHandler(QueueMessageHandler<T> handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public void setTotalThread(int totalThread) {
|
||||
this.totalThread = totalThread;
|
||||
}
|
||||
|
||||
public void setBatchFlushIntervalMillis(long batchFlushIntervalMillis) {
|
||||
this.batchFlushIntervalMillis = batchFlushIntervalMillis;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
logger.info("Run queue worker ...");
|
||||
this.running = true;
|
||||
|
||||
internalExec = Executors.newFixedThreadPool(totalThread);
|
||||
internalWorker = this.executorservice.submit(new QueueRunnable());
|
||||
}
|
||||
|
||||
public void stop(boolean interrupt) {
|
||||
logger.info("Stopping queue worker ...");
|
||||
stop(interrupt, 0L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public boolean stop(boolean interrupt, long timeout, TimeUnit unit) {
|
||||
logger.info("Stopping queue worker ...");
|
||||
this.running = false;
|
||||
if(internalWorker == null) {
|
||||
logger.warn("Queue worker has not been run!");
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
if(interrupt) {
|
||||
internalWorker.cancel(true);
|
||||
} else if(!internalWorker.isDone()) {
|
||||
if(timeout > 0L) {
|
||||
internalWorker.get(timeout, unit);
|
||||
} else {
|
||||
internalWorker.get();
|
||||
}
|
||||
}
|
||||
} catch(Exception ex) {
|
||||
logger.warn("Queue worker did not stop gracefully: {}", ex.getMessage());
|
||||
}
|
||||
|
||||
internalExec.shutdown();
|
||||
try {
|
||||
boolean terminated = timeout > 0L ? internalExec.awaitTermination(timeout, unit) : internalExec.awaitTermination(0L, TimeUnit.MILLISECONDS);
|
||||
if(!terminated) {
|
||||
logger.warn("Queue handler tasks did not finish within {} {}", timeout, unit);
|
||||
internalExec.shutdownNow();
|
||||
return false;
|
||||
}
|
||||
} catch(InterruptedException ex) {
|
||||
logger.warn("Interrupted while waiting queue handler tasks: {}", ex.getMessage());
|
||||
internalExec.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean add(T message) {
|
||||
return queue.add(message);
|
||||
}
|
||||
|
||||
class QueueRunnable implements Runnable {
|
||||
|
||||
private final List<T> internalList = new ArrayList<>();
|
||||
private long lastFlushTime = System.currentTimeMillis();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while(running || !queue.isEmpty()) {
|
||||
try {
|
||||
T message = queue.poll();
|
||||
if(message != null) {
|
||||
if(mode == ConsumerMode.BATCH) {
|
||||
// handled batches
|
||||
internalList.add(message);
|
||||
|
||||
// check capacity
|
||||
if(internalList.size() >= capacity) {
|
||||
flushBatch();
|
||||
}
|
||||
} else if(mode == ConsumerMode.SINGLE) {
|
||||
// run job using separate runnable
|
||||
Runnable run = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// handle single
|
||||
boolean handled = handler.handleMessage(message);
|
||||
logger.debug("Message handled? {}", handled);
|
||||
} catch(DatabaseException ex) {
|
||||
logger.error("Error saving heartbeats: {}", ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
internalExec.execute(run);
|
||||
}
|
||||
} else {
|
||||
if(mode == ConsumerMode.BATCH && !internalList.isEmpty()
|
||||
&& System.currentTimeMillis() - lastFlushTime >= batchFlushIntervalMillis) {
|
||||
flushBatch();
|
||||
}
|
||||
TimeUnit.MILLISECONDS.sleep(100L);
|
||||
}
|
||||
if(mode == ConsumerMode.BATCH && !internalList.isEmpty()
|
||||
&& System.currentTimeMillis() - lastFlushTime >= batchFlushIntervalMillis) {
|
||||
flushBatch();
|
||||
}
|
||||
} catch(InterruptedException ex) {
|
||||
logger.debug("Queue worker interrupted: {}", ex.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error consuming queue : {}", ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
if(mode == ConsumerMode.BATCH && !internalList.isEmpty()) {
|
||||
flushBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private void flushBatch() {
|
||||
List<T> workingList = new ArrayList<>();
|
||||
workingList.addAll(internalList);
|
||||
internalList.clear();
|
||||
lastFlushTime = System.currentTimeMillis();
|
||||
|
||||
// run job using separate runnable
|
||||
Runnable run = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean handled = handler.handleMessages(workingList);
|
||||
logger.debug("Batch messages handled? {}", handled);
|
||||
} catch(DatabaseException ex) {
|
||||
logger.error("Error saving heartbeats: {}", ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
internalExec.execute(run);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
134
src/id/iptek/utms/agent/queue/DeviceInitQueueMessageHandler.java
Normal file
134
src/id/iptek/utms/agent/queue/DeviceInitQueueMessageHandler.java
Normal file
@ -0,0 +1,134 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import id.iptek.utms.agent.dao.ProfileDao;
|
||||
import id.iptek.utms.agent.model.Profile;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
*/
|
||||
public class DeviceInitQueueMessageHandler implements QueueMessageHandler<Map> {
|
||||
|
||||
public final static String NAME = "DEV_INIT_QUEUE_HANDLER";
|
||||
private Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private ProfileDao profileDao = new ProfileDao();
|
||||
private SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
|
||||
private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm");
|
||||
|
||||
public DeviceInitQueueMessageHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessage(Map message) {
|
||||
boolean processed = false;
|
||||
try {
|
||||
logger.debug("Handling init request: {}", message);
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
MqttClient mqttClient = (MqttClient) message.get("MQTT");
|
||||
String terminalSN = (String) message.get("device_sn");
|
||||
Profile profile = profileDao.getProfileForTerminal(terminalSN);
|
||||
if (profile != null) {
|
||||
broadcastProfile(mqttClient, profile);
|
||||
} else {
|
||||
logger.warn("No profile for SN: {}", terminalSN);
|
||||
}
|
||||
processed = true;
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return processed;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling init: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessages(List<Map> messages) {
|
||||
try {
|
||||
logger.debug("Handling list of init requests: {}", messages.size());
|
||||
long start = System.currentTimeMillis();
|
||||
boolean processed = false;
|
||||
for(Map message : messages) {
|
||||
MqttClient mqttClient = (MqttClient) message.get("MQTT");
|
||||
String terminalSN = (String) message.get("device_sn");
|
||||
Profile profile = profileDao.getProfileForTerminal(terminalSN);
|
||||
if (profile != null) {
|
||||
broadcastProfile(mqttClient, profile);
|
||||
} else {
|
||||
logger.warn("No profile for SN: {}", terminalSN);
|
||||
}
|
||||
processed |= true;
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return processed;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling list of init requests: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// send profile to device topic via mqtt
|
||||
public void broadcastProfile(MqttClient mqttClient,
|
||||
Profile profile) throws Exception {
|
||||
logger.debug("Publish profile ...");
|
||||
try {
|
||||
try {
|
||||
Gson gson = new Gson();
|
||||
int qos = 2;
|
||||
String topicName = profile.getTerminalSN().toUpperCase() + "_IN";
|
||||
// generate json
|
||||
Map messageMap = new HashMap<>();
|
||||
messageMap.put("req_id", UUID.randomUUID().toString());
|
||||
messageMap.put("req_time", dateTimeFormat.format(new Date()));
|
||||
messageMap.put("req_type", "UPDATE_PARAM");
|
||||
// profile map
|
||||
Map profileMap = new HashMap<>();
|
||||
profileMap.put("id", profile.getId());
|
||||
profileMap.put("name", profile.getName());
|
||||
profileMap.put("hearbeat_interval", profile.getHeartbeatInterval());
|
||||
profileMap.put("diagnostic_interval", profile.getDiagnosticInterval());
|
||||
profileMap.put("mask_home_button", profile.isMaskHomeButton());
|
||||
profileMap.put("mask_status_button", profile.isMaskStatusButton());
|
||||
profileMap.put("schedule_reboot", profile.isScheduleReboot());
|
||||
if (profile.isScheduleReboot()) {
|
||||
profileMap.put("schedule_reboot_time", timeFormat.format(profile.getScheduleRebootTime()));
|
||||
}
|
||||
profileMap.put("relocation_alert", profile.isRelocationAlert());
|
||||
profileMap.put("moving_threshold", profile.getMovingThreshold());
|
||||
profileMap.put("admin_password", profile.getAdminPassword());
|
||||
|
||||
// additional
|
||||
profileMap.put("front_app", profile.getFrontApp());
|
||||
profileMap.put("apps_home_list", profile.getApps());
|
||||
profileMap.put("group_list", profile.getGroupIds());
|
||||
|
||||
messageMap.put("profile", profileMap);
|
||||
|
||||
String messageData = gson.toJson(messageMap);
|
||||
|
||||
logger.debug("Try publish message to: {}", topicName);
|
||||
mqttClient.publish(topicName, messageData.getBytes(), qos, false);
|
||||
logger.debug("Message published!");
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error publish to device: {}", ex.getMessage(), ex);
|
||||
} finally {
|
||||
}
|
||||
} finally {
|
||||
logger.debug("Publish profile DONE");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
import id.iptek.utms.agent.dao.DiagnosticDao;
|
||||
import id.iptek.utms.agent.model.Diagnostic;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
*/
|
||||
public class DiagnosticInfoQueueMessageHandler implements QueueMessageHandler<Diagnostic> {
|
||||
|
||||
public final static String NAME = "DIAGNOSTIC_QUEUE_HANDLER";
|
||||
private Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private DiagnosticDao dao = new DiagnosticDao();
|
||||
|
||||
public DiagnosticInfoQueueMessageHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessage(Diagnostic message) {
|
||||
try {
|
||||
logger.debug("Saving diagnostic: {}", message);
|
||||
long start = System.currentTimeMillis();
|
||||
boolean saved = dao.save(message);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return saved;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling diagnostic: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessages(List<Diagnostic> messages) {
|
||||
try {
|
||||
logger.debug("Saving list of diagnostics: {}", messages.size());
|
||||
long start = System.currentTimeMillis();
|
||||
boolean saved = dao.saveAll(messages);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return saved;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling list of diagnostics: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
import id.iptek.utms.agent.dao.HeartbeatDao;
|
||||
import id.iptek.utms.agent.model.HeartBeat;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
*/
|
||||
public class HeartBeatQueueMessageHandler implements QueueMessageHandler<HeartBeat> {
|
||||
|
||||
public final static String NAME = "HEARTBEAT_QUEUE_HANDLER";
|
||||
private Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private HeartbeatDao dao = new HeartbeatDao();
|
||||
|
||||
public HeartBeatQueueMessageHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessage(HeartBeat message) {
|
||||
try {
|
||||
logger.debug("Saving heartbeat: {}", message);
|
||||
long start = System.currentTimeMillis();
|
||||
boolean saved = dao.save(message);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return saved;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling heartbeat: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleMessages(List<HeartBeat> messages) {
|
||||
try {
|
||||
logger.debug("Saving list of heartbeats: {}", messages.size());
|
||||
long start = System.currentTimeMillis();
|
||||
boolean saved = dao.saveAll(messages);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.debug("Saved in {}ms", (end-start));
|
||||
return saved;
|
||||
} catch(Exception ex) {
|
||||
logger.error("Error handling list of heartbeats: {}", ex.getMessage(), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
25
src/id/iptek/utms/agent/queue/QueueMessageHandler.java
Normal file
25
src/id/iptek/utms/agent/queue/QueueMessageHandler.java
Normal file
@ -0,0 +1,25 @@
|
||||
package id.iptek.utms.agent.queue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author jakar
|
||||
*/
|
||||
public interface QueueMessageHandler<T> {
|
||||
|
||||
/**
|
||||
* Handle message
|
||||
* @param message
|
||||
* @return
|
||||
*/
|
||||
public boolean handleMessage(T message);
|
||||
|
||||
/**
|
||||
* Handle batch of messages at same time
|
||||
* @param messages
|
||||
* @return
|
||||
*/
|
||||
public boolean handleMessages(List<T> messages);
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user