fix: check sqlalchemy_uri (#23901)

This commit is contained in:
Daniel Vaz Gaspar 2023-05-03 11:14:03 +01:00 committed by GitHub
parent d4b9c18271
commit e5f512e348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 23 deletions

View File

@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
from flask_babel import lazy_gettext as _
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import NoSuchModuleError
@ -24,18 +26,20 @@ from superset.exceptions import SupersetSecurityException
# list of unsafe SQLAlchemy dialects
BLOCKLIST = {
# sqlite creates a local DB, which allows mapping server's filesystem
"sqlite",
re.compile(r"sqlite(?:\+[^\s]*)?$"),
# shillelagh allows opening local files (eg, 'SELECT * FROM "csv:///etc/passwd"')
"shillelagh",
"shillelagh+apsw",
re.compile(r"shillelagh$"),
re.compile(r"shillelagh\+apsw$"),
}
def check_sqlalchemy_uri(uri: URL) -> None:
if uri.drivername in BLOCKLIST:
for blocklist_regex in BLOCKLIST:
if not re.match(blocklist_regex, uri.drivername):
continue
try:
dialect = uri.get_dialect().__name__
except NoSuchModuleError:
except (NoSuchModuleError, ValueError):
dialect = uri.drivername
raise SupersetSecurityException(

View File

@ -14,30 +14,78 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
import pytest
from sqlalchemy.engine.url import make_url
from superset.exceptions import SupersetSecurityException
from superset.security.analytics_db_safety import check_sqlalchemy_uri
from tests.integration_tests.base_tests import SupersetTestCase
class TestDBConnections(SupersetTestCase):
def test_check_sqlalchemy_uri_ok(self):
check_sqlalchemy_uri(make_url("postgres://user:password@test.com"))
def test_check_sqlalchemy_url_sqlite(self):
@pytest.mark.parametrize(
"sqlalchemy_uri, error, error_message",
[
("postgres://user:password@test.com", False, None),
(
"sqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+pysqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+aiosqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+pysqlcipher:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+new+driver:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+new+:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"shillelagh:///home/superset/bad.db",
True,
"shillelagh cannot be used as a data source for security reasons.",
),
(
"shillelagh+apsw:///home/superset/bad.db",
True,
"shillelagh cannot be used as a data source for security reasons.",
),
("shillelagh+:///home/superset/bad.db", False, None),
(
"shillelagh+something:///home/superset/bad.db",
False,
None,
),
],
)
def test_check_sqlalchemy_uri(
sqlalchemy_uri: str, error: bool, error_message: Optional[str]
):
if error:
with pytest.raises(SupersetSecurityException) as excinfo:
check_sqlalchemy_uri(make_url("sqlite:///home/superset/bad.db"))
assert (
str(excinfo.value)
== "SQLiteDialect_pysqlite cannot be used as a data source for security reasons."
)
with pytest.raises(SupersetSecurityException) as excinfo:
check_sqlalchemy_uri(make_url("shillelagh:///home/superset/bad.db"))
assert (
str(excinfo.value)
== "shillelagh cannot be used as a data source for security reasons."
)
check_sqlalchemy_uri(make_url(sqlalchemy_uri))
assert str(excinfo.value) == error_message
else:
check_sqlalchemy_uri(make_url(sqlalchemy_uri))