atomic api Example with multiprocessing and multiple threads:¶
You need the following steps to utilize the atomic_int, atomic_uint, atomic_bool, atomic_float, atomic_bytearray:
create shared variable used by any process, refer to UIntAPIs, IntAPIs, BytearrayAPIs, BoolFloatAPIs
from shared_atomic.atomic_int import atomic_int
a = atomic_int(b'1',0)
create function used to change the shared variable
from shared_atomic.atomic_int import atomic_int, int_add_and_fetch
def process1_run():
a = atomic_int(b'1')
int_add_and_fetch(a, 100)
initiate concurrent processes from any process
from multiprocessing import Pool
import os
async_result: list
result: list
async_result=[]
result = []
parallelism = os.cpu_count()
pool = Pool(parallelism)
for i in range(parallelism):
async_result.append(pool.apply_async(process1_run))
try:
for i in range(parallelism):
async_result[i].get()
except Exception as e:
result.append(e)
pool.close()
pool.join()
if result:
raise RuntimeError("Subprocess finished with error{}".format(result))
remove the shared variable in any process
atomic_object_remove(b'1')
You need the following steps to utilize the shared_dict:
create shared dict used by any process, refer to SharedDictAPIs
from shared_atomic.shared_dict import shared_dict
a = shared_dict(None, bucket_chunk_size_exponent=18, bucket_chunk_number=100, size=1024**3*5)
print(a.file_name)
use the shared dict in any process by the same file_name.
import random
a = shared_dict(b'/private/var/tmp/mm29273eOYE9z', bucket_chunk_size_exponent=18, bucket_chunk_number=100, size=1024**3*5)
random_int_list = random.sample(range(-2**62, 2**62 - 1), k=333333)
for i in range(len(random_int_list)):
dict_insert(a, random_int_list[i], random_int_list[i]*random_int_list[i])
for i in range(len(random_int_list)):
result = dict_get(a, random_int_list[i])
assert random_int_list[i]*random_int_list[i] == result[1]
assert True == result[0]
the value should persist from any other process unless deleted manually.
remove the content of shared_dict permanently.
import os
os.remove(b'/private/var/tmp/mm29273eOYE9z')
You following steps is an example to utilize the atomic_shared_memory to calculate the pageview per second in the promotion.
create shared variable used by any process, refer to atomic_shared_memory, IntAPIs
import shutil
from shared_atomic_host.common import promotion_data_memory_path # file path of the atomic_shared_memory
from shared_atomic_host.common import promotion_time_name # name of the start of promotion time atomic_int
from datetime import datetime
from shared_atomic.atomic_shared_memory import atomic_shared_memory
from shared_atomic.atomic_int import atomic_int
data=atomic_shared_memory(b'\0'*57600)
temp_file_name=data.f.name
del data
shutil.move(temp_file_name,promotion_data_memory_path)
start_time=atomic_int(promotion_time_name,2**63-1) # use a large value to stop the statistics
update the shared memory segment in the FastAPI backend code to count the page view
from shared_atomic_host.common import promotion_data_memory_path, promotion_time_name
from shared_atomic.atomic_shared_memory import atomic_shared_memory
from datetime import datetime
from shared_atomic.atomic_int import atomic_int, int_get
data=atomic_shared_memory(source='f',
previous_shared_memory_path=promotion_data_memory_path,
standalone=True)
start_time=atomic_int(promotion_time_name)
@payment_router.get("/count_visit")
async def count_visit():
second = int(datetime.now().timestamp() - int_get(start_time))
if 0 <= second < 7200:
data.offset_add_and_fetch(b'\0\0\0\0\0\0\0\1', second*8)
return ""
when the promotion start update the start_time atomic_int
from shared_atomic_host.common import promotion_time_name # name of the start of promotion time atomic_int
from shared_atomic.atomic_int import atomic_int, int_get_and_set
start_time=atomic_int(promotion_time_name)
int_get_and_set(start_time, int(datetime.now().timestamp()))
summary the resulted shared memory segment
from datetime import timezone, timedelta, datetime
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
from shared_atomic.atomic_shared_memory import atomic_shared_memory
from array import array
from shared_atomic_host.common import promotion_data_memory_path, promotion_time_name
from shared_atomic.atomic_int import atomic_int, int_get
data=atomic_shared_memory(source='f',
previous_shared_memory_path=promotion_data_memory_path)
start_time=atomic_int(promotion_time_name)
timestamp_start_posix=int_get(start_time) #mark start time
timestamp_start = datetime.fromtimestamp(timestamp_start_posix)
time_index=[i.timestamp() for i in pd.date_range(start=timestamp_start, #use the start time to generate the time_index
end=timestamp_start+timedelta(seconds=7199),
freq='1s', tz='Asia/Hong_Kong').to_pydatetime()]
offsets=array('Q', range(0, 7200*8, 8))
lengths=array('b', [8]*7200)
count_series = data.offset_gets(offsets, lengths)
count_series = [int.from_bytes(count_series[i][:lengths[i]],byteorder='big') for i in range(7200)]
def minute_formatter(x, pos):
#only needs the hours and minutes value after start time of the promotion
return f'{datetime.fromtimestamp(x, tz=timezone(timedelta(hours=8))):%H:%M}'
if __name__ == "__main__":
fig, ax = plt.subplots()
fig.set(dpi=800)
fig.set(size_inches=(8, 6))
ax.set_xlabel('time')
ax.set_ylabel('request per second')
ax.grid(ls='dotted')
ax.set_xlim(time_index[0],
time_index[1199])
ax.set_ylim(-1,35)
ax.xaxis.set_major_locator(ticker.MultipleLocator(600))
ax.xaxis.set_major_formatter(minute_formatter)
ax.plot(time_index, count_series, label='website traffic', marker = 'o', markersize=2, color='tab:blue')
ax.legend(loc=(0.65, 0.8), numpoints=2, fontsize=10)
plt.savefig("/.../figure17.eps")