Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proc::Async's streams may call &done() before last output is handled #3507

Closed
p6rt opened this issue Sep 7, 2014 · 6 comments
Closed

Proc::Async's streams may call &done() before last output is handled #3507

p6rt opened this issue Sep 7, 2014 · 6 comments
Labels

Comments

@p6rt
Copy link

p6rt commented Sep 7, 2014

Migrated from rt.perl.org#122722 (status was 'resolved')

Searchable as RT122722$

@p6rt
Copy link
Author

p6rt commented Sep 7, 2014

From @Leont

As per subject, Proc​::Async's streams may call &done() before calling their &tap/&act for the last time. Take for example this code​:

  my $num = @​*ARGS.shift // 3;
 
  my @​procs = (1..$num).map( -> $count {
  my $proc = Proc​::Async.new(​:path('echo'), :args(<Hello World>));
  my $input = $proc.stdout_chars();
  $input.act(-> $input { say "$count​: " ~ $input.chomp }, :done({ say "$count is done" }));
  { :$proc, :$input, :done($proc.start()) };
  });
 
  await @​procs.map(*.<done>);

It will give a non-deterministic output, that will typically look something like this​:

  1 is done
  1 is done
  1​: Hello World
  2​: Hello World
  2 is done
  2 is done
  3 is done
  3 is done
  3​: Hello World

(the double done's are reported as #​122714)

It sometimes also includes errors such as​:

  Unhandled exception in code scheduled on thread 140254841403136

or

  This representation (Null) does not support elems

Not sure what's going on exactly, but this is limiting the use of the output Supplies greatly.

Leon

@p6rt
Copy link
Author

p6rt commented Sep 16, 2014

From @Leont

On Sun Sep 07 15​:36​:08 2014, LeonT wrote​:

As per subject, Proc​::Async's streams may call &done() before calling
their &tap/&act for the last time. Take for example this code​:

my $num = @​*ARGS.shift // 3;

my @​procs = (1..$num).map( -> $count {
my $proc = Proc​::Async.new(​:path('echo'), :args(<Hello World>));
my $input = $proc.stdout_chars();
$input.act(-> $input { say "$count​: " ~ $input.chomp }, :done({ say
"$count is done" }));
{ :$proc, :$input, :done($proc.start()) };
});

await @​procs.map(*.<done>);

It will give a non-deterministic output, that will typically look
something like this​:

1 is done
1 is done
1​: Hello World
2​: Hello World
2 is done
2 is done
3 is done
3 is done
3​: Hello World

(the double done's are reported as #​122714)

It sometimes also includes errors such as​:

Unhandled exception in code scheduled on thread 140254841403136

or

This representation (Null) does not support elems

Not sure what's going on exactly, but this is limiting the use of the
output Supplies greatly.

The attached patch should fix this, but I'm not sure how to test this well.

Leon

@p6rt
Copy link
Author

p6rt commented Sep 16, 2014

From @Leont

0001-Only-signal-done-on-output-supply-when-really-done.patch
From 298070f9d7d01983b54b5ae09e5381e32a8cc57c Mon Sep 17 00:00:00 2001
From: Leon Timmermans <fawaka@gmail.com>
Date: Tue, 16 Sep 2014 22:23:37 +0200
Subject: [PATCH] Only signal done on output supply when really done

---
 src/core/Proc/Async.pm | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/src/core/Proc/Async.pm b/src/core/Proc/Async.pm
index dc06561..ffef91e 100644
--- a/src/core/Proc/Async.pm
+++ b/src/core/Proc/Async.pm
@@ -29,6 +29,8 @@ my class Proc::Async {
     has Bool $!stderr_supply_chars;
     has $!process_handle;
     has $!exited_promise;
+    has $!stdout_promise;
+    has $!stderr_promise;
 
     my class ProcessCancellation is repr('AsyncTask') { }
 
@@ -92,26 +94,29 @@ my class Proc::Async {
 
         my Mu $callbacks := nqp::hash();
         nqp::bindkey($callbacks, 'done', -> Mu \status {
-            $!stdout_supply.?done();
-            $!stderr_supply.?done();
             $exited_vow.keep(Proc::Status.new(:exit(status)))
         });
         nqp::bindkey($callbacks, 'error', -> Mu \err {
             $exited_vow.break(err);
         });
         if defined $!stdout_supply_chars {
+            $!stdout_promise = Promise.new;
             nqp::bindkey($callbacks,
                 $!stdout_supply_chars ?? 'stdout_chars' !! 'stdout_bytes',
                 -> Mu \seq, Mu \data, Mu \err {
                     if err {
                         $!stdout_supply.quit(err);
                     }
-                    elsif seq >= 0 {
+                    elsif seq < 0 {
+                        $!stdout_promise.keep(True);
+                    }
+                    else {
                         $!stdout_supply.more(data);
                     }
                 });
         }
         if defined $!stderr_supply_chars {
+            $!stdout_promise = Promise.new;
             nqp::bindkey($callbacks,
                 $!stderr_supply_chars ?? 'stderr_chars' !! 'stderr_bytes',
                 -> Mu \seq, Mu \data, Mu \err {
@@ -119,7 +124,7 @@ my class Proc::Async {
                         $!stderr_supply.quit(err);
                     }
                     elsif seq < 0 {
-                        $!stderr_supply.done();
+                        $!stderr_promise.keep(True);
                     }
                     else {
                         $!stderr_supply.more(data);
@@ -132,7 +137,11 @@ my class Proc::Async {
         $!process_handle := nqp::spawnprocasync($scheduler.queue,
             $args-without, $*CWD.Str, $hash-without, $callbacks);
 
-        $!exited_promise
+        Promise.allof($!exited_promise, |($!stdout_promise, $!stderr_promise).grep(*.defined)).then({
+            $!stdout_supply.?done();
+            $!stderr_supply.?done();
+            $!exited_promise.result;
+        });
     }
 
     method print($str as Str, :$scheduler = $*SCHEDULER) {
-- 
1.9.1

@p6rt
Copy link
Author

p6rt commented Jun 13, 2017

From @jnthn

On Tue, 16 Sep 2014 13​:47​:14 -0700, LeonT wrote​:

On Sun Sep 07 15​:36​:08 2014, LeonT wrote​:

As per subject, Proc​::Async's streams may call &done() before calling
their &tap/&act for the last time. Take for example this code​:

my $num = @​*ARGS.shift // 3;

my @​procs = (1..$num).map( -> $count {
my $proc = Proc​::Async.new(​:path('echo'), :args(<Hello World>));
my $input = $proc.stdout_chars();
$input.act(-> $input { say "$count​: " ~ $input.chomp }, :done({ say
"$count is done" }));
{ :$proc, :$input, :done($proc.start()) };
});

await @​procs.map(*.<done>);

It will give a non-deterministic output, that will typically look
something like this​:

1 is done
1 is done
1​: Hello World
2​: Hello World
2 is done
2 is done
3 is done
3 is done
3​: Hello World

(the double done's are reported as #​122714)

It sometimes also includes errors such as​:

Unhandled exception in code scheduled on thread 140254841403136

or

This representation (Null) does not support elems

Not sure what's going on exactly, but this is limiting the use of the
output Supplies greatly.

The attached patch should fix this, but I'm not sure how to test this well.

It seems in the meantime an equivalent patch has been applied to address this same issue​:

https://github.com/rakudo/rakudo/blob/nom/src/core/Proc/Async.pm#L258

And I've not seen anything like this issue show up in a long time, so I think this can be considered resolved now.

/jnthn

@p6rt
Copy link
Author

p6rt commented Jun 13, 2017

The RT System itself - Status changed from 'new' to 'open'

@p6rt
Copy link
Author

p6rt commented Jun 13, 2017

@jnthn - Status changed from 'open' to 'resolved'

@p6rt p6rt closed this as completed Jun 13, 2017
@p6rt p6rt added the conc label Jan 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant