Skip to content
GitLab
Explore
Sign in
Hide whitespace changes
Inline
Side-by-side
Some changes are not shown.
For a faster browsing experience, only
20 of 201+
files are shown.
src/main/java/gui_server/main.css
deleted
100644 → 0
View file @
8a65e137
table
,
th
,
td
{
border
:
1px
solid
black
;
border-style
:
dotted
;
border-collapse
:
collapse
;
padding
:
10px
;
}
#canvasWrapper
{
position
:
relative
;
padding
:
10px
;
margin
:
10px
;
}
canvas
{
position
:
absolute
;
}
src/main/java/gui_server/server-temp/main.py
deleted
100644 → 0
View file @
8a65e137
from
flask
import
Flask
import
json
app
=
Flask
(
__name__
)
@app.route
(
"
/
"
)
def
hello_world
():
return
"
<p>Hello, World!</p>
"
@app.route
(
"
/getJSON
"
)
def
getJSON
():
data
=
{
"
A
"
:
{
"
0
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
1
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
2
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
3
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
4
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
},
"
B
"
:
{
"
0
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
1
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
2
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
3
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
4
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
5
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
},
"
C
"
:
{
"
0
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
1
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
2
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
},
"
D
"
:
{
"
0
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
1
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
2
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
3
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
4
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
5
"
:
[
"
x
"
,
"
y
"
,
"
z
"
],
"
6
"
:
[
"
x
"
,
"
y
"
,
"
z
"
]
}
}
return
json
.
dumps
(
data
)
if
__name__
==
"
__main__
"
:
app
.
run
()
\ No newline at end of file
src/main/java/gui_server/server-temp/venv/bin/Activate.ps1
deleted
100644 → 0
View file @
8a65e137
<#
.Synopsis
Activate a Python virtual environment for the current PowerShell session.
.Description
Pushes the python executable for a virtual environment to the front of the
$Env:PATH environment variable and sets the prompt to signify that you are
in a Python virtual environment. Makes use of the command line switches as
well as the `pyvenv.cfg` file values present in the virtual environment.
.Parameter
VenvDir
Path to the directory that contains the virtual environment to activate. The
default value for this is the parent of the directory that the Activate.ps1
script is located within.
.Parameter
Prompt
The prompt prefix to display when this virtual environment is activated. By
default, this prompt is the name of the virtual environment folder (VenvDir)
surrounded by parentheses and followed by a single space (ie. '(.venv) ').
.Example
Activate.ps1
Activates the Python virtual environment that contains the Activate.ps1 script.
.Example
Activate.ps1 -Verbose
Activates the Python virtual environment that contains the Activate.ps1 script,
and shows extra information about the activation as it executes.
.Example
Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv
Activates the Python virtual environment located in the specified location.
.Example
Activate.ps1 -Prompt "MyPython"
Activates the Python virtual environment that contains the Activate.ps1 script,
and prefixes the current prompt with the specified string (surrounded in
parentheses) while the virtual environment is active.
.Notes
On Windows, it may be required to enable this Activate.ps1 script by setting the
execution policy for the user. You can do this by issuing the following PowerShell
command:
PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
For more information on Execution Policies:
https://go.microsoft.com/fwlink/?LinkID=135170
#>
Param
(
[
Parameter
(
Mandatory
=
$false
)]
[
String
]
$VenvDir
,
[
Parameter
(
Mandatory
=
$false
)]
[
String
]
$Prompt
)
<# Function declarations --------------------------------------------------- #>
<#
.Synopsis
Remove all shell session elements added by the Activate script, including the
addition of the virtual environment's Python executable from the beginning of
the PATH variable.
.Parameter
NonDestructive
If present, do not remove this function from the global namespace for the
session.
#>
function
global
:
deactivate
([
switch
]
$NonDestructive
)
{
# Revert to original values
# The prior prompt:
if
(
Test-Path
-Path
Function:_OLD_VIRTUAL_PROMPT
)
{
Copy-Item
-Path
Function:_OLD_VIRTUAL_PROMPT
-Destination
Function:prompt
Remove-Item
-Path
Function:_OLD_VIRTUAL_PROMPT
}
# The prior PYTHONHOME:
if
(
Test-Path
-Path
Env:_OLD_VIRTUAL_PYTHONHOME
)
{
Copy-Item
-Path
Env:_OLD_VIRTUAL_PYTHONHOME
-Destination
Env:PYTHONHOME
Remove-Item
-Path
Env:_OLD_VIRTUAL_PYTHONHOME
}
# The prior PATH:
if
(
Test-Path
-Path
Env:_OLD_VIRTUAL_PATH
)
{
Copy-Item
-Path
Env:_OLD_VIRTUAL_PATH
-Destination
Env:PATH
Remove-Item
-Path
Env:_OLD_VIRTUAL_PATH
}
# Just remove the VIRTUAL_ENV altogether:
if
(
Test-Path
-Path
Env:VIRTUAL_ENV
)
{
Remove-Item
-Path
env:VIRTUAL_ENV
}
# Just remove VIRTUAL_ENV_PROMPT altogether.
if
(
Test-Path
-Path
Env:VIRTUAL_ENV_PROMPT
)
{
Remove-Item
-Path
env:VIRTUAL_ENV_PROMPT
}
# Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether:
if
(
Get-Variable
-Name
"_PYTHON_VENV_PROMPT_PREFIX"
-ErrorAction
SilentlyContinue
)
{
Remove-Variable
-Name
_PYTHON_VENV_PROMPT_PREFIX
-Scope
Global
-Force
}
# Leave deactivate function in the global namespace if requested:
if
(
-not
$NonDestructive
)
{
Remove-Item
-Path
function:deactivate
}
}
<#
.Description
Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the
given folder, and returns them in a map.
For each line in the pyvenv.cfg file, if that line can be parsed into exactly
two strings separated by `=` (with any amount of whitespace surrounding the =)
then it is considered a `key = value` line. The left hand string is the key,
the right hand is the value.
If the value starts with a `'` or a `"` then the first and last character is
stripped from the value before being captured.
.Parameter
ConfigDir
Path to the directory that contains the `pyvenv.cfg` file.
#>
function
Get-PyVenvConfig
(
[
String
]
$ConfigDir
)
{
Write-Verbose
"Given ConfigDir=
$ConfigDir
, obtain values in pyvenv.cfg"
# Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue).
$pyvenvConfigPath
=
Join-Path
-Resolve
-Path
$ConfigDir
-ChildPath
'pyvenv.cfg'
-ErrorAction
Continue
# An empty map will be returned if no config file is found.
$pyvenvConfig
=
@{
}
if
(
$pyvenvConfigPath
)
{
Write-Verbose
"File exists, parse
`k
ey = value
`
lines"
$pyvenvConfigContent
=
Get-Content
-Path
$pyvenvConfigPath
$pyvenvConfigContent
|
ForEach-Object
{
$keyval
=
$PSItem
-split
"\s*=\s*"
,
2
if
(
$keyval
[
0
]
-and
$keyval
[
1
])
{
$val
=
$keyval
[
1
]
# Remove extraneous quotations around a string value.
if
(
"'"""
.
Contains
(
$val
.
Substring
(
0
,
1
)))
{
$val
=
$val
.
Substring
(
1
,
$val
.
Length
-
2
)
}
$pyvenvConfig
[
$keyval
[
0
]]
=
$val
Write-Verbose
"Adding Key: '
$(
$keyval
[
0
]
)
'='
$val
'"
}
}
}
return
$pyvenvConfig
}
<# Begin Activate script --------------------------------------------------- #>
# Determine the containing directory of this script
$VenvExecPath
=
Split-Path
-Parent
$MyInvocation
.
MyCommand
.
Definition
$VenvExecDir
=
Get-Item
-Path
$VenvExecPath
Write-Verbose
"Activation script is located in path: '
$VenvExecPath
'"
Write-Verbose
"VenvExecDir Fullname: '
$(
$VenvExecDir
.
FullName
)
"
Write-Verbose
"VenvExecDir Name: '
$(
$VenvExecDir
.
Name
)
"
# Set values required in priority: CmdLine, ConfigFile, Default
# First, get the location of the virtual environment, it might not be
# VenvExecDir if specified on the command line.
if
(
$VenvDir
)
{
Write-Verbose
"VenvDir given as parameter, using '
$VenvDir
' to determine values"
}
else
{
Write-Verbose
"VenvDir not given as a parameter, using parent directory name as VenvDir."
$VenvDir
=
$VenvExecDir
.
Parent
.
FullName
.
TrimEnd
(
"\\/"
)
Write-Verbose
"VenvDir=
$VenvDir
"
}
# Next, read the `pyvenv.cfg` file to determine any required value such
# as `prompt`.
$pyvenvCfg
=
Get-PyVenvConfig
-ConfigDir
$VenvDir
# Next, set the prompt from the command line, or the config file, or
# just use the name of the virtual environment folder.
if
(
$Prompt
)
{
Write-Verbose
"Prompt specified as argument, using '
$Prompt
'"
}
else
{
Write-Verbose
"Prompt not specified as argument to script, checking pyvenv.cfg value"
if
(
$pyvenvCfg
-and
$pyvenvCfg
[
'prompt'
])
{
Write-Verbose
" Setting based on value in pyvenv.cfg='
$(
$pyvenvCfg
[
'prompt'
]
)
'"
$Prompt
=
$pyvenvCfg
[
'prompt'
];
}
else
{
Write-Verbose
" Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)"
Write-Verbose
" Got leaf-name of
$VenvDir
='
$(
Split-Path
-Path
$venvDir
-Leaf
)
'"
$Prompt
=
Split-Path
-Path
$venvDir
-Leaf
}
}
Write-Verbose
"Prompt = '
$Prompt
'"
Write-Verbose
"VenvDir='
$VenvDir
'"
# Deactivate any currently active virtual environment, but leave the
# deactivate function in place.
deactivate
-nondestructive
# Now set the environment variable VIRTUAL_ENV, used by many tools to determine
# that there is an activated venv.
$
env
:
VIRTUAL_ENV
=
$VenvDir
if
(
-not
$
Env
:
VIRTUAL_ENV_DISABLE_PROMPT
)
{
Write-Verbose
"Setting prompt to '
$Prompt
'"
# Set the prompt to include the env name
# Make sure _OLD_VIRTUAL_PROMPT is global
function
global
:
_OLD_VIRTUAL_PROMPT
{
""
}
Copy-Item
-Path
function:prompt
-Destination
function:_OLD_VIRTUAL_PROMPT
New-Variable
-Name
_PYTHON_VENV_PROMPT_PREFIX
-Description
"Python virtual environment prompt prefix"
-Scope
Global
-Option
ReadOnly
-Visibility
Public
-Value
$Prompt
function
global
:
prompt
{
Write-Host
-NoNewline
-ForegroundColor
Green
"(
$_
PYTHON_VENV_PROMPT_PREFIX) "
_
OLD_VIRTUAL_PROMPT
}
$
env
:
VIRTUAL_ENV_PROMPT
=
$Prompt
}
# Clear PYTHONHOME
if
(
Test-Path
-Path
Env:PYTHONHOME
)
{
Copy-Item
-Path
Env:PYTHONHOME
-Destination
Env:_OLD_VIRTUAL_PYTHONHOME
Remove-Item
-Path
Env:PYTHONHOME
}
# Add the venv to the PATH
Copy-Item
-Path
Env:PATH
-Destination
Env:_OLD_VIRTUAL_PATH
$
Env
:
PATH
=
"
$VenvExecDir
$(
[
System.IO.Path
]::
PathSeparator
)
$
Env
:
PATH
"
src/main/java/gui_server/server-temp/venv/bin/activate
deleted
100644 → 0
View file @
8a65e137
# This file must be used with "source bin/activate" *from bash*
# you cannot run it directly
deactivate () {
# reset old environment variables
if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then
PATH="${_OLD_VIRTUAL_PATH:-}"
export PATH
unset _OLD_VIRTUAL_PATH
fi
if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then
PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}"
export PYTHONHOME
unset _OLD_VIRTUAL_PYTHONHOME
fi
# This should detect bash and zsh, which have a hash command that must
# be called to get it to forget past commands. Without forgetting
# past commands the $PATH changes we made may not be respected
if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then
hash -r 2> /dev/null
fi
if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then
PS1="${_OLD_VIRTUAL_PS1:-}"
export PS1
unset _OLD_VIRTUAL_PS1
fi
unset VIRTUAL_ENV
unset VIRTUAL_ENV_PROMPT
if [ ! "${1:-}" = "nondestructive" ] ; then
# Self destruct!
unset -f deactivate
fi
}
# unset irrelevant variables
deactivate nondestructive
VIRTUAL_ENV="/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv"
export VIRTUAL_ENV
_OLD_VIRTUAL_PATH="$PATH"
PATH="$VIRTUAL_ENV/bin:$PATH"
export PATH
# unset PYTHONHOME if set
# this will fail if PYTHONHOME is set to the empty string (which is bad anyway)
# could use `if (set -u; : $PYTHONHOME) ;` in bash
if [ -n "${PYTHONHOME:-}" ] ; then
_OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}"
unset PYTHONHOME
fi
if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then
_OLD_VIRTUAL_PS1="${PS1:-}"
PS1="(venv) ${PS1:-}"
export PS1
VIRTUAL_ENV_PROMPT="(venv) "
export VIRTUAL_ENV_PROMPT
fi
# This should detect bash and zsh, which have a hash command that must
# be called to get it to forget past commands. Without forgetting
# past commands the $PATH changes we made may not be respected
if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then
hash -r 2> /dev/null
fi
src/main/java/gui_server/server-temp/venv/bin/activate.csh
deleted
100644 → 0
View file @
8a65e137
# This file must be used with "source bin/activate.csh" *from csh*.
# You cannot run it directly.
# Created by Davide Di Blasi <davidedb@gmail.com>.
# Ported to Python 3.3 venv by Andrew Svetlov <andrew.svetlov@gmail.com>
alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate'
# Unset irrelevant variables.
deactivate nondestructive
setenv VIRTUAL_ENV "/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv"
set _OLD_VIRTUAL_PATH="$PATH"
setenv PATH "$VIRTUAL_ENV/bin:$PATH"
set _OLD_VIRTUAL_PROMPT="$prompt"
if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then
set prompt = "(venv) $prompt"
setenv VIRTUAL_ENV_PROMPT "(venv) "
endif
alias pydoc python -m pydoc
rehash
src/main/java/gui_server/server-temp/venv/bin/activate.fish
deleted
100644 → 0
View file @
8a65e137
# This file must be used with "source <venv>/bin/activate.fish" *from fish*
# (https://fishshell.com/); you cannot run it directly.
function deactivate -d "Exit virtual environment and return to normal shell environment"
# reset old environment variables
if test -n "$_OLD_VIRTUAL_PATH"
set -gx PATH $_OLD_VIRTUAL_PATH
set -e _OLD_VIRTUAL_PATH
end
if test -n "$_OLD_VIRTUAL_PYTHONHOME"
set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME
set -e _OLD_VIRTUAL_PYTHONHOME
end
if test -n "$_OLD_FISH_PROMPT_OVERRIDE"
set -e _OLD_FISH_PROMPT_OVERRIDE
# prevents error when using nested fish instances (Issue #93858)
if functions -q _old_fish_prompt
functions -e fish_prompt
functions -c _old_fish_prompt fish_prompt
functions -e _old_fish_prompt
end
end
set -e VIRTUAL_ENV
set -e VIRTUAL_ENV_PROMPT
if test "$argv[1]" != "nondestructive"
# Self-destruct!
functions -e deactivate
end
end
# Unset irrelevant variables.
deactivate nondestructive
set -gx VIRTUAL_ENV "/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv"
set -gx _OLD_VIRTUAL_PATH $PATH
set -gx PATH "$VIRTUAL_ENV/bin" $PATH
# Unset PYTHONHOME if set.
if set -q PYTHONHOME
set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME
set -e PYTHONHOME
end
if test -z "$VIRTUAL_ENV_DISABLE_PROMPT"
# fish uses a function instead of an env var to generate the prompt.
# Save the current fish_prompt function as the function _old_fish_prompt.
functions -c fish_prompt _old_fish_prompt
# With the original prompt function renamed, we can override with our own.
function fish_prompt
# Save the return status of the last command.
set -l old_status $status
# Output the venv prompt; color taken from the blue of the Python logo.
printf "%s%s%s" (set_color 4B8BBE) "(venv) " (set_color normal)
# Restore the return status of the previous command.
echo "exit $old_status" | .
# Output the original/"old" prompt.
_old_fish_prompt
end
set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV"
set -gx VIRTUAL_ENV_PROMPT "(venv) "
end
src/main/java/gui_server/server-temp/venv/bin/flask
deleted
100755 → 0
View file @
8a65e137
#!/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv/bin/python3
# -*- coding: utf-8 -*-
import
re
import
sys
from
flask.cli
import
main
if
__name__
==
'
__main__
'
:
sys
.
argv
[
0
]
=
re
.
sub
(
r
'
(-script\.pyw|\.exe)?$
'
,
''
,
sys
.
argv
[
0
])
sys
.
exit
(
main
())
src/main/java/gui_server/server-temp/venv/bin/pip
deleted
100755 → 0
View file @
8a65e137
#!/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv/bin/python3
# -*- coding: utf-8 -*-
import
re
import
sys
from
pip._internal.cli.main
import
main
if
__name__
==
'
__main__
'
:
sys
.
argv
[
0
]
=
re
.
sub
(
r
'
(-script\.pyw|\.exe)?$
'
,
''
,
sys
.
argv
[
0
])
sys
.
exit
(
main
())
src/main/java/gui_server/server-temp/venv/bin/pip3
deleted
100755 → 0
View file @
8a65e137
#!/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv/bin/python3
# -*- coding: utf-8 -*-
import
re
import
sys
from
pip._internal.cli.main
import
main
if
__name__
==
'
__main__
'
:
sys
.
argv
[
0
]
=
re
.
sub
(
r
'
(-script\.pyw|\.exe)?$
'
,
''
,
sys
.
argv
[
0
])
sys
.
exit
(
main
())
src/main/java/gui_server/server-temp/venv/bin/pip3.10
deleted
100755 → 0
View file @
8a65e137
#!/Users/sam/Documents/Documents/BU/EC504/FinalProjFrontEnd/server/venv/bin/python3
# -*- coding: utf-8 -*-
import
re
import
sys
from
pip._internal.cli.main
import
main
if
__name__
==
'
__main__
'
:
sys
.
argv
[
0
]
=
re
.
sub
(
r
'
(-script\.pyw|\.exe)?$
'
,
''
,
sys
.
argv
[
0
])
sys
.
exit
(
main
())
src/main/java/gui_server/server-temp/venv/bin/python
deleted
120000 → 0
View file @
8a65e137
python3
\ No newline at end of file
src/main/java/gui_server/server-temp/venv/bin/python3
deleted
120000 → 0
View file @
8a65e137
/Library/Frameworks/Python.framework/Versions/3.10/bin/python3
\ No newline at end of file
src/main/java/gui_server/server-temp/venv/bin/python3.10
deleted
120000 → 0
View file @
8a65e137
python3
\ No newline at end of file
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/blinker/__init__.py
deleted
100644 → 0
View file @
8a65e137
from
blinker.base
import
ANY
from
blinker.base
import
NamedSignal
from
blinker.base
import
Namespace
from
blinker.base
import
receiver_connected
from
blinker.base
import
Signal
from
blinker.base
import
signal
from
blinker.base
import
WeakNamespace
__all__
=
[
"
ANY
"
,
"
NamedSignal
"
,
"
Namespace
"
,
"
Signal
"
,
"
WeakNamespace
"
,
"
receiver_connected
"
,
"
signal
"
,
]
__version__
=
"
1.7.0
"
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/blinker/_saferef.py
deleted
100644 → 0
View file @
8a65e137
# extracted from Louie, http://pylouie.org/
# updated for Python 3
#
# Copyright (c) 2006 Patrick K. O'Brien, Mike C. Fletcher,
# Matthew R. Scott
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# * Neither the name of the <ORGANIZATION> nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
"""
Refactored
'
safe reference from dispatcher.py
"""
import
operator
import
sys
import
traceback
import
weakref
get_self
=
operator
.
attrgetter
(
"
__self__
"
)
get_func
=
operator
.
attrgetter
(
"
__func__
"
)
def
safe_ref
(
target
,
on_delete
=
None
):
"""
Return a *safe* weak reference to a callable target.
- ``target``: The object to be weakly referenced, if it
'
s a bound
method reference, will create a BoundMethodWeakref, otherwise
creates a simple weakref.
- ``on_delete``: If provided, will have a hard reference stored to
the callable to be called after the safe reference goes out of
scope with the reference object, (either a weakref or a
BoundMethodWeakref) as argument.
"""
try
:
im_self
=
get_self
(
target
)
except
AttributeError
:
if
callable
(
on_delete
):
return
weakref
.
ref
(
target
,
on_delete
)
else
:
return
weakref
.
ref
(
target
)
else
:
if
im_self
is
not
None
:
# Turn a bound method into a BoundMethodWeakref instance.
# Keep track of these instances for lookup by disconnect().
assert
hasattr
(
target
,
"
im_func
"
)
or
hasattr
(
target
,
"
__func__
"
),
(
f
"
safe_ref target
{
target
!r}
has im_self, but no im_func,
"
"
don
'
t know how to create reference
"
)
reference
=
BoundMethodWeakref
(
target
=
target
,
on_delete
=
on_delete
)
return
reference
class
BoundMethodWeakref
:
"""'
Safe
'
and reusable weak references to instance methods.
BoundMethodWeakref objects provide a mechanism for referencing a
bound method without requiring that the method object itself
(which is normally a transient object) is kept alive. Instead,
the BoundMethodWeakref object keeps weak references to both the
object and the function which together define the instance method.
Attributes:
- ``key``: The identity key for the reference, calculated by the
class
'
s calculate_key method applied to the target instance method.
- ``deletion_methods``: Sequence of callable objects taking single
argument, a reference to this object which will be called when
*either* the target object or target function is garbage
collected (i.e. when this object becomes invalid). These are
specified as the on_delete parameters of safe_ref calls.
- ``weak_self``: Weak reference to the target object.
- ``weak_func``: Weak reference to the target function.
Class Attributes:
- ``_all_instances``: Class attribute pointing to all live
BoundMethodWeakref objects indexed by the class
'
s
calculate_key(target) method applied to the target objects.
This weak value dictionary is used to short-circuit creation so
that multiple references to the same (object, function) pair
produce the same BoundMethodWeakref instance.
"""
_all_instances
=
weakref
.
WeakValueDictionary
()
# type: ignore[var-annotated]
def
__new__
(
cls
,
target
,
on_delete
=
None
,
*
arguments
,
**
named
):
"""
Create new instance or return current instance.
Basically this method of construction allows us to
short-circuit creation of references to already-referenced
instance methods. The key corresponding to the target is
calculated, and if there is already an existing reference,
that is returned, with its deletion_methods attribute updated.
Otherwise the new instance is created and registered in the
table of already-referenced methods.
"""
key
=
cls
.
calculate_key
(
target
)
current
=
cls
.
_all_instances
.
get
(
key
)
if
current
is
not
None
:
current
.
deletion_methods
.
append
(
on_delete
)
return
current
else
:
base
=
super
().
__new__
(
cls
)
cls
.
_all_instances
[
key
]
=
base
base
.
__init__
(
target
,
on_delete
,
*
arguments
,
**
named
)
return
base
def
__init__
(
self
,
target
,
on_delete
=
None
):
"""
Return a weak-reference-like instance for a bound method.
- ``target``: The instance-method target for the weak reference,
must have im_self and im_func attributes and be
reconstructable via the following, which is true of built-in
instance methods::
target.im_func.__get__( target.im_self )
- ``on_delete``: Optional callback which will be called when
this weak reference ceases to be valid (i.e. either the
object or the function is garbage collected). Should take a
single argument, which will be passed a pointer to this
object.
"""
def
remove
(
weak
,
self
=
self
):
"""
Set self.isDead to True when method or instance is destroyed.
"""
methods
=
self
.
deletion_methods
[:]
del
self
.
deletion_methods
[:]
try
:
del
self
.
__class__
.
_all_instances
[
self
.
key
]
except
KeyError
:
pass
for
function
in
methods
:
try
:
if
callable
(
function
):
function
(
self
)
except
Exception
:
try
:
traceback
.
print_exc
()
except
AttributeError
:
e
=
sys
.
exc_info
()[
1
]
print
(
f
"
Exception during saferef
{
self
}
"
f
"
cleanup function
{
function
}
:
{
e
}
"
)
self
.
deletion_methods
=
[
on_delete
]
self
.
key
=
self
.
calculate_key
(
target
)
im_self
=
get_self
(
target
)
im_func
=
get_func
(
target
)
self
.
weak_self
=
weakref
.
ref
(
im_self
,
remove
)
self
.
weak_func
=
weakref
.
ref
(
im_func
,
remove
)
self
.
self_name
=
str
(
im_self
)
self
.
func_name
=
str
(
im_func
.
__name__
)
@classmethod
def
calculate_key
(
cls
,
target
):
"""
Calculate the reference key for this reference.
Currently this is a two-tuple of the id()
'
s of the target
object and the target function respectively.
"""
return
(
id
(
get_self
(
target
)),
id
(
get_func
(
target
)))
def
__str__
(
self
):
"""
Give a friendly representation of the object.
"""
return
"
{}({}.{})
"
.
format
(
self
.
__class__
.
__name__
,
self
.
self_name
,
self
.
func_name
,
)
__repr__
=
__str__
def
__hash__
(
self
):
return
hash
((
self
.
self_name
,
self
.
key
))
def
__nonzero__
(
self
):
"""
Whether we are still a valid reference.
"""
return
self
()
is
not
None
def
__eq__
(
self
,
other
):
"""
Compare with another reference.
"""
if
not
isinstance
(
other
,
self
.
__class__
):
return
operator
.
eq
(
self
.
__class__
,
type
(
other
))
return
operator
.
eq
(
self
.
key
,
other
.
key
)
def
__call__
(
self
):
"""
Return a strong reference to the bound method.
If the target cannot be retrieved, then will return None,
otherwise returns a bound instance method for our object and
function.
Note: You may call this method any number of times, as it does
not invalidate the reference.
"""
target
=
self
.
weak_self
()
if
target
is
not
None
:
function
=
self
.
weak_func
()
if
function
is
not
None
:
return
function
.
__get__
(
target
)
return
None
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/blinker/_utilities.py
deleted
100644 → 0
View file @
8a65e137
from
__future__
import
annotations
import
typing
as
t
from
weakref
import
ref
from
blinker._saferef
import
BoundMethodWeakref
IdentityType
=
t
.
Union
[
t
.
Tuple
[
int
,
int
],
str
,
int
]
class
_symbol
:
def
__init__
(
self
,
name
):
"""
Construct a new named symbol.
"""
self
.
__name__
=
self
.
name
=
name
def
__reduce__
(
self
):
return
symbol
,
(
self
.
name
,)
def
__repr__
(
self
):
return
self
.
name
_symbol
.
__name__
=
"
symbol
"
class
symbol
:
"""
A constant symbol.
>>>
symbol
(
'
foo
'
)
is
symbol
(
'
foo
'
)
True
>>>
symbol
(
'
foo
'
)
foo
A slight refinement of the MAGICCOOKIE=object() pattern. The primary
advantage of symbol() is its repr(). They are also singletons.
Repeated calls of symbol(
'
name
'
) will all return the same instance.
"""
symbols
=
{}
# type: ignore[var-annotated]
def
__new__
(
cls
,
name
):
try
:
return
cls
.
symbols
[
name
]
except
KeyError
:
return
cls
.
symbols
.
setdefault
(
name
,
_symbol
(
name
))
def
hashable_identity
(
obj
:
object
)
->
IdentityType
:
if
hasattr
(
obj
,
"
__func__
"
):
return
(
id
(
obj
.
__func__
),
id
(
obj
.
__self__
))
# type: ignore[attr-defined]
elif
hasattr
(
obj
,
"
im_func
"
):
return
(
id
(
obj
.
im_func
),
id
(
obj
.
im_self
))
# type: ignore[attr-defined]
elif
isinstance
(
obj
,
(
int
,
str
)):
return
obj
else
:
return
id
(
obj
)
WeakTypes
=
(
ref
,
BoundMethodWeakref
)
class
annotatable_weakref
(
ref
):
"""
A weakref.ref that supports custom instance attributes.
"""
receiver_id
:
t
.
Optional
[
IdentityType
]
sender_id
:
t
.
Optional
[
IdentityType
]
def
reference
(
# type: ignore[no-untyped-def]
object
,
callback
=
None
,
**
annotations
)
->
annotatable_weakref
:
"""
Return an annotated weak ref.
"""
if
callable
(
object
):
weak
=
callable_reference
(
object
,
callback
)
else
:
weak
=
annotatable_weakref
(
object
,
callback
)
for
key
,
value
in
annotations
.
items
():
setattr
(
weak
,
key
,
value
)
return
weak
# type: ignore[no-any-return]
def
callable_reference
(
object
,
callback
=
None
):
"""
Return an annotated weak ref, supporting bound instance methods.
"""
if
hasattr
(
object
,
"
im_self
"
)
and
object
.
im_self
is
not
None
:
return
BoundMethodWeakref
(
target
=
object
,
on_delete
=
callback
)
elif
hasattr
(
object
,
"
__self__
"
)
and
object
.
__self__
is
not
None
:
return
BoundMethodWeakref
(
target
=
object
,
on_delete
=
callback
)
return
annotatable_weakref
(
object
,
callback
)
class
lazy_property
:
"""
A @property that is only evaluated once.
"""
def
__init__
(
self
,
deferred
):
self
.
_deferred
=
deferred
self
.
__doc__
=
deferred
.
__doc__
def
__get__
(
self
,
obj
,
cls
):
if
obj
is
None
:
return
self
value
=
self
.
_deferred
(
obj
)
setattr
(
obj
,
self
.
_deferred
.
__name__
,
value
)
return
value
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/blinker/base.py
deleted
100644 → 0
View file @
8a65e137
"""
Signals and events.
A small implementation of signals, inspired by a snippet of Django signal
API client code seen in a blog post. Signals are first-class objects and
each manages its own receivers and message emission.
The :func:`signal` function provides singleton behavior for named signals.
"""
from
__future__
import
annotations
import
typing
as
t
from
collections
import
defaultdict
from
contextlib
import
contextmanager
from
inspect
import
iscoroutinefunction
from
warnings
import
warn
from
weakref
import
WeakValueDictionary
from
blinker._utilities
import
annotatable_weakref
from
blinker._utilities
import
hashable_identity
from
blinker._utilities
import
IdentityType
from
blinker._utilities
import
lazy_property
from
blinker._utilities
import
reference
from
blinker._utilities
import
symbol
from
blinker._utilities
import
WeakTypes
if
t
.
TYPE_CHECKING
:
import
typing_extensions
as
te
T_callable
=
t
.
TypeVar
(
"
T_callable
"
,
bound
=
t
.
Callable
[...,
t
.
Any
])
T
=
t
.
TypeVar
(
"
T
"
)
P
=
te
.
ParamSpec
(
"
P
"
)
AsyncWrapperType
=
t
.
Callable
[[
t
.
Callable
[
P
,
t
.
Awaitable
[
T
]]],
t
.
Callable
[
P
,
T
]]
SyncWrapperType
=
t
.
Callable
[[
t
.
Callable
[
P
,
T
]],
t
.
Callable
[
P
,
t
.
Awaitable
[
T
]]]
ANY
=
symbol
(
"
ANY
"
)
ANY
.
__doc__
=
'
Token for
"
any sender
"
.
'
ANY_ID
=
0
# NOTE: We need a reference to cast for use in weakref callbacks otherwise
# t.cast may have already been set to None during finalization.
cast
=
t
.
cast
class
Signal
:
"""
A notification emitter.
"""
#: An :obj:`ANY` convenience synonym, allows ``Signal.ANY``
#: without an additional import.
ANY
=
ANY
set_class
:
type
[
set
]
=
set
@lazy_property
def
receiver_connected
(
self
)
->
Signal
:
"""
Emitted after each :meth:`connect`.
The signal sender is the signal instance, and the :meth:`connect`
arguments are passed through: *receiver*, *sender*, and *weak*.
.. versionadded:: 1.2
"""
return
Signal
(
doc
=
"
Emitted after a receiver connects.
"
)
@lazy_property
def
receiver_disconnected
(
self
)
->
Signal
:
"""
Emitted after :meth:`disconnect`.
The sender is the signal instance, and the :meth:`disconnect` arguments
are passed through: *receiver* and *sender*.
Note, this signal is emitted **only** when :meth:`disconnect` is
called explicitly.
The disconnect signal can not be emitted by an automatic disconnect
(due to a weakly referenced receiver or sender going out of scope),
as the receiver and/or sender instances are no longer available for
use at the time this signal would be emitted.
An alternative approach is available by subscribing to
:attr:`receiver_connected` and setting up a custom weakref cleanup
callback on weak receivers and senders.
.. versionadded:: 1.2
"""
return
Signal
(
doc
=
"
Emitted after a receiver disconnects.
"
)
def
__init__
(
self
,
doc
:
str
|
None
=
None
)
->
None
:
"""
:param doc: optional. If provided, will be assigned to the signal
'
s
__doc__ attribute.
"""
if
doc
:
self
.
__doc__
=
doc
#: A mapping of connected receivers.
#:
#: The values of this mapping are not meaningful outside of the
#: internal :class:`Signal` implementation, however the boolean value
#: of the mapping is useful as an extremely efficient check to see if
#: any receivers are connected to the signal.
self
.
receivers
:
dict
[
IdentityType
,
t
.
Callable
|
annotatable_weakref
]
=
{}
self
.
is_muted
=
False
self
.
_by_receiver
:
dict
[
IdentityType
,
set
[
IdentityType
]]
=
defaultdict
(
self
.
set_class
)
self
.
_by_sender
:
dict
[
IdentityType
,
set
[
IdentityType
]]
=
defaultdict
(
self
.
set_class
)
self
.
_weak_senders
:
dict
[
IdentityType
,
annotatable_weakref
]
=
{}
def
connect
(
self
,
receiver
:
T_callable
,
sender
:
t
.
Any
=
ANY
,
weak
:
bool
=
True
)
->
T_callable
:
"""
Connect *receiver* to signal events sent by *sender*.
:param receiver: A callable. Will be invoked by :meth:`send` with
`sender=` as a single positional argument and any ``kwargs`` that
were provided to a call to :meth:`send`.
:param sender: Any object or :obj:`ANY`, defaults to ``ANY``.
Restricts notifications delivered to *receiver* to only those
:meth:`send` emissions sent by *sender*. If ``ANY``, the receiver
will always be notified. A *receiver* may be connected to
multiple *sender* values on the same Signal through multiple calls
to :meth:`connect`.
:param weak: If true, the Signal will hold a weakref to *receiver*
and automatically disconnect when *receiver* goes out of scope or
is garbage collected. Defaults to True.
"""
receiver_id
=
hashable_identity
(
receiver
)
receiver_ref
:
T_callable
|
annotatable_weakref
if
weak
:
receiver_ref
=
reference
(
receiver
,
self
.
_cleanup_receiver
)
receiver_ref
.
receiver_id
=
receiver_id
else
:
receiver_ref
=
receiver
sender_id
:
IdentityType
if
sender
is
ANY
:
sender_id
=
ANY_ID
else
:
sender_id
=
hashable_identity
(
sender
)
self
.
receivers
.
setdefault
(
receiver_id
,
receiver_ref
)
self
.
_by_sender
[
sender_id
].
add
(
receiver_id
)
self
.
_by_receiver
[
receiver_id
].
add
(
sender_id
)
del
receiver_ref
if
sender
is
not
ANY
and
sender_id
not
in
self
.
_weak_senders
:
# wire together a cleanup for weakref-able senders
try
:
sender_ref
=
reference
(
sender
,
self
.
_cleanup_sender
)
sender_ref
.
sender_id
=
sender_id
except
TypeError
:
pass
else
:
self
.
_weak_senders
.
setdefault
(
sender_id
,
sender_ref
)
del
sender_ref
# broadcast this connection. if receivers raise, disconnect.
if
"
receiver_connected
"
in
self
.
__dict__
and
self
.
receiver_connected
.
receivers
:
try
:
self
.
receiver_connected
.
send
(
self
,
receiver
=
receiver
,
sender
=
sender
,
weak
=
weak
)
except
TypeError
as
e
:
self
.
disconnect
(
receiver
,
sender
)
raise
e
if
receiver_connected
.
receivers
and
self
is
not
receiver_connected
:
try
:
receiver_connected
.
send
(
self
,
receiver_arg
=
receiver
,
sender_arg
=
sender
,
weak_arg
=
weak
)
except
TypeError
as
e
:
self
.
disconnect
(
receiver
,
sender
)
raise
e
return
receiver
def
connect_via
(
self
,
sender
:
t
.
Any
,
weak
:
bool
=
False
)
->
t
.
Callable
[[
T_callable
],
T_callable
]:
"""
Connect the decorated function as a receiver for *sender*.
:param sender: Any object or :obj:`ANY`. The decorated function
will only receive :meth:`send` emissions sent by *sender*. If
``ANY``, the receiver will always be notified. A function may be
decorated multiple times with differing *sender* values.
:param weak: If true, the Signal will hold a weakref to the
decorated function and automatically disconnect when *receiver*
goes out of scope or is garbage collected. Unlike
:meth:`connect`, this defaults to False.
The decorated function will be invoked by :meth:`send` with
`sender=` as a single positional argument and any ``kwargs`` that
were provided to the call to :meth:`send`.
.. versionadded:: 1.1
"""
def
decorator
(
fn
:
T_callable
)
->
T_callable
:
self
.
connect
(
fn
,
sender
,
weak
)
return
fn
return
decorator
@contextmanager
def
connected_to
(
self
,
receiver
:
t
.
Callable
,
sender
:
t
.
Any
=
ANY
)
->
t
.
Generator
[
None
,
None
,
None
]:
"""
Execute a block with the signal temporarily connected to *receiver*.
:param receiver: a receiver callable
:param sender: optional, a sender to filter on
This is a context manager for use in the ``with`` statement. It can
be useful in unit tests. *receiver* is connected to the signal for
the duration of the ``with`` block, and will be disconnected
automatically when exiting the block:
.. code-block:: python
with on_ready.connected_to(receiver):
# do stuff
on_ready.send(123)
.. versionadded:: 1.1
"""
self
.
connect
(
receiver
,
sender
=
sender
,
weak
=
False
)
try
:
yield
None
finally
:
self
.
disconnect
(
receiver
)
@contextmanager
def
muted
(
self
)
->
t
.
Generator
[
None
,
None
,
None
]:
"""
Context manager for temporarily disabling signal.
Useful for test purposes.
"""
self
.
is_muted
=
True
try
:
yield
None
except
Exception
as
e
:
raise
e
finally
:
self
.
is_muted
=
False
def
temporarily_connected_to
(
self
,
receiver
:
t
.
Callable
,
sender
:
t
.
Any
=
ANY
)
->
t
.
ContextManager
[
None
]:
"""
An alias for :meth:`connected_to`.
:param receiver: a receiver callable
:param sender: optional, a sender to filter on
.. versionadded:: 0.9
.. versionchanged:: 1.1
Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was
deprecated in 1.2 and will be removed in a subsequent version.
"""
warn
(
"
temporarily_connected_to is deprecated; use connected_to instead.
"
,
DeprecationWarning
,
)
return
self
.
connected_to
(
receiver
,
sender
)
def
send
(
self
,
*
sender
:
t
.
Any
,
_async_wrapper
:
AsyncWrapperType
|
None
=
None
,
**
kwargs
:
t
.
Any
,
)
->
list
[
tuple
[
t
.
Callable
,
t
.
Any
]]:
"""
Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
value. The ordering of receiver notification is undefined.
:param sender: Any object or ``None``. If omitted, synonymous
with ``None``. Only accepts one positional argument.
:param _async_wrapper: A callable that should wrap a coroutine
receiver and run it when called synchronously.
:param kwargs: Data to be sent to receivers.
"""
if
self
.
is_muted
:
return
[]
sender
=
self
.
_extract_sender
(
sender
)
results
=
[]
for
receiver
in
self
.
receivers_for
(
sender
):
if
iscoroutinefunction
(
receiver
):
if
_async_wrapper
is
None
:
raise
RuntimeError
(
"
Cannot send to a coroutine function
"
)
receiver
=
_async_wrapper
(
receiver
)
result
=
receiver
(
sender
,
**
kwargs
)
results
.
append
((
receiver
,
result
))
return
results
async
def
send_async
(
self
,
*
sender
:
t
.
Any
,
_sync_wrapper
:
SyncWrapperType
|
None
=
None
,
**
kwargs
:
t
.
Any
,
)
->
list
[
tuple
[
t
.
Callable
,
t
.
Any
]]:
"""
Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
value. The ordering of receiver notification is undefined.
:param sender: Any object or ``None``. If omitted, synonymous
with ``None``. Only accepts one positional argument.
:param _sync_wrapper: A callable that should wrap a synchronous
receiver and run it when awaited.
:param kwargs: Data to be sent to receivers.
"""
if
self
.
is_muted
:
return
[]
sender
=
self
.
_extract_sender
(
sender
)
results
=
[]
for
receiver
in
self
.
receivers_for
(
sender
):
if
not
iscoroutinefunction
(
receiver
):
if
_sync_wrapper
is
None
:
raise
RuntimeError
(
"
Cannot send to a non-coroutine function
"
)
receiver
=
_sync_wrapper
(
receiver
)
result
=
await
receiver
(
sender
,
**
kwargs
)
results
.
append
((
receiver
,
result
))
return
results
def
_extract_sender
(
self
,
sender
:
t
.
Any
)
->
t
.
Any
:
if
not
self
.
receivers
:
# Ensure correct signature even on no-op sends, disable with -O
# for lowest possible cost.
if
__debug__
and
sender
and
len
(
sender
)
>
1
:
raise
TypeError
(
f
"
send() accepts only one positional argument,
{
len
(
sender
)
}
given
"
)
return
[]
# Using '*sender' rather than 'sender=None' allows 'sender' to be
# used as a keyword argument- i.e. it's an invisible name in the
# function signature.
if
len
(
sender
)
==
0
:
sender
=
None
elif
len
(
sender
)
>
1
:
raise
TypeError
(
f
"
send() accepts only one positional argument,
{
len
(
sender
)
}
given
"
)
else
:
sender
=
sender
[
0
]
return
sender
def
has_receivers_for
(
self
,
sender
:
t
.
Any
)
->
bool
:
"""
True if there is probably a receiver for *sender*.
Performs an optimistic check only. Does not guarantee that all
weakly referenced receivers are still alive. See
:meth:`receivers_for` for a stronger search.
"""
if
not
self
.
receivers
:
return
False
if
self
.
_by_sender
[
ANY_ID
]:
return
True
if
sender
is
ANY
:
return
False
return
hashable_identity
(
sender
)
in
self
.
_by_sender
def
receivers_for
(
self
,
sender
:
t
.
Any
)
->
t
.
Generator
[
t
.
Callable
[[
t
.
Any
],
t
.
Any
],
None
,
None
]:
"""
Iterate all live receivers listening for *sender*.
"""
# TODO: test receivers_for(ANY)
if
self
.
receivers
:
sender_id
=
hashable_identity
(
sender
)
if
sender_id
in
self
.
_by_sender
:
ids
=
self
.
_by_sender
[
ANY_ID
]
|
self
.
_by_sender
[
sender_id
]
else
:
ids
=
self
.
_by_sender
[
ANY_ID
].
copy
()
for
receiver_id
in
ids
:
receiver
=
self
.
receivers
.
get
(
receiver_id
)
if
receiver
is
None
:
continue
if
isinstance
(
receiver
,
WeakTypes
):
strong
=
receiver
()
if
strong
is
None
:
self
.
_disconnect
(
receiver_id
,
ANY_ID
)
continue
receiver
=
strong
yield
receiver
# type: ignore[misc]
def
disconnect
(
self
,
receiver
:
t
.
Callable
,
sender
:
t
.
Any
=
ANY
)
->
None
:
"""
Disconnect *receiver* from this signal
'
s events.
:param receiver: a previously :meth:`connected<connect>` callable
:param sender: a specific sender to disconnect from, or :obj:`ANY`
to disconnect from all senders. Defaults to ``ANY``.
"""
sender_id
:
IdentityType
if
sender
is
ANY
:
sender_id
=
ANY_ID
else
:
sender_id
=
hashable_identity
(
sender
)
receiver_id
=
hashable_identity
(
receiver
)
self
.
_disconnect
(
receiver_id
,
sender_id
)
if
(
"
receiver_disconnected
"
in
self
.
__dict__
and
self
.
receiver_disconnected
.
receivers
):
self
.
receiver_disconnected
.
send
(
self
,
receiver
=
receiver
,
sender
=
sender
)
def
_disconnect
(
self
,
receiver_id
:
IdentityType
,
sender_id
:
IdentityType
)
->
None
:
if
sender_id
==
ANY_ID
:
if
self
.
_by_receiver
.
pop
(
receiver_id
,
False
):
for
bucket
in
self
.
_by_sender
.
values
():
bucket
.
discard
(
receiver_id
)
self
.
receivers
.
pop
(
receiver_id
,
None
)
else
:
self
.
_by_sender
[
sender_id
].
discard
(
receiver_id
)
self
.
_by_receiver
[
receiver_id
].
discard
(
sender_id
)
def
_cleanup_receiver
(
self
,
receiver_ref
:
annotatable_weakref
)
->
None
:
"""
Disconnect a receiver from all senders.
"""
self
.
_disconnect
(
cast
(
IdentityType
,
receiver_ref
.
receiver_id
),
ANY_ID
)
def
_cleanup_sender
(
self
,
sender_ref
:
annotatable_weakref
)
->
None
:
"""
Disconnect all receivers from a sender.
"""
sender_id
=
cast
(
IdentityType
,
sender_ref
.
sender_id
)
assert
sender_id
!=
ANY_ID
self
.
_weak_senders
.
pop
(
sender_id
,
None
)
for
receiver_id
in
self
.
_by_sender
.
pop
(
sender_id
,
()):
self
.
_by_receiver
[
receiver_id
].
discard
(
sender_id
)
def
_cleanup_bookkeeping
(
self
)
->
None
:
"""
Prune unused sender/receiver bookkeeping. Not threadsafe.
Connecting & disconnecting leave behind a small amount of bookkeeping
for the receiver and sender values. Typical workloads using Blinker,
for example in most web apps, Flask, CLI scripts, etc., are not
adversely affected by this bookkeeping.
With a long-running Python process performing dynamic signal routing
with high volume- e.g. connecting to function closures,
"
senders
"
are
all unique object instances, and doing all of this over and over- you
may see memory usage will grow due to extraneous bookkeeping. (An empty
set() for each stale sender/receiver pair.)
This method will prune that bookkeeping away, with the caveat that such
pruning is not threadsafe. The risk is that cleanup of a fully
disconnected receiver/sender pair occurs while another thread is
connecting that same pair. If you are in the highly dynamic, unique
receiver/sender situation that has lead you to this method, that
failure mode is perhaps not a big deal for you.
"""
for
mapping
in
(
self
.
_by_sender
,
self
.
_by_receiver
):
for
_id
,
bucket
in
list
(
mapping
.
items
()):
if
not
bucket
:
mapping
.
pop
(
_id
,
None
)
def
_clear_state
(
self
)
->
None
:
"""
Throw away all signal state. Useful for unit tests.
"""
self
.
_weak_senders
.
clear
()
self
.
receivers
.
clear
()
self
.
_by_sender
.
clear
()
self
.
_by_receiver
.
clear
()
receiver_connected
=
Signal
(
"""
\
Sent by a :class:`Signal` after a receiver connects.
:argument: the Signal that was connected to
:keyword receiver_arg: the connected receiver
:keyword sender_arg: the sender to connect to
:keyword weak_arg: true if the connection to receiver_arg is a weak reference
.. deprecated:: 1.2
As of 1.2, individual signals have their own private
:attr:`~Signal.receiver_connected` and
:attr:`~Signal.receiver_disconnected` signals with a slightly simplified
call signature. This global signal is planned to be removed in 1.6.
"""
)
class
NamedSignal
(
Signal
):
"""
A named generic notification emitter.
"""
def
__init__
(
self
,
name
:
str
,
doc
:
str
|
None
=
None
)
->
None
:
Signal
.
__init__
(
self
,
doc
)
#: The name of this signal.
self
.
name
=
name
def
__repr__
(
self
)
->
str
:
base
=
Signal
.
__repr__
(
self
)
return
f
"
{
base
[
:
-
1
]
}
;
{
self
.
name
!r}
>
"
# noqa: E702
class
Namespace
(
dict
):
"""
A mapping of signal names to signals.
"""
def
signal
(
self
,
name
:
str
,
doc
:
str
|
None
=
None
)
->
NamedSignal
:
"""
Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try
:
return
self
[
name
]
# type: ignore[no-any-return]
except
KeyError
:
result
=
self
.
setdefault
(
name
,
NamedSignal
(
name
,
doc
))
return
result
# type: ignore[no-any-return]
class
WeakNamespace
(
WeakValueDictionary
):
"""
A weak mapping of signal names to signals.
Automatically cleans up unused Signals when the last reference goes out
of scope. This namespace implementation exists for a measure of legacy
compatibility with Blinker <= 1.2, and may be dropped in the future.
.. versionadded:: 1.3
"""
def
signal
(
self
,
name
:
str
,
doc
:
str
|
None
=
None
)
->
NamedSignal
:
"""
Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try
:
return
self
[
name
]
# type: ignore[no-any-return]
except
KeyError
:
result
=
self
.
setdefault
(
name
,
NamedSignal
(
name
,
doc
))
return
result
# type: ignore[no-any-return]
signal
=
Namespace
().
signal
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/click/__init__.py
deleted
100644 → 0
View file @
8a65e137
"""
Click is a simple Python module inspired by the stdlib optparse to make
writing command line scripts fun. Unlike other modules, it
'
s based
around a simple API that does not come with too much magic and is
composable.
"""
from
.core
import
Argument
as
Argument
from
.core
import
BaseCommand
as
BaseCommand
from
.core
import
Command
as
Command
from
.core
import
CommandCollection
as
CommandCollection
from
.core
import
Context
as
Context
from
.core
import
Group
as
Group
from
.core
import
MultiCommand
as
MultiCommand
from
.core
import
Option
as
Option
from
.core
import
Parameter
as
Parameter
from
.decorators
import
argument
as
argument
from
.decorators
import
command
as
command
from
.decorators
import
confirmation_option
as
confirmation_option
from
.decorators
import
group
as
group
from
.decorators
import
help_option
as
help_option
from
.decorators
import
make_pass_decorator
as
make_pass_decorator
from
.decorators
import
option
as
option
from
.decorators
import
pass_context
as
pass_context
from
.decorators
import
pass_obj
as
pass_obj
from
.decorators
import
password_option
as
password_option
from
.decorators
import
version_option
as
version_option
from
.exceptions
import
Abort
as
Abort
from
.exceptions
import
BadArgumentUsage
as
BadArgumentUsage
from
.exceptions
import
BadOptionUsage
as
BadOptionUsage
from
.exceptions
import
BadParameter
as
BadParameter
from
.exceptions
import
ClickException
as
ClickException
from
.exceptions
import
FileError
as
FileError
from
.exceptions
import
MissingParameter
as
MissingParameter
from
.exceptions
import
NoSuchOption
as
NoSuchOption
from
.exceptions
import
UsageError
as
UsageError
from
.formatting
import
HelpFormatter
as
HelpFormatter
from
.formatting
import
wrap_text
as
wrap_text
from
.globals
import
get_current_context
as
get_current_context
from
.parser
import
OptionParser
as
OptionParser
from
.termui
import
clear
as
clear
from
.termui
import
confirm
as
confirm
from
.termui
import
echo_via_pager
as
echo_via_pager
from
.termui
import
edit
as
edit
from
.termui
import
getchar
as
getchar
from
.termui
import
launch
as
launch
from
.termui
import
pause
as
pause
from
.termui
import
progressbar
as
progressbar
from
.termui
import
prompt
as
prompt
from
.termui
import
secho
as
secho
from
.termui
import
style
as
style
from
.termui
import
unstyle
as
unstyle
from
.types
import
BOOL
as
BOOL
from
.types
import
Choice
as
Choice
from
.types
import
DateTime
as
DateTime
from
.types
import
File
as
File
from
.types
import
FLOAT
as
FLOAT
from
.types
import
FloatRange
as
FloatRange
from
.types
import
INT
as
INT
from
.types
import
IntRange
as
IntRange
from
.types
import
ParamType
as
ParamType
from
.types
import
Path
as
Path
from
.types
import
STRING
as
STRING
from
.types
import
Tuple
as
Tuple
from
.types
import
UNPROCESSED
as
UNPROCESSED
from
.types
import
UUID
as
UUID
from
.utils
import
echo
as
echo
from
.utils
import
format_filename
as
format_filename
from
.utils
import
get_app_dir
as
get_app_dir
from
.utils
import
get_binary_stream
as
get_binary_stream
from
.utils
import
get_text_stream
as
get_text_stream
from
.utils
import
open_file
as
open_file
__version__
=
"
8.1.7
"
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/click/_compat.py
deleted
100644 → 0
View file @
8a65e137
import
codecs
import
io
import
os
import
re
import
sys
import
typing
as
t
from
weakref
import
WeakKeyDictionary
CYGWIN
=
sys
.
platform
.
startswith
(
"
cygwin
"
)
WIN
=
sys
.
platform
.
startswith
(
"
win
"
)
auto_wrap_for_ansi
:
t
.
Optional
[
t
.
Callable
[[
t
.
TextIO
],
t
.
TextIO
]]
=
None
_ansi_re
=
re
.
compile
(
r
"
\033\[[;?0-9]*[a-zA-Z]
"
)
def
_make_text_stream
(
stream
:
t
.
BinaryIO
,
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
force_readable
:
bool
=
False
,
force_writable
:
bool
=
False
,
)
->
t
.
TextIO
:
if
encoding
is
None
:
encoding
=
get_best_encoding
(
stream
)
if
errors
is
None
:
errors
=
"
replace
"
return
_NonClosingTextIOWrapper
(
stream
,
encoding
,
errors
,
line_buffering
=
True
,
force_readable
=
force_readable
,
force_writable
=
force_writable
,
)
def
is_ascii_encoding
(
encoding
:
str
)
->
bool
:
"""
Checks if a given encoding is ascii.
"""
try
:
return
codecs
.
lookup
(
encoding
).
name
==
"
ascii
"
except
LookupError
:
return
False
def
get_best_encoding
(
stream
:
t
.
IO
[
t
.
Any
])
->
str
:
"""
Returns the default stream encoding if not found.
"""
rv
=
getattr
(
stream
,
"
encoding
"
,
None
)
or
sys
.
getdefaultencoding
()
if
is_ascii_encoding
(
rv
):
return
"
utf-8
"
return
rv
class
_NonClosingTextIOWrapper
(
io
.
TextIOWrapper
):
def
__init__
(
self
,
stream
:
t
.
BinaryIO
,
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
force_readable
:
bool
=
False
,
force_writable
:
bool
=
False
,
**
extra
:
t
.
Any
,
)
->
None
:
self
.
_stream
=
stream
=
t
.
cast
(
t
.
BinaryIO
,
_FixupStream
(
stream
,
force_readable
,
force_writable
)
)
super
().
__init__
(
stream
,
encoding
,
errors
,
**
extra
)
def
__del__
(
self
)
->
None
:
try
:
self
.
detach
()
except
Exception
:
pass
def
isatty
(
self
)
->
bool
:
# https://bitbucket.org/pypy/pypy/issue/1803
return
self
.
_stream
.
isatty
()
class
_FixupStream
:
"""
The new io interface needs more from streams than streams
traditionally implement. As such, this fix-up code is necessary in
some circumstances.
The forcing of readable and writable flags are there because some tools
put badly patched objects on sys (one such offender are certain version
of jupyter notebook).
"""
def
__init__
(
self
,
stream
:
t
.
BinaryIO
,
force_readable
:
bool
=
False
,
force_writable
:
bool
=
False
,
):
self
.
_stream
=
stream
self
.
_force_readable
=
force_readable
self
.
_force_writable
=
force_writable
def
__getattr__
(
self
,
name
:
str
)
->
t
.
Any
:
return
getattr
(
self
.
_stream
,
name
)
def
read1
(
self
,
size
:
int
)
->
bytes
:
f
=
getattr
(
self
.
_stream
,
"
read1
"
,
None
)
if
f
is
not
None
:
return
t
.
cast
(
bytes
,
f
(
size
))
return
self
.
_stream
.
read
(
size
)
def
readable
(
self
)
->
bool
:
if
self
.
_force_readable
:
return
True
x
=
getattr
(
self
.
_stream
,
"
readable
"
,
None
)
if
x
is
not
None
:
return
t
.
cast
(
bool
,
x
())
try
:
self
.
_stream
.
read
(
0
)
except
Exception
:
return
False
return
True
def
writable
(
self
)
->
bool
:
if
self
.
_force_writable
:
return
True
x
=
getattr
(
self
.
_stream
,
"
writable
"
,
None
)
if
x
is
not
None
:
return
t
.
cast
(
bool
,
x
())
try
:
self
.
_stream
.
write
(
""
)
# type: ignore
except
Exception
:
try
:
self
.
_stream
.
write
(
b
""
)
except
Exception
:
return
False
return
True
def
seekable
(
self
)
->
bool
:
x
=
getattr
(
self
.
_stream
,
"
seekable
"
,
None
)
if
x
is
not
None
:
return
t
.
cast
(
bool
,
x
())
try
:
self
.
_stream
.
seek
(
self
.
_stream
.
tell
())
except
Exception
:
return
False
return
True
def
_is_binary_reader
(
stream
:
t
.
IO
[
t
.
Any
],
default
:
bool
=
False
)
->
bool
:
try
:
return
isinstance
(
stream
.
read
(
0
),
bytes
)
except
Exception
:
return
default
# This happens in some cases where the stream was already
# closed. In this case, we assume the default.
def
_is_binary_writer
(
stream
:
t
.
IO
[
t
.
Any
],
default
:
bool
=
False
)
->
bool
:
try
:
stream
.
write
(
b
""
)
except
Exception
:
try
:
stream
.
write
(
""
)
return
False
except
Exception
:
pass
return
default
return
True
def
_find_binary_reader
(
stream
:
t
.
IO
[
t
.
Any
])
->
t
.
Optional
[
t
.
BinaryIO
]:
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detaching
# the streams to get binary streams. Some code might do this, so
# we need to deal with this case explicitly.
if
_is_binary_reader
(
stream
,
False
):
return
t
.
cast
(
t
.
BinaryIO
,
stream
)
buf
=
getattr
(
stream
,
"
buffer
"
,
None
)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
if
buf
is
not
None
and
_is_binary_reader
(
buf
,
True
):
return
t
.
cast
(
t
.
BinaryIO
,
buf
)
return
None
def
_find_binary_writer
(
stream
:
t
.
IO
[
t
.
Any
])
->
t
.
Optional
[
t
.
BinaryIO
]:
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detaching
# the streams to get binary streams. Some code might do this, so
# we need to deal with this case explicitly.
if
_is_binary_writer
(
stream
,
False
):
return
t
.
cast
(
t
.
BinaryIO
,
stream
)
buf
=
getattr
(
stream
,
"
buffer
"
,
None
)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
if
buf
is
not
None
and
_is_binary_writer
(
buf
,
True
):
return
t
.
cast
(
t
.
BinaryIO
,
buf
)
return
None
def
_stream_is_misconfigured
(
stream
:
t
.
TextIO
)
->
bool
:
"""
A stream is misconfigured if its encoding is ASCII.
"""
# If the stream does not have an encoding set, we assume it's set
# to ASCII. This appears to happen in certain unittest
# environments. It's not quite clear what the correct behavior is
# but this at least will force Click to recover somehow.
return
is_ascii_encoding
(
getattr
(
stream
,
"
encoding
"
,
None
)
or
"
ascii
"
)
def
_is_compat_stream_attr
(
stream
:
t
.
TextIO
,
attr
:
str
,
value
:
t
.
Optional
[
str
])
->
bool
:
"""
A stream attribute is compatible if it is equal to the
desired value or the desired value is unset and the attribute
has a value.
"""
stream_value
=
getattr
(
stream
,
attr
,
None
)
return
stream_value
==
value
or
(
value
is
None
and
stream_value
is
not
None
)
def
_is_compatible_text_stream
(
stream
:
t
.
TextIO
,
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
]
)
->
bool
:
"""
Check if a stream
'
s encoding and errors attributes are
compatible with the desired values.
"""
return
_is_compat_stream_attr
(
stream
,
"
encoding
"
,
encoding
)
and
_is_compat_stream_attr
(
stream
,
"
errors
"
,
errors
)
def
_force_correct_text_stream
(
text_stream
:
t
.
IO
[
t
.
Any
],
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
is_binary
:
t
.
Callable
[[
t
.
IO
[
t
.
Any
],
bool
],
bool
],
find_binary
:
t
.
Callable
[[
t
.
IO
[
t
.
Any
]],
t
.
Optional
[
t
.
BinaryIO
]],
force_readable
:
bool
=
False
,
force_writable
:
bool
=
False
,
)
->
t
.
TextIO
:
if
is_binary
(
text_stream
,
False
):
binary_reader
=
t
.
cast
(
t
.
BinaryIO
,
text_stream
)
else
:
text_stream
=
t
.
cast
(
t
.
TextIO
,
text_stream
)
# If the stream looks compatible, and won't default to a
# misconfigured ascii encoding, return it as-is.
if
_is_compatible_text_stream
(
text_stream
,
encoding
,
errors
)
and
not
(
encoding
is
None
and
_stream_is_misconfigured
(
text_stream
)
):
return
text_stream
# Otherwise, get the underlying binary reader.
possible_binary_reader
=
find_binary
(
text_stream
)
# If that's not possible, silently use the original reader
# and get mojibake instead of exceptions.
if
possible_binary_reader
is
None
:
return
text_stream
binary_reader
=
possible_binary_reader
# Default errors to replace instead of strict in order to get
# something that works.
if
errors
is
None
:
errors
=
"
replace
"
# Wrap the binary stream in a text stream with the correct
# encoding parameters.
return
_make_text_stream
(
binary_reader
,
encoding
,
errors
,
force_readable
=
force_readable
,
force_writable
=
force_writable
,
)
def
_force_correct_text_reader
(
text_reader
:
t
.
IO
[
t
.
Any
],
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
force_readable
:
bool
=
False
,
)
->
t
.
TextIO
:
return
_force_correct_text_stream
(
text_reader
,
encoding
,
errors
,
_is_binary_reader
,
_find_binary_reader
,
force_readable
=
force_readable
,
)
def
_force_correct_text_writer
(
text_writer
:
t
.
IO
[
t
.
Any
],
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
force_writable
:
bool
=
False
,
)
->
t
.
TextIO
:
return
_force_correct_text_stream
(
text_writer
,
encoding
,
errors
,
_is_binary_writer
,
_find_binary_writer
,
force_writable
=
force_writable
,
)
def
get_binary_stdin
()
->
t
.
BinaryIO
:
reader
=
_find_binary_reader
(
sys
.
stdin
)
if
reader
is
None
:
raise
RuntimeError
(
"
Was not able to determine binary stream for sys.stdin.
"
)
return
reader
def
get_binary_stdout
()
->
t
.
BinaryIO
:
writer
=
_find_binary_writer
(
sys
.
stdout
)
if
writer
is
None
:
raise
RuntimeError
(
"
Was not able to determine binary stream for sys.stdout.
"
)
return
writer
def
get_binary_stderr
()
->
t
.
BinaryIO
:
writer
=
_find_binary_writer
(
sys
.
stderr
)
if
writer
is
None
:
raise
RuntimeError
(
"
Was not able to determine binary stream for sys.stderr.
"
)
return
writer
def
get_text_stdin
(
encoding
:
t
.
Optional
[
str
]
=
None
,
errors
:
t
.
Optional
[
str
]
=
None
)
->
t
.
TextIO
:
rv
=
_get_windows_console_stream
(
sys
.
stdin
,
encoding
,
errors
)
if
rv
is
not
None
:
return
rv
return
_force_correct_text_reader
(
sys
.
stdin
,
encoding
,
errors
,
force_readable
=
True
)
def
get_text_stdout
(
encoding
:
t
.
Optional
[
str
]
=
None
,
errors
:
t
.
Optional
[
str
]
=
None
)
->
t
.
TextIO
:
rv
=
_get_windows_console_stream
(
sys
.
stdout
,
encoding
,
errors
)
if
rv
is
not
None
:
return
rv
return
_force_correct_text_writer
(
sys
.
stdout
,
encoding
,
errors
,
force_writable
=
True
)
def
get_text_stderr
(
encoding
:
t
.
Optional
[
str
]
=
None
,
errors
:
t
.
Optional
[
str
]
=
None
)
->
t
.
TextIO
:
rv
=
_get_windows_console_stream
(
sys
.
stderr
,
encoding
,
errors
)
if
rv
is
not
None
:
return
rv
return
_force_correct_text_writer
(
sys
.
stderr
,
encoding
,
errors
,
force_writable
=
True
)
def
_wrap_io_open
(
file
:
t
.
Union
[
str
,
"
os.PathLike[str]
"
,
int
],
mode
:
str
,
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
],
)
->
t
.
IO
[
t
.
Any
]:
"""
Handles not passing ``encoding`` and ``errors`` in binary mode.
"""
if
"
b
"
in
mode
:
return
open
(
file
,
mode
)
return
open
(
file
,
mode
,
encoding
=
encoding
,
errors
=
errors
)
def
open_stream
(
filename
:
"
t.Union[str, os.PathLike[str]]
"
,
mode
:
str
=
"
r
"
,
encoding
:
t
.
Optional
[
str
]
=
None
,
errors
:
t
.
Optional
[
str
]
=
"
strict
"
,
atomic
:
bool
=
False
,
)
->
t
.
Tuple
[
t
.
IO
[
t
.
Any
],
bool
]:
binary
=
"
b
"
in
mode
filename
=
os
.
fspath
(
filename
)
# Standard streams first. These are simple because they ignore the
# atomic flag. Use fsdecode to handle Path("-").
if
os
.
fsdecode
(
filename
)
==
"
-
"
:
if
any
(
m
in
mode
for
m
in
[
"
w
"
,
"
a
"
,
"
x
"
]):
if
binary
:
return
get_binary_stdout
(),
False
return
get_text_stdout
(
encoding
=
encoding
,
errors
=
errors
),
False
if
binary
:
return
get_binary_stdin
(),
False
return
get_text_stdin
(
encoding
=
encoding
,
errors
=
errors
),
False
# Non-atomic writes directly go out through the regular open functions.
if
not
atomic
:
return
_wrap_io_open
(
filename
,
mode
,
encoding
,
errors
),
True
# Some usability stuff for atomic writes
if
"
a
"
in
mode
:
raise
ValueError
(
"
Appending to an existing file is not supported, because that
"
"
would involve an expensive `copy`-operation to a temporary
"
"
file. Open the file in normal `w`-mode and copy explicitly
"
"
if that
'
s what you
'
re after.
"
)
if
"
x
"
in
mode
:
raise
ValueError
(
"
Use the `overwrite`-parameter instead.
"
)
if
"
w
"
not
in
mode
:
raise
ValueError
(
"
Atomic writes only make sense with `w`-mode.
"
)
# Atomic writes are more complicated. They work by opening a file
# as a proxy in the same folder and then using the fdopen
# functionality to wrap it in a Python file. Then we wrap it in an
# atomic file that moves the file over on close.
import
errno
import
random
try
:
perm
:
t
.
Optional
[
int
]
=
os
.
stat
(
filename
).
st_mode
except
OSError
:
perm
=
None
flags
=
os
.
O_RDWR
|
os
.
O_CREAT
|
os
.
O_EXCL
if
binary
:
flags
|=
getattr
(
os
,
"
O_BINARY
"
,
0
)
while
True
:
tmp_filename
=
os
.
path
.
join
(
os
.
path
.
dirname
(
filename
),
f
"
.__atomic-write
{
random
.
randrange
(
1
<<
32
)
:
08
x
}
"
,
)
try
:
fd
=
os
.
open
(
tmp_filename
,
flags
,
0o666
if
perm
is
None
else
perm
)
break
except
OSError
as
e
:
if
e
.
errno
==
errno
.
EEXIST
or
(
os
.
name
==
"
nt
"
and
e
.
errno
==
errno
.
EACCES
and
os
.
path
.
isdir
(
e
.
filename
)
and
os
.
access
(
e
.
filename
,
os
.
W_OK
)
):
continue
raise
if
perm
is
not
None
:
os
.
chmod
(
tmp_filename
,
perm
)
# in case perm includes bits in umask
f
=
_wrap_io_open
(
fd
,
mode
,
encoding
,
errors
)
af
=
_AtomicFile
(
f
,
tmp_filename
,
os
.
path
.
realpath
(
filename
))
return
t
.
cast
(
t
.
IO
[
t
.
Any
],
af
),
True
class
_AtomicFile
:
def
__init__
(
self
,
f
:
t
.
IO
[
t
.
Any
],
tmp_filename
:
str
,
real_filename
:
str
)
->
None
:
self
.
_f
=
f
self
.
_tmp_filename
=
tmp_filename
self
.
_real_filename
=
real_filename
self
.
closed
=
False
@property
def
name
(
self
)
->
str
:
return
self
.
_real_filename
def
close
(
self
,
delete
:
bool
=
False
)
->
None
:
if
self
.
closed
:
return
self
.
_f
.
close
()
os
.
replace
(
self
.
_tmp_filename
,
self
.
_real_filename
)
self
.
closed
=
True
def
__getattr__
(
self
,
name
:
str
)
->
t
.
Any
:
return
getattr
(
self
.
_f
,
name
)
def
__enter__
(
self
)
->
"
_AtomicFile
"
:
return
self
def
__exit__
(
self
,
exc_type
:
t
.
Optional
[
t
.
Type
[
BaseException
]],
*
_
:
t
.
Any
)
->
None
:
self
.
close
(
delete
=
exc_type
is
not
None
)
def
__repr__
(
self
)
->
str
:
return
repr
(
self
.
_f
)
def
strip_ansi
(
value
:
str
)
->
str
:
return
_ansi_re
.
sub
(
""
,
value
)
def
_is_jupyter_kernel_output
(
stream
:
t
.
IO
[
t
.
Any
])
->
bool
:
while
isinstance
(
stream
,
(
_FixupStream
,
_NonClosingTextIOWrapper
)):
stream
=
stream
.
_stream
return
stream
.
__class__
.
__module__
.
startswith
(
"
ipykernel.
"
)
def
should_strip_ansi
(
stream
:
t
.
Optional
[
t
.
IO
[
t
.
Any
]]
=
None
,
color
:
t
.
Optional
[
bool
]
=
None
)
->
bool
:
if
color
is
None
:
if
stream
is
None
:
stream
=
sys
.
stdin
return
not
isatty
(
stream
)
and
not
_is_jupyter_kernel_output
(
stream
)
return
not
color
# On Windows, wrap the output streams with colorama to support ANSI
# color codes.
# NOTE: double check is needed so mypy does not analyze this on Linux
if
sys
.
platform
.
startswith
(
"
win
"
)
and
WIN
:
from
._winconsole
import
_get_windows_console_stream
def
_get_argv_encoding
()
->
str
:
import
locale
return
locale
.
getpreferredencoding
()
_ansi_stream_wrappers
:
t
.
MutableMapping
[
t
.
TextIO
,
t
.
TextIO
]
=
WeakKeyDictionary
()
def
auto_wrap_for_ansi
(
# noqa: F811
stream
:
t
.
TextIO
,
color
:
t
.
Optional
[
bool
]
=
None
)
->
t
.
TextIO
:
"""
Support ANSI color and style codes on Windows by wrapping a
stream with colorama.
"""
try
:
cached
=
_ansi_stream_wrappers
.
get
(
stream
)
except
Exception
:
cached
=
None
if
cached
is
not
None
:
return
cached
import
colorama
strip
=
should_strip_ansi
(
stream
,
color
)
ansi_wrapper
=
colorama
.
AnsiToWin32
(
stream
,
strip
=
strip
)
rv
=
t
.
cast
(
t
.
TextIO
,
ansi_wrapper
.
stream
)
_write
=
rv
.
write
def
_safe_write
(
s
):
try
:
return
_write
(
s
)
except
BaseException
:
ansi_wrapper
.
reset_all
()
raise
rv
.
write
=
_safe_write
try
:
_ansi_stream_wrappers
[
stream
]
=
rv
except
Exception
:
pass
return
rv
else
:
def
_get_argv_encoding
()
->
str
:
return
getattr
(
sys
.
stdin
,
"
encoding
"
,
None
)
or
sys
.
getfilesystemencoding
()
def
_get_windows_console_stream
(
f
:
t
.
TextIO
,
encoding
:
t
.
Optional
[
str
],
errors
:
t
.
Optional
[
str
]
)
->
t
.
Optional
[
t
.
TextIO
]:
return
None
def
term_len
(
x
:
str
)
->
int
:
return
len
(
strip_ansi
(
x
))
def
isatty
(
stream
:
t
.
IO
[
t
.
Any
])
->
bool
:
try
:
return
stream
.
isatty
()
except
Exception
:
return
False
def
_make_cached_stream_func
(
src_func
:
t
.
Callable
[[],
t
.
Optional
[
t
.
TextIO
]],
wrapper_func
:
t
.
Callable
[[],
t
.
TextIO
],
)
->
t
.
Callable
[[],
t
.
Optional
[
t
.
TextIO
]]:
cache
:
t
.
MutableMapping
[
t
.
TextIO
,
t
.
TextIO
]
=
WeakKeyDictionary
()
def
func
()
->
t
.
Optional
[
t
.
TextIO
]:
stream
=
src_func
()
if
stream
is
None
:
return
None
try
:
rv
=
cache
.
get
(
stream
)
except
Exception
:
rv
=
None
if
rv
is
not
None
:
return
rv
rv
=
wrapper_func
()
try
:
cache
[
stream
]
=
rv
except
Exception
:
pass
return
rv
return
func
_default_text_stdin
=
_make_cached_stream_func
(
lambda
:
sys
.
stdin
,
get_text_stdin
)
_default_text_stdout
=
_make_cached_stream_func
(
lambda
:
sys
.
stdout
,
get_text_stdout
)
_default_text_stderr
=
_make_cached_stream_func
(
lambda
:
sys
.
stderr
,
get_text_stderr
)
binary_streams
:
t
.
Mapping
[
str
,
t
.
Callable
[[],
t
.
BinaryIO
]]
=
{
"
stdin
"
:
get_binary_stdin
,
"
stdout
"
:
get_binary_stdout
,
"
stderr
"
:
get_binary_stderr
,
}
text_streams
:
t
.
Mapping
[
str
,
t
.
Callable
[[
t
.
Optional
[
str
],
t
.
Optional
[
str
]],
t
.
TextIO
]
]
=
{
"
stdin
"
:
get_text_stdin
,
"
stdout
"
:
get_text_stdout
,
"
stderr
"
:
get_text_stderr
,
}
src/main/java/gui_server/server-temp/venv/lib/python3.10/site-packages/click/_termui_impl.py
deleted
100644 → 0
View file @
8a65e137
"""
This module contains implementations for the termui module. To keep the
import time of Click down, some infrequently used functionality is
placed in this module and only imported as needed.
"""
import
contextlib
import
math
import
os
import
sys
import
time
import
typing
as
t
from
gettext
import
gettext
as
_
from
io
import
StringIO
from
types
import
TracebackType
from
._compat
import
_default_text_stdout
from
._compat
import
CYGWIN
from
._compat
import
get_best_encoding
from
._compat
import
isatty
from
._compat
import
open_stream
from
._compat
import
strip_ansi
from
._compat
import
term_len
from
._compat
import
WIN
from
.exceptions
import
ClickException
from
.utils
import
echo
V
=
t
.
TypeVar
(
"
V
"
)
if
os
.
name
==
"
nt
"
:
BEFORE_BAR
=
"
\r
"
AFTER_BAR
=
"
\n
"
else
:
BEFORE_BAR
=
"
\r\033
[?25l
"
AFTER_BAR
=
"
\033
[?25h
\n
"
class
ProgressBar
(
t
.
Generic
[
V
]):
def
__init__
(
self
,
iterable
:
t
.
Optional
[
t
.
Iterable
[
V
]],
length
:
t
.
Optional
[
int
]
=
None
,
fill_char
:
str
=
"
#
"
,
empty_char
:
str
=
"
"
,
bar_template
:
str
=
"
%(bar)s
"
,
info_sep
:
str
=
"
"
,
show_eta
:
bool
=
True
,
show_percent
:
t
.
Optional
[
bool
]
=
None
,
show_pos
:
bool
=
False
,
item_show_func
:
t
.
Optional
[
t
.
Callable
[[
t
.
Optional
[
V
]],
t
.
Optional
[
str
]]]
=
None
,
label
:
t
.
Optional
[
str
]
=
None
,
file
:
t
.
Optional
[
t
.
TextIO
]
=
None
,
color
:
t
.
Optional
[
bool
]
=
None
,
update_min_steps
:
int
=
1
,
width
:
int
=
30
,
)
->
None
:
self
.
fill_char
=
fill_char
self
.
empty_char
=
empty_char
self
.
bar_template
=
bar_template
self
.
info_sep
=
info_sep
self
.
show_eta
=
show_eta
self
.
show_percent
=
show_percent
self
.
show_pos
=
show_pos
self
.
item_show_func
=
item_show_func
self
.
label
:
str
=
label
or
""
if
file
is
None
:
file
=
_default_text_stdout
()
# There are no standard streams attached to write to. For example,
# pythonw on Windows.
if
file
is
None
:
file
=
StringIO
()
self
.
file
=
file
self
.
color
=
color
self
.
update_min_steps
=
update_min_steps
self
.
_completed_intervals
=
0
self
.
width
:
int
=
width
self
.
autowidth
:
bool
=
width
==
0
if
length
is
None
:
from
operator
import
length_hint
length
=
length_hint
(
iterable
,
-
1
)
if
length
==
-
1
:
length
=
None
if
iterable
is
None
:
if
length
is
None
:
raise
TypeError
(
"
iterable or length is required
"
)
iterable
=
t
.
cast
(
t
.
Iterable
[
V
],
range
(
length
))
self
.
iter
:
t
.
Iterable
[
V
]
=
iter
(
iterable
)
self
.
length
=
length
self
.
pos
=
0
self
.
avg
:
t
.
List
[
float
]
=
[]
self
.
last_eta
:
float
self
.
start
:
float
self
.
start
=
self
.
last_eta
=
time
.
time
()
self
.
eta_known
:
bool
=
False
self
.
finished
:
bool
=
False
self
.
max_width
:
t
.
Optional
[
int
]
=
None
self
.
entered
:
bool
=
False
self
.
current_item
:
t
.
Optional
[
V
]
=
None
self
.
is_hidden
:
bool
=
not
isatty
(
self
.
file
)
self
.
_last_line
:
t
.
Optional
[
str
]
=
None
def
__enter__
(
self
)
->
"
ProgressBar[V]
"
:
self
.
entered
=
True
self
.
render_progress
()
return
self
def
__exit__
(
self
,
exc_type
:
t
.
Optional
[
t
.
Type
[
BaseException
]],
exc_value
:
t
.
Optional
[
BaseException
],
tb
:
t
.
Optional
[
TracebackType
],
)
->
None
:
self
.
render_finish
()
def
__iter__
(
self
)
->
t
.
Iterator
[
V
]:
if
not
self
.
entered
:
raise
RuntimeError
(
"
You need to use progress bars in a with block.
"
)
self
.
render_progress
()
return
self
.
generator
()
def
__next__
(
self
)
->
V
:
# Iteration is defined in terms of a generator function,
# returned by iter(self); use that to define next(). This works
# because `self.iter` is an iterable consumed by that generator,
# so it is re-entry safe. Calling `next(self.generator())`
# twice works and does "what you want".
return
next
(
iter
(
self
))
def
render_finish
(
self
)
->
None
:
if
self
.
is_hidden
:
return
self
.
file
.
write
(
AFTER_BAR
)
self
.
file
.
flush
()
@property
def
pct
(
self
)
->
float
:
if
self
.
finished
:
return
1.0
return
min
(
self
.
pos
/
(
float
(
self
.
length
or
1
)
or
1
),
1.0
)
@property
def
time_per_iteration
(
self
)
->
float
:
if
not
self
.
avg
:
return
0.0
return
sum
(
self
.
avg
)
/
float
(
len
(
self
.
avg
))
@property
def
eta
(
self
)
->
float
:
if
self
.
length
is
not
None
and
not
self
.
finished
:
return
self
.
time_per_iteration
*
(
self
.
length
-
self
.
pos
)
return
0.0
def
format_eta
(
self
)
->
str
:
if
self
.
eta_known
:
t
=
int
(
self
.
eta
)
seconds
=
t
%
60
t
//=
60
minutes
=
t
%
60
t
//=
60
hours
=
t
%
24
t
//=
24
if
t
>
0
:
return
f
"
{
t
}
d
{
hours
:
02
}
:
{
minutes
:
02
}
:
{
seconds
:
02
}
"
else
:
return
f
"
{
hours
:
02
}
:
{
minutes
:
02
}
:
{
seconds
:
02
}
"
return
""
def
format_pos
(
self
)
->
str
:
pos
=
str
(
self
.
pos
)
if
self
.
length
is
not
None
:
pos
+=
f
"
/
{
self
.
length
}
"
return
pos
def
format_pct
(
self
)
->
str
:
return
f
"
{
int
(
self
.
pct
*
100
)
:
4
}
%
"
[
1
:]
def
format_bar
(
self
)
->
str
:
if
self
.
length
is
not
None
:
bar_length
=
int
(
self
.
pct
*
self
.
width
)
bar
=
self
.
fill_char
*
bar_length
bar
+=
self
.
empty_char
*
(
self
.
width
-
bar_length
)
elif
self
.
finished
:
bar
=
self
.
fill_char
*
self
.
width
else
:
chars
=
list
(
self
.
empty_char
*
(
self
.
width
or
1
))
if
self
.
time_per_iteration
!=
0
:
chars
[
int
(
(
math
.
cos
(
self
.
pos
*
self
.
time_per_iteration
)
/
2.0
+
0.5
)
*
self
.
width
)
]
=
self
.
fill_char
bar
=
""
.
join
(
chars
)
return
bar
def
format_progress_line
(
self
)
->
str
:
show_percent
=
self
.
show_percent
info_bits
=
[]
if
self
.
length
is
not
None
and
show_percent
is
None
:
show_percent
=
not
self
.
show_pos
if
self
.
show_pos
:
info_bits
.
append
(
self
.
format_pos
())
if
show_percent
:
info_bits
.
append
(
self
.
format_pct
())
if
self
.
show_eta
and
self
.
eta_known
and
not
self
.
finished
:
info_bits
.
append
(
self
.
format_eta
())
if
self
.
item_show_func
is
not
None
:
item_info
=
self
.
item_show_func
(
self
.
current_item
)
if
item_info
is
not
None
:
info_bits
.
append
(
item_info
)
return
(
self
.
bar_template
%
{
"
label
"
:
self
.
label
,
"
bar
"
:
self
.
format_bar
(),
"
info
"
:
self
.
info_sep
.
join
(
info_bits
),
}
).
rstrip
()
def
render_progress
(
self
)
->
None
:
import
shutil
if
self
.
is_hidden
:
# Only output the label as it changes if the output is not a
# TTY. Use file=stderr if you expect to be piping stdout.
if
self
.
_last_line
!=
self
.
label
:
self
.
_last_line
=
self
.
label
echo
(
self
.
label
,
file
=
self
.
file
,
color
=
self
.
color
)
return
buf
=
[]
# Update width in case the terminal has been resized
if
self
.
autowidth
:
old_width
=
self
.
width
self
.
width
=
0
clutter_length
=
term_len
(
self
.
format_progress_line
())
new_width
=
max
(
0
,
shutil
.
get_terminal_size
().
columns
-
clutter_length
)
if
new_width
<
old_width
:
buf
.
append
(
BEFORE_BAR
)
buf
.
append
(
"
"
*
self
.
max_width
)
# type: ignore
self
.
max_width
=
new_width
self
.
width
=
new_width
clear_width
=
self
.
width
if
self
.
max_width
is
not
None
:
clear_width
=
self
.
max_width
buf
.
append
(
BEFORE_BAR
)
line
=
self
.
format_progress_line
()
line_len
=
term_len
(
line
)
if
self
.
max_width
is
None
or
self
.
max_width
<
line_len
:
self
.
max_width
=
line_len
buf
.
append
(
line
)
buf
.
append
(
"
"
*
(
clear_width
-
line_len
))
line
=
""
.
join
(
buf
)
# Render the line only if it changed.
if
line
!=
self
.
_last_line
:
self
.
_last_line
=
line
echo
(
line
,
file
=
self
.
file
,
color
=
self
.
color
,
nl
=
False
)
self
.
file
.
flush
()
def
make_step
(
self
,
n_steps
:
int
)
->
None
:
self
.
pos
+=
n_steps
if
self
.
length
is
not
None
and
self
.
pos
>=
self
.
length
:
self
.
finished
=
True
if
(
time
.
time
()
-
self
.
last_eta
)
<
1.0
:
return
self
.
last_eta
=
time
.
time
()
# self.avg is a rolling list of length <= 7 of steps where steps are
# defined as time elapsed divided by the total progress through
# self.length.
if
self
.
pos
:
step
=
(
time
.
time
()
-
self
.
start
)
/
self
.
pos
else
:
step
=
time
.
time
()
-
self
.
start
self
.
avg
=
self
.
avg
[
-
6
:]
+
[
step
]
self
.
eta_known
=
self
.
length
is
not
None
def
update
(
self
,
n_steps
:
int
,
current_item
:
t
.
Optional
[
V
]
=
None
)
->
None
:
"""
Update the progress bar by advancing a specified number of
steps, and optionally set the ``current_item`` for this new
position.
:param n_steps: Number of steps to advance.
:param current_item: Optional item to set as ``current_item``
for the updated position.
.. versionchanged:: 8.0
Added the ``current_item`` optional parameter.
.. versionchanged:: 8.0
Only render when the number of steps meets the
``update_min_steps`` threshold.
"""
if
current_item
is
not
None
:
self
.
current_item
=
current_item
self
.
_completed_intervals
+=
n_steps
if
self
.
_completed_intervals
>=
self
.
update_min_steps
:
self
.
make_step
(
self
.
_completed_intervals
)
self
.
render_progress
()
self
.
_completed_intervals
=
0
def
finish
(
self
)
->
None
:
self
.
eta_known
=
False
self
.
current_item
=
None
self
.
finished
=
True
def
generator
(
self
)
->
t
.
Iterator
[
V
]:
"""
Return a generator which yields the items added to the bar
during construction, and updates the progress bar *after* the
yielded block returns.
"""
# WARNING: the iterator interface for `ProgressBar` relies on
# this and only works because this is a simple generator which
# doesn't create or manage additional state. If this function
# changes, the impact should be evaluated both against
# `iter(bar)` and `next(bar)`. `next()` in particular may call
# `self.generator()` repeatedly, and this must remain safe in
# order for that interface to work.
if
not
self
.
entered
:
raise
RuntimeError
(
"
You need to use progress bars in a with block.
"
)
if
self
.
is_hidden
:
yield
from
self
.
iter
else
:
for
rv
in
self
.
iter
:
self
.
current_item
=
rv
# This allows show_item_func to be updated before the
# item is processed. Only trigger at the beginning of
# the update interval.
if
self
.
_completed_intervals
==
0
:
self
.
render_progress
()
yield
rv
self
.
update
(
1
)
self
.
finish
()
self
.
render_progress
()
def
pager
(
generator
:
t
.
Iterable
[
str
],
color
:
t
.
Optional
[
bool
]
=
None
)
->
None
:
"""
Decide what method to use for paging through text.
"""
stdout
=
_default_text_stdout
()
# There are no standard streams attached to write to. For example,
# pythonw on Windows.
if
stdout
is
None
:
stdout
=
StringIO
()
if
not
isatty
(
sys
.
stdin
)
or
not
isatty
(
stdout
):
return
_nullpager
(
stdout
,
generator
,
color
)
pager_cmd
=
(
os
.
environ
.
get
(
"
PAGER
"
,
None
)
or
""
).
strip
()
if
pager_cmd
:
if
WIN
:
return
_tempfilepager
(
generator
,
pager_cmd
,
color
)
return
_pipepager
(
generator
,
pager_cmd
,
color
)
if
os
.
environ
.
get
(
"
TERM
"
)
in
(
"
dumb
"
,
"
emacs
"
):
return
_nullpager
(
stdout
,
generator
,
color
)
if
WIN
or
sys
.
platform
.
startswith
(
"
os2
"
):
return
_tempfilepager
(
generator
,
"
more <
"
,
color
)
if
hasattr
(
os
,
"
system
"
)
and
os
.
system
(
"
(less) 2>/dev/null
"
)
==
0
:
return
_pipepager
(
generator
,
"
less
"
,
color
)
import
tempfile
fd
,
filename
=
tempfile
.
mkstemp
()
os
.
close
(
fd
)
try
:
if
hasattr
(
os
,
"
system
"
)
and
os
.
system
(
f
'
more
"
{
filename
}
"'
)
==
0
:
return
_pipepager
(
generator
,
"
more
"
,
color
)
return
_nullpager
(
stdout
,
generator
,
color
)
finally
:
os
.
unlink
(
filename
)
def
_pipepager
(
generator
:
t
.
Iterable
[
str
],
cmd
:
str
,
color
:
t
.
Optional
[
bool
])
->
None
:
"""
Page through text by feeding it to another program. Invoking a
pager through this might support colors.
"""
import
subprocess
env
=
dict
(
os
.
environ
)
# If we're piping to less we might support colors under the
# condition that
cmd_detail
=
cmd
.
rsplit
(
"
/
"
,
1
)[
-
1
].
split
()
if
color
is
None
and
cmd_detail
[
0
]
==
"
less
"
:
less_flags
=
f
"
{
os
.
environ
.
get
(
'
LESS
'
,
''
)
}{
'
'
.
join
(
cmd_detail
[
1
:
])
}
"
if
not
less_flags
:
env
[
"
LESS
"
]
=
"
-R
"
color
=
True
elif
"
r
"
in
less_flags
or
"
R
"
in
less_flags
:
color
=
True
c
=
subprocess
.
Popen
(
cmd
,
shell
=
True
,
stdin
=
subprocess
.
PIPE
,
env
=
env
)
stdin
=
t
.
cast
(
t
.
BinaryIO
,
c
.
stdin
)
encoding
=
get_best_encoding
(
stdin
)
try
:
for
text
in
generator
:
if
not
color
:
text
=
strip_ansi
(
text
)
stdin
.
write
(
text
.
encode
(
encoding
,
"
replace
"
))
except
(
OSError
,
KeyboardInterrupt
):
pass
else
:
stdin
.
close
()
# Less doesn't respect ^C, but catches it for its own UI purposes (aborting
# search or other commands inside less).
#
# That means when the user hits ^C, the parent process (click) terminates,
# but less is still alive, paging the output and messing up the terminal.
#
# If the user wants to make the pager exit on ^C, they should set
# `LESS='-K'`. It's not our decision to make.
while
True
:
try
:
c
.
wait
()
except
KeyboardInterrupt
:
pass
else
:
break
def
_tempfilepager
(
generator
:
t
.
Iterable
[
str
],
cmd
:
str
,
color
:
t
.
Optional
[
bool
]
)
->
None
:
"""
Page through text by invoking a program on a temporary file.
"""
import
tempfile
fd
,
filename
=
tempfile
.
mkstemp
()
# TODO: This never terminates if the passed generator never terminates.
text
=
""
.
join
(
generator
)
if
not
color
:
text
=
strip_ansi
(
text
)
encoding
=
get_best_encoding
(
sys
.
stdout
)
with
open_stream
(
filename
,
"
wb
"
)[
0
]
as
f
:
f
.
write
(
text
.
encode
(
encoding
))
try
:
os
.
system
(
f
'
{
cmd
}
"
{
filename
}
"'
)
finally
:
os
.
close
(
fd
)
os
.
unlink
(
filename
)
def
_nullpager
(
stream
:
t
.
TextIO
,
generator
:
t
.
Iterable
[
str
],
color
:
t
.
Optional
[
bool
]
)
->
None
:
"""
Simply print unformatted text. This is the ultimate fallback.
"""
for
text
in
generator
:
if
not
color
:
text
=
strip_ansi
(
text
)
stream
.
write
(
text
)
class
Editor
:
def
__init__
(
self
,
editor
:
t
.
Optional
[
str
]
=
None
,
env
:
t
.
Optional
[
t
.
Mapping
[
str
,
str
]]
=
None
,
require_save
:
bool
=
True
,
extension
:
str
=
"
.txt
"
,
)
->
None
:
self
.
editor
=
editor
self
.
env
=
env
self
.
require_save
=
require_save
self
.
extension
=
extension
def
get_editor
(
self
)
->
str
:
if
self
.
editor
is
not
None
:
return
self
.
editor
for
key
in
"
VISUAL
"
,
"
EDITOR
"
:
rv
=
os
.
environ
.
get
(
key
)
if
rv
:
return
rv
if
WIN
:
return
"
notepad
"
for
editor
in
"
sensible-editor
"
,
"
vim
"
,
"
nano
"
:
if
os
.
system
(
f
"
which
{
editor
}
>/dev/null 2>&1
"
)
==
0
:
return
editor
return
"
vi
"
def
edit_file
(
self
,
filename
:
str
)
->
None
:
import
subprocess
editor
=
self
.
get_editor
()
environ
:
t
.
Optional
[
t
.
Dict
[
str
,
str
]]
=
None
if
self
.
env
:
environ
=
os
.
environ
.
copy
()
environ
.
update
(
self
.
env
)
try
:
c
=
subprocess
.
Popen
(
f
'
{
editor
}
"
{
filename
}
"'
,
env
=
environ
,
shell
=
True
)
exit_code
=
c
.
wait
()
if
exit_code
!=
0
:
raise
ClickException
(
_
(
"
{editor}: Editing failed
"
).
format
(
editor
=
editor
)
)
except
OSError
as
e
:
raise
ClickException
(
_
(
"
{editor}: Editing failed: {e}
"
).
format
(
editor
=
editor
,
e
=
e
)
)
from
e
def
edit
(
self
,
text
:
t
.
Optional
[
t
.
AnyStr
])
->
t
.
Optional
[
t
.
AnyStr
]:
import
tempfile
if
not
text
:
data
=
b
""
elif
isinstance
(
text
,
(
bytes
,
bytearray
)):
data
=
text
else
:
if
text
and
not
text
.
endswith
(
"
\n
"
):
text
+=
"
\n
"
if
WIN
:
data
=
text
.
replace
(
"
\n
"
,
"
\r\n
"
).
encode
(
"
utf-8-sig
"
)
else
:
data
=
text
.
encode
(
"
utf-8
"
)
fd
,
name
=
tempfile
.
mkstemp
(
prefix
=
"
editor-
"
,
suffix
=
self
.
extension
)
f
:
t
.
BinaryIO
try
:
with
os
.
fdopen
(
fd
,
"
wb
"
)
as
f
:
f
.
write
(
data
)
# If the filesystem resolution is 1 second, like Mac OS
# 10.12 Extended, or 2 seconds, like FAT32, and the editor
# closes very fast, require_save can fail. Set the modified
# time to be 2 seconds in the past to work around this.
os
.
utime
(
name
,
(
os
.
path
.
getatime
(
name
),
os
.
path
.
getmtime
(
name
)
-
2
))
# Depending on the resolution, the exact value might not be
# recorded, so get the new recorded value.
timestamp
=
os
.
path
.
getmtime
(
name
)
self
.
edit_file
(
name
)
if
self
.
require_save
and
os
.
path
.
getmtime
(
name
)
==
timestamp
:
return
None
with
open
(
name
,
"
rb
"
)
as
f
:
rv
=
f
.
read
()
if
isinstance
(
text
,
(
bytes
,
bytearray
)):
return
rv
return
rv
.
decode
(
"
utf-8-sig
"
).
replace
(
"
\r\n
"
,
"
\n
"
)
# type: ignore
finally
:
os
.
unlink
(
name
)
def
open_url
(
url
:
str
,
wait
:
bool
=
False
,
locate
:
bool
=
False
)
->
int
:
import
subprocess
def
_unquote_file
(
url
:
str
)
->
str
:
from
urllib.parse
import
unquote
if
url
.
startswith
(
"
file://
"
):
url
=
unquote
(
url
[
7
:])
return
url
if
sys
.
platform
==
"
darwin
"
:
args
=
[
"
open
"
]
if
wait
:
args
.
append
(
"
-W
"
)
if
locate
:
args
.
append
(
"
-R
"
)
args
.
append
(
_unquote_file
(
url
))
null
=
open
(
"
/dev/null
"
,
"
w
"
)
try
:
return
subprocess
.
Popen
(
args
,
stderr
=
null
).
wait
()
finally
:
null
.
close
()
elif
WIN
:
if
locate
:
url
=
_unquote_file
(
url
.
replace
(
'"'
,
""
))
args
=
f
'
explorer /select,
"
{
url
}
"'
else
:
url
=
url
.
replace
(
'"'
,
""
)
wait_str
=
"
/WAIT
"
if
wait
else
""
args
=
f
'
start
{
wait_str
}
""
"
{
url
}
"'
return
os
.
system
(
args
)
elif
CYGWIN
:
if
locate
:
url
=
os
.
path
.
dirname
(
_unquote_file
(
url
).
replace
(
'"'
,
""
))
args
=
f
'
cygstart
"
{
url
}
"'
else
:
url
=
url
.
replace
(
'"'
,
""
)
wait_str
=
"
-w
"
if
wait
else
""
args
=
f
'
cygstart
{
wait_str
}
"
{
url
}
"'
return
os
.
system
(
args
)
try
:
if
locate
:
url
=
os
.
path
.
dirname
(
_unquote_file
(
url
))
or
"
.
"
else
:
url
=
_unquote_file
(
url
)
c
=
subprocess
.
Popen
([
"
xdg-open
"
,
url
])
if
wait
:
return
c
.
wait
()
return
0
except
OSError
:
if
url
.
startswith
((
"
http://
"
,
"
https://
"
))
and
not
locate
and
not
wait
:
import
webbrowser
webbrowser
.
open
(
url
)
return
0
return
1
def
_translate_ch_to_exc
(
ch
:
str
)
->
t
.
Optional
[
BaseException
]:
if
ch
==
"
\x03
"
:
raise
KeyboardInterrupt
()
if
ch
==
"
\x04
"
and
not
WIN
:
# Unix-like, Ctrl+D
raise
EOFError
()
if
ch
==
"
\x1a
"
and
WIN
:
# Windows, Ctrl+Z
raise
EOFError
()
return
None
if
WIN
:
import
msvcrt
@contextlib.contextmanager
def
raw_terminal
()
->
t
.
Iterator
[
int
]:
yield
-
1
def
getchar
(
echo
:
bool
)
->
str
:
# The function `getch` will return a bytes object corresponding to
# the pressed character. Since Windows 10 build 1803, it will also
# return \x00 when called a second time after pressing a regular key.
#
# `getwch` does not share this probably-bugged behavior. Moreover, it
# returns a Unicode object by default, which is what we want.
#
# Either of these functions will return \x00 or \xe0 to indicate
# a special key, and you need to call the same function again to get
# the "rest" of the code. The fun part is that \u00e0 is
# "latin small letter a with grave", so if you type that on a French
# keyboard, you _also_ get a \xe0.
# E.g., consider the Up arrow. This returns \xe0 and then \x48. The
# resulting Unicode string reads as "a with grave" + "capital H".
# This is indistinguishable from when the user actually types
# "a with grave" and then "capital H".
#
# When \xe0 is returned, we assume it's part of a special-key sequence
# and call `getwch` again, but that means that when the user types
# the \u00e0 character, `getchar` doesn't return until a second
# character is typed.
# The alternative is returning immediately, but that would mess up
# cross-platform handling of arrow keys and others that start with
# \xe0. Another option is using `getch`, but then we can't reliably
# read non-ASCII characters, because return values of `getch` are
# limited to the current 8-bit codepage.
#
# Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
# is doing the right thing in more situations than with `getch`.
func
:
t
.
Callable
[[],
str
]
if
echo
:
func
=
msvcrt
.
getwche
# type: ignore
else
:
func
=
msvcrt
.
getwch
# type: ignore
rv
=
func
()
if
rv
in
(
"
\x00
"
,
"
\xe0
"
):
# \x00 and \xe0 are control characters that indicate special key,
# see above.
rv
+=
func
()
_translate_ch_to_exc
(
rv
)
return
rv
else
:
import
tty
import
termios
@contextlib.contextmanager
def
raw_terminal
()
->
t
.
Iterator
[
int
]:
f
:
t
.
Optional
[
t
.
TextIO
]
fd
:
int
if
not
isatty
(
sys
.
stdin
):
f
=
open
(
"
/dev/tty
"
)
fd
=
f
.
fileno
()
else
:
fd
=
sys
.
stdin
.
fileno
()
f
=
None
try
:
old_settings
=
termios
.
tcgetattr
(
fd
)
try
:
tty
.
setraw
(
fd
)
yield
fd
finally
:
termios
.
tcsetattr
(
fd
,
termios
.
TCSADRAIN
,
old_settings
)
sys
.
stdout
.
flush
()
if
f
is
not
None
:
f
.
close
()
except
termios
.
error
:
pass
def
getchar
(
echo
:
bool
)
->
str
:
with
raw_terminal
()
as
fd
:
ch
=
os
.
read
(
fd
,
32
).
decode
(
get_best_encoding
(
sys
.
stdin
),
"
replace
"
)
if
echo
and
isatty
(
sys
.
stdout
):
sys
.
stdout
.
write
(
ch
)
_translate_ch_to_exc
(
ch
)
return
ch
Prev
1
2
3
4
5
6
7
8
…
11
Next