Source code for kernel
#!/usr/bin/env python3
# MetaCall Jupyter Kernel by Parra Studios
# A Jupyter Kernel for MetaCall.
#
# Copyright (C) 2016 - 2021 Vicente Eduardo Ferrer Garcia <vic798@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import tempfile
import re
import sys
import os
import json
import logging
import sqlite3
import traceback
import nest_asyncio
from traitlets import List
from ipykernel.kernelbase import Kernel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
HISTORY_DB = None
nest_asyncio.apply()
[docs]class metacall_jupyter(Kernel):
"""Defines the Jupyter Kernel declaration for MetaCall Core
"""
implementation = "Jupyter Kernel for MetaCall Core"
implementation_version = "0.1"
language = "MetaCall Core"
language_version = "0.4.12"
language_info = {
"name": "MetaCall Core",
"mimetype": "text/plain",
"file_extension": ".txt",
}
banner = "Wrapper Kernel for MetaCall Core Library leveraging IPython and Jupyter"
help_links = List(
[
{"text": "MetaCall Core", "url": "https://github.com/metacall/core"},
{
"text": "MetaCall Polyglot REPL",
"url": "https://github.com/metacall/polyglot-repl",
},
{
"text": "MetaCall Jupyter Kernel",
"url": "https://github.com/metacall/jupyter-kernel",
},
]
)
history = {}
results = {}
history_db_ready = False
def __init__(self, **kwargs):
"""init method for the Kernel"""
Kernel.__init__(self, **kwargs)
self._start_metacall()
def _start_metacall(self):
"""Starts the MetaCall REPL Subprocess to take user input and execute the user code.
"""
try:
self.metacall_subprocess = subprocess.Popen(
["metacall", "repl.js"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.metacall_subprocess.stdout.readline()
except Exception as e: # noqa: F841
logger.exception("MetaCall Subprocess failed to start")
[docs] def start_history(self):
"""Starts the MetaCall Kernel History Database
"""
db_name = HISTORY_DB
try:
self.history_db = sqlite3.connect(db_name)
c = self.history_db.cursor()
c.execute(
"create table if not exists history (session text, execution_count int, code text, result text)" # noqa: E501
)
self.history_db_ready = True
except: # noqa: E722
print(sys.exc_info()[0])
traceback.print_tb(sys.exc_info()[2])
[docs] def record_history(self, session, count, code, data):
"""Record the user code and the result of the user code to the history database.
"""
try:
result = data._repr_text_()
if type(result) == list:
result = str(result)
c = self.history_db.cursor()
c.execute(
"insert into history values (?,?,?,?)", (session, count, code, result)
)
self.history_db.commit()
except: # noqa: E722
print(sys.exc_info()[0])
traceback.print_tb(sys.exc_info()[2])
[docs] def byte_to_string(self, code):
"""Function to convert the result of the execution to string"""
return code.decode("UTF-8")
[docs] def newfile_magic(self, code):
"""Function to save a new file using the `$newfile` magic.
Args:
code (str): The code to be executed
"""
code = code + "\n"
magic_argument = code.split("\n")[0]
file_name = magic_argument.lstrip("$newfile ")
file_input = code.split("\n", 1)[1]
with open(file_name, "a", encoding="utf-8") as file:
file.write(file_input)
return "File " + file_name + " is saved."
[docs] def metacall_execute(self, code, extension):
"""Executes the Code passed by creating a temporary file using a MetaCall Subprocess
Args:
code: Code to executed by the MetaCall subprocess
extension: The extension of the code to create a temporary file from
Returns:
logger_output: The log output generated by the subprocess after a successful execution
"""
with tempfile.NamedTemporaryFile(suffix=extension) as temp:
temp.write(code.encode())
temp.flush()
result = subprocess.Popen(
["metacall", str(temp.name)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout_value, stderr_value = result.communicate()
std_output = repr(stdout_value)
std_error = repr(stderr_value)
full_output = std_output + "\n" + std_error
exact_output = full_output[2:-5]
split_output = exact_output.split("\\n")
logger_output = ""
for item in split_output:
logger_output += item + "\n"
temp.close()
return logger_output
[docs] def shell_execute(self, code, shcmd):
"""Executes the Shell Commands using a Subprocess.
Args:
code: Shell Command to executed by the subprocess
shcmd: Configuration to call Shell Commands
Returns:
logger_output: The log output generated by the subprocess after a successful execution
"""
from subprocess import run, PIPE, STDOUT
cmd = str(code[len(shcmd):].lstrip())
exact_output = run(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
if exact_output.returncode == 0:
logger_output = exact_output.stdout.decode()
else:
logger_output = self.error_message(exact_output.stdout.decode())
return logger_output
[docs] def error_message(self, code):
"""Highlights the error message in red color"""
code = "\033[0;31m" + code + "\033[0m"
return code
[docs] def metacall_repl(self, code):
"""Function to execute the user code and return the result through MetaCall subprocess.
Args:
code: The code to be executed
Returns:
result: The result of the execution
"""
code = code.lstrip() + "\n"
arr = bytes(code, "utf-8")
self.metacall_subprocess.stdin.write(arr)
self.metacall_subprocess.stdin.flush()
output = self.metacall_subprocess.stdout.readline()
return output
[docs] def available_repl(self):
"""Function to check for available REPLs on the MetaCall Kernel
Args:
None
Returns:
available_repl: List of REPLs available on the Kernel
"""
code = self.byte_to_string(self.metacall_repl("%available"))
lang = code.split(":")[1].lstrip().rstrip()
lang = list(lang.split(" "))
repl_languages = {
"py": "Python: Use `%repl py` to activate",
"node": "JavaScript: Use `%repl node` to activate",
}
available_repl = ""
for key, value in repl_languages.items():
for i in lang:
if key == i:
available_repl = available_repl + value + "\n"
return available_repl.lstrip()
[docs] def do_execute( # noqa: C901
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
):
"""Executes the User Code
Args:
code: The code to be executed
silent: Whether to display output
store_history: Whether to record this code in history and increase the execution count
user_expressions: Mapping of names to expressions to evaluate after the code has run
allow_stdin: Whether the frontend can provide input on request
Returns:
history (str): The history of the session.
"""
if store_history:
self.history[self.execution_count] = code
if not silent:
try:
def metacall_loadcell(code, extension):
"""Loads a function on MetaCall's REPL subprocess enabling cross-language function calls.
Args:
code: Code to executed by the MetaCall subprocess
extension: The extension of the code to create a temporary file from
Returns:
logger_output: The log output generated by the subprocess
after a successful save and load.
"""
with tempfile.NamedTemporaryFile(suffix=extension, dir="") as temp:
temp.write(code.encode())
temp.flush()
filename = str(temp.name)[1:]
filepath, filename = os.path.split(filename)
code = "%load " + extension[1:] + " " + filename
code = code.lstrip() + "\n"
code_bytes = bytes(code, "utf-8")
self.metacall_subprocess.stdin.write(code_bytes)
self.metacall_subprocess.stdin.flush()
self.metacall_subprocess.stdout.readline()
return "The code has been successfully loaded on Kernel"
def loadcell_extension(code):
"""Parses the code to extract the code and file extension from the user input.
Args:
code: User input consisting the code and file extension
Returns:
code: The code to be saved in the temporary file.
extension: The temporary file extension.
"""
code = code + "\n"
magic_argument = code.split("\n")[0]
extension = magic_argument.lstrip("$loadcell ")
code = code.split("\n", 1)[1]
extension = "." + extension
return code, extension
def split_magics(code):
"""Grabs the langage name passed in the magic and returns the magic and the code
Args:
code: Code to be parsed from and the magic extracted
Returns:
magics: The Language Name passed through Magic
code: The parsed code with the magic extracted from the same
"""
code_lines = []
magics = []
lines = code.split("\n")
state = "magics"
for line in lines:
if state == "magics":
if line.startswith(">"):
magics.append(line.lstrip(">"))
continue
elif not line:
continue
state = "code"
code_lines.append(line)
return (magics, "\n".join(code_lines))
def metacall_inspect():
"""Executes the %inspect on the REPL subprocess to check all loaded functions
Returns:
inspect: A dictionary showing all available functions across the REPL state
"""
code = "%inspect"
code = code.lstrip() + "\n"
code_bytes = bytes(code, "utf-8")
self.metacall_subprocess.stdin.write(code_bytes)
self.metacall_subprocess.stdin.flush()
inspect = ""
while True:
line = self.metacall_subprocess.stdout.readline()
inspect = inspect + line.decode("utf-8")
if line == b"\n":
break
return json.loads(inspect)
def metacall_load(code):
"""Loads a function through the Kernel on the REPL Subprocess for inter-language function calls
Args:
code: Load command in the format: `$load <tag> <file_0>... <file_N>`
"""
try:
code = code.lstrip() + "\n"
logger.info(code)
code_bytes = bytes(code, "utf-8")
self.metacall_subprocess.stdin.write(code_bytes)
self.metacall_subprocess.stdin.flush()
self.metacall_subprocess.stdout.readline()
return "The file has been successfully loaded"
except: # noqa: E722
return "The file was not loaded onto the Kernel"
def delete_line_from_string(code):
"""Delete the Script loading message from the execution"""
regex = re.compile(r"Script \(.+\) loaded correctly")
match = regex.search(code)
if match:
code = regex.sub("", code)
return code
def trim_empty_lines(text):
"""Trim the empty lines from the logger output for better formatting"""
text = os.linesep.join([s for s in text.splitlines() if s])
return text
extensions = {"python": ".py", "javascript": ".js"}
(magics, code) = split_magics(code)
shcmd = "!"
shutd = "$shutdown"
newfile = "$newfile"
inspect_command = "$inspect"
load_command = "$loadfile"
help_command = "$help"
logger_output = ""
loadcell = "$loadcell"
available = "$available"
if code.startswith(help_command):
logger_output = (
"1. ! : Run a Shell Command on the MetaCall Jupyter Kernel\n"
+ "2. $shutdown : Shutdown the MetaCall Jupyter Kernel\n"
+ "3. $inspect : Inspects the MetaCall to check all loaded functions\n"
+ "4. $loadfile: Loads a file onto the MetaCall which can be evaluated\n"
+ "5. $newfile: Creates a new file and appends the code mentioned below\n"
+ "6. %repl <tag>: Switch from different REPL (available tags: node, py)\n"
+ "7. >lang: Execute scripts using the MetaCall exec by saving them in a "
+ "temporary file (available languages: python, javascript)\n"
+ "8. $loadcell <tag>: Loads a function onto the MetaCall to be evaluated\n"
+ "9. $help: Check all the commands and tags you can use while accessing "
+ "the MetaCall Kernel\n"
+ "10. $available: Checks all the available REPLs on the Kernel"
)
elif code.startswith(available):
logger_output = self.available_repl()
elif code.startswith(loadcell):
code, extension = loadcell_extension(code)
logger_output = metacall_loadcell(code, extension)
elif code.startswith(shcmd):
logger_output = self.shell_execute(code, shcmd)
elif code.startswith(inspect_command):
logger_output = json.dumps(metacall_inspect())
elif code.startswith(load_command):
code = code.split(" ", 1)[1]
code = "%load " + code
logger_output = metacall_load(code)
elif code.startswith(newfile):
logger_output = self.newfile_magic(code)
elif code.startswith(shutd):
self.do_shutdown(False)
elif magics:
magic_lang = "".join(map(str, magics))
magic_lang = magic_lang.lower()
if magic_lang in extensions:
extension = extensions[magic_lang]
logger_output = self.metacall_execute(code, extension)
else:
logger_output = (
"We don't suppport "
+ magic_lang
+ " language, yet.\nPlease try another language or add support for "
+ magic_lang
+ " language.\n"
)
else:
output = self.metacall_repl(code)
logger_output = self.byte_to_string(output)
except Exception as e:
logger_output = self.error_message(str(e))
if store_history:
self.results[self.execution_count] = logger_output
self.record_history(
self.session.session, self.execution_count, code, logger_output
)
if "error" in logger_output:
logger_output = self.error_message(logger_output)
stream_content = {
"name": "stdout",
"text": trim_empty_lines(delete_line_from_string(logger_output)),
}
self.send_response(self.iopub_socket, "stream", stream_content)
return {
"status": "ok",
"execution_count": self.execution_count,
"payload": [],
"user_expressions": {},
}
[docs] def do_history(
self,
hist_access_type,
output,
raw,
session=None,
start=None,
stop=None,
n=None,
pattern=None,
unique=False,
):
"""Get the history of a session
Args:
hist_access_type (str): 'tail' or 'range'
output (bool): If True, then the history is printed to stdout.
Otherwise, it is returned as a string.
raw (bool): If True, then the history is not formatted in any way.
session (str): The name of the session.
start (int): The first execution count from the history to get.
stop (int): The last execution count from the history to get.
n (int): The number of executions to get.
pattern (str): The pattern to search the history with.
unique (bool): If True, then only unique history items are shown.
Returns:
history (str): The history of the session.
"""
if hist_access_type == "tail":
hist = self.get_tail(n, raw=raw, output=output, include_latest=True)
elif hist_access_type == "range":
hist = self.get_range(session, start, stop, raw=raw, output=output)
elif hist_access_type == "search":
hist = self.search(pattern, raw=raw, output=output, n=n, unique=unique)
else:
hist = []
return {"history": list(hist)}
[docs] def get_tail(self, n, raw, output, include_latest):
"""Gets the last n lines of history, formatted nicely
Args:
n (int): The number of lines to be fetched
raw (bool): Whether to include raw_history
output (bool): Whether to include output
include_latest (bool): Whether to include the latest
Returns:
list: A list of lines
"""
n = n or self.history.__len__()
key_range = list(self.history.keys())[-n:]
result = []
for key in key_range:
r = (key + 1, self.history[key], self.results[key]._repr_text_())
result.append(r)
return result
[docs] def get_range(self, session, start, stop, raw, output):
"""Gets the range of history items for a session
Args:
session: session name
start: start line number
stop: stop line number
raw: True to get the raw input
output: True to get the formatted output
Returns:
result: A list of tuples (source, input, output)
"""
start = start or 0
stop = stop or self.history.__len__()
start = start if start == 0 else start - 1
key_range = list(self.history.keys())[start:stop]
result = []
for key in key_range:
r = (key + 1, self.history[key], self.results[key]._repr_text_())
result.append(r)
return result
[docs] def do_shutdown(self, restart):
"""Shuts down the Kernel
Args:
restart: Boolean value to determine the kernel is shutdown or restarted
Returns:
restart: Boolean value to signal the kernel shutdown
"""
code = "%exit"
code = code.lstrip() + "\n"
arr = bytes(code, "utf-8")
self.metacall_subprocess.stdin.write(arr)
self.metacall_subprocess.stdin.flush()
self.metacall_subprocess.stdout.readline()
logger_output = "Kernel Shutdown!"
stream_content = {"name": "stdout", "text": logger_output}
self.send_response(self.iopub_socket, "stream", stream_content)
return {"restart": False}