Skip to content

Commit

Permalink
Fixing follow changes script.
Browse files Browse the repository at this point in the history
  • Loading branch information
mihxil committed Nov 10, 2023
1 parent 25a309b commit 371b9e5
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 26 deletions.
3 changes: 3 additions & 0 deletions src/npoapi/basic_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,7 @@ def xml_to_bytes(self, xml) -> bytes:
else:
raise Exception("unrecognized type " + str(t))

def __str__(self) -> str:
return "client for " + self.url


107 changes: 84 additions & 23 deletions src/npoapi/bin/npo_media_follow_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Simple client to get the changes feed from the NPO Frontend API
"""
import http
import json
import logging
import time
Expand All @@ -25,34 +26,87 @@ def __init__(self):
help="properties filtering")
self.client.add_argument("--raw", action='store_true', help="No attempts to stream and handle big results. Everything should fit in memory. Simpler, but less efficient.")
self.client.add_argument("--reasonFilter", type=str, default="")
self.client.add_argument("--change_to_string", type=str, help="dict to string for change. E.g. 'change.get('id', '') + title(change). Or 'CONCISE' for a default concise string.")

self.args = self.client.parse_args()

self.since = self.args.since
if self.since is None:
self.since = datetime.now().isoformat()
self.client.logger.info("No since given, using %s" % self.since)
since = self.args.since
if since is None:
since = str(datetime.now().isoformat())
self.client.logger.info("No since given, using %s" % since)

self.since_as_epoch = int(datetime.fromisoformat(self.since).timestamp() * 1000) - 60000
if since.isdigit():
self.since = int(since)
else:
self.since = int(datetime.fromisoformat(since).timestamp() * 1000) - 60000

self.since_mid = None
self.check_grow = False
if self.args.change_to_string == "CONCISE":
self.change_to_string_function = "timestamp_to_string(change.get('publishDate')) + ':' + change.get('id', '') + ':' + title(change) + ':' + reasons(change)"
self.client.logger.info("Using to string: %s" % self.change_to_string_function)
else:
self.change_to_string_function = self.args.change_to_string


def change_to_string(self, change):
if self.args.change_to_string:
def timestamp_to_string(timestamp):
return self.timestamp_to_string(timestamp)
def title(change):
return self.title(change)
def reasons(change):
ar = change.get('reasons', [])
if len(ar) == 0:
self.client.debug("No reasons in %s" % change)
return "<no reasons>"
return ",".join(list(map(lambda r: r['value'], ar)))
return str(eval(self.change_to_string_function))
else:
return json.dumps(change)

def title(self, change):
"""
helpful in change_to_string
"""
tail = change.get("tail", False)
if tail:
return "TAIL"
delete = change.get("delete", False)
if delete:
return "DELETE"
media = change.get('media')

if media:
return media['titles'][0]['value']
else:
return "<no media>"

def timestamp_to_string(self, timestamp):
return datetime.fromtimestamp(timestamp/1000).isoformat()

def one_call_raw(self):
response = self.client.changes_raw(
profile=self.args.profile,
since=self.since_as_epoch,
since=self.since,
since_mid = self.since_mid,
properties=self.args.properties,
deletes=self.args.deletes,
reason_filter=self.args.reasonFilter,
stream=False)
self.since_as_epoch = json.loads(response)['changes'][-1]['publishDate']
is_tail = json.loads(response)['changes'][-1]['tail']
if not is_tail or self.args.tail:
stdout.write(response + "\n")
changes = json.loads(response)['changes']
self.since = changes[-1]['publishDate']
self.since_mid = changes[-1].get('id', None)
for change in changes:
is_tail = change.get('tail', False)
if not is_tail or self.args.tail:
stdout.write(self.change_to_string(change) + "\n")

def one_call(self):
response = self.client.changes_raw(
profile=self.args.profile,
since=self.since_as_epoch,
since=self.since,
since_mid = self.since_mid,
properties=self.args.properties,
deletes=self.args.deletes,
reason_filter=self.args.reasonFilter,
Expand All @@ -65,38 +119,45 @@ def one_call(self):
changes = data['changes']
new_since = None
count = 0
for change in changes:
for lazy_change in changes:
count += 1
c = json_stream.to_standard_types(change)
c = json_stream.to_standard_types(lazy_change)
new_since = c.get('publishDate')
self.since_mid = c.get('id', None)
if not new_since:
logging.error("No publishDate in %s" % c)
break
is_tail = c.get('tail', False)
if not is_tail or self.args.tail:
stdout.write(json.dumps(c) + "\n")
stdout.write(self.change_to_string(c) + "\n")
stdout.flush()
if count == 0:
raise Exception("No changes received!")
if new_since is None:
raise Exception("No tail received?")
if self.check_grow and new_since < self.since_as_epoch:
raise Exception("Since doesn't grow (%s <= %s)" % (new_since, self.since_as_epoch))
if self.check_grow and new_since < self.since:
raise Exception("Since doesn't grow (%s < %s)" % (new_since, self.since))
self.check_grow = True
self.since_as_epoch = new_since
self.since = new_since
changes.read_all()
response.close()



def follow_changes(self):
self.client.logger.info("Watching %s " % (self.client.url))
try:
while True:
self.client.logger.info("since: %s (%s)" % (self.since_as_epoch, datetime.fromtimestamp(self.since_as_epoch/1000).isoformat()))
if self.args.raw:
self.one_call_raw()
else:
self.one_call()
time.sleep(self.args.sleep)
try:
self.client.logger.debug("since: %s,%s (%s)" % (self.since, self.since_mid, self.timestamp_to_string(self.since)))
if self.args.raw:
self.one_call_raw()
else:
self.one_call()
time.sleep(self.args.sleep)
except http.client.IncompleteRead:
self.client.logger.warn("Incomplete read")

except KeyboardInterrupt:
self.client.logger.info("interrupted")

Expand Down
16 changes: 13 additions & 3 deletions src/npoapi/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ def search(self, form="{}", sort="asc", offset: int = 0, limit: int = 240, profi
def changes(self, profile=None, limit=10, since=None, properties=None, deletes="ID_ONLY", tail=None) -> Union[None, ijson.items]:
return ijson.items(self.changes_raw(stream=True, profile=profile, limit=limit, since=since, properties=properties, deletes=deletes, tail=tail), 'changes.item')

def changes_raw(self, profile=None, order="ASC", stream=False, limit=10, since:Union[str, int, datetime.datetime]=None, force_oldstyle=False, properties=None, deletes="ID_ONLY", tail=None, reason_filter="") -> Union[None, http.client.HTTPResponse, str]:
def changes_raw(self, profile=None, order="ASC", stream=False, limit=None,
since:Union[str, int, datetime.datetime]=None,
since_mid=None,
force_oldstyle=False, properties=None, deletes="ID_ONLY", tail=None, reason_filter="") -> Union[None, http.client.HTTPResponse, str]:
if isinstance(properties, list):
properties = ",".join(properties)
sinceLong = None
Expand All @@ -64,8 +67,12 @@ def changes_raw(self, profile=None, order="ASC", stream=False, limit=10, since:U
sinceLong = since

params = { "profile": profile, "order": order, "max": limit,
"since": sinceLong, "publishedSince": sinceDate, "properties": properties,
"deletes": deletes, "tail": tail, "reasonFilter": reason_filter }
"since": sinceLong,
"publishedSince": (sinceDate + ("," + since_mid if since_mid else "")),
"properties": properties,
"deletes": deletes,
"tail": tail,
"reasonFilter": reason_filter }
if stream:
return self.stream("/api/media/changes", params=params)
else:
Expand All @@ -86,3 +93,6 @@ def iterate_raw(self, form=None, profile=None, stream=True, limit=1000, timeout=
else:
return self.request("/api/media/iterate", data=form,
params={"profile": profile, "max": limit, "properties": properties})

def __str__(self) -> str:
return super.__str__(self) + " (media)"
4 changes: 4 additions & 0 deletions src/npoapi/media_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,7 @@ def upload(self, mid: str, file: str, content_type: None, **kwargs):
@override
def accept_choices(self) -> Dict[str, str]:
return {"xml": "application/xml", "json": "application/json"}


def __str__(self) -> str:
return super.__str__(self) + " (media)"
3 changes: 3 additions & 0 deletions src/npoapi/npoapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,6 @@ def stream(self, path: str, params=None, accept=None, data=None, content_type: s
req.add_header("Accept", accept if accept else self._accept)
self.logger.debug("headers: %s" % str(req.headers))
return self.get_response(req, url, ignore_not_found=ignore_not_found, timeout=timeout)

def __str__(self) -> str:
return "frontend api client " + self.url

0 comments on commit 371b9e5

Please sign in to comment.