rtu
RTUClient is a high-level client for establishing a websocket connection, authenticating with a jwt, subscribing to a file by version or last delta id, "squashing" Deltas into an in-memory Notebook model, and registering callbacks for incoming RTU events by event_name and channel or incoming Deltas by delta type and delta action.
DeltaRequestCallbackManager
#
Don't use this directly, see RTUClient.new_delta_request which builds an instance of this and returns the .result -- Future resolves to bool or raises DeltaRejected
- Sends over websocket to Gate
- Registers RTU and Delta squashing callbacks to resolve the Future either when the Delta was successful and squashed into Notebook or when there was an error (Rejected / Invalid Delta)
- Deregisters RTU and Delta callbacks when Future is resolved
Use case: delta_squashed: asyncio.Future[bool] = await rtu_client.new_delta_request(...) try: await delta_squashed except DeltaRejected: ...
Delta is guarenteed to be in rtu_client.builder at this point#
Source code in origami/clients/rtu.py
RTUClient
#
Source code in origami/clients/rtu.py
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 |
|
cell_ids
property
#
Return list of cell_id's in order from NotebookBuilder in-memory model
kernel_pod_name: str
property
#
Transform the file_id into the Pod name used to build the kernels/ RTU channel
__init__(api_client, file_id, file_subscribe_timeout=10)
#
High-level client over the Sending websocket backend / RTUManager (serialize websocket msgs to/from RTU models) that allows you to add callbacks by RTU event type or Delta type/action.
- On .initialize(), will make a websocket connection to Gate
- RTUManager / Sending websocket backend handles reconnection
- RTUClient sets .manager.auth_hook to kick off the auth request, don't override that
-
awaits .on_websocket_connect() hook that you can override in application code
-
After websocket connection is established, sends authenticate_request on system channel
-
Has a callback registered for 'authenticate_reply' on system channel which will await .on_auth (hook to define in application code) then send file subscribe request
-
After authentication, sends subscribe_request to files/{file_id} channel
-
awaits .on_file_subscribe() hook that you can override in application code
-
Use .register_rtu_event_callback to register callbacks that are run against RTU messages
-
Use .register_delta_callback to register callbacks that are run against Deltas
- May not run when message is initially received if the Delta is "out of order", RTUClient handles queueing and replaying out of order deltas
- Callbacks run after the Delta is "squashed" into {builder}
Source code in origami/clients/rtu.py
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
|
add_cell(source='', cell=None, before_id=None, after_id=None)
async
#
Adds a Cell to the Notebook. - if a cell is passed in, will use that or otherwise make a CodeCell from source value - If before_id and after_id are unspecified, then it will add the new cell at the bottom of the notebook.
Source code in origami/clients/rtu.py
apply_delta(delta)
async
#
Squash a Delta into the NotebookBuilder and run applicable callbacks
- If squashing a Delta into the in-memory Notebook representation fails for some reason, then PA basically needs to crash because all follow on Delta application is very suspect (e.g. future deltas think a cell exists when it doesn't, or content exists, etc)
- If callbacks are triggered, it is okay for them to fail and we just log it because those are generally just side-effects, not core to applying future deltas
Note on alternative approach to handling delta squashing failures: @Seal suggested redownloading Notebook and starting from latest delta rather than killing Kernel Pod but we don't have great comm mechanisms for PA to tell Gate to squash the problematic Delta or to figure out the most recent version in Cockroach / S3. For now, killing Kernel Pod on NotebookBuilder apply and logging errors on side-effect callbacks is the best we can do.
Source code in origami/clients/rtu.py
auth_hook(*args, **kwargs)
async
#
Called after the websocket connection is established. This also implicitly makes it so
.send() / ._publish will effectively suspend sending messages over the websocket
until we've observed an authenticate_reply
event
Source code in origami/clients/rtu.py
catastrophic_failure()
async
#
A hook for applications like PA to override so they can handle things like Pod shutdown in cases where the RTUClient cannot recover. Examples are when reloading Notebook state after inconsistent_state_event and not getting a current_version_id to subscribe by or getting Deltas that cannot be squashed into the builder
Source code in origami/clients/rtu.py
change_cell_type(cell_id, cell_type, code_language='python', db_connection='@noteable', assign_results_to=None)
async
#
Switch a cell between code, markdown, or SQL cell. - code_language only relevant when switching to code cell - db_connection and assign_results_to only relevant when switching to SQL cell
Source code in origami/clients/rtu.py
failed_to_squash_delta(delta, exc)
async
#
Hook for Application code to override when a Delta fails to "squash" into the in-memory Notebook representation.
file_unsubscribe()
async
#
Send file unsubscribe request to Gate. This is called when the RTUClient is shutting down.
load_seed_notebook()
async
#
Pull in the seed notebook that will be the base document model of the NotebookBuilder, which can then squash Deltas that update the Notebook, including deltas_to_apply on file subscribe which represents changes that may have happened since the last "save" to s3. - Get current file version and presigned url from /v1/files endpoint - Download and parse seed notebook into Notebook / NotebookBuilder
Source code in origami/clients/rtu.py
new_delta_request(delta=FileDelta)
async
#
Send a new delta request to the server and wait for it to have been accepted and propogated to other clients, as well as squashed into our own in-memory Notebook. Raises errors if the Delta was rejected for any reason.
Source code in origami/clients/rtu.py
on_bulk_cell_state_update(msg)
async
#
Called when we receive a bulk_cell_state_update_event on kernels/ channel
Source code in origami/clients/rtu.py
on_file_subscribe_timeout()
async
#
Hook for Application code to override if we don't get the expected file subscribe reply after some amount of seconds. Without a timeout, RTU Clients can easily get stuck forever awaiting the .deltas_to_apply event that is resolved in file subscribe reply.
Source code in origami/clients/rtu.py
on_inconsistent_state_event(msg)
async
#
To "reset" our internal document model, we need to unsubscribe from the files channel at the least, to stop getting new deltas in. Then we need to figure out what the new current version id is, and pull down seed notebook, and then resubscribe to file channel.
Source code in origami/clients/rtu.py
on_kernel_status_update(msg)
async
#
Called when we receive a kernel_status_update_event on kernels/ channel
Source code in origami/clients/rtu.py
post_queue_delta(delta)
async
#
Hook for Application code to override if it wants to do something special when queueing "out of order" Deltas.
pre_apply_delta(delta)
async
#
Hook for Application code to override if it wants to do something special before running "squashing" Delta into NotebookBuilder and running applicable callbacks.
queue_execution(cell_id=None, before_id=None, after_id=None, run_all=False)
async
#
Execute an individual cell or multiple cells in the Notebook. The return value is a dict of {future: cell_id}, even in the case of executing a single cell.
- Only code Cells can be executed. When running multiple cells with before / after / all non-code cells will be excluded automatically
- Code cells with no source are not executed on Noteable backend, so they'll be skipped
- Outputs should be available from the cell.output_collection_id property
Use: queued_execute = await rtu_client.queue_execution(run_all=True) done, pending = await asyncio.wait(*queued_execute, timeout=5)
still_running_cell_ids = [queued_execute[f] for f in pending]
Source code in origami/clients/rtu.py
queue_or_apply_delta(delta)
async
#
Checks whether we're able to apply the Delta by comparing its parent_delta_id with the last_applied_delta_id in the NBBuilder. If it is not a match, we may have received out of order deltas and we queue it to be replayed later
Source code in origami/clients/rtu.py
register_delta_callback(delta_class, fn)
#
Register a callback that may be triggered when we (eventually) apply an in-order Delta.
RTUClient has a separate mechanism for registering delta callbacks from the vanilla Sending .register_callback flow because we don't necessarily want to run callbacks immediately when we observe a Delta come over the RTU websocket. We may be dealing with out-of-order deltas that are queued up and applied later on.
These callbacks are triggered by .apply_delta() and stored in a separate callback list from vanilla Sending callbacks (manager.register_callback's)
Source code in origami/clients/rtu.py
register_rtu_event_callback(rtu_event, fn)
#
Register a callback that will be awaited whenever an RTU event is received that matches the other arguments passed in (event, channel, channel_prefix, transaction_id).
Source code in origami/clients/rtu.py
register_transaction_id_callback(transaction_id, fn)
#
Register a callback that will be triggered whenever an RTU message comes in with a given transaction id. Useful for doing things like waiting for a reply / event or error to be propogated, e.g. for new delta requests.
Source code in origami/clients/rtu.py
replace_cell_content(cell_id, source)
async
#
Replace cell content with a string
Source code in origami/clients/rtu.py
replay_unapplied_deltas()
async
#
Attempt to apply any previous unapplied Deltas that were received out of order. Calls itself recursively in case replaying unapplied deltas resulted in multiple Deltas now being able to be applied. E.g. we received in order: - {'id': 2, 'parent_id': 1} # applied because NBBuilder had no last_applied_delta_id - {'id': 5, 'parent_id': 4} # queued because parent_id doesn't match builder - {'id': 4, 'parent_id': 3} # queued because parent_id doesn't match builder - {'id': 3, 'parent_id': 2} # applied, then needs to replay queued deltas
Replaying would make the third received delta be applied, which would let replaying again also apply the second delta.
Source code in origami/clients/rtu.py
send(msg)
#
Send an RTU message to Noteable. This is not async because what's happening behind the scenes is that RTUManager.send drops the RTU pydantic model onto an "outbound" asyncio.Queue then the "outbound worker" picks it up off the queue, serializes it to JSON, and sends it out over the wire.
Source code in origami/clients/rtu.py
send_file_subscribe()
async
#
Once authenticate_reply
is observed, we should send the File subscription request.
Source code in origami/clients/rtu.py
update_cell_content(cell_id, patch)
async
#
Update cell content with a diff-match-patch patch string
Source code in origami/clients/rtu.py
wait_for_kernel_idle()
async
#
Wait for the kernel to be idle
RTUManager
#
Bases: WebsocketManager
- Makes a connection to the RTU validation server
- Handles reconnection if the validation server crashes
- Serializes inbound messages to rtu.GenericRTUReply and outbound to rtu.GenericRTURequest
- Adds extra logging kwargs for RTU event type and optional Delta type/action
- Other classes that use this should add appropriate .auth_hook and .init_hook, and register callbacks to do something with RTU events (see RTUClient)
Source code in origami/clients/rtu.py
inbound_message_hook(contents)
async
#
Hook applied to every message coming in to us over the websocket before the message is passed to registered callback functions.
- The validation server receives RTU Requests and emits RTU Replies
- We're an RTU client, every message we get should parse into an RTU Reply
- Registered callback functions should expect to take in an RTU Reply pydantic model
Source code in origami/clients/rtu.py
on_exception(exc)
async
#
Add a naive delay in reconnecting if we broke the websocket connection because there was a raised Exception in our _poll_loop, e.g. unserializable messages or syntax errors somewhere in our code.
TODO: Make this elegant, perhaps a backoff strategy in Sending base.py
Source code in origami/clients/rtu.py
outbound_message_hook(contents)
async
#
Hook applied to every message we send out over the websocket. - Anything calling .send() should pass in an RTU Request pydantic model
Source code in origami/clients/rtu.py
send(message)
#
Override WebsocketManager-defined method for type hinting and logging.