Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check internet connection using api_endpoint if present in CLIENT_OPTIONS settings #275

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions rele/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ class NotConnectionError(BaseException):
pass


def check_internet_connection():
def check_internet_connection(remote_server):
logger.debug("Checking connection")
remote_server = "www.google.com"
port = 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
Expand Down Expand Up @@ -67,6 +66,15 @@ def __init__(
self._futures: Dict[str, Future] = {}
self._subscriptions = subscriptions
self.threads_per_subscription = threads_per_subscription
self.internet_check_endpoint = self._get_internet_check_endpoint(client_options)

def _get_internet_check_endpoint(self, client_options):
if (
client_options is not None
and client_options.get("api_endpoint") is not None
):
return client_options.get("api_endpoint")
return "www.google.com"

def setup(self):
"""Create the subscriptions on a Google PubSub topic.
Expand Down Expand Up @@ -152,7 +160,7 @@ def _boostrap_consumption(self, subscription):
"future cancelled and result"
)

if not check_internet_connection():
if not check_internet_connection(self.internet_check_endpoint):
logger.debug(
f"Not internet "
f"connection when boostrap a consumption for {subscription}"
Expand Down Expand Up @@ -181,7 +189,9 @@ def _wait_forever(self, sleep_interval):
while True:
logger.debug(f"[_wait_forever][0] Futures: {self._futures.values()}")

if datetime.now().timestamp() % 50 < 1 and not check_internet_connection():
if datetime.now().timestamp() % 50 < 1 and not check_internet_connection(
self.internet_check_endpoint
):
logger.debug("Not internet connection, raising an Exception")
raise NotConnectionError

Expand Down
49 changes: 48 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ def worker(config):
)


@pytest.fixture
def worker_without_client_options(config):
subscriptions = (sub_stub,)
return Worker(
subscriptions,
None,
config.gc_project_id,
config.credentials,
config.gc_storage_region,
default_ack_deadline=60,
threads_per_subscription=10,
default_retry_policy=config.retry_policy,
)


@pytest.fixture
def mock_consume(config):
with patch.object(Subscriber, "consume") as m:
Expand All @@ -52,7 +67,7 @@ def mock_create_subscription():

@pytest.fixture(autouse=True)
def mock_internet_connection():
with patch("rele.worker.check_internet_connection") as m:
with patch("rele.worker.check_internet_connection", autospec=True) as m:
m.return_value = True
yield m

Expand Down Expand Up @@ -129,12 +144,44 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
assert worker._subscriber._gc_project_id == "rele-test"

def test_raises_not_connection_error_during_start(
self, worker_without_client_options, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker_without_client_options.start()
mock_internet_connection.assert_called_once_with("www.google.com")

def test_check_internet_connection_with_default_endpoint_if_client_options_do_not_have_api_endpoint(
self, config, mock_internet_connection
):
mock_internet_connection.return_value = False
subscriptions = (sub_stub,)
worker = Worker(
subscriptions,
{},
config.gc_project_id,
config.credentials,
config.gc_storage_region,
default_ack_deadline=60,
threads_per_subscription=10,
default_retry_policy=config.retry_policy,
)

with pytest.raises(NotConnectionError):
worker.start()
mock_internet_connection.assert_called_once_with("www.google.com")

def test_check_internet_connection_uses_api_endpoint_setting_when_present(
self, worker, mock_internet_connection
):
mock_internet_connection.return_value = False

with pytest.raises(NotConnectionError):
worker.start()
mock_internet_connection.assert_called_once_with(
"custom-api.interconnect.example.com"
)


@pytest.mark.usefixtures("mock_create_subscription")
Expand Down
Loading