미래 목록 대기 중
A를 반환하는 메서드가 있습니다.List
장래의
List<Future<O>> futures = getFutures();
이제 모든 미래 처리가 정상적으로 완료되거나 미래에 의해 출력이 반환된 작업 중 하나가 예외를 발생시킬 때까지 기다립니다.하나의 작업이 예외를 던져도 다른 미래를 기다리는 것은 의미가 없다.
간단한 접근법은 다음과 같습니다.
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
하지만 여기서 문제는 예를 들어 4번째 미래가 예외를 발생시킨다면 처음 3가지 미래가 나올 때까지 불필요하게 기다려야 한다는 것입니다.
어떻게 해결할까요?래치 도움말이 카운트다운됩니까?Future를 사용할 수 없습니다.isDone
java doc에 따르면
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
Completion Service를 사용하여 선물 준비가 완료되는 대로 선물을 받을 수 있으며, 이들 중 하나가 예외를 발생시킬 경우 처리를 취소할 수 있습니다.다음과 같은 경우:
Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);
//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}
int received = 0;
boolean errors = false;
while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}
또한 작업 중 하나가 오류를 발생시킨 경우 작업을 취소할 수 있도록 개선할 수 있다고 생각합니다.
Java 8을 사용하는 경우 Completetable을 사용하여 이 작업을 쉽게 수행할 수 있습니다.미래와 완성도Future.allOf: 모든 Complete 테이블이 제공된 후에만 콜백을 적용합니다.선물은 끝났다.
// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.
public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
return CompletableFuture.allOf(cfs)
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
Java 8에서의 사용
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
logger.info("--> " + page1.get());
Executor Completion Service를 사용할 수 있습니다.이 문서에는 정확한 사용 사례에 대한 예도 나와 있습니다.
대신 작업 집합의 첫 번째 null이 아닌 결과를 사용하여 발생한 예외를 모두 무시하고 첫 번째 작업이 준비되었을 때 다른 모든 작업을 취소한다고 가정합니다.
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
여기서 주의할 점은 ecs.take()는 첫 번째 전송된 작업뿐만 아니라 첫 번째 완료된 작업도 가져옵니다.따라서 실행이 종료된(또는 예외가 발생한) 순서대로 가져와야 합니다.
Java 8을 사용하고 있으며 조작을 원하지 않는 경우CompletableFuture
s, 저는 그 결과를 취득하기 위한 툴을 작성했습니다.List<Future<T>>
스트리밍을 사용합니다.중요한 것은 네가 하는 것이 금지되어 있다는 거야map(Future::get)
던질 때.
public final class Futures
{
private Futures()
{}
public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}
private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();
@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}
@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}
@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {
List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);
return ret;
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}
이거는...AggregateException
C#과 같이 동작합니다.
public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;
private final List<Throwable> causes;
private List<?> successfulElements;
public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}
public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}
@Override
public synchronized Throwable getCause()
{
return this;
}
public List<Throwable> getCauses()
{
return causes;
}
public List<?> getSuccessfulElements()
{
return successfulElements;
}
public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}
}
이 컴포넌트는 C#의 태스크와 동일하게 동작합니다.Wait All(Wait All (모두 대기)같은 기능을 하는 변종을 연구하고 있습니다.CompletableFuture.allOf
(해당 사항)Task.WhenAll
)
제가 이걸 한 이유는 Spring's를 쓰고 있기 때문입니다.ListenableFuture
포팅하고 싶지 않습니다.CompletableFuture
그것이 보다 표준적인 방법임에도 불구하고
Complete table 목록을 결합하는 경우Futures는 다음과 같이 할 수 있습니다.
List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures
// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();
Future & 에 대한 자세한 은 Future, 유용한 링크:
1. 미래 : https://www.baeldung.com/java-future
2. 완성표미래: https://www.baeldung.com/java-completablefuture
3. 완성표미래: https://www.callicoder.com/java-8-completablefuture-tutorial/
다음과 같은 유틸리티 클래스가 있습니다.
@FunctionalInterface
public interface CheckedSupplier<X> {
X get() throws Throwable;
}
public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
return () -> {
try {
return supplier.get();
} catch (final Throwable checkedException) {
throw new IllegalStateException(checkedException);
}
};
}
그런 다음 정적 Import를 사용하여 다음과 같이 모든 미래를 단순하게 기다릴 수 있습니다.
futures.stream().forEach(future -> uncheckedSupplier(future::get).get());
다음과 같이 모든 결과를 수집할 수도 있습니다.
List<MyResultType> results = futures.stream()
.map(future -> uncheckedSupplier(future::get).get())
.collect(Collectors.toList());
내 옛 게시물을 다시 찾았을 뿐인데 또 다른 슬픔이 있으시군요
하지만 여기서 문제는 예를 들어 4번째 미래가 예외를 발생시킨다면 처음 3가지 미래가 나올 때까지 불필요하게 기다려야 한다는 것입니다.
이 경우 간단한 해결책은 이 작업을 병행하는 것입니다.
futures.stream().parallel()
.forEach(future -> uncheckedSupplier(future::get).get());
이렇게 하면 첫 번째 예외는 미래를 정지시키지는 않지만 시리얼 예시와 같이 forEach 스테이트먼트가 해제되지만 모두 병렬로 대기하기 때문에 처음 3개가 완료될 때까지 기다릴 필요가 없습니다.
아마 이것이 도움이 될 것이다(날실로 대체되는 것은 아무것도 없다, 응!).해 보는 것이 .Future
), 매니저에게 Handler
★★★★★★★★★★★★★★★★★★」
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
위 코드는 에러(체크하지 않음)라고 말씀드려야 하는데, 제가 해결책을 설명해 드릴 수 있으면 좋겠습니다.한번 시도해 보세요.
/**
* execute suppliers as future tasks then wait / join for getting results
* @param functors a supplier(s) to execute
* @return a list of results
*/
private List getResultsInFuture(Supplier<?>... functors) {
CompletableFuture[] futures = stream(functors)
.map(CompletableFuture::supplyAsync)
.collect(Collectors.toList())
.toArray(new CompletableFuture[functors.length]);
CompletableFuture.allOf(futures).join();
return stream(futures).map(a-> {
try {
return a.get();
} catch (InterruptedException | ExecutionException e) {
//logger.error("an error occurred during runtime execution a function",e);
return null;
}
}).collect(Collectors.toList());
};
Completion Service는 콜러블을 .submit() 메서드로 취득하고 계산된 선물은 .take() 메서드로 취득할 수 있습니다.
잊지 말아야 할 것은 .shutdown() 메서드를 호출하여 Executor Service를 종료하는 것입니다.또한 이 메서드는 실행자 서비스에 대한 참조를 저장한 경우에만 호출할 수 있으므로 반드시 보관하십시오.
코드 예 - 병렬로 작업하는 고정 수 작업 항목의 경우:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;
while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();
코드 예 - 병렬로 작업하는 동적 작업 항목의 경우:
public void runIt(){
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();
//Initial workload is 8 threads
for (int i = 0; i < 9; i++) {
futures.add(completionService.submit(write.new CallableImplementor()));
}
boolean finished = false;
while (!finished) {
try {
Future<CallableImplementor> resultFuture;
resultFuture = completionService.take();
CallableImplementor result = resultFuture.get();
finished = doSomethingWith(result.getResult());
result.setResult(null);
result = null;
resultFuture = null;
//After work package has been finished create new work package and add it to futures
futures.add(completionService.submit(write.new CallableImplementor()));
} catch (InterruptedException | ExecutionException e) {
//handle interrupted and assert correct thread / work packet count
}
}
//important: shutdown your ExecutorService
service.shutdown();
}
public class CallableImplementor implements Callable{
boolean result;
@Override
public CallableImplementor call() throws Exception {
//business logic goes here
return this;
}
public boolean getResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
}
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Stack2 {
public static void waitFor(List<Future<?>> futures) {
List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
while (futureCopiesIterator.hasNext()) {
Future<?> future = futureCopiesIterator.next();
if (future.isDone()) {//already done
futureCopiesIterator.remove();
try {
future.get();// no longer waiting
} catch (InterruptedException e) {
//ignore
//only happen when current Thread interrupted
} catch (ExecutionException e) {
Throwable throwable = e.getCause();// real cause of exception
futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
return;
}
}
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Runnable runnable1 = new Runnable (){
public void run(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
};
Runnable runnable2 = new Runnable (){
public void run(){
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
}
}
};
Runnable fail = new Runnable (){
public void run(){
try {
Thread.sleep(1000);
throw new RuntimeException("bla bla bla");
} catch (InterruptedException e) {
}
}
};
List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
.map(executorService::submit)
.collect(Collectors.toList());
double start = System.nanoTime();
waitFor(futures);
double end = (System.nanoTime()-start)/1e9;
System.out.println(end +" seconds");
}
}
언급URL : https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
'programing' 카테고리의 다른 글
예외조항 중 어떤 부분이 비용이 많이 드나요? (0) | 2022.07.11 |
---|---|
포인터 '디레퍼런스'란 무슨 뜻입니까? (0) | 2022.07.11 |
테스트 폴더에 상대 프로젝트 루트 경로를 사용하여 Vue 가져오기 (0) | 2022.07.11 |
SLF4J: "org.slf4j.impl" 클래스를 로드하지 못했습니다.Static Logger Binder" (0) | 2022.07.11 |
jQuery 약속을 반환하는 Vuex 작업이 작동하지 않습니다. .fail은 함수가 아닙니다. (0) | 2022.07.11 |