松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

Flux反應式編程結合多線程實現任務編排

博客首頁文章列表 松花皮蛋me 2019-03-12 18:51

一、任務編排

如果僅僅是批量執行一段命令、腳本,是不能稱之為自動化運維的。運維過程中,涉及到一些復雜、需要日常重復性的工作,則可以通過任務編排來處理。任務編排可以將復雜的作業節點編排成任務,設定觸發條件和時間,滿足更為靈活的應用場景

二、反應式編程

任務實體

import lombok.Data;
import java.util.List;

@Data
public class Task {
    private Integer id;
    private Integer jobId;
    private String name;
    private String command;
    private Integer order;
    private String failHandle = "break";
    private List<String> requires;
    private Boolean preArgs = true;
}

假設一個job中需要運行多個task,task間有order、requires依賴關系,傳統編程模式中,最容易想到的就是回調編程,但是如果要達到效果將導致多層回調內嵌,維護成本和可讀性極差

ExecutorService executor = Executors.newFixedThreadPool(1);
    ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
    ListenableFuture<String> lf = listeningExecutor.submit(new Callable<String>()  {
        @Override
        public String call() {

        }
    });
    Futures.addCallback(lf, new FutureCallback<String>()  {
        @Override
        public void onFailure(Throwable t) {

        }
        @Override
        public void onSuccess(String s) {
        }
    });

隨著反應式編程(Reactive Programming)這種新的編程范式的出現,有了新的解決方案。

我們一般通過迭代器(Iterator)模式來遍歷一個序列,這種遍歷方式是由調用者來控制節奏的,采用的是拉的方式,每次由調用者通過 next()方法來獲取序列中的下一個值。使用反應式流時采用的則是推的方式,即常見的發布者-訂閱者模式。當發布者有新的數據產生時,這些數據會被推送到訂閱者來進行處理。在反應式流上可以添加各種不同的操作來對數據進行處理,形成數據處理鏈。這個以聲明式的方式添加的處理鏈只在訂閱者進行訂閱操作時才會真正執行

官方的例子

userService.getFavorites(userId) ?
           .flatMap(favoriteService::getDetails)  ?
           .switchIfEmpty(suggestionService.getSuggestions())  ?
           .take(5)  ?
           .publishOn(UiUtils.uiThreadScheduler())  ?
           .subscribe(uiList::show, UiUtils::errorPopup);  ?

? 根據用戶ID獲得喜歡的信息(打開一個 Publisher)
? 使用 flatMap 操作獲得詳情信息
? 使用 switchIfEmpty 操作,在沒有喜歡數據的情況下,采用系統推薦的方式獲得
? 取前五個
? 在 uiThread 上進行發布
? 最終的消費行為

三、代碼實現

主方法:

@SpringBootApplication
public class WorkflowApplication implements CommandLineRunner {

public static void main(String[] args) {
    SpringApplication.run(WorkflowApplication.class, args);

}

static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);

@Autowired
private TaskSubcriber taskSubcriber;


@Override
public void run(String... args) throws Exception {
    List taskList = new ArrayList();
    Task task1 = new Task();
    task1.setName("task1");
    task1.setCommand("command1");
    task1.setOrder(1);

    Task task2 = new Task();
    task2.setName("task2");
    task2.setCommand("command2");
    task2.setRequires(Arrays.asList("task1"));
    task2.setOrder(2);

    Task task3 = new Task();
    task3.setName("task3");
    task3.setCommand("command3");
    task3.setOrder(3);

    taskList.add(task1);
    taskList.add(task2);
    taskList.add(task3);

    BaseService baseService = new BaseService();
    ExecutorService executor = Executors.newFixedThreadPool(3);
    executor.execute(new Runnable() {
        @Override
        public void run() {
            Flux.fromArray(taskList.toArray()).filter(o -> {
                return baseService.requireFilter((Task) o,true);
            }).subscribe(taskSubcriber);
        }
    });
    executor.execute(new Runnable() {
        @Override
        public void run() {
            Flux.interval(Duration.ofSeconds(0),Duration.ofSeconds(0)).fromArray(taskList.toArray()).filter(o -> {
                return baseService.requireFilter((Task) o,false);
            }).subscribe(taskSubcriber);
        }
    });


}


}

消費層

@Component
public class TaskSubcriber implements Subscriber<Object> {

private static final Logger logger = LoggerFactory.getLogger(TaskSubcriber.class);

private static Object lock = new Object();

@Override
public void onSubscribe(Subscription subscription) {
    logger.info("onSubscribe subscription:{}",subscription);
    subscription.request(Integer.MAX_VALUE);
}

@Override
public void onNext(Object object) {
    logger.info("onNext list:{}",object);
    Task task = (Task) object;
    if(task.getRequires()==null) {
        NoRelyTaskService noRelyTaskService = new NoRelyTaskService(lock,task);
        Thread thread = new Thread(noRelyTaskService);
        thread.start();
      } else {
        RelyTaskService relyTaskService = new RelyTaskService(lock,task);
        Thread thread = new Thread(relyTaskService);
        thread.start();
    }
}

@Override
public void onError(Throwable throwable) {
    throwable.printStackTrace();
}

@Override
public void onComplete() {
    logger.info("onComplete :{}");
}
}

不依賴其他任務的任務執行層

public class NoRelyTaskService extends BaseService implements   Runnable {

private  Object lock = null;
private Task task;

public NoRelyTaskService(Object l,Task t)
{
    lock = l;
    task = t;
}

@Override
public void run()
{
    synchronized (lock) {
        System.out.println("線程:"+Thread.currentThread().getName()+",命令:"+task.getCommand());
        lock.notifyAll();
    }

}
}   

依賴其他任務的任務執行層

public class RelyTaskService extends BaseService implements     Runnable{

private  Object lock = null;
private Task task;

public RelyTaskService(Object l,Task t)
{
    lock = l;
    task = t;
}

@Override
public void  run()
{
    synchronized (lock) {
        try {
            List<String> requires = task.getRequires();
            while (state(requires)==false) {
                System.out.println("線程"+Thread.currentThread().getName()+"開始等待");
                lock.wait();
            }
            System.out.println("線程:"+Thread.currentThread().getName()+",命令:"+task.getCommand());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


}

四、多線程需要注意的地方

1、初學者理解wait()的時候都認為是將當前線程阻塞,所以Thread.currentThread().wait();視乎很有道理。但是不知道大家有沒有發現,在JDK類庫中wait()和notify()方法并不是Thread類的,而是Object()中的。在其他線程調用此對象的 notify() 方法或 notifyAll() 方法前,當前線程等待

2、始終使用while循環來調用wait方法,永遠不要在循環外調用wait方法,這樣做的原因是盡管并不滿足條件,但是由于其他線程調用notifyAll方法會導致被阻塞線程意外喚醒,此時執行條件不滿足,它會導致約束失效

3、喚醒線程,應該使用notify還是notifyAll?notify會隨機通知等待隊列中的一個線程,而notifyAll會通知等待隊列中所有線程,可知notify是有風險的 ,可能導致某些線程永遠不會被通知到

4、當前線程必須擁有此對象監視器,然后才可以放棄對此監視器的所有權并等待 ,直到其他線程通過調用notify方法或notifyAll方法通知在此對象的監視器上等待的線程醒來,然后該線程將等到重新獲得對監視器的所有權后才能繼續執行。否則會報IllegalMonitorStateException 錯誤

黑龙江6+1开奖结果查询