trackStream.js 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. export const streamChunk = function* (chunk, chunkSize) {
  2. let len = chunk.byteLength;
  3. if (!chunkSize || len < chunkSize) {
  4. yield chunk;
  5. return;
  6. }
  7. let pos = 0;
  8. let end;
  9. while (pos < len) {
  10. end = pos + chunkSize;
  11. yield chunk.slice(pos, end);
  12. pos = end;
  13. }
  14. };
  15. export const readBytes = async function* (iterable, chunkSize) {
  16. for await (const chunk of readStream(iterable)) {
  17. yield* streamChunk(chunk, chunkSize);
  18. }
  19. };
  20. const readStream = async function* (stream) {
  21. if (stream[Symbol.asyncIterator]) {
  22. yield* stream;
  23. return;
  24. }
  25. const reader = stream.getReader();
  26. try {
  27. for (;;) {
  28. const { done, value } = await reader.read();
  29. if (done) {
  30. break;
  31. }
  32. yield value;
  33. }
  34. } finally {
  35. await reader.cancel();
  36. }
  37. };
  38. export const trackStream = (stream, chunkSize, onProgress, onFinish) => {
  39. const iterator = readBytes(stream, chunkSize);
  40. let bytes = 0;
  41. let done;
  42. let _onFinish = (e) => {
  43. if (!done) {
  44. done = true;
  45. onFinish && onFinish(e);
  46. }
  47. };
  48. return new ReadableStream(
  49. {
  50. async pull(controller) {
  51. try {
  52. const { done, value } = await iterator.next();
  53. if (done) {
  54. _onFinish();
  55. controller.close();
  56. return;
  57. }
  58. let len = value.byteLength;
  59. if (onProgress) {
  60. let loadedBytes = (bytes += len);
  61. onProgress(loadedBytes);
  62. }
  63. controller.enqueue(new Uint8Array(value));
  64. } catch (err) {
  65. _onFinish(err);
  66. throw err;
  67. }
  68. },
  69. cancel(reason) {
  70. _onFinish(reason);
  71. return iterator.return();
  72. },
  73. },
  74. {
  75. highWaterMark: 2,
  76. }
  77. );
  78. };